zoukankan      html  css  js  c++  java
  • Kafak概述及集群部署

    一、Kafka概述

    1.1、定义

    Kafka 是一个分布式的基于发布/订阅模式的消息队列(Message Queue),主要应用于 大数据实时处理领域。

    1.2、消息队列

    1.2.1、消息队列的应用场景

    image

    1.2.2、消息队列的好处

    1)解耦

    允许你独立的扩展或修改两边的处理过程,只要确保它们遵守同样的接口约束

    2)可恢复性

    系统的一部分组件失效时,不会影响到整个系统。消息队列降低了进程间的耦合度,所以即使一个处理消息的进程挂掉,加入队列中的消息仍然可以在系统恢复后被处理。

    3)缓冲

    有助于控制和优化数据流经过系统的速度,解决生产消息和消费消息的处理速度不一致的情况。

    4)灵活性 & 峰值处理能力

    在访问量剧增的情况下,应用仍然需要继续发挥作用,但是这样的突发流量并不常见。如果为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列能够使关键组件顶住突发的访问压力,而不会因为突发的超负荷的请求而完全崩溃。

    5)异步通信

    很多时候,用户不想也不需要立即处理消息。消息队列提供了异步处理机制,允许用户把一个消息放入队列,但并不立即处理它。想向队列中放入多少消息就放多少,然后在需要的时候再去处理它们。

    1.2.3、消息队列的两种模式

    (1)点对点模式(一对一,消费者主动拉取数据,消息收到后消息清除)

    消息生产者生产消息发送到Queue中,然后消息消费者从Queue中取出并且消费消息。消息被消费以后,queue 中不再有存储,所以消息消费者不可能消费到已经被消费的消息。 Queue 支持存在多个消费者,但是对一个消息而言,只会有一个消费者可以消费。

    image

    (2)发布/订阅模式(一对多,消费者消费数据之后不会清除消息)

    消息生产者(发布)将消息发布到 topic 中,同时有多个消息消费者(订阅)消费该消息。和点对点方式不同,发布到 topic 的消息会被所有订阅者消费。

    image

    二、Kafka基础架构

    image

    1)Producer :消息生产者,就是向 kafka broker 发消息的客户端

    2)Consumer :消息消费者,向 kafka broker 取消息的客户端

    3)Consumer Group (CG):消费者组,由多个 consumer 组成。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

    4)Broker :一台 kafka 服务器就是一个 broker。一个集群由多个 broker 组成。一个 broker可以容纳多个 topic。

    5)Topic :可以理解为一个队列,生产者和消费者面向的都是一个 topic

    6)Partition:为了实现扩展性,一个非常大的 topic 可以分布到多个 broker(即服务器)上,一个 topic 可以分为多个 partition,每个 partition 是一个有序的队列

    7)Replica:副本,为保证集群中的某个节点发生故障时,该节点上的 partition 数据不丢失且 kafka 仍然能够继续工作,kafka 提供了副本机制,一个 topic 的每个分区都有若干个副本, 一个 leader 和若干个 follower。

    8)leader:每个分区多个副本的“主”,生产者发送数据的对象,以及消费者消费数据的对象都是 leader。

    9)follower:每个分区多个副本中的“从”,实时从 leader 中同步数据,保持和 leader 数据的同步。leader 发生故障时,某个 follower 会成为新的 follower。

    三、Kafka集群安装

    3.1、集群规划

    hadoop102 hadoop103 hadoop103
    zookeeper zookeeper zookeeper
    Kafka Kafka Kafka

    3.2、软件下载

    官网:http://kafka.apache.org/downloads.html

    3.3、集群部署

    1)解压安装包

    [hadoop@hadoop102 software]$ tar xf kafka_2.11-0.11.0.0.tgz -C /opt/module/
    [hadoop@hadoop102 software]$ ll /opt/module/kafka_2.11-0.11.0.0/
    total 48
    drwxr-xr-x 3 hadoop hadoop  4096 Jun 23  2017 bin
    drwxr-xr-x 2 hadoop hadoop  4096 Jun 23  2017 config
    drwxr-xr-x 2 hadoop hadoop  4096 Jan 29 15:02 libs
    -rw-r--r-- 1 hadoop hadoop 28824 Jun 23  2017 LICENSE
    -rw-r--r-- 1 hadoop hadoop   336 Jun 23  2017 NOTICE
    drwxr-xr-x 2 hadoop hadoop    47 Jun 23  2017 site-docs

    2)修改解压后的文件名称

    [hadoop@hadoop102 software]$ cd ../module/
    [hadoop@hadoop102 module]$ mv kafka_2.11-0.11.0.0/ kafka

    3)在/opt/module/kafka 目录下创建 logs 文件夹

    [hadoop@hadoop102 kafka]$ mkdir logs

    4)修改配置文件

    [hadoop@hadoop102 kafka]$ cd config/
    [hadoop@hadoop102 config]$ vim server.properties

    image

    image

    image

    配置项说明:

    #broker 的全局唯一编号,不能重复
    broker.id=2
    #删除 topic 功能使能
    delete.topic.enable=true
    #处理网络请求的线程数量
    num.network.threads=3
    #用来处理磁盘 IO 的现成数量
    num.io.threads=8
    #发送套接字的缓冲区大小
    socket.send.buffer.bytes=102400
    #接收套接字的缓冲区大小
    socket.receive.buffer.bytes=102400
    #请求套接字的缓冲区大小
    socket.request.max.bytes=104857600
    #kafka 运行日志存放的路径
    log.dirs=/opt/module/kafka/logs
    #topic 在当前 broker 上的分区个数
    num.partitions=1
    #用来恢复和清理 data 下数据的线程数量
    num.recovery.threads.per.data.dir=1
    #segment 文件保留的最长时间,超时将被删除
    log.retention.hours=168
    #配置连接 Zookeeper 集群地址
    zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

    5)配置环境变量

    [hadoop@hadoop102 config]$ sudo vim /etc/profile
    #KAFKA_HOME
    export KAFKA_HOME=/opt/module/kafka
    export PATH=$PATH:$KAFKA_HOME/bin
    
    [hadoop@hadoop102 config]$ source /etc/profile

    6)分发安装包

    [hadoop@hadoop102 config]$ cat /usr/local/bin/xsync 
    #!/bin/bash
    #1 获取输入参数个数,如果没有参数,直接退出
    pcount=$#
    if((pcount==0)); then
    echo no args;
    exit;
    fi
    
    #2 获取文件名称
    p1=$1
    fname=`basename $p1`
    echo fname=$fname
    
    #3 获取上级目录到绝对路径
    pdir=`cd -P $(dirname $p1); pwd`
    echo pdir=$pdir
    
    #4 获取当前用户名称
    user=`whoami`
    
    #5 循环
    for((host=103; host<105; host++)); do
            echo ------------------- hadoop$host --------------
            rsync -rvl $pdir/$fname $user@hadoop$host:$pdir
    done
    
    [hadoop@hadoop102 config]$ xsync /opt/module/kafka/

    注意:分发之后记得配置其他机器的环境变量

    7)分别在 hadoop103 和 hadoop104 上修改配置文件/opt/module/kafka/config/server.properties

    中的 broker.id=3、broker.id=4。注:broker.id 不得重复

    8)启动集群

    依次在 hadoop102、hadoop103、hadoop104 节点上启动 kafka

    [hadoop@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
    [hadoop@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
    [hadoop@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties

    9)关闭集群

    [hadoop@hadoop102 kafka]$ bin/kafka-server-stop.sh stop
    [hadoop@hadoop103 kafka]$ bin/kafka-server-stop.sh stop
    [hadoop@hadoop104 kafka]$ bin/kafka-server-stop.sh stop

    四、Kafka 命令行操作

    1)查看当前服务器中的所有 topic

    [hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
    __consumer_offsets

    2)创建 topic

    [hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
    Created topic "first".
    [hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
    __consumer_offsets
    first
    

    选项说明:

    • --topic 定义 topic 名
    • --replication-factor 定义副本数
    • --partitions 定义分区数

    3) 查看某个 Topic 的详情

    [hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first
    Topic:first	PartitionCount:1	ReplicationFactor:3	Configs:
    	Topic: first	Partition: 0	Leader: 4	Replicas: 4,3,2	Isr: 4,3,2

    4)修改分区数

    [hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --alter --topic first --partitions 6
    WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
    Adding partitions succeeded!
    [hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --describe --topic first
    Topic:first	PartitionCount:6	ReplicationFactor:3	Configs:
    	Topic: first	Partition: 0	Leader: 4	Replicas: 4,3,2	Isr: 4,3,2
    	Topic: first	Partition: 1	Leader: 2	Replicas: 2,3,4	Isr: 2,3,4
    	Topic: first	Partition: 2	Leader: 3	Replicas: 3,4,2	Isr: 3,4,2
    	Topic: first	Partition: 3	Leader: 4	Replicas: 4,3,2	Isr: 4,3,2
    	Topic: first	Partition: 4	Leader: 2	Replicas: 2,4,3	Isr: 2,4,3
    	Topic: first	Partition: 5	Leader: 3	Replicas: 3,2,4	Isr: 3,2,4

    5)删除 topic

    [hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --delete --topic first
    Topic first is marked for deletion.
    Note: This will have no impact if delete.topic.enable is not set to true.
    [hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --list
    __consumer_offsets
    
    #需要 server.properties 中设置 delete.topic.enable=true 否则只是标记删除。
    

    6) 发送及消费消息

    #发送消息
    [hadoop@hadoop102 kafka]$ bin/kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
    
    #消费消息
    [hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh --zookeeper hadoop102:2181 --topic first
    [hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
    [hadoop@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first

    image

    image

    image

    image

    --from-beginning:会把主题中以往所有的数据都读取出来。

    五、数据日志分离

    1)停止kafka集群并删除logs目录

    [hadoop@hadoop102 kafka]$ bin/kafka-server-stop.sh stop
    [hadoop@hadoop103 kafka]$ bin/kafka-server-stop.sh stop
    [hadoop@hadoop104 kafka]$ bin/kafka-server-stop.sh stop
    
    #清除logs目录
    [hadoop@hadoop102 kafka]$ rm -fr logs/
    [hadoop@hadoop103 kafka]$ rm -fr logs/
    [hadoop@hadoop104 kafka]$ rm -fr logs/

    2)重装zookeeper

    zookeeper中会存在kafka相关注册信息,需要删除

    [zk: localhost:2181(CONNECTED) 0] ls /
    [cluster, controller_epoch, servers, brokers, zookeeper, atguigu, admin, isr_change_notification, consumers, latest_producer_id_block, config]
    
    #停止zookeeper集群
    [hadoop@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh stop
    [hadoop@hadoop103 zookeeper-3.4.10]$ bin/zkServer.sh stop
    [hadoop@hadoop104 zookeeper-3.4.10]$ bin/zkServer.sh stop
    
    #删除数据目录并重启
    [hadoop@hadoop102 zookeeper-3.4.10]$ rm -fr zkData/version-2/
    [hadoop@hadoop103 zookeeper-3.4.10]$ rm -fr zkData/version-2/
    [hadoop@hadoop104 zookeeper-3.4.10]$ rm -fr zkData/version-2/
    
    [hadoop@hadoop102 zookeeper-3.4.10]$ bin/zkServer.sh start
    [hadoop@hadoop103 zookeeper-3.4.10]$ bin/zkServer.sh start
    [hadoop@hadoop104 zookeeper-3.4.10]$ bin/zkServer.sh start
    

    3)修改kafka配置

    [hadoop@hadoop102 kafka]$ vim config/server.properties
    log.dirs=/opt/module/kafka/data
    
    [hadoop@hadoop103 kafka]$ vim config/server.properties
    log.dirs=/opt/module/kafka/data
    
    [hadoop@hadoop104 kafka]$ vim config/server.properties
    log.dirs=/opt/module/kafka/data
    

    4)启动kafka集群

    [hadoop@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
    [hadoop@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
    [hadoop@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
    
    #创建topic测试日志与数据分离
    [hadoop@hadoop102 kafka]$ bin/kafka-topics.sh --zookeeper hadoop102:2181 --create --replication-factor 3 --partitions 1 --topic first
    Created topic "first".
    [hadoop@hadoop102 kafka]$ ll
    total 48
    drwxr-xr-x 3 hadoop hadoop  4096 Jun 23  2017 bin
    drwxr-xr-x 2 hadoop hadoop  4096 Feb  2 14:37 config
    drwxrwxr-x 3 hadoop hadoop   202 Feb  2 14:40 data
    drwxr-xr-x 2 hadoop hadoop  4096 Feb  2 11:32 libs
    -rw-r--r-- 1 hadoop hadoop 28824 Jun 23  2017 LICENSE
    drwxrwxr-x 2 hadoop hadoop   205 Feb  2 14:39 logs
    -rw-r--r-- 1 hadoop hadoop   336 Jun 23  2017 NOTICE
    drwxr-xr-x 2 hadoop hadoop    47 Jun 23  2017 site-docs
    [hadoop@hadoop102 kafka]$ ll logs/
    total 68
    -rw-rw-r-- 1 hadoop hadoop  7309 Feb  2 14:40 controller.log
    -rw-rw-r-- 1 hadoop hadoop     0 Feb  2 14:39 kafka-authorizer.log
    -rw-rw-r-- 1 hadoop hadoop     0 Feb  2 14:39 kafka-request.log
    -rw-rw-r-- 1 hadoop hadoop  5952 Feb  2 14:39 kafkaServer-gc.log.0.current
    -rw-rw-r-- 1 hadoop hadoop 18801 Feb  2 14:40 kafkaServer.out
    -rw-rw-r-- 1 hadoop hadoop   172 Feb  2 14:39 log-cleaner.log
    -rw-rw-r-- 1 hadoop hadoop 18801 Feb  2 14:40 server.log
    -rw-rw-r-- 1 hadoop hadoop  5897 Feb  2 14:40 state-change.log
    [hadoop@hadoop102 kafka]$ ll data/
    total 12
    -rw-rw-r-- 1 hadoop hadoop   0 Feb  2 14:39 cleaner-offset-checkpoint
    drwxrwxr-x 2 hadoop hadoop 141 Feb  2 14:40 first-0
    -rw-rw-r-- 1 hadoop hadoop   0 Feb  2 14:39 log-start-offset-checkpoint
    -rw-rw-r-- 1 hadoop hadoop  54 Feb  2 14:39 meta.properties
    -rw-rw-r-- 1 hadoop hadoop  14 Feb  2 14:40 recovery-point-offset-checkpoint
    -rw-rw-r-- 1 hadoop hadoop  14 Feb  2 14:41 replication-offset-checkpoint
    [hadoop@hadoop102 kafka]$ ll data/first-0/
    total 0
    -rw-rw-r-- 1 hadoop hadoop 10485760 Feb  2 14:40 00000000000000000000.index
    -rw-rw-r-- 1 hadoop hadoop        0 Feb  2 14:40 00000000000000000000.log
    -rw-rw-r-- 1 hadoop hadoop 10485756 Feb  2 14:40 00000000000000000000.timeindex
    -rw-rw-r-- 1 hadoop hadoop        0 Feb  2 14:40 leader-epoch-checkpoint
    
    
    作者:Lawrence

    -------------------------------------------

    个性签名:独学而无友,则孤陋而寡闻。做一个灵魂有趣的人!

    扫描上面二维码关注我
    如果你真心觉得文章写得不错,而且对你有所帮助,那就不妨帮忙“推荐"一下,您的“推荐”和”打赏“将是我最大的写作动力!
    本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接.
  • 相关阅读:
    bootstrap 兼容 IE8
    在IE8的基础上安装IE11
    前台
    dll 库文件下载地址
    年轻
    linux 异常
    Navicat断网时连不上数据库
    jQuery
    破解版 Teamver 安装
    mysql
  • 原文地址:https://www.cnblogs.com/hujinzhong/p/14344213.html
Copyright © 2011-2022 走看看