zoukankan      html  css  js  c++  java
  • (08)java程序连接kafka示例

      1、导入kafka所需要的包

      在服务器上安装kafka程序的时候,解压后就有kafka需要的jar包,如下图所示:

      2、新建生产者类

     1 package demo;
     2 
     3 import java.util.Properties;
     4 import java.util.concurrent.TimeUnit;
     5 
     6 import kafka.javaapi.producer.Producer;
     7 import kafka.producer.KeyedMessage;
     8 import kafka.producer.ProducerConfig;
     9 import kafka.serializer.StringEncoder;
    10 
    11 public class ProducerDemo extends Thread {
    12     
    13     //指定具体的topic
    14     private String topic;
    15     
    16     public ProducerDemo(String topic){
    17         this.topic = topic;
    18     }
    19     
    20     //每隔5秒发送一条消息
    21     public void run(){
    22         //创建一个producer的对象
    23         Producer producer = createProducer();
    24         //发送消息
    25         int i = 1;
    26         while(true){
    27             String data = "message " + i++;
    28             //使用produer发送消息
    29             producer.send(new KeyedMessage(this.topic, data));
    30             //打印
    31             System.out.println("发送数据:" + data);
    32             try {
    33                 TimeUnit.SECONDS.sleep(5);
    34             } catch (Exception e) {
    35                 e.printStackTrace();
    36             }
    37         }
    38     }
    39     
    40     //创建Producer的实例
    41     private Producer createProducer() {
    42         Properties prop = new Properties();
    43         //声明zk
    44         prop.put("zookeeper.connect", "192.168.7.151:2181,192.168.7.152:2181,192.168.7.153:2181");
    45         prop.put("serializer.class",StringEncoder.class.getName());
    46         //声明Broker的地址
    47         prop.put("metadata.broker.list","192.168.7.151:9092,192.168.7.151:9093");
    48         return new Producer(new ProducerConfig(prop));
    49     }
    50     
    51     public static void main(String[] args) {
    52         //启动线程发送消息
    53         new ProducerDemo("mydemo1").start();
    54     }
    55 }

      3、新建消费者类

     1 package demo;
     2 
     3 import java.util.HashMap;
     4 import java.util.List;
     5 import java.util.Map;
     6 import java.util.Properties;
     7 
     8 
     9 import kafka.consumer.Consumer;
    10 import kafka.consumer.ConsumerConfig;
    11 import kafka.consumer.ConsumerIterator;
    12 import kafka.consumer.KafkaStream;
    13 import kafka.javaapi.consumer.ConsumerConnector;
    14 
    15 public class ConsumerDemo extends Thread {
    16 
    17     //指定具体的topic
    18     private String topic;
    19     
    20     public ConsumerDemo(String topic){
    21         this.topic = topic;
    22     }
    23     
    24     public void run(){
    25         //构造一个consumer的对象
    26         ConsumerConnector consumer = createConsumer();
    27         //构造一个Map对象,代表topic
    28         //String: topic的名称  Integer: 从这个topic中获取多少条记录
    29         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    30         //一次从这个topic中获取一条记录
    31         topicCountMap.put(this.topic, 1);
    32         //构造一个messageStream:输入流
    33         //String: topic的名称 List: 获取的数据
    34         Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
    35         //获取每次接受到的具体的数据
    36         KafkaStream<byte[], byte[]> stream = messageStreams.get(this.topic).get(0);
    37         ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
    38         while(iterator.hasNext()){
    39             String message = new String(iterator.next().message());
    40             System.out.println("接受数据:" + message);
    41         }
    42     }
    43     
    44     //创建具体的consumer
    45     private ConsumerConnector createConsumer() {
    46         Properties prop = new Properties();
    47         //指明zk的地址
    48         prop.put("zookeeper.connect", "192.168.7.151:2181,192.168.7.152:2181,192.168.7.153:2181");
    49         //指明这个consumer的消费组
    50         prop.put("group.id", "group1");
    51         //时间设置的过小可能会连接超时。。。
    52         prop.put("zookeeper.connection.timeout.ms", "60000");
    53         return Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
    54     }
    55 
    56     public static void main(String[] args) {
    57         new ConsumerDemo("mydemo1").start();
    58     }
    59 
    60 }

      运行程序如下:

       注意:

      1、在消费者的类中,时间要设置长一些,否则可能出现连接超时的错误(我就出现了。。。)

      2、直接关闭生产者和消费者窗口,重新打开消费者窗口,会有重复数据。。。目前还没找到解决办法。。。

  • 相关阅读:
    关于找了很长时间但是找不到原因的bug的解决方法
    牛客_剑指offer题集——二叉树中和为某一值的路径(java实现)
    牛客_剑指offer题集——栈的压入弹出序列(java实现)
    牛客_剑指offer题集——顺时针打印算法(java实现)
    第 7 章 Selenium WebDriver 进阶应用 Selenium 3+Python 3 自动化测试
    第 6 章 Selenium 常用方法(二) Selenium 3+Python 3 自动化测试
    第 6 章 Selenium 常用方法(一) Selenium 3+Python 3 自动化测试
    5.2 Selenium 八大定位 Selenium 3+Python 3 自动化测试
    5.1 Python 基础知识 Selenium 3+Python 3 自动化测试
    第 4 章 前端技术简介 Selenium 3+Python 3 自动化测试
  • 原文地址:https://www.cnblogs.com/javasl/p/12273968.html
Copyright © 2011-2022 走看看