zoukankan      html  css  js  c++  java
  • 什么是kafka以及如何搭建kafka集群?

    一、Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。

    Kafka场景比喻

    接下来我大概比喻下Kafka的使用场景

    消息中间件:生产者和消费者

    妈妈:生产者
    你:消费者
    馒头:数据流、消息
    正常情况下: 生产一个  消费一个
    其他情况:  
    1)一直生产,你吃到某一个馒头时,你卡主(机器故障), 馒头就丢失了
    2)一直生产,做馒头速度快,你吃来不及,馒头也就丢失了
    为了放着其他生产情况的出现,我们可以拿个碗/篮子,馒头做好以后先放到篮子里,你要吃的时候去篮子里面取出来吃,而这篮子/框就可以为:Kafka。当篮子满了,馒头就装不下了,咋办? 多准备几个篮子 === Kafka的扩容

    二、Kafka的架构和核心概念

    这是张我在Kafka官网上截的图,我大概可以把Kafka的主要结构分为以下几点:

    producer:生产者,就是生产馒头(老妈)
    consumer:消费者,就是吃馒头的(你)
    broker:篮子
    topic:主题,给馒头带一个标签,topica的馒头是给你吃的,topicb的馒头是给你弟弟吃

    1.Kafka可以作为集群运行在一台或者多个服务器上面;

    2.Kafka集群可以分类地存储记录流,以打标签的方式,就是采用topics,每个broker可以打个topic,这样能保证消费者可以根据topic选择性消费;

    3.每个记录由Key、Value、timestamp构成。

    Kafka四个核心的API

    1.ProducerAPI:允许一个应用向一个或多个topic里发布记录流;

    2.ConsumerAPI:允许一个应用订阅一个或多个topics,处理topic里的数据流,就相当于消费;

    3.StreamAPI:允许应用扮演流处理的作用,从一个或多个topic里消费数据流,然后产生输出流数据到其他一个或多个topic里,对输入流数据有效传输到输出口;

    4.ConnectorAPI:允许运行和构建一个可重复利用的生产者和消费者,能将kafka的topic与其他存在的应用和数据库设备相连接,比如链接一个实时数据库,可以捕捉到每张表的变化。

    这四个API,主要应用在IDEA上对应用程序的开发中,通过代码的形式管理Kafka。在第四部分将会对前两个API写个简单DEMO演示。

    三、Kafka的快速使用

    Kafka使用到了zookeeper,所以首先你得安装zookeeper再安装kafka。

    1.单节点的broker部署

    首先我们需要修改$KAFKA_HOME/config/server.properties这个配置文件,主要以下几处需要修改:
    broker.id=0,每个broker的ID需要唯一
    listeners:监听的端口(此处笔者设置的是默认端口9092)
    host.name:当前机器
    log.dirs:存储日志的文件夹

    num.partitions:分区的数量
    zookeeper.connect:zookeeper的地址(默认为localhost:2181)

    这几处根据你自身需要进行配置,然后启动步骤如下:

    1)开启zookeeper,此处需要注意的是zookeeper的conf目录下的zoo.cfg配置文件,主要修改的也是日志存储目录那块。

    2)启动Kafka,命令为:kafka-server-start.sh $KAFKA_HOME/config/server.properties

    3)创建topic,需要指定zookeeper,命令为:kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 1 --partitions 1 --topic hello_topic。 注意指定zookeeper,后面几个属性可以根据你实际情况进行定义。另外查看所有topic的命令为:
    kafka-topics.sh --list --zookeeper hadoop000:2181

    4)发送消息,需要指定broker,命令为:kafka-console-producer.sh --broker-list hadoop000:9092 --topic hello_topic

    5)消费消息,需要指定zookeeper,命令为:kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic hello_topic --from-beginning。意思就是指定zookeeper上的topic进行消费,from-beginning的设置,可以查看之前的消息。

    2.单节点,多broker

    主要是增加多个server.properties文件,一个配置文件就相当于一个broker,我就设置三个broker:

    server-1.properties

    log.dirs=/home/hadoop/app/tmp/kafka-logs-1

    listeners=PLAINTEXT://:9093

    broker.id=1

    server-2.properties

    log.dirs=/home/hadoop/app/tmp/kafka-logs-2

    listeners=PLAINTEXT://:9094

    broker.id=2

    server-3.properties

    log.dirs=/home/hadoop/app/tmp/kafka-logs-3

    listeners=PLAINTEXT://:9095

    broker.id=3

    然后依次开启,命令如下:

     kafka-server-start.sh -daemon $KAFKA_HOME/config/server-1.properties &

    kafka-server-start.sh -daemon $KAFKA_HOME/config/server-2.properties &
    kafka-server-start.sh -daemon $KAFKA_HOME/config/server-3.properties &

     接下来就跟上面的步骤一样:

     kafka-topics.sh --create --zookeeper hadoop000:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic

    kafka-console-producer.sh --broker-list hadoop000:9093,hadoop000:9094,hadoop000:9095 --topic my-replicated-topic

    kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic my-replicated-topic

     查看 topic的详细信息:

     kafka-topics.sh --describe --zookeeper hadoop000:2181 --topic my-replicated-topic

    要注意的是,副本中会有个leader,而多副本也实现了kafka的容错性,挂掉一个副本后,会自动在剩下副本里选出一个leader来同步操作。

    根据上面步骤操作,我们在producer窗口输入,在consumer消费窗口看到相应输出。

    四、Producer和Consumer API的使用

    接下来展示一个简单的Demo,在生产端简单创建个线程进行循环输出,然后用消费者端对输出的内容进行展示,也就是消费。

    配置文件

    /**

    * Kafka常用配置文件
    */

    public class KafkaProperties {

      public static final String ZK = "192.168.199.111:2181";

      public static final String TOPIC = "hello_topic";

      public static final String BROKER_LIST = "192.168.199.111:9092";

      public static final String GROUP_ID = "test_group1";

    }

    Producer API DEMO

     
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;

    import java.util.Properties;

    /**

    * Kafka生产者

    */

    public class KafkaProducer extends Thread{

      private String topic;

      private Producer<Integer, String> producer;

      public KafkaProducer(String topic) {

        this.topic = topic;

        Properties properties = new Properties();

        properties.put("metadata.broker.list",KafkaProperties.BROKER_LIST);

        properties.put("serializer.class","kafka.serializer.StringEncoder");

        properties.put("request.required.acks","1");

        producer = new Producer<Integer, String>(new ProducerConfig(properties));

      }

      @Override

      public void run() {

        int messageNo = 1;

        while(true) {

          String message = "message_" + messageNo;

          producer.send(new KeyedMessage<Integer, String>(topic, message));

          System.out.println("Sent: " + message);

          messageNo ++ ;

          try{

            Thread.sleep(2000);

          } catch (Exception e){

            e.printStackTrace();

          }

        }

      }

    }

    Consumer API DEMO

    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.consumer.ConsumerIterator;

    import kafka.consumer.KafkaStream;

    import kafka.javaapi.consumer.ConsumerConnector;

    import java.util.HashMap;

    import java.util.List;

    import java.util.Map;

    import java.util.Properties;

    /**

    * Kafka消费者

    */

    public class KafkaConsumer extends Thread{

      private String topic;

      public KafkaConsumer(String topic) {

        this.topic = topic;

      }

      private ConsumerConnector createConnector(){

        Properties properties = new Properties();

        properties.put("zookeeper.connect", KafkaProperties.ZK);

        properties.put("group.id",KafkaProperties.GROUP_ID);

        return Consumer.createJavaConsumerConnector(new ConsumerConfig(properties));

      }

      @Override

      public void run() {

        ConsumerConnector consumer = createConnector();

        Map<String, Integer> topicCountMap = new HashMap<String, Integer>();

        topicCountMap.put(topic, 1);

        // topicCountMap.put(topic2, 1);

        // topicCountMap.put(topic3, 1);

        // String: topic

        // List<KafkaStream<byte[], byte[]>> 对应的数据流

        Map<String, List<KafkaStream<byte[], byte[]>>> messageStream = consumer.createMessageStreams(topicCountMap);

        KafkaStream<byte[], byte[]> stream = messageStream.get(topic).get(0); //获取我们每次接收到的暑假

        ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

        while (iterator.hasNext()) {

          String message = new String(iterator.next().message());

          System.out.println("rec: " + message);

        }

      }

    }

    最后在main函数对这两个类调用即可,结果如下:

     

    五、搭建kafka集群

    1)下载kafka0.8(http://kafka.apache.org/downloads.html),保存到服务器/home/wwb目录下kafka-0.8.0-beta1-src.tgz(kafka_2.8.0-0.8.0-beta1.tgz)

        2)解压 tar -zxvf kafka-0.8.0-beta1-src.tgz,产生文件夹kafka-0.8.0-beta1-src更改为kafka01   

    3)配置

        修改kafka01/config/server.properties,其中broker.id,log.dirs,zookeeper.connect必须根据实际情况进行修改,其他项根据需要自行斟酌。大致如下:

         broker.id=1  

         port=9091  

         num.network.threads=2  

         num.io.threads=2  

         socket.send.buffer.bytes=1048576  

        socket.receive.buffer.bytes=1048576  

         socket.request.max.bytes=104857600  

        log.dir=./logs  

        num.partitions=2  

        log.flush.interval.messages=10000  

        log.flush.interval.ms=1000  

        log.retention.hours=168  

        #log.retention.bytes=1073741824  

        log.segment.bytes=536870912  

        num.replica.fetchers=2  

        log.cleanup.interval.mins=10  

        zookeeper.connect=192.168.0.1:2181,192.168.0.2:2182,192.168.0.3:2183  

        zookeeper.connection.timeout.ms=1000000  

        kafka.metrics.polling.interval.secs=5  

        kafka.metrics.reporters=kafka.metrics.KafkaCSVMetricsReporter  

        kafka.csv.metrics.dir=/tmp/kafka_metrics  

        kafka.csv.metrics.reporter.enabled=false

     

    4)初始化因为kafka用scala语言编写,因此运行kafka需要首先准备scala相关环境。

        > cd kafka01  

        > ./sbt update  

        > ./sbt package  

        > ./sbt assembly-package-dependency

    在第二个命令时可能需要一定时间,由于要下载更新一些依赖包。所以请大家 耐心点。

    5) 启动kafka01

        >JMX_PORT=9997 bin/kafka-server-start.sh config/server.properties &  

    a)kafka02操作步骤与kafka01雷同,不同的地方如下

        修改kafka02/config/server.properties

        broker.id=2

        port=9092

        ##其他配置和kafka-0保持一致

        启动kafka02

        JMX_PORT=9998 bin/kafka-server-start.shconfig/server.properties &  

    b)kafka03操作步骤与kafka01雷同,不同的地方如下

        修改kafka03/config/server.properties

        broker.id=3

        port=9093

        ##其他配置和kafka-0保持一致

        启动kafka02

        JMX_PORT=9999 bin/kafka-server-start.shconfig/server.properties &

    6)创建Topic(包含一个分区,三个副本)

        >bin/kafka-create-topic.sh--zookeeper 192.168.0.1:2181 --replica 3 --partition 1 --topicmy-replicated-topic

    7)查看topic情况

        >bin/kafka-list-top.sh --zookeeper 192.168.0.1:2181

        topic: my-replicated-topic  partition: 0 leader: 1  replicas: 1,2,0  isr: 1,2,0

    8)创建发送者

       >bin/kafka-console-producer.sh--broker-list 192.168.0.1:9091 --topic my-replicated-topic

        my test message1

        my test message2

        ^C

    9)创建消费者

        >bin/kafka-console-consumer.sh --zookeeper127.0.0.1:2181 --from-beginning --topic my-replicated-topic

        ...

        my test message1

        my test message2

    ^C

    10)杀掉server1上的broker

      >pkill -9 -f config/server.properties

    11)查看topic

      >bin/kafka-list-top.sh --zookeeper192.168.0.1:2181

      topic: my-replicated-topic  partition: 0 leader: 1  replicas: 1,2,0  isr: 1,2,0

    发现topic还正常的存在

    11)创建消费者,看是否能查询到消息

        >bin/kafka-console-consumer.sh --zookeeper192.168.0.1:2181 --from-beginning --topic my-replicated-topic

        ...

        my test message 1

        my test message 2

        ^C

    说明一切都是正常的。

     

    OK,以上就是对Kafka个人的理解,不对之处请大家及时指出。

     

     

    补充说明:

    1、public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap),其中该方法的参数Map的key为topic名称,value为topic对应的分区数,譬如说如果在kafka中不存在相应的topic时,则会创建一个topic,分区数为value,如果存在的话,该处的value则不起什么作用

    2、关于生产者向指定的分区发送数据,通过设置partitioner.class的属性来指定向那个分区发送数据,如果自己指定必须编写相应的程序,默认是kafka.producer.DefaultPartitioner,分区程序是基于散列的键。

    3、在多个消费者读取同一个topic的数据,为了保证每个消费者读取数据的唯一性,必须将这些消费者group_id定义为同一个值,这样就构建了一个类似队列的数据结构,如果定义不同,则类似一种广播结构的。

    4、在consumerapi中,参数设计到数字部分,类似Map<String,Integer>,

    numStream,指的都是在topic不存在的时,会创建一个topic,并且分区个数为Integer,numStream,注意如果数字大于broker的配置中num.partitions属性,会以num.partitions为依据创建分区个数的。

    5、producerapi,调用send时,如果不存在topic,也会创建topic,在该方法中没有提供分区个数的参数,在这里分区个数是由服务端broker的配置中num.partitions属性决定的

     

    关于kafka说明可以参考:http://kafka.apache.org/documentation.html

  • 相关阅读:
    联想 Vibe Shot(Z90-3) 免recovery 获取ROOT权限 救砖 VIBEUI V3.1_1625
    联想 Z5S(L78071)免解锁BL 免rec 保留数据 ROOT Magisk Xposed 救砖 ZUI 10.5.370
    联想 Z5(L78011) 免解锁BL 免rec 保留数据 ROOT Magisk Xposed 救砖 ZUI 10.5.254
    联想 S5 Pro(L78041)免解锁BL 免rec 保留数据 ROOT Magisk Xposed 救砖 ZUI 5.0.123
    第二阶段 冲刺八
    第二阶段 冲刺七
    第二阶段 冲刺六
    第二阶段 冲刺五
    代码大全阅读笔记03
    学习进度十二
  • 原文地址:https://www.cnblogs.com/liuqing576598117/p/9505961.html
Copyright © 2011-2022 走看看