zoukankan      html  css  js  c++  java
  • Kafka 安装及入门

    什么是Kafka?

    Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 

    Kafka基本概念

    • Broker:物理概念,Kafka集群中的每个Kafka节点;

    • Topic:逻辑概念,Kafka消息的类别,对数据进行区分、隔离;

    • Partition:物理概念,Kafka下数据存储的基本单元。一个Topic数据,会被分散存储到多个Partition,每一个Partition是有序的;

    • Replication(副本、备份):同一个Partition可能会有多个Replica,多个Replica之间数据是一样的;

    • Replication Leader:一个Partitionn的多个Replica上,需要一个Leader负责该Partition上与Producer和Consumer交互;

    • ReplicaManager:负责管理当前broker所有分区和副本的信息,处理KafkaController发起的一些请求,副本状态的切换、添加/读取消息、Leader的选举等。

     

    Kafka概念延伸

    Partition(最小存储单元)

    • 每一个Topic被切分为多个Partitions(Partition属于消费者存储的基本单位);

    • 消费者数目小于或等于Partition的数目(多个消费者若消费同个Partition会出现数据错误,所有Kafka如此设计);

    • Broker Group中的每一个Broker保存Topic的一个或多个Partitions(一个Broker只会保存一个Partition,若Partition太大则多个Broker保存同个Partition);

    • Consumer Group中的仅有一个Consumer读取Topic的一个或多个Partitions,并且是唯一的Consumer(避免同一个Partition被多个Consumer消费)。

    Replication

    • 当集群中有Broker挂掉的情况,系统可以主动地使Replicas提供服务;

    • 系统默认设置每一个Topic的replication系数为1(即默认没有副本,节省资源),可以在创建Topic时单独设置。

    特点:

    1. Replication的基本单位是Topic的Partition;

    2. 所有的读和写都从Leader进,Followers只是做为备份(只有Leader管理读写,其他的Replication只做备份);

    3. Follower必须能够及时复制Leader的数据;

    4. 增加容错性与可拓展性。

    Kafka基本结构

    Kafka消息结构

    Kafka特点

    • 分布式(多分区,多副本,多消费者,基于ZooKeeper调度);
    • 高性能(高吞吐,低延时,高并发,时间复杂度为O(1));
    • 持久性和扩展性(数据可持久化,容错率,支持在线水平扩展,消息自动平衡)。

    Kafka应用场景

    消息队列,行为跟踪,元信息监控,日志收集,流处理,事件源,持久性日志(commit log)等。

    Kafka安装(Linux下)

      需要安装JDK(步骤略过)与ZooKeeper(高版本Kafka自带ZooKeeper)。

    ZooKeeper安装: 

    1. 下载,解压,配置:

    wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.12/zookeeper-3.4.12.tar.gz
    tar -zxvf zookeeper-3.4.12.tar.gz
    # zookeeper-3.4.12/conf中复制zoo_sample.cfg为zoo.cfg
    cp zoo_sample.cfg zoo.cfg
    # 修改zoo.cfg文件中下面两行(dataDir和dataLogDir后面所指的文件夹必须要存在如果不存在的话,在启动ZooKeeper服务端的时候会报错。这里是单机情况下的配置情况,如果是集群的话,要在clientPort下面添加服务器的ip。如server.1=192.168.180.132:2888:3888
    server.2=192.168.180.133:2888:3888...等等。)
    dataDir=/tmp/zookeeper  
    dataLogDir=/tmp/zookeeper/log 

     2. 配置环境变量(全用户永久更改方式):

    修改/etc/profile文件,尾部添加:

    ZOOKEEPER_INSTALL=/usr/local/zookeeper-3.4.12 
    PATH=$PATH:$ZOOKEEPER_INSTALL/bin  
    
    export ZOOKEEPER_INSTALL
    export PATH

    3. 启动检验:

    # 进入zookeeper的bin目录下,启动
    ./zkServer.sh start  
    # 查看状态
    ./zkServer.sh status
    # 启动zookeeper的客户端(本地不需要-server参数)
    ./zkCli.sh -server 192.168.229.128:2181

    注:如拒绝连接,检查防火墙配置。

    Kafka安装: 

    1. 下载,解压:

    # curl 文件传输工具,-L 支持重定向链接,-O 把输出写到该文件中,保留远程文件的文件名
    curl -L -O http://mirror.bit.edu.cn/apache/kafka/2.0.0/kafka_2.11-2.0.0.tgz
    # -x 解压,-z 调用gzip执行,-v 显示细节,-f 指定文件
    tar -xzvf kafka_2.11-2.0.0.tgz

    2. 配置单节点ZooKeeper(使用自带ZooKeeper时配置):

    cd /usr/local/kafka_2.11-2.0.0  # 进入kafka主目录
    mkdir -p zk/data    # 创建zookeeper数据存放目录
    mkdir -p zk/logs    # 创建zookeeper存放日志目录
    cd config       # 进入配置文件所在目录
    mv zookeeper.properties zookeeper.properties.bak  # 将原配置文件移位.bak备份文件
    cat > zookeeper.properties << EOF
    tickTime
    =2000
    dataDir
    =/usr/local/kafka_2.11-2.0.0/zk/data
    dataLogDir
    =/usr/local/kafka_2.11-2.0.0/zk/logs
    clientPort
    =2181
    EOF

    3. 配置单结点Kafka:

    cd /usr/local/kafka_2.11-2.0.0  # 进入kafka主目录
    mkdir logs      # 创建logs目录用于存放日志
    cd config       # 进入配置文件所在目录
    mv server.properties server.properties.bak  # 将原配置文件移位.bak备份文件
    
    cat > server.properties << EOF
    broker.id=1
    listeners=PLAINTEXT://192.168.229.128:9092
    num.network.threads=3
    num.io.threads=8
    socket.send.buffer.bytes=102400
    socket.receive.buffer.bytes=102400
    socket.request.max.bytes=104857600
    log.dirs=/usr/local/kafka_2.11-2.0.0/logs
    num.partitions=1
    num.recovery.threads.per.data.dir=1
    offsets.topic.replication.factor=1
    transaction.state.log.replication.factor=1
    transaction.state.log.min.isr=1
    log.retention.hours=168
    log.segment.bytes=1073741824
    log.retention.check.interval.ms=300000
    zookeeper.connect=192.168.229.128:2181
    zookeeper.connection.timeout.ms=6000
    group.initial.rebalance.delay.ms=0
    EOF

    server.properties:  基本为原server.properties的默认配置,安装时主要修改;

    broker.id--broker的id:  修改为任意想要的数值(和zookeeper中的id类似);

    listeners--监听址址:  修改为kafka要监听的地址;

    log.dirs--日志文件存放目录:  修改为要存放日志的目录;

    zookeeper.connect--zookeeper监听地址:  修改为zookeeper的监听地址,如果是集群所有地址全写上用逗号(半角)隔开即可。

    4. 启动和停止:

    启动:

    ./zookeeper-server-start.sh -daemon ../config/zookeeper.properties    # 启动zookeeper  (-daemon在后台执行,后接配置文件)
    ./kafka-server-start.sh -daemon ../config/server.properties           # 启动kafka

    停止:

    ./zookeeper-server-stop.sh    # 停止zookeeper
    ./kafka-server-stop.sh        # 停止kafka,centos7上可能关不了用kill -9直接杀掉

    Kafka基本命令

    创建topic:

    ./kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 3 --topic test-kafka-topic

    查看topic:

    ./kafka-topics.sh --list --zookeeper localhost:2181

    生产者命令:

    ./kafka-console-producer.sh --broker-list 192.168.229.128:9092 --topic test-kafka-topic        # 更换成自己的IP

    消费者命令:

    ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-kafka-topic --from-beginning         # 更换成自己的IP,--from-beginning表示重头开始消费

    命令效果:

    生产:

    消费:

  • 相关阅读:
    JavaScript对数组的处理(一)
    PHP数组排序函数array_multisort()函数详解(二)
    PHP数组排序函数array_multisort()函数详解(一)
    jquery html动态添加的元素绑定事件详解
    SpringBoot 通用Error设计
    SpringBoot Lombok
    VMware Big Data Extensions 安装步骤
    SpringBoot 通用Validator
    SpringBoot Mybatis keyProperty和useGeneratedKeys的作用
    Introduction to vSphere Integrated Containers
  • 原文地址:https://www.cnblogs.com/weswes/p/9864182.html
Copyright © 2011-2022 走看看