zoukankan      html  css  js  c++  java
  • kafka_2.11-0.8.2.1+java 生产消费程序demo示例

     
    Kafka学习8_kafka java 生产消费程序demo示例

    kafka是吞吐量巨大的一个消息系统,它是用scala写的,和普通的消息的生产消费还有所不同,写了个demo程序供大家参考。kafka的安装请参考官方文档。

    首先我们需要新建一个maven项目,然后在pom中引用kafka jar包,引用依赖如下:

    
    
    1. <dependency>
    2.     <groupId>org.apache.kafka</groupId>
    3.     <artifactId>kafka_2.11</artifactId>
    4.     <version>0.8.2.1</version>
    5. </dependency>

    我们用的版本是0.8, 下面我们看下生产消息的代码:

    
    
    1. package com.telewave.kafka.util;
    2.  
    3. import java.util.Properties;
    4.  
    5. import kafka.javaapi.producer.Producer;
    6.  
    7. import kafka.producer.KeyedMessage;
    8.  
    9. import kafka.producer.ProducerConfig;
    10.  
    11. /**
    12.  * 
    13.  * Hello world!
    14.  * 
    15.  * 
    16.  */
    17.  
    18. public class KafkaProducer
    19.  
    20. {
    21.  
    22. private final Producer<String, String> producer;
    23.  
    24. public final static String TOPIC = "TestTopic";
    25.  
    26. private KafkaProducer() {
    27.  
    28. Properties props = new Properties();
    29.  
    30. // 此处配置的是kafka的端口
    31.  
    32. props.put("metadata.broker.list", "192.168.168.200:9092");
    33.  
    34. // 配置value的序列化类
    35.  
    36. props.put("serializer.class", "kafka.serializer.StringEncoder");
    37.  
    38. // 配置key的序列化类
    39.  
    40. props.put("key.serializer.class", "kafka.serializer.StringEncoder");
    41.  
    42. // request.required.acks
    43.  
    44. // 0, which means that the producer never waits for an acknowledgement
    45. // from the broker (the same behavior as 0.7). This option provides the
    46. // lowest latency but the weakest durability guarantees (some data will
    47. // be lost when a server fails).
    48.  
    49. // 1, which means that the producer gets an acknowledgement after the
    50. // leader replica has received the data. This option provides better
    51. // durability as the client waits until the server acknowledges the
    52. // request as successful (only messages that were written to the
    53. // now-dead leader but not yet replicated will be lost).
    54.  
    55. // -1, which means that the producer gets an acknowledgement after all
    56. // in-sync replicas have received the data. This option provides the
    57. // best durability, we guarantee that no messages will be lost as long
    58. // as at least one in sync replica remains.
    59.  
    60. props.put("request.required.acks", "-1");
    61.  
    62. producer = new Producer<String, String>(new ProducerConfig(props));
    63.  
    64. }
    65.  
    66. void produce() {
    67.  
    68. int messageNo = 1000;
    69.  
    70. final int COUNT = 10000;
    71.  
    72. while (messageNo < COUNT) {
    73.  
    74. String key = String.valueOf(messageNo);
    75.  
    76. String data = "hello kafka message " + key;
    77.  
    78. producer.send(new KeyedMessage<String, String>(TOPIC, key, data));
    79.  
    80. System.out.println(data);
    81.  
    82. messageNo++;
    83.  
    84. }
    85.  
    86. }
    87.  
    88. public static void main(String[] args)
    89.  
    90. {
    91.  
    92. new KafkaProducer().produce();
    93.  
    94. }
    95.  
    96. }

    下面是消费端的代码实现:

    
    
    1. package com.telewave.kafka.util;
    2.  
    3. import java.util.HashMap;
    4.  
    5. import java.util.List;
    6.  
    7. import java.util.Map;
    8.  
    9. import java.util.Properties;
    10.  
    11. import org.apache.kafka.clients.producer.KafkaProducer;
    12.  
    13. import kafka.consumer.ConsumerConfig;
    14.  
    15. import kafka.consumer.ConsumerIterator;
    16.  
    17. import kafka.consumer.KafkaStream;
    18.  
    19. import kafka.javaapi.consumer.ConsumerConnector;
    20.  
    21. import kafka.serializer.StringDecoder;
    22.  
    23. import kafka.utils.VerifiableProperties;
    24.  
    25. public class KafkaConsumer {
    26.  
    27. private final ConsumerConnector consumer;
    28.  
    29. public KafkaConsumer() {
    30.  
    31. Properties props = new Properties();
    32.  
    33. // zookeeper 配置
    34.  
    35. props.put("zookeeper.connect", "192.168.168.200:2181");
    36.  
    37. // group 代表一个消费组
    38.  
    39. props.put("group.id", "jd-group");
    40.  
    41. // zk连接超时
    42.  
    43. props.put("zookeeper.session.timeout.ms", "4000");
    44.  
    45. props.put("zookeeper.sync.time.ms", "200");
    46.  
    47. props.put("auto.commit.interval.ms", "1000");
    48.  
    49. props.put("auto.offset.reset", "largest");
    50.  
    51. // 序列化类
    52.  
    53. props.put("serializer.class", "kafka.serializer.StringEncoder");
    54.  
    55. ConsumerConfig config = new ConsumerConfig(props);
    56.  
    57. consumer = kafka.consumer.Consumer.createJavaConsumerConnector(config);
    58.  
    59. }
    60.  
    61. public void consume() {
    62.  
    63. Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
    64.  
    65. topicCountMap.put("TestTopic", new Integer(1));
    66.  
    67. StringDecoder keyDecoder = new StringDecoder(new VerifiableProperties());
    68.  
    69. StringDecoder valueDecoder = new StringDecoder(
    70. new VerifiableProperties());
    71.  
    72. Map<String, List<KafkaStream<String, String>>> consumerMap =
    73.  
    74. consumer.createMessageStreams(topicCountMap, keyDecoder, valueDecoder);
    75.  
    76. KafkaStream<String, String> stream = consumerMap.get(
    77. "TestTopic").get(0);
    78.  
    79. ConsumerIterator<String, String> it = stream.iterator();
    80.  
    81. while (it.hasNext())
    82.  
    83. System.out.println(it.next().message());
    84.  
    85. }
    86.  
    87. public static void main(String[] args) {
    88.  
    89. new KafkaConsumer().consume();
    90.  
    91. }
    92.  
    93. }
    
    
  • 相关阅读:
    如何撤销Git操作?
    SpringBoot Controller接收参数的几种方式盘点
    全面解析Spring中@ModelAttribute注解的用法
    Java中将字符串转为驼峰格式
    如何将Map键值的下划线转为驼峰
    JS如何获取地址栏url后面的参数?
    解决wordpress 5.3更新后Uncaught Typeerror: $ is not a function
    小程序如何判断用户(后台使用Django)
    服务器 Web服务器 应用服务器区别联系
    C语言和Python语言在存储变量方面的不同
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723854.html
Copyright © 2011-2022 走看看