zoukankan      html  css  js  c++  java
  • 使用java创建kafka的生产者和消费者

        创建一个Kafka的主题,连接到zk集群,副本因子3,分区3,主题名是test111
            [root@h5 kafka]# bin/kafka-topics.sh --create --zookeeper h5:2181 --topic test111 --replication-factor 3 --partitions 3
        查看Kafka的主题详情
            [root@h5 kafka]# bin/kafka-topics.sh --describe --zookeeper h5:2181 --topic test111
        查看Kafka所有的主题    
            [root@h5 kafka]# bin/kafka-topics.sh --list --zookeeper h5:2181
        删除Kafka指定的主题    
            [root@h5 kafka]# bin/kafka-topics.sh --delete --zookeeper h5:2181,h6:2181,h7:2181 --topic test111
            如删除时提示
            Topic guowang1 is marked for deletion.
       Note: This will have no impact if delete.topic.enable is not set to true.
          请修改Kafka/config/server.properties新增delete.topic.enable=true
            kafka_2.10-0.8.2.0.jar
            kafka-clients-0.8.2.0.jar
            metrics-core-2.2.0.jar
            scala-library-2.10.4.jar
            zkclient-0.3.jar
            zookeeper-3.4.6.jar
        
        1、生产者        

     1 package storm.test.kafka;
     2 
     3         import java.util.Properties;
     4 
     5         import kafka.javaapi.producer.Producer;
     6         import kafka.producer.KeyedMessage;
     7         import kafka.producer.ProducerConfig;
     8         import kafka.serializer.StringEncoder;
     9 
    10         public class TestProducer {
    11 
    12             public static void main(String[] args) throws Exception {
    13                 Properties prop = new Properties();
    14                 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");
    15                 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");
    16                 prop.put("serializer.class", StringEncoder.class.getName());
    17                 Producer<String, String> producer = new Producer<String, String>(new ProducerConfig(prop));
    18                 int i = 0;
    19                 while(true){
    20                     producer.send(new KeyedMessage<String, String>("test111", "msg:"+i++));
    21                     Thread.sleep(1000);
    22                 }
    23             }
    24 
    25         }


        2、消费者        

     1 package storm.test.kafka;
     2 
     3         import java.util.HashMap;
     4         import java.util.List;
     5         import java.util.Map;
     6         import java.util.Properties;
     7 
     8         import kafka.consumer.Consumer;
     9         import kafka.consumer.ConsumerConfig;
    10         import kafka.consumer.ConsumerIterator;
    11         import kafka.consumer.KafkaStream;
    12         import kafka.javaapi.consumer.ConsumerConnector;
    13         import kafka.serializer.StringEncoder;
    14 
    15         public class TestConsumer {
    16 
    17             static final String topic = "test111";
    18             
    19             public static void main(String[] args) {
    20                 Properties prop = new Properties();
    21                 prop.put("zookeeper.connect", "h5:2181,h6:2181,h7:2181");
    22                 prop.put("serializer.class", StringEncoder.class.getName());
    23                 prop.put("metadata.broker.list", "h5:9092,h6:9092,h7:9092");
    24                 prop.put("group.id", "group1");
    25                 ConsumerConnector consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(prop));
    26                 Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    27                 topicCountMap.put(topic, 1);
    28                 Map<String, List<KafkaStream<byte[], byte[]>>> messageStreams = consumer.createMessageStreams(topicCountMap);
    29                 final KafkaStream<byte[], byte[]> kafkaStream = messageStreams.get(topic).get(0);
    30                 ConsumerIterator<byte[], byte[]> iterator = kafkaStream.iterator();
    31                 while (iterator.hasNext()) {
    32                     String msg = new String(iterator.next().message());
    33                     System.out.println("收到消息:"+msg);
    34                 }
    35             }
    36 
    37         }
  • 相关阅读:
    mysql5.7 linux安装参考
    谈谈微服务中的 API 网关(API Gateway)
    十大Intellij IDEA快捷键
    SqoopFlume、Flume、HDFS之间比较
    PostgreSQL-存储过程(一)基础篇
    spark调优篇-oom 优化(汇总)
    spark调优篇-数据倾斜(汇总)
    spark调优篇-Spark ON Yarn 内存管理(汇总)
    spark异常篇-OutOfMemory:GC overhead limit exceeded
    spark异常篇-Removing executor 5 with no recent heartbeats: 120504 ms exceeds timeout 120000 ms 可能的解决方案
  • 原文地址:https://www.cnblogs.com/mengyao/p/4526075.html
Copyright © 2011-2022 走看看