zoukankan      html  css  js  c++  java
  • (08)java程序连接kafka示例

      1、导入kafka所需要的包

      在服务器上安装kafka程序的时候,解压后就有kafka需要的jar包,如下图所示:

      2、新建生产者类

     1 package demo;
     2 
     3 import java.util.Properties;
     4 import java.util.concurrent.TimeUnit;
     5 
     6 import kafka.javaapi.producer.Producer;
     7 import kafka.producer.KeyedMessage;
     8 import kafka.producer.ProducerConfig;
     9 import kafka.serializer.StringEncoder;
    10 
    11 public class ProducerDemo extends Thread {
    12     
    13     //指定具体的topic
    14     private String topic;
    15     
    16     public ProducerDemo(String topic){
    17         this.topic = topic;
    18     }
    19     
    20     //每隔5秒发送一条消息
    21     public void run(){
    22         //创建一个producer的对象
    23         Producer producer = createProducer();
    24         //发送消息
    25         int i = 1;
    26         while(true){
    27             String data = "message " + i++;
    28             //使用produer发送消息
    29             producer.send(new KeyedMessage(this.topic, data));
    30             //打印
    31             System.out.println("发送数据:" + data);
    32             try {
    33                 TimeUnit.SECONDS.sleep(5);
    34             } catch (Exception e) {
    35                 e.printStackTrace();
    36             }
    37         }
    38     }
    39     
    40     //创建Producer的实例
    41     private Producer createProducer() {
    42         Properties prop = new Properties();
    43         //声明zk
    44         prop.put("zookeeper.connect", "192.168.7.151:2181,192.168.7.152:2181,192.168.7.153:2181");
    45         prop.put("serializer.class",StringEncoder.class.getName());
    46         //声明Broker的地址
    47         prop.put("metadata.broker.list","192.168.7.151:9092,192.168.7.151:9093");
    48         return new Producer(new ProducerConfig(prop));
    49     }
    50     
    51     public static void main(String[] args) {
    52         //启动线程发送消息
    53         new ProducerDemo("mydemo1").start();
    54     }
    55 }

      3、新建消费者类

     1 package demo;
     2 
     3 import java.util.HashMap;
     4 import java.util.List;
     5 import java.util.Map;
     6 import java.util.Properties;
     7 
     8 
     9 import kafka.consumer.Consumer;
    10 import kafka.consumer.ConsumerConfig;
    11 import kafka.consumer.ConsumerIterator;
    12 import kafka.consumer.KafkaStream;
    13 import kafka.javaapi.consumer.ConsumerConnector;
    14 
    15 public class ConsumerDemo extends Thread {
    16 
    17     //指定具体的topic
    18     private String topic;
    19     
    20     public ConsumerDemo(String topic){
    21         this.topic = topic;
    22     }
    23     
    24     public void run(){
    25         //构造一个consumer的对象
    26         ConsumerConnector consumer = createConsumer();
    27         //构造一个Map对象,代表topic
    28         //String: topic的名称  Integer: 从这个topic中获取多少条记录
    29         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    30         //一次从这个topic中获取一条记录
    31         topicCountMap.put(this.topic, 1);
    32         //构造一个messageStream:输入流
    33         //String: topic的名称 List: 获取的数据
    34         Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
    35         //获取每次接受到的具体的数据
    36         KafkaStream<byte[], byte[]> stream = messageStreams.get(this.topic).get(0);
    37         ConsumerIterator<byte[], byte[]> iterator = stream.iterator();
    38         while(iterator.hasNext()){
    39             String message = new String(iterator.next().message());
    40             System.out.println("接受数据:" + message);
    41         }
    42     }
    43     
    44     //创建具体的consumer
    45     private ConsumerConnector createConsumer() {
    46         Properties prop = new Properties();
    47         //指明zk的地址
    48         prop.put("zookeeper.connect", "192.168.7.151:2181,192.168.7.152:2181,192.168.7.153:2181");
    49         //指明这个consumer的消费组
    50         prop.put("group.id", "group1");
    51         //时间设置的过小可能会连接超时。。。
    52         prop.put("zookeeper.connection.timeout.ms", "60000");
    53         return Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
    54     }
    55 
    56     public static void main(String[] args) {
    57         new ConsumerDemo("mydemo1").start();
    58     }
    59 
    60 }

      运行程序如下:

       注意:

      1、在消费者的类中,时间要设置长一些,否则可能出现连接超时的错误(我就出现了。。。)

      2、直接关闭生产者和消费者窗口,重新打开消费者窗口,会有重复数据。。。目前还没找到解决办法。。。

  • 相关阅读:
    How to create jar for Android Library Project
    Very large tabs in eclipse panes on Ubuntu
    64bit Ubuntu, Android AAPT, R.java
    Linux(Ubuntu)下如何安装JDK
    Configure xterm Fonts and Colors for Your Eyeball
    建立、配置和使用Activity——启动其他Activity并返回结果
    建立、配置和使用Activity——使用Bundle在Activity之间交换数据
    建立、配置和使用Activity——启动、关闭Activity
    建立、配置和使用Activity——Activity
    异步任务(AsyncTask)
  • 原文地址:https://www.cnblogs.com/javasl/p/12273968.html
Copyright © 2011-2022 走看看