zoukankan      html  css  js  c++  java
  • Kafka消费者生产者实例

     
     

    为了更为直观展示Kafka的消息生产消费的过程,我会从基于Console和基于Application两个方面介绍使用实例。Kafka是一个分布式流处理平台,具体来说有三层含义:

    1. 它允许发布和订阅记录流,类似于消息队列或企业消息传递系统。
    2. 它可以容错的方式存储记录流。
    3. 它可以处理记录发生时的流。

    由于主要介绍如何使用Kafka快速构建生产者消费者实例,所以不会涉及Kafka内部的原理。一个基于Kafka的生产者消费者过程通常是这样的(来自官网):

    Kafka生产者消费者

    安装Kafka

    官网下载kafka_2.11-0.11.0.0.tgz,解压后安装到指定目录:

    cd kafka_2.11-0.11.0.0
    tar -zxvf kafka_2.11-0.11.0.0.tgz -C pathToInstall
    • 1
    • 2

    启动Kafka:

    bin/kafka-server-start.sh config/server.properties
    • 1

    基于Console

    创建Topic

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    • 1

    Producer发送消息

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    • 1

    在控制台输入要发送的消息:

    This is a message
    This is another message
    • 1
    • 2

    Consumer接收消息

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
    • 1

    输入命令后可以看到控制台输出了刚才的消息:

    This is a message
    This is another message
    • 1
    • 2

    基于Application

    单个consumer

    生产者:

    public class SimpleKafkaProducer {
    
        public static void main(String[] args) {
    
            Properties props = new Properties();
    
            //broker地址
            props.put("bootstrap.servers", "localhost:9092");
    
            //请求时候需要验证
            props.put("acks", "all");
    
            //请求失败时候需要重试
            props.put("retries", 0);
    
            //内存缓存区大小
            props.put("buffer.memory", 33554432);
    
            //指定消息key序列化方式
            props.put("key.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
    
            //指定消息本身的序列化方式
            props.put("value.serializer",
                    "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
    
            for (int i = 0; i < 10; i++)
                producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
            System.out.println("Message sent successfully");
            producer.close();
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34

    消费者:

    
    public class SimpleKafkaConsumer {
    
        public static void main(String[] args) {
    
            Properties props = new Properties();
    
            props.put("bootstrap.servers", "localhost:9092");
            //每个消费者分配独立的组号
            props.put("group.id", "test");
    
            //如果value合法,则自动提交偏移量
            props.put("enable.auto.commit", "true");
    
            //设置多久一次更新被消费消息的偏移量
            props.put("auto.commit.interval.ms", "1000");
    
            //设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
            props.put("session.timeout.ms", "30000");
    
            props.put("key.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer",
                    "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            consumer.subscribe(Collections.singletonList("test"));
    
            System.out.println("Subscribed to topic " + "test");
            int i = 0;
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records)
    
                    // print the offset,key and value for the consumer records.
                    System.out.printf("offset = %d, key = %s, value = %s
    ",
                            record.offset(), record.key(), record.value());
            }
        }
    }
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 13
    • 14
    • 15
    • 16
    • 17
    • 18
    • 19
    • 20
    • 21
    • 22
    • 23
    • 24
    • 25
    • 26
    • 27
    • 28
    • 29
    • 30
    • 31
    • 32
    • 33
    • 34
    • 35
    • 36
    • 37
    • 38
    • 39
    • 40
    • 41
    • 42

    先启动生产者,发送消息到broker,这里简单发送了10条从0-9的消息,再启动消费者,控制台输出如下:

    消费结果

    集群消费

    以上的程序只是单生产者单消费者的场景,所谓集群消费就是同一个topic的消费可能有多个消费者消费,也称广播消费。集群消费只一种多线程或者多机器的消费方式。

    要实现集群消费只需要为每个消费者指定不同的group.id就可以。由于代码比较简单就不贴了。

    测试发现,当为了两个consumer(这里是两个进程)指定不同的group.id后,producer发送的消息两个consumer都能接受到,这很显然,集群消费嘛。为设置两个consumer的group.id为同一个的时候,只有一个消费者能消费者到。也就是说,kafka的消息只能由组中的单个用户读取。

  • 相关阅读:
    28.Implement strStr()【leetcod】
    35. Search Insert Position【leetcode】
    27. Remove Element【leetcode】
    20. Valid Parentheses【leetcode】
    14. Longest Common Prefix【leetcode】
    Java的String中的subString()方法
    charAt()的功能
    spring整合mybatis
    AOP
    代理模式
  • 原文地址:https://www.cnblogs.com/xiaohanlin/p/8640858.html
Copyright © 2011-2022 走看看