zoukankan      html  css  js  c++  java
  • kafka_2.11-0.10.0.1生产者producer的Java实现

    转载自:http://blog.csdn.net/qq_26479655/article/details/52555283

    首先导入包

    1. 将kafka目录下的libs中的jar包导入
    2. 用maven建立
    
    
    1. <dependency>
    2.     <groupId>org.apache.kafka</groupId>
    3.     <artifactId>kafka-clients</artifactId>
    4.     <version>0.10.0.1</version>
    5. </dependency>

    写好properties配置文件 
    一下为项目结构 
    这里写图片描述

    
    
    1. #kafka集群地址
    2. bootstrap.servers = 192.168.168.200:9092
    3. client.id = testProducer
    4. key.serializer = org.apache.kafka.common.serialization.IntegerSerializer
    5. value.serializer = org.apache.kafka.common.serialization.StringSerializer

    然后上代码

    
    
    1. package kafka.producer;
    2.  
    3. import java.io.IOException;
    4. import java.util.Properties;
    5.  
    6. import java.util.concurrent.ExecutionException;
    7.  
    8. import org.apache.kafka.clients.producer.Callback;
    9. import org.apache.kafka.clients.producer.KafkaProducer;
    10. import org.apache.kafka.clients.producer.ProducerRecord;
    11. import org.apache.kafka.clients.producer.RecordMetadata;
    12.  
    13. public class ProducerTest extends Thread{
    14.      private final KafkaProducer<Integer, String> producer;
    15.         private final String topic;
    16.         private final Boolean isAsync;
    17.  
    18. /*isAsync同步、异步*/
    19.         public ProducerTest(String topic, Boolean isAsync) {
    20.             Properties properties = new Properties();
    21.             /*加载配置文件*/
    22.             try {
    23. properties.load(ProducerTest.class.getClassLoader().getResourceAsStream("conf/kafka.producer.properties"));
    24.             } catch (IOException e) {
    25.  
    26.                 e.printStackTrace();
    27.             }
    28.             producer = new KafkaProducer<>(properties);
    29.             this.topic = topic;
    30.             this.isAsync = isAsync;
    31.         }
    32.  
    33.         public void run() {
    34.             int messageNo = 1;
    35.             while (true) {
    36.                 String messageStr = "Message_" + messageNo;
    37.                 long startTime = System.currentTimeMillis();
    38.                 if (isAsync) { // Send asynchronously
    39.                     producer.send(new ProducerRecord<>(topic,
    40.                         messageNo,
    41.                         messageStr), new DemoCallBack(startTime, messageNo, messageStr));
    42.                 } else { // Send synchronously
    43.                     try {
    44.                         producer.send(new ProducerRecord<>(topic,
    45.                             messageNo,
    46.                             messageStr)).get();
    47.                         System.out.println("Sent message: (" + messageNo + ", " + messageStr + ")");
    48.                     } catch (InterruptedException | ExecutionException e) {
    49.                         e.printStackTrace();
    50.                     }
    51.                 }
    52.                 ++messageNo;
    53.             }
    54.         }
    55.     }
    56.  
    57.     class DemoCallBack implements Callback {
    58.  
    59.         private final long startTime;
    60.         private final int key;
    61.         private final String message;
    62.  
    63.         public DemoCallBack(long startTime, int key, String message) {
    64.             this.startTime = startTime;
    65.             this.key = key;
    66.             this.message = message;
    67.         }
    68.  
    69.         /**
    70.          * @param metadata  The metadata for the record that was sent (i.e. the partition and offset). Null if an error
    71.          *                  occurred.
    72.          * @param exception The exception thrown during processing of this record. Null if no error occurred.
    73.          */
    74.         public void onCompletion(RecordMetadata metadata, Exception exception) {
    75.             long elapsedTime = System.currentTimeMillis() - startTime;
    76.             if (metadata != null) {
    77.                 System.out.println(
    78.                     "message(" + key + ", " + message + ") sent to partition(" + metadata.partition() +
    79.                         "), " +
    80.                         "offset(" + metadata.offset() + ") in " + elapsedTime + " ms");
    81.             } else {
    82.                 exception.printStackTrace();
    83.             }
    84.         }
    85.  
    86.  
    87. }

    测试代码

    
    
    1. package kafka.producer;
    2.  
    3. public class Main {
    4.  
    5.     public static void main(String[] args) {
    6.         ProducerTest test = new ProducerTest("TestTopic", true);
    7.         test.start();
    8.     }
    9. }

     


    运行结果

    
    
    1. message(51215, Message_51215) sent to partition(0), offset(161830) in 3497 ms
    2. message(51216, Message_51216) sent to partition(0), offset(161831) in 3497 ms
    3. message(51217, Message_51217) sent to partition(0), offset(161832) in 3497 ms
    4. message(51218, Message_51218) sent to partition(0), offset(161833) in 3497 ms
    5. message(51219, Message_51219) sent to partition(0), offset(161834) in 3497 ms
    6. message(51220, Message_51220) sent to partition(0), offset(161835) in 3497 ms
    7. message(51221, Message_51221) sent to partition(0), offset(161836) in 3497 ms
    8. message(51222, Message_51222) sent to partition(0), offset(161837) in 3497 ms
    9. message(51223, Message_51223) sent to partition(0), offset(161838) in 3497 ms
    10. message(51224, Message_51224) sent to partition(0), offset(161839) in 3497 ms
    11. message(51225, Message_51225) sent to partition(0), offset(161840) in 3497 ms
    12. message(51226, Message_51226) sent to partition(0), offset(161841) in 3497 ms
    13. message(51227, Message_51227) sent to partition(0), offset(161842) in 3497 ms
    14. message(51228, Message_51228) sent to partition(0), offset(161843) in 3497 ms
    15.  
    16. .............​
  • 相关阅读:
    【六校联合训练 省选 #20】快递
    IOError: cannot open resource
    [已解决]运行gunicorn失败:[ERROR] Connection in use 127.0.0.1 8080
    windows下通过navicat for mysql连接centos6.3-64bit下的MySQL数据库
    在centos7中使用yum安装mysql数据库并使用navicat连接
    centos出现“FirewallD is not running”怎么办
    [linux]centos7下解决yum install mysql-server没有可用包
    CentOS 7安装Python3
    flask——CSRFToken保护
    python 获取当前文件夹下所有文件名
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723852.html
Copyright © 2011-2022 走看看