zoukankan      html  css  js  c++  java
  • 创建Kafka0.8.2生产者与消费者

    一、下载安装Kafka0.8.2

    二、vi config/server.properties

    三、修改为advertised.host.name=192.168.1.76

    四、rm -rf  /tmp *移除临时目录下的文件

    五、修改vi /etc/hosts中的127.0.0.1为192.168.1.76

    六、开启zookeeper 

    [html] view plain copy
     
    1. bin/zookeeper-server-start.sh config/zookeeper.properties  


    七、开启kafka

    bin/kafka-server-start.sh config/server.properties


    八、创建主题

    bin/kafka-topics.sh --create --zookeeper 192.168.1.76:2181 --replication-factor 1 --partitions 1 --topic mytesttopic


    九、开启消费者

    bin/kafka-console-consumer.sh --zookeeper 192.168.1.76:2181 --topic mytesttopic --from-beginning 回车


    十、生产者代码(0.8.2.1的jar包)

    [java] view plain copy
     
    1. import java.util.*;  
    2.   
    3. import org.apache.kafka.clients.producer.KafkaProducer;  
    4. import org.apache.kafka.clients.producer.ProducerRecord;  
    5.   
    6. public class SimpleProducer {  
    7.     public static void main(String[] args) {  
    8.         Properties properties = new Properties();  
    9.         properties.put("bootstrap.servers", "192.168.1.76:9092");  
    10.         properties.put("metadata.broker.list", "192.168.1.76:9092");  
    11.         properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
    12.         properties.put("serializer.class", "kafka.serializer.StringEncoder");  
    13.         properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");  
    14.         properties.put("request.required.acks", "1");  
    15.   
    16.         KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(properties);  
    17.         for (int iCount = 0; iCount < 100; iCount++) {  
    18.             String message = "My Test Message No " + iCount;  
    19.             ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("mytesttopic", message);  
    20.             producer.send(record);  
    21.         }  
    22.         producer.close();  
    23.     }  
    24. }  

    十一、查看结果

    [html] view plain copy
     
    1. My Test Message No 0  
    2. My Test Message No 1  
    3. My Test Message No 2  
    4. My Test Message No 3  
    5. My Test Message No 4  
    6. My Test Message No 5  
    7. My Test Message No 6  
    8. My Test Message No 7  
    9. My Test Message No 8  
    10. My Test Message No 9  
    11. My Test Message No 10  
    [html] view plain copy
     
    1. ...................  
    [html] view plain copy
     
    1. ..................  

    十、消费者代码(0.8.2.1的jar包)

    [java] view plain copy
     
    1. import kafka.consumer.ConsumerConfig;  
    2. import kafka.consumer.ConsumerIterator;  
    3. import kafka.consumer.KafkaStream;  
    4. import kafka.serializer.StringDecoder;  
    5. import kafka.utils.VerifiableProperties;  
    6. import java.util.*;  
    7. public class SimpleConsumerExample {  
    8.   
    9.     private static kafka.javaapi.consumer.ConsumerConnector consumer;  
    10.   
    11.     public static void consume() {  
    12.   
    13.         Properties props = new Properties();  
    14.         // zookeeper 配置  
    15.         props.put("zookeeper.connect", "192.168.1.76:2181");  
    16.   
    17.         // group 代表一个消费组  
    18.         props.put("group.id", "jd-group");  
    19.   
    20.         // zk连接超时  
    21.         props.put("zookeeper.session.timeout.ms", "4000");  
    22.         props.put("zookeeper.sync.time.ms", "200");  
    23.         props.put("auto.commit.interval.ms", "1000");  
    24.         props.put("auto.offset.reset", "smallest");  
    25.         // 序列化类  
    26.         props.put("serializer.class", "kafka.serializer.StringEncoder");  
    27.   
    28.         ConsumerConfig config = new ConsumerConfig(props);  
    29.   
    30.         consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);  
    31.   
    32.         Map<String, Integer> topicCountMap = new HashMap<String, Integer>();  
    33.         topicCountMap.put("mytesttopic", new Integer(1));  
    34.   
    35.         StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());  
    36.         StringDecoder valueDecoder = new StringDecoder(new VerifiableProperties());  
    37.   
    38.         Map<String, List<KafkaStream<String, String>>> consumerMap = consumer.createMessageStreams(topicCountMap,  
    39.                 keyDecoder, valueDecoder);  
    40.         KafkaStream<String, String> stream = consumerMap.get("mytesttopic").get(0);  
    41.         ConsumerIterator<String, String> it = stream.iterator();  
    42.         while (it.hasNext())  
    43.             System.out.println(it.next().message());  
    44.     }  
    45.   
    46.     public static void main(String[] args) {  
    47.         consume();  
    48.     }  
    49. }  

    十一、提供下C#版的代码

    [csharp] view plain copy
     
      1. static void Main(string[] args)  
      2. {  
      3.     //https://github.com/Jroland/kafka-net  
      4.     //生产者  
      5.     //var options = new KafkaOptions(new Uri("http://192.168.1.76:9092"), new Uri("http://192.168.1.76:9092"));  
      6.     //var router = new BrokerRouter(options);  
      7.     //var client = new Producer(router);  
      8.   
      9.     //client.SendMessageAsync("mytesttopic", new[] { new Message("hello world") }).Wait();  
      10.   
      11.     //using (client) { }  
      12.   
      13.     //消费者  
      14.     var options = new KafkaOptions(new Uri("http://192.168.1.76:9092"), new Uri("http://192.168.1.76:9092"));  
      15.     var router = new BrokerRouter(options);  
      16.     var consumer = new Consumer(new ConsumerOptions("mytesttopic", router));  
      17.   
      18.     //Consume returns a blocking IEnumerable (ie: never ending stream)  
      19.     foreach (var message in consumer.Consume())  
      20.     {  
      21.         Console.WriteLine("Response: P{0},O{1} : {2}",  
      22.             message.Meta.PartitionId, message.Meta.Offset,System.Text.Encoding.ASCII.GetString(message.Value));  
      23.     }  
      24.     Console.ReadLine();  
      25. }  
  • 相关阅读:
    (转)写好程序注释的十三条建议
    注册表添加NoDrives隐藏盘符(禁用U盘)参数说明
    AJAX实用教程——开篇
    浅谈函数求解与人生
    C#(服务器)与Java(客户端)通过Socket传递对象
    BI开发之——Mdx基础语法(2)
    UML——序列图
    UML——序列图案例总结
    ORM内核原理解析之:延迟加载
    应用程序系统基本设计原则——SOLID
  • 原文地址:https://www.cnblogs.com/heidsoft/p/7697979.html
Copyright © 2011-2022 走看看