zoukankan      html  css  js  c++  java
  • 关于kafka-clients JAVA API的基本使用

    首先老规矩, 引入maven依赖

    1 <dependency>
    2     <groupId>org.apache.kafka</groupId>
    3     <artifactId>kafka-clients</artifactId>
    4     <version>1.0.0</version>
    5 </dependency>

    关于kafka-clients的消息生产者: 

     1 @Slf4j
     2 public class KafkaProducerClient {
     3 
     4     public static void pushMsg(String msg) throws Exception {
     5         Properties props = new Properties();
     6         props.put("bootstrap.servers", KafkaConstant.KAFKA_SERVER_ADDRESS);
     7         props.put("acks", "0");
     8         props.put("retries", 0);
     9         props.put("batch.size", 16384);
    10         props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    11         props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    12         Producer producer = new KafkaProducer(props);
    13         ProducerRecord<String, String> record = new ProducerRecord<>(KafkaConstant.KAFKA_TOPIC_NAME, 0, "123", msg);
    14         producer.send(record, new Callback() {
    15             @Override
    16             public void onCompletion(RecordMetadata metadata, Exception e) {
    17                 if (e != null) {
    18                     e.printStackTrace();
    19                 }
    20                 log.info("pushMsg of msg: {}, metadata: {}", msg, metadata);
    21             }
    22         });
    23         producer.close();
    24     }
    25     
    26 }

    关于kafka-clients的消息消费者

     1 @Slf4j
     2 public class KafkaConsumerClient extends Thread {
     3     
     4     private KafkaConsumerClient() {
     5     }
     6 
     7     /**
     8      * 初始化consumer
     9      */
    10     public void initKafkaConsumer () {
    11         log.info("init Kafka Consumer");
    12         new KafkaConsumerClient().start();
    13     }
    14     
    15     @Override
    16     public void run() {
    17         Properties props = new Properties();
    18 
    19         props.put("bootstrap.servers", KafkaConstant.KAFKA_SERVER_ADDRESS);
    20         props.put("group.id", "1");
    21         props.put("enable.auto.commit", "true");
    22         props.put("auto.commit.interval.ms", "1000");
    23         props.put("session.timeout.ms", "30000");
    24         props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    25         props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    26 
    27         Consumer<String, String> consumer = new KafkaConsumer<>(props);
    28         consumer.subscribe(Arrays.asList(KafkaConstant.KAFKA_TOPIC_NAME));
    29         consumer.seekToBeginning(new ArrayList<>());
    30 
    31         // ===== 拿到所有的topic ===== //
    32         Map<String, List<PartitionInfo>> listTopics = consumer.listTopics();
    33         Set<Map.Entry<String, List<PartitionInfo>>> entries = listTopics.entrySet();
    34 
    35         while (true) {
    36             ConsumerRecords<String, String> records = consumer.poll(1000 * 60);
    37             for(ConsumerRecord<String, String> record : records) {
    38                 System.out.println("[fetched from partition " + record.partition() + ", offset: " + record.offset() + ", message: " + record.value() + "]");
    39             }
    40         }
    41     }
    42 }

    我们需要在项目启动的时候将消费者consumer启动起来

    1 <bean id="initKafkaConsumer" class="com.dywl.zhoushan.kafka.KafkaConsumerClient" init-method="initKafkaConsumer"></bean>

    然后调用生产者producer时, 消费者consumer就能拿到消息

    如: 

     1 @Override
     2 public void pushMsgById(Long id) throws Exception {
     3     User user = new User();
     4     user.setId(id);
     5     user.setUsername("test11111111");
     6     user.setPassword("test22222222");
     7     String str = JsonUtil.toCompactJsonString(user);
     8     log.info("pushMsgById is user: {}", str);
     9     KafkaProducerClient.pushMsg(str);
    10 }

    得到结果: 

  • 相关阅读:
    IOS开发创建开发证书及发布App应用(六)——打包应用
    IOS开发创建开发证书及发布App应用(四)——创建配置概要文件
    IOS开发创建开发证书及发布App应用(五)——编译应用
    如何查看SQL Server的版本、补丁包信息?以及如何鉴别是否需要升级自己的SQL Server?
    如何用PowerShell列出你机器上的.NET Framework的版本号和SP服务补丁
    Linux 开放服务端口
    在CentOS/RHEL/Scientific Linux 6下安装 LAMP
    使用ownCloud在Linux安装你的个人云服务
    升級 Centos 6.5 的 php 版本
    reStructuredText(.rst)语法规则快速入门
  • 原文地址:https://www.cnblogs.com/yanwu0527/p/9083386.html
Copyright © 2011-2022 走看看