zoukankan      html  css  js  c++  java
  • Kafka介绍及集群搭建

    简介

      Kafka是一个开源的,分布式的,高吞吐量的消息系统。随着Kafka的版本迭代,日趋成熟。大家对它的使用也逐步从日志系统衍生到其他关键业务领域。特别是其超高吞吐量的特性,在互联网领域,使用越来越广泛,生态系统也越来的完善。同时,其设计思路也是其他消息中间件重要的设计参考。

      Kafka原先的开发初衷是构建一个处理海量日志的框架,基于高吞吐量为第一原则,所以它对消息的可靠性以及消息的持久化机制考虑的并不是特别的完善。0.8版本后,陆续加入了一些复制、应答和故障转移等相关机制以后,才可以让我们在其他关键性业务中使用。

    Kafka的运行架构如下图,各组件之间通过TCP协议通信:

    Topic:

    主题,或者说是一类消息。类似于RabbitMQ中的queue。可以理解为一个队列。

    Broker:

    一个Kafka服务称之为Broker。Kafka可以集群部署,每一个Kafka部署就是一个Broker。

    Producer & Consumer:

    生产者和消费者。一般消息系统都有生产者和消费者的概念。生产者产生消息,即把消息放入Topic中,而消费者则从Topic中获取消息处理。一个生产者可以向多个Topic发送消息,一个消费者也可以同时从几个Topic接收消息。同样的,一个Topic也可以被多个消费者来接收消息。

    Partition:

    分区,或者说分组。分组是Kafka提升吞吐量的一个关键设计。这样可以让消费者多线程并行接收消息。创建Topic时可指定Parition数量。一个Topic可以分为多个Partition,也可以只有一个Partition。每一个Partition是一个有序的,不可变的消息序列。每一个消息在各自的Partition中有唯一的ID。这些ID是有序的。称之为offset,offset在不同的Partition中是可以重复的,但是在一个Partition中是不可能重复的。越大的offset的消息是最新的。Kafka只保证在每个Partition中的消息是有序的,就会带来一个问题,即如果一个Consumer在不同的Partition中获取消息,那么消息的顺序也许是和Producer发送到Kafka中的消息的顺序是不一致的。这个在后续会讨论。

    如果是多Partition,生产者在把消息放到Topic中时,可以决定放到哪一个Patition。这个可以使用简单的轮训方法,也可以使用一些Hash算法。

    一个Topic的多个Partition可以分布式部署在不同的Server上,一个Kafka集群。配置项为:num.partitions,默认是1。每一个Partition也可以在Broker上复制多分,用来做容错。详细信息见下面创建Topic一节。

    Consumer Group:

    顾名思义,定义了一组消费者。一般来说消息中间件都有两种模式:队列模式和发布订阅模式。队列模式及每一个消息都会给其中一个消费者,而发布订阅模式则是每个消息都广播给所有的消费者。Kafka就是使用了Consumer Group来实现了这两种模式。

    如果所有的消费者都是同一个Consumer Group的话,就是队列模式,每个消息都会负载均衡的分配到所有的消费者。

    如果所有的消息者都在不同的Consumer Group的话,就是发布订阅模式,每个消费者都会得到这个消息。

    下图是一个Topic,配置了4个Patition,分布在2个Broker上。由于有2个Consumer Group,Group A和Group B都可以得到P0-P3的所有消息,是一个订阅发布模式。两个Group中的Consumer则负载均衡的接收了这个Topic的消息。如果Group中的Consumer的总线程数量超过了Partition的数量,则会出现空闲状态。

    Zookeeper:

    Kafka的运行依赖于Zookeeper。Topic、Consumer、Patition、Broker等注册信息都存储在ZooKeeper中。

    消息的持久化:

    Kafka可以通过配置时间和大小来持久化所有的消息,不管是否被消费(消费者收掉)。举例来说,如果消息保留被配置为1天,那么,消息就会在磁盘保留一天的时间,也就是说,一天以内,任意消费这个消息。一天以后,这个消息就会被删除。保留多少时间就取决于业务和磁盘的大小。

    Kafka主要有两种方式:时间和大小。在Broker中的配置参数为:

    log.retention.bytes:最多保留的文件字节大小。默认-1。

    log.retention.hours:最多保留的时间,小时。优先级最低。默认168。

    log.retention.minutes:最多保留的时间,分钟。如果为空,则看log.retention.hours。默认null。

    log.retention.ms:最多保留的时间,毫秒。如果为空,则看log.retention.minutes。默认null。

    集群的搭建

    # 解压配置文件
    tar -zxvf kafka_2.11-1.0.0.tgz -C /export/servers/
    # 文件夹重命名
    cd /export/servers/
    mv kafka_2.11-1.0.0 kafka
    # 修改配置文件
    cd /export/servers/kafka/config/
    cat server.properties |grep -v "#"
    # 得到以下内容,并修改
    vi /export/servers/kafka/config/server.properties --- # 1 需要修改 每个机器上的kafka实例的编号,应该不一样 broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 # 2 需要修改 存储路径需要指定 log.dirs=/export/logs/kafka num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 transaction.state.log.replication.factor=1 transaction.state.log.min.isr=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 # 3 zookeeper地址需要修改 zookeeper.connect=bigdata-01:2181,bigdata-02:2181,bigdata-03:2181 zookeeper.connection.timeout.ms=6000 group.initial.rebalance.delay.ms=0 # 4 需要修改 hostname 每个机器上不一样 delete.topic.enable=true host.name=bigata-01 --- # 分发到其他节点上 scp -r /export/servers/kafka node02:/export/servers/ scp -r /export/servers/kafka node03:/export/servers/ # 启动服务------前台启动,当控制台输入ctrl+c 和控制台失效,连接变灰,kafka进程就会死掉。 cd /export/servers/kafka/bin ./kafka-server-start.sh /export/servers/kafka/config/server.properties # 启动服务 ------后台启动 nohup /export/servers/kafka/bin/kafka-server-start.sh /export/servers/kafka/config/server.properties >/dev/null 2>
    &1 &

    创建topic

    /export/servers/kafka/bin/kafka-topics.sh --create --topic zytest --zookeeper bigdata-01:2181,bigdata-02:2181,bigdata-03:2181 --partitions 3 --replication-factor 2
    - --topic 主题的名字 
    - --zookeeper zookeeper地址
    - --partitions  分片,分片的作用是分治。就是讲一个topic的数据分成多份,存放在不同的broker上。
    - --replication-factor  副本,副本的作用是备份

    发送数据

    ./kafka-console-producer.sh --topic zytest  --broker-list bigdata-01:9092,bigdata-02:9092,bigdata-03:9092 
    --topic 主题的名字
    --broker-list kafka节点的地址

    消费数据

    从最新的位置消费

    ./kafka-console-consumer.sh --topic zytest --bootstrap-server bigdata-01:9092
    --topic 主题的名字
    --bootstrap-server kafka节点的地址

    从最开始的位置消费

    ./kafka-console-consumer.sh --topic zytest --bootstrap-server bigdata-01:9092 --from-beginning
    --topic 主题的名字
    --bootstrap-server kafka节点的地址
    --from-beginning 从最开始的位置消费

    显示所有topic

     ./kafka-topics.sh --list --zookeeper bigdata-01:2181

    删除topic

    ./kafka-topics.sh --delete --topic zytest --zookeeper bigdata-01:2181,bigdata-02:2181,bigdata-03:2181 

    查看topic

    ./kafka-topics.sh --describe --topic zytest  --zookeeper bigdata-01:2181
  • 相关阅读:
    Realtime crowdsourcing
    maven 常用插件汇总
    fctix
    sencha extjs4 command tools sdk
    首次吃了一颗带奶糖味的消炎药,不知道管用不
    spring mvc3 example
    ubuntu ati driver DO NOT INSTALL recommand driver
    yet another js editor on windows support extjs
    how to use springsource tools suite maven3 on command
    ocr service
  • 原文地址:https://www.cnblogs.com/blazeZzz/p/9767489.html
Copyright © 2011-2022 走看看