zoukankan      html  css  js  c++  java
  • Kafka 安装及入门

    什么是Kafka?

    Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 

    Kafka基本概念

    • Broker:物理概念,Kafka集群中的每个Kafka节点;

    • Topic:逻辑概念,Kafka消息的类别,对数据进行区分、隔离;

    • Partition:物理概念,Kafka下数据存储的基本单元。一个Topic数据,会被分散存储到多个Partition,每一个Partition是有序的;

    • Replication(副本、备份):同一个Partition可能会有多个Replica,多个Replica之间数据是一样的;

    • Replication Leader:一个Partitionn的多个Replica上,需要一个Leader负责该Partition上与Producer和Consumer交互;

    • ReplicaManager:负责管理当前broker所有分区和副本的信息,处理KafkaController发起的一些请求,副本状态的切换、添加/读取消息、Leader的选举等。

     

    Kafka概念延伸

    Partition(最小存储单元)

    • 每一个Topic被切分为多个Partitions(Partition属于消费者存储的基本单位);

    • 消费者数目小于或等于Partition的数目(多个消费者若消费同个Partition会出现数据错误,所有Kafka如此设计);

    • Broker Group中的每一个Broker保存Topic的一个或多个Partitions(一个Broker只会保存一个Partition,若Partition太大则多个Broker保存同个Partition);

    • Consumer Group中的仅有一个Consumer读取Topic的一个或多个Partitions,并且是唯一的Consumer(避免同一个Partition被多个Consumer消费)。

    Replication

    • 当集群中有Broker挂掉的情况,系统可以主动地使Replicas提供服务;

    • 系统默认设置每一个Topic的replication系数为1(即默认没有副本,节省资源),可以在创建Topic时单独设置。

    特点:

    1. Replication的基本单位是Topic的Partition;

    2. 所有的读和写都从Leader进,Followers只是做为备份(只有Leader管理读写,其他的Replication只做备份);

    3. Follower必须能够及时复制Leader的数据;

    4. 增加容错性与可拓展性。

    Kafka基本结构

    Kafka消息结构

    Kafka特点

    • 分布式(多分区,多副本,多消费者,基于ZooKeeper调度);
    • 高性能(高吞吐,低延时,高并发,时间复杂度为O(1));
    • 持久性和扩展性(数据可持久化,容错率,支持在线水平扩展,消息自动平衡)。

    Kafka应用场景

    消息队列,行为跟踪,元信息监控,日志收集,流处理,事件源,持久性日志(commit log)等。

    Kafka安装(Linux下)

      需要安装JDK(步骤略过)与ZooKeeper(高版本Kafka自带ZooKeeper)。

    ZooKeeper安装: 

    1. 下载,解压,配置:

    wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
    tar -zxvf zookeeper-3.4.12.tar.gz
    # zookeeper-3.4.12/conf中复制zoo_sample.cfg为zoo.cfg
    cp zoo_sample.cfg zoo.cfg
    # 修改zoo.cfg文件中下面两行(dataDir和dataLogDir后面所指的文件夹必须要存在如果不存在的话,在启动ZooKeeper服务端的时候会报错。这里是单机情况下的配置情况,如果是集群的话,要在clientPort下面添加服务器的ip。如server.1=192.168.180.132:2888:3888
    server.2=192.168.180.133:2888:3888...等等。)
    dataDir=/tmp/zookeeper  
    dataLogDir=/tmp/zookeeper/log 

     2. 配置环境变量(全用户永久更改方式):

    修改/etc/profile文件,尾部添加:

    ZOOKEEPER_INSTALL=/usr/local/zookeeper-3.4.12 
    PATH=$PATH:$ZOOKEEPER_INSTALL/bin  
    
    export ZOOKEEPER_INSTALL
    export PATH

    3. 启动检验:

    # 进入zookeeper的bin目录下,启动
    ./zkServer.sh start  
    # 查看状态
    ./zkServer.sh status
    # 启动zookeeper的客户端(本地不需要-server参数)
    ./zkCli.sh -server 192.168.229.128:2181

    注:如拒绝连接,检查防火墙配置。

    Kafka安装: 

    1. 下载,解压:

    # curl 文件传输工具,-L 支持重定向链接,-O 把输出写到该文件中,保留远程文件的文件名
    curl -L -O http://mirror.bit.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
    # -x 解压,-z 调用gzip执行,-v 显示细节,-f 指定文件
    tar -xzvf kafka_2.11-2.0.0.tgz

    2. 配置单节点ZooKeeper(使用自带ZooKeeper时配置):

    cd /usr/local/kafka_2.11-2.0.0  # 进入kafka主目录
    mkdir -p zk/data    # 创建zookeeper数据存放目录
    mkdir -p zk/logs    # 创建zookeeper存放日志目录
    cd config       # 进入配置文件所在目录
    mv zookeeper.properties zookeeper.properties.bak  # 将原配置文件移位.bak备份文件
    cat > zookeeper.properties << EOF
    tickTime
    =2000
    dataDir
    =/usr/local/kafka_2.11-2.0.0/zk/data
    dataLogDir
    =/usr/local/kafka_2.11-2.0.0/zk/logs
    clientPort
    =2181
    EOF

    3. 配置单结点Kafka:

    cd /usr/local/kafka_2.11-2.0.0  # 进入kafka主目录
    mkdir logs      # 创建logs目录用于存放日志
    cd config       # 进入配置文件所在目录
    mv server.properties server.properties.bak  # 将原配置文件移位.bak备份文件
    
    cat > server.properties << EOF
    broker.id=1
    listeners=PLAINTEXT://192.168.229.128:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/usr/local/kafka_2.11-2.0.0/logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.229.128:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    EOF

    server.properties:  基本为原server.properties的默认配置,安装时主要修改;

    broker.id--broker的id:  修改为任意想要的数值(和zookeeper中的id类似);

    listeners--监听址址:  修改为kafka要监听的地址;

    log.dirs--日志文件存放目录:  修改为要存放日志的目录;

    zookeeper.connect--zookeeper监听地址:  修改为zookeeper的监听地址,如果是集群所有地址全写上用逗号(半角)隔开即可。

    4. 启动和停止:

    启动:

    ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties    # 启动zookeeper  (-daemon在后台执行,后接配置文件)
    ./kafka-server-start.sh -daemon ../config/server.properties           # 启动kafka

    停止:

    ./zookeeper-server-stop.sh    # 停止zookeeper
    ./kafka-server-stop.sh        # 停止kafka,centos7上可能关不了用kill -9直接杀掉

    Kafka基本命令

    创建topic:

    ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test-kafka-topic

    查看topic:

    ./kafka-topics.sh --list --zookeeper localhost:2181

    生产者命令:

    ./kafka-console-producer.sh --broker-list 192.168.229.128:9092 --topic test-kafka-topic        # 更换成自己的IP

    消费者命令:

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-kafka-topic --from-beginning         # 更换成自己的IP,--from-beginning表示重头开始消费

    命令效果:

    生产:

    消费:

  • 相关阅读:
    unexpected inconsistency;run fsck manually esxi断电后虚拟机启动故障
    centos 安装mysql 5.7
    centos 7 卸载mysql
    centos7 在线安装mysql5.6,客户端远程连接mysql
    ubuntu 14.04配置ip和dns
    centos7 上搭建mqtt服务
    windows eclipse IDE打开当前类所在文件路径
    git 在非空文件夹clone新项目
    eclipse中java build path下 allow output folders for source folders 无法勾选,该如何解决 eclipse中java build path下 allow output folders for source folders 无法勾选,
    Eclipse Kepler中配置JadClipse
  • 原文地址:https://www.cnblogs.com/weswes/p/9864182.html
Copyright © 2011-2022 走看看