zoukankan      html  css  js  c++  java
  • Kafka学习(一) Kafka入门和安装

    1、了解 Apache Kafka

    1.1、简介

    file

    官网:http://kafka.apache.org/

    • Apache Kafka 是一个开源消息系统,由Scala 写成。是由Apache 软件基金会开发的一个开源消息系统项目。
    • Kafka 最初是由LinkedIn 开发,并于2011 年初开源。2012 年10 月从Apache Incubator 毕业。该项目的目标是为处理实时数据提供一个统一、高通量、低等待(低延时)的平台。
    • Kafka 是一个分布式消息系统:具有生产者、消费者的功能。它提供了类似于JMS 的特性,但是在设计实现上完全不同,此外它并不是JMS 规范的实现。【重点】

    1.2、kafka的基本结构

    file

    • Producer:消息的发送者

    • Consumer:消息的接收者

    • kafka cluster:kafka的集群。

    • Topic:就是消息类别名,一个topic中通常放置一类消息。每个topic都有一个或者多个订阅者(消费者)。

    消息的生产者将消息推送到kafka集群,消息的消费者从kafka集群中拉取消息。

    1.3、kafka的完整架构

    file

    说明:

    • broker:集群中的每一个kafka实例,称之为broker;
    • ZooKeeper:Kafka 利用ZooKeeper 保存相应元数据信息, Kafka 元数据信息包括如代理节点信息、Kafka集群信息、旧版消费者信息及其消费偏移量信息、主题信息、分区状态信息、分区副本分配方案信息、动态配置信息等。
    • ConsumerGroup:在Kafka 中每一个消费者都属于一个特定消费组( ConsumerGroup ),我们可以为每个消费者指定一个消费组,以groupld 代表消费组名称,通过group.id 配置设置。如果不指定消费组,则该消费者属于默认消费组test-consumer-group 。

    1.4、kafka的特性

    • 消息持久化
      • Kafka 基于文件系统来存储和缓存消息。
    • 高吞吐量
      • Kafka 将数据写到磁盘,充分利用磁盘的顺序读写。同时, Kafka 在数据写入及数据同步采用了零拷贝( zero-copy )技术,采用sendFile()函数调用,sendFile()函数是在两个文件描述符之间直接传递数据,完全在内核中操作,从而避免了内核缓冲区与用户缓冲区之间数据的拷贝,操作效率极高。
      • Kafka 还支持数据压缩及批量发送,同时Kafka 将每个主题划分为多个分区,这一系列的优化及实现方法使得Kafka 具有很高的吞吐量。经大多数公司对Kafka 应用的验证, Kafka 支持每秒数百万级别的消息
    • 高扩展性
      • Kafka 依赖ZooKeeper来对集群进行协调管理,这样使得Kafka 更加容易进行水平扩展,生产者、消费者和代理都为分布式,可配置多个。
      • 同时在机器扩展时无需将整个集群停机,集群能够自动感知,重新进行负责均衡及数据复制。
    • 多客户端支持
      • Kafka 核心模块用Scala 语言开发,Kafka 提供了多种开发语言的接入,如Java 、Scala、C 、C++、Python 、Go 、Erlang 、Ruby 、Node. 等。
    • 安全机制
      • Kafka 支持以下几种安全措施:
        • 通过SSL 和SASL(Kerberos), SASL/PLA时验证机制支持生产者、消费者与broker连接时的身份认证;
        • 支持代理与ZooKeeper 连接身份验证;
        • 通信时数据加密;
        • 客户端读、写权限认证;
        • Kafka 支持与外部其他认证授权服务的集成;
    • 数据备份
      • Kafka 可以为每个topic指定副本数,对数据进行持久化备份,这可以一定程度上防止数据丢失,提高可用性。
    • 轻量级
      • Kafka 的实例是无状态的,即broker不记录消息是否被消费,消费偏移量的管理交由消费者自己或组协调器来维护。
      • 同时集群本身几乎不需要生产者和消费者的状态信息,这就使得Kafka非常轻量级,同时生产者和消费者客户端实现也非常轻量级。
    • 消息压缩
      • Kafka 支持Gzip, Snappy 、LZ4 这3 种压缩方式,通常把多条消息放在一起组成MessageSet,然后再把Message Set 放到一条消息里面去,从而提高压缩比率进而提高吞吐量。

    1.5、kafka的应用场景

    • 消息系统。
      • Kafka 作为一款优秀的消息系统,具有高吞吐量、内置的分区、备份冗余分布式等特点,为大规模消息处理提供了一种很好的解决方案。
    • 应用监控。
      • 利用Kafka 采集应用程序和服务器健康相关的指标,如CPU 占用率、IO 、内存、连接数、TPS 、QPS 等,然后将指标信息进行处理,从而构建一个具有监控仪表盘、曲线图等可视化监控系统。例如,很多公司采用Kafka 与ELK (Elastic Search 、Logstash 和Kibana)整合构建应用服务监控系统。
    • 网站用户行为追踪。
      • 为了更好地了解用户行为、操作习惯,改善用户体验,进而对产品升级改进,将用户操作轨迹、内容等信息发送到Kafka 集群上,通过Hadoop 、Spark 或Strom等进行数据分析处理,生成相应的统计报告,为推荐系统推荐对象建模提供数据源,进而为每个用户进行个性化推荐。
    • 流处理。
      • 需要将己收集的流数据提供给其他流式计算框架进行处理,用Kafka 收集流数据是一个不错的选择。
    • 持久性日志。
      • Kafka 可以为外部系统提供一种持久性日志的分布式系统。日志可以在多个节点间进行备份, Kafka 为故障节点数据恢复提供了一种重新同步的机制。同时, Kafka很方便与HDFS 和Flume 进行整合,这样就方便将Kafka 采集的数据持久化到其他外部系统。

    2、Kafka的安装与配置

    准备三台虚拟机,分别是node01,node02,node03,并且修改hosts文件如下:

    vim /etc/hosts
    #注意: 前面的ip地址改成自己的ip地址
    
    192.168.40.133 node01
    192.168.40.134 node02
    192.168.40.135 node03
    
    #3台服务器的时间要一致
    #时间更新:
    yum install -y rdate
    rdate -s  time-b.nist.gov
    

      

    2.1、基础环境配置

    2.1.1、JDK环境

    由于Kafka 是用Scala 语言开发的,运行在JVM上,因此在安装Kafka 之前需要先安装JDK 。

    安装过程略过,我这里使用的是jdk1.8。

    file

    2.1.2、ZooKeeper环境

    2.1.2.1、安装ZooKeeper

    Kafka 依赖ZooKeeper ,通过ZooKeeper 来对服务节点、消费者上下线管理、集群、分区元数据管理等,因此ZooKeeper 也是Kafka 得以运行的基础环境之一。

    #上传zookeeper-3.4.9.tar.gz到/export/software
    cd /export/software
    mkdir -p /export/servers/
    tar -xvf zookeeper-3.4.9.tar.gz -C /export/servers/
    #创建ZooKeeper的data目录
    mkdir /export/data/zookeeper -p
    cd /export/servers/zookeeper-3.4.9/conf/
    #修改配置文件
    mv zoo_sample.cfg zoo.cfg
    vim zoo.cfg
    #设置data目录
    dataDir=/export/data/zookeeper
    #启动ZooKeeper
    ./zkServer.sh start
    #检查是否启动成功
    jps
    

      

    2.1.2.3、搭建ZooKeeper集群
    #在/export/data/zookeeper目录中创建myid文件
    vim /export/data/zookeeper/myid
    #写入对应的节点的id,如:1,2等,保存退出
    
    #在conf下,修改zoo.cfg文件
    vim zoo.cfg
    #添加如下内容
    server.1=node01:2888:3888
    server.2=node02:2888:3888
    server.3=node03:2888:3888
    2.1.2.3、配置环境变量
    vim /etc/profile
    export ZK_HOME=/export/servers/zookeeper-3.4.9
    export PATH=${ZK_HOME}/bin:$PATH
    
    #立即生效
    source /etc/profile
    2.1.2.4、分发到其它机器
    scp /etc/profile node02:/etc/
    scp /etc/profile node03:/etc/
    
    cd /export/servers
    scp -r zookeeper-3.4.9 node02:/export/servers/
    scp -r zookeeper-3.4.9 node03:/export/servers/
    2.1.2.5、一键启动、停止脚本
    mkdir -p /export/servers/onekey/zk
    vim slave
    #输入如下内容
    node01
    node02
    node03
    #保存退出
    
    vim startzk.sh
    #输入如下内容
    cat /export/servers/onekey/zk/slave | while read line
    do
    {
     echo "开始启动 --> "$line
     ssh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh start >/dev/null 2>&1 &"
    }&
    wait
    done
    echo "★★★启动完成★★★"
    #保存退出
    
    vim stopzk.sh
    #输入如下内容
    cat /export/servers/onekey/zk/slave | while read line
    do
    {
     echo "开始停止 --> "$line
     ssh $line "source /etc/profile;nohup sh ${ZK_HOME}/bin/zkServer.sh stop >/dev/null 2>&1 &"
    }&
    wait
    done
    echo "★★★停止完成★★★"
    #保存退出
    
    #设置可执行权限
    chmod +x startzk.sh stopzk.sh
    
    #添加到环境变量中
    export ZK_ONEKEY=/export/servers/onekey
    export PATH=${ZK_ONEKEY}/zk:$PATH
    

      

    2.1.2.6、检查启动是否成功

    file

    发现三台机器都有“QuorumPeerMain”进程,说明机器已经启动成功了。

    检查集群是否正常:

    zkServer.sh status
    

      

    file

    file

    file

    发现,集群运行一切正常。

    2.2、安装Kafka

    2.2.1、单机版Kafka安装

    第一步:上传Kafka安装包并且解压
    
    rz 上传kafka_2.11-1.1.0.tgz到 /export/software/
    cd /export/software/
    tar -xvf kafka_2.11-1.1.0.tgz -C /export/servers/
    cd /export/servers
    mv kafka_2.11-1.1.0/ kafka
    第二步:配置环境变量
    
    vim /etc/profile
    
    #输入如下内容
    export KAFKA_HOME=/export/servers/kafka
    export PATH=${KAFKA_HOME}/bin:$PATH
    
    #保存退出
    source /etc/profile
    第三步:修改配置文件
    
    cd /export/servers/kafka
    cd config
    vim server.properties
    
    # The id of the broker. This must be set to a unique integer for each broker.
    # 必须要只要一个brokerid,并且它必须是唯一的。
    broker.id=0
    
    # A comma separated list of directories under which to store log files
    # 日志数据文件存储的路径 (如不存在,需要手动创建该目录, mkdir -p /export/data/kafka/)
    log.dirs=/export/data/kafka
    
    # ZooKeeper的配置,本地模式下指向到本地的ZooKeeper服务即可
    zookeeper.connect=node01:2181
    
    # 保存退出
    第四步:启动kafka服务
    
    # 以守护进程的方式启动kafka
    kafka-server-start.sh -daemon /export/servers/kafka/config/server.properties
    第五步:检测kafka是否启动
    

      

    file

    如果进程中有名为kafka的进程,就说明kafka已经启动了。

    2.2.2、验证kafka是否安装成功

    由于kafka是将元数据保存在ZooKeeper中的,所以,可以通过查看ZooKeeper中的信息进行验证kafka是否安装成功。

    file

    file

    file

    2.2.3、部署kafka-manager

    Kafka Manager 由 yahoo 公司开发,该工具可以方便查看集群 主题分布情况,同时支持对 多个集群的管理、分区平衡以及创建主题等操作。

    源码托管于github:https://github.com/yahoo/kafka-manager

    第一步:上传Kafka-manager安装包并且解压
    
    rz上传kafka-manager-1.3.3.17.tar.gz到 /export/software/
    cd /export/software
    tar -xvf kafka-manager-1.3.3.17.tar.gz -C /export/servers/
    cd /export/servers/kafka-manager-1.3.3.17/conf
    第二步:修改配置文件
    
    #修改配置文件
    vim application.conf
    #新增项,http访问服务的端口
    http.port=19000
    #修改成自己的zk机器地址和端口
    kafka-manager.zkhosts="node01:2181"
    #保存退出
    第三步:启动服务
    
    cd /export/servers/kafka-manager-1.3.3.17/bin
    #启动服务
    ./kafka-manager -Dconfig.file=../conf/application.conf
    
    #制作启动脚本
    vim /etc/profile
    export KAFKA_MANAGE_HOME=/export/servers/kafka-manager-1.3.3.17
    export PATH=${KAFKA_MANAGE_HOME}/bin:$PATH
    
    source /etc/profile
    
    cd /export/servers/onekey/
    mkdir kafka-manager
    cd kafka-manager
    vim start-kafka-manager.sh
    nohup kafka-manager -Dconfig.file=${KAFKA_MANAGE_HOME}/conf/application.conf >/dev/null 2>&1 &
    chmod +x start-kafka-manager.sh
    vim /etc/profile
    export PATH=${ZK_ONEKEY}/kafka-manager:$PATH
    source /etc/profile
    

      

    第四步:检查是否启动成功

    打开浏览器,输入地址:http://node01:19000/,即可看到kafka-manage管理界面。

    file

    2.2.4、kafka-manager的使用

    进入管理界面,是没有显示Cluster信息的,需要添加后才能操作。

    • 添加 Cluster:

    file

    输入Cluster Name、ZooKeeper信息、以及Kafka的版本信息(这里最高只能选择1.0.0)。

    file

    点击Save按钮保存。

    file

    添加成功。

    • 查看kafka的信息
      file
    • 查看Broker信息
      file
    • 查看Topic列表
      file
    • 查看单个topic信息以及操作
      file
    • 优化副本选举
      file
    • 查看消费者信息
      file

    2.2.5、搭建kafka集群

    kafka集群的搭建是非常简单的,只需要将上面的单机版的kafka分发的其他机器,并且将ZooKeeper信息修改成集群的配置以及设置不同的broker值即可。

    第一步:将kafka分发到node02、node03
    
    cd /export/servers/
    scp -r kafka node02:/export/servers/
    scp -r kafka node03:/export/servers/
    scp /etc/profile node02:/etc/
    scp /etc/profile node03:/etc/
    # 分别到node02、node03机器上执行
    source /etc/profile
    第二步:修改node01、node02、node03上的kafka配置文件
    
    node01:
    
    cd /export/servers/kafka/config
    vim server.properties
    zookeeper.connect=node01:2181,node02:2181,node03:2181
    node02:
    
    cd /export/servers/kafka/config
    vim server.properties
    broker.id=1
    zookeeper.connect=node01:2181,node02:2181,node03:2181
    node03:
    
    cd /export/servers/kafka/config
    vim server.properties
    broker.id=2
    zookeeper.connect=node01:2181,node02:2181,node03:2181
    第三步:编写一键启动、停止脚本。注意:该脚本依赖于环境变量中的KAFKA_HOME。
    
    mkdir -p /export/servers/onekey/kafka
    vim slave
    #输入如下内容
    node01
    node02
    node03
    #保存退出
    
    vim start-kafka.sh
    #输入如下内容
    cat /export/servers/onekey/kafka/slave | while read line
    do
    {
     echo "开始启动 --> "$line
     ssh $line "source /etc/profile;nohup sh ${KAFKA_HOME}/bin/kafka-server-start.sh -daemon ${KAFKA_HOME}/config/server.properties >/dev/null 2>&1 &"
    }&
    wait
    done
    echo "★★★启动完成★★★"
    #保存退出
    chmod +x start-kafka.sh
    
    vim stop-kafka.sh
    #输入如下内容
    cat /export/servers/onekey/kafka/slave | while read line
    do
    {
     echo "开始停止 --> "$line
     ssh $line "source /etc/profile;nohup sh ${KAFKA_HOME}/bin/kafka-server-stop.sh >/dev/null 2>&1 &"
    }&
    wait
    done
    echo "★★★停止完成★★★"
    #保存退出
    chmod +x stop-kafka.sh
    
    #加入到环境变量中
    export PATH=${ZK_ONEKEY}/kafka:$PATH
    source /etc/profile
    

      

    第四步:通过kafka-manager管理工具查看集群信息。
    file

    由此可见,kafka集群已经启动完成。

    3、Kafka快速入门

    对kafka的操作有2种方式,一种是通过命令行方式,一种是通过API方式。

    3.1、通过命令行Kafka

    Kafka在bin目录下提供了shell脚本文件,可以对Kafka进行操作,分别是:
    file
    通过命令行的方式,我们将体验下kafka,以便我们对kafka有进一步的认知。

    3.1.1、topic的操作
    3.1.1.1、创建topic
    kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 1 --partitions 1 --topic my-kafka-topic
    
    #执行结果:
    Created topic "my-kafka-topic".
    参数说明:
    
    zookeeper:参数是必传参数,用于配置 Kafka 集群与 ZooKeeper 连接地址。至少写一个。
    partitions:参数用于设置主题分区数,该配置为必传参数。
    replication-factor:参数用来设置主题副本数 ,该配置也是必传参数。
    topic:指定topic的名称。
    3.1.1.2、查看topic列表
    kafka-topics.sh --list --zookeeper node01:2181
    
    __consumer_offsets
    my-kafka-topic
    可以查看列表。
    
    如果需要查看topic的详细信息,需要使用describe命令。
    
    kafka-topics.sh --describe --zookeeper node01:2181 --topic test-topic
    #若不指定topic,则查看所有topic的信息
    kafka-topics.sh --describe --zookeeper node01:2181
    3.1.1.3、删除topic
    通过kafka-topics.sh执行删除动作,需要在server.properties文件中配置 delete.topic.enable=true,该配置默认为 false。
    
    否则执行该脚本并未真正删除主题 ,将该topic标记为删除状态 。
    
    kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic
    
    # 执行如下
    [root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic
    Topic my-kafka-topic is marked for deletion.
    Note: This will have no impact if delete.topic.enable is not set to true.
    
    # 如果将delete.topic.enable=true
    [root@node01 config]# kafka-topics.sh --delete --zookeeper node01:2181 --topic my-kafka-topic2
    Topic my-kafka-topic2 is marked for deletion.
    Note: This will have no impact if delete.topic.enable is not set to true.
    
    # 说明:虽然设置后,删除时依然提示没有设置为true,实际上已经删除了。
    3.1.2、生产者的操作
    kafka-console-producer.sh --broker-list node01:9092 --topic my-kafka-topic
    可以看到,已经向topic发送了消息。
    
    3.1.3、消费者的操作
    kafka-console-consumer.sh --bootstrap-server node01:9092 --topic my-kafka-topic
    # 通过以上命令,可以看到消费者可以接收生产者发送的消息
    
    # 如果需要从头开始接收数据,需要添加--from-beginning参数
    kafka-console-consumer.sh --bootstrap-server node01:9092 --from-beginning --topic my-kafka-topic
    

      

    file

    3.2、通过Java Api操作Kafka

    除了通过命令行的方式操作kafka外,还可以通过Java api的方式操作,这种方式将更加的常用。

    3.2.1、创建工程

    file

    导入依赖:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>itcast-bigdata</artifactId>
            <groupId>cn.itcast.bigdata</groupId>
            <version>1.0.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>itcast-bigdata-kafka</artifactId>
    
        <dependencies>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>1.1.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>1.1.0</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
    
        </dependencies>
        
        <build>
            <plugins>
                <!-- java编译插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.2</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    
    </project>
    3.2.2、topic的操作
    由于主题的元数据信息是注册在 ZooKeeper 相 应节点之中,所以对主题的操作实质是对 ZooKeeper 中记录主题元数据信息相关路径的操作。 Kafka将对 ZooKeeper 的相关操作封装成一 个 ZkUtils 类 , 井封装了一个AdrninUtils 类调用 ZkClient 类的相关方法以实现对 Kafka 元数据 的操作,包括对主题、代理、消费者等相关元数据的操作。对主题操作的相关 API调用较简单, 相应操作都是通过调用 AdminUtils类的相应方法来完成的。
    
    package cn.itcast.kafka;
    
    import kafka.admin.AdminUtils;
    import kafka.utils.ZkUtils;
    import org.apache.kafka.common.security.JaasUtils;
    import org.junit.Test;
    
    import java.util.Properties;
    
    public class TestKafkaTopic {
    
        @Test
        public void testCreateTopic() {
            ZkUtils zkUtils = null;
            try {
                //参数:zookeeper的地址,session超时时间,连接超时时间,是否启用zookeeper安全机制
                zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());
    
                String topicName = "my-kafka-topic-test1";
                if (!AdminUtils.topicExists(zkUtils, topicName)) {
                    //参数:zkUtils,topic名称,partition数量,副本数量,参数,机架感知模式
                    AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), AdminUtils.createTopic$default$6());
                    System.out.println(topicName + " 创建成功!");
                } else {
                    System.out.println(topicName + " 已存在!");
                }
            } finally {
                if (null != zkUtils) {
                    zkUtils.close();
                }
            }
    
        }
    }
    

      

    测试结果:

    file

    3.2.2.1、删除topic
        @Test
        public void testDeleteTopic() {
            ZkUtils zkUtils = null;
            try {
                //参数:zookeeper的地址,session超时时间,连接超时时间,是否启用zookeeper安全机制
                zkUtils = ZkUtils.apply("node01:2181", 30000, 3000, JaasUtils.isZkSecurityEnabled());
                String topicName = "my-kafka-topic-test1";
                if (AdminUtils.topicExists(zkUtils, topicName)) {
                    //参数:zkUtils,topic名称
                    AdminUtils.deleteTopic(zkUtils, topicName);
                    System.out.println(topicName + " 删除成功!");
                } else {
                    System.out.println(topicName + " 不已存在!");
                }
            } finally {
                if (null != zkUtils) {
                    zkUtils.close();
                }
            }
    
        }
    

      

    测试结果:

    file

    3.2.3、生产者的操作

    package cn.itcast.kafka;
    
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.junit.Test;
    
    import java.util.Properties;
    
    public class TestProducer {
    
        @Test
        public void testProducer() throws InterruptedException {
            Properties config = new Properties();
    
            // 设置kafka服务列表,多个用逗号分隔
            config.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
            // 设置序列化消息 Key 的类
            config.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            // 设置序列化消息 value 的类
            config.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    
            // 初始化
            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(config);
            for (int i = 0; i < 100 ; i++) {
                ProducerRecord record = new ProducerRecord("my-kafka-topic","data-" + i);
                // 发送消息
                kafkaProducer.send(record);
                System.out.println("发送消息 --> " + i);
    
                Thread.sleep(100);
            }
    
            kafkaProducer.close();
    
        }
    
    }
    3.2.4、消费者的操作
    package cn.itcast.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.junit.Test;
    
    import javax.sound.midi.Soundbank;
    import java.util.Arrays;
    import java.util.Properties;
    
    public class TestConsumer {
    
        @Test
        public void testConsumer() {
            Properties config = new Properties();
            // 设置kafka服务列表,多个用逗号分隔
            config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092");
            // 设置消费者分组id
            config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
            // 设置序反列化消息 Key 的类
            config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
            // 设置序反列化消息 value 的类
            config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    
    
            KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(config);
            // 订阅topic
            kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic"));
    
            while (true) { // 使用死循环不断的拉取数据
                ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
                for (ConsumerRecord<String, String> record : records) {
                    String value = record.value();
                    long offset = record.offset();
                    System.out.println("value = " + value + ", offset = " + offset);
                }
            }
    
        }
    }
    

      



    原文:https://www.cnblogs.com/tree1123/p/11490122.html

  • 相关阅读:
    链表--判断一个链表是否为回文结构
    矩阵--“之”字形打印矩阵
    二叉树——平衡二叉树,二叉搜索树,完全二叉树
    链表--反转单向和双向链表
    codeforces 490C. Hacking Cypher 解题报告
    codeforces 490B.Queue 解题报告
    BestCoder19 1001.Alexandra and Prime Numbers(hdu 5108) 解题报告
    codeforces 488A. Giga Tower 解题报告
    codeforces 489C.Given Length and Sum of Digits... 解题报告
    codeforces 489B. BerSU Ball 解题报告
  • 原文地址:https://www.cnblogs.com/cdchencw/p/12404676.html
Copyright © 2011-2022 走看看