zoukankan      html  css  js  c++  java
  • 057 Java中kafka的Producer程序实现

    1.需要启动的服务

      这里启动的端口是9092。

        bin/kafka-console-consumer.sh --topic beifeng --zookeeper linux-hadoop01.ibeifeng.com:2181/kafka

      

    2.producer的程序

      1 package com.jun.it;
      2 import kafka.javaapi.producer.Producer;
      3 import kafka.producer.KeyedMessage;
      4 import kafka.producer.ProducerConfig;
      5 import java.util.Properties;
      6 import java.util.Random;
      7 import java.util.concurrent.atomic.AtomicBoolean;
      8 public class JavaKafkaProducer {
      9     public static final char[] chars = "qazwsxedcrfvtgbyhnujmikolp0123456789".toCharArray();
     10     public static final int charsLength = chars.length;
     11     public static final Random random = new Random(System.currentTimeMillis());
     12     private Producer<String, String> producer = null;
     13     
     14     private String topicName = null;
     15     private String brokerList = null;
     16     private boolean isSync = true; // 默认为同步
     17 
     18     /**
     19      * 构造函数
     20      *
     21      * @param topicName
     22      * @param brokerList
     23      */
     24     public JavaKafkaProducer(String topicName, String brokerList) {
     25         this(topicName, brokerList, true);
     26     }
     27 
     28     /**
     29      * 构造函数,主要是产生producer
     30      *
     31      * @param topicName
     32      * @param brokerList
     33      * @param isSync
     34      */
     35     public JavaKafkaProducer(String topicName, String brokerList, boolean isSync) {
     36         // 赋值
     37         this.topicName = topicName;
     38         this.brokerList = brokerList;
     39         this.isSync = isSync;
     40 
     41         // 1. 给定配置信息:参考http://kafka.apache.org/082/documentation.html#producerconfigs
     42         Properties props = new Properties();
     43         // kafka集群的连接信息
     44         props.put("metadata.broker.list", this.brokerList);
     45         // kafka发送数据方式
     46         if (this.isSync) {
     47             // 同步发送数据
     48             props.put("producer.type", "sync");
     49         } else {
     50             // 异步发送数据
     51             props.put("producer.type", "async");
     52             /**
     53              * 0: 不等待broker的返回
     54              * 1: 表示至少等待1个broker返回结果
     55              * -1:表示等待所有broker返回数据接收成功的结果
     56              */
     57             props.put("request.required.acks", "0");
     58         }
     59         // key/value数据序列化的类
     60         /**
     61          * 默认是:DefaultEncoder, 指发送的数据类型是byte类型
     62          * 如果发送数据是string类型,必须更改StringEncoder
     63          */
     64         props.put("serializer.class", "kafka.serializer.StringEncoder");
     65 
     66         // 2. 构建Kafka的Producer Configuration上下文
     67         ProducerConfig config = new ProducerConfig(props);
     68 
     69         // 3. 构建Kafka的生产者:Producerr
     70         this.producer = new Producer<String, String>(config);
     71     }
     72 
     73     /**
     74      * 关闭producer连接
     75      */
     76     public void closeProducer() {
     77         producer.close();
     78     }
     79 
     80     /**
     81      * 提供给外部应用调用的直接运行发送数据代码的方法
     82      *
     83      * @param threadNumbers
     84      * @param isRunning
     85      */
     86     public void run(int threadNumbers, final AtomicBoolean isRunning) {
     87         for (int i = 0; i < threadNumbers; i++) {
     88             new Thread(new Runnable() {
     89                 public void run() {
     90                     int count = 0;
     91                     while (isRunning.get()) {
     92                         // 只有在运行状态的情况下,才发送数据
     93                         KeyedMessage<String, String> message = generateMessage();
     94                         // 发送数据
     95                         producer.send(message);
     96                         count++;
     97                         // 打印一下
     98                         if (count % 100 == 0) {
     99                             System.out.println("Count = " + count + "; message:" + message);
    100                         }
    101 
    102                         // 假设需要休息一下
    103                         try {
    104                             Thread.sleep(random.nextInt(100) + 10);
    105                         } catch (InterruptedException e) {
    106                             // nothings
    107                         }
    108                     }
    109                     System.out.println("Thread:" + Thread.currentThread().getName() + " send message count is:" + count);
    110                 }
    111             }).start();
    112         }
    113     }
    114 
    115     /**
    116      * 产生一个随机的Kafka的KeyedMessage对象
    117      *
    118      * @return
    119      */
    120     public KeyedMessage<String, String> generateMessage() {
    121         String key = generateString(3) + "_" + random.nextInt(10);
    122         StringBuilder sb = new StringBuilder();
    123         int numWords = random.nextInt(5) + 1; // [1,5]单词
    124         for (int i = 0; i < numWords; i++) {
    125             String word = generateString(random.nextInt(5) + 1); // 单词中字符最少1个最多5个
    126             sb.append(word).append(" ");
    127         }
    128         String message = sb.toString().trim();
    129         return new KeyedMessage(this.topicName, key, message);
    130     }
    131 
    132     /**
    133      * 随机生产一个给定长度的字符串
    134      *
    135      * @param numItems
    136      * @return
    137      */
    138     public static String generateString(int numItems) {
    139         StringBuilder sb = new StringBuilder();
    140         for (int i = 0; i < numItems; i++) {
    141             sb.append(chars[random.nextInt(charsLength)]);
    142         }
    143         return sb.toString();
    144     }
    145 }

    3.测试类

     1 package com.jun.it;
     2 
     3 import java.util.concurrent.atomic.AtomicBoolean;
     4 
     5 public class JavaKafkaProducerTest {
     6     public static void main(String[] args) {
     7         String topicName = "beifeng";
     8         String brokerList = "linux-hadoop01.ibeifeng.com:9092,linux-hadoop01.ibeifeng.com:9093";
     9         int threadNums = 10;
    10         AtomicBoolean isRunning = new AtomicBoolean(true);
    11         JavaKafkaProducer producer = new JavaKafkaProducer(topicName, brokerList);
    12         producer.run(threadNums, isRunning);
    13 
    14         // 停留60秒后,进行关闭操作
    15         try {
    16             Thread.sleep(1000);
    17         } catch (InterruptedException e) {
    18             // nothings
    19         }
    20         isRunning.set(false);
    21 
    22         // 关闭连接
    23         producer.closeProducer();
    24     }
    25 }

    4.效果

      

    二:使用自定义的分区器

    1.分区器

     1 package com.jun.it;
     2 
     3 import kafka.producer.Partitioner;
     4 import kafka.utils.VerifiableProperties;
     5 
     6 public class JavaKafkaPartitioner implements Partitioner {
     7     /**
     8      * 默认无参构造函数
     9      */
    10     public JavaKafkaPartitioner() {
    11         this(new VerifiableProperties());
    12     }
    13 
    14     /**
    15      * 该构造函数必须给定
    16      *
    17      * @param properties 初始化producer的时候给定的配置信息
    18      */
    19     public JavaKafkaPartitioner(VerifiableProperties properties) {
    20         // nothings
    21     }
    22 
    23     @Override
    24     public int partition(Object key, int numPartitions) {
    25         String tmp = (String) key;
    26         int index = tmp.lastIndexOf('_');
    27         int number = Integer.valueOf(tmp.substring(index + 1));
    28         return number % numPartitions;
    29     }
    30 }

    2.producer类重新修改

      1 package com.jun.it;
      2 import kafka.javaapi.producer.Producer;
      3 import kafka.producer.KeyedMessage;
      4 import kafka.producer.ProducerConfig;
      5 import java.util.Properties;
      6 import java.util.Random;
      7 import java.util.concurrent.atomic.AtomicBoolean;
      8 public class JavaKafkaProducer {
      9     public static final char[] chars = "qazwsxedcrfvtgbyhnujmikolp0123456789".toCharArray();
     10     public static final int charsLength = chars.length;
     11     public static final Random random = new Random(System.currentTimeMillis());
     12     private Producer<String, String> producer = null;
     13 
     14     private String topicName = null;
     15     private String brokerList = null;
     16     private boolean isSync = true; // 默认为同步
     17     private String partitionerClass = null; // 数据分区器class类
     18 
     19     /**
     20      * 构造函数
     21      *
     22      * @param topicName
     23      * @param brokerList
     24      */
     25     public JavaKafkaProducer(String topicName, String brokerList) {
     26         this(topicName, brokerList, true, null);
     27     }
     28 
     29     /**
     30      * 构造函数
     31      *
     32      * @param topicName
     33      * @param brokerList
     34      * @param partitionerClass
     35      */
     36     public JavaKafkaProducer(String topicName, String brokerList, String partitionerClass) {
     37         this(topicName, brokerList, true, partitionerClass);
     38     }
     39 
     40     /**
     41      * 构造函数,主要是产生producer
     42      *
     43      * @param topicName
     44      * @param brokerList
     45      * @param isSync
     46      */
     47     public JavaKafkaProducer(String topicName, String brokerList, boolean isSync, String partitionerClass) {
     48         // 赋值
     49         this.topicName = topicName;
     50         this.brokerList = brokerList;
     51         this.isSync = isSync;
     52         this.partitionerClass = partitionerClass;
     53 
     54         // 1. 给定配置信息:参考http://kafka.apache.org/082/documentation.html#producerconfigs
     55         Properties props = new Properties();
     56         // kafka集群的连接信息
     57         props.put("metadata.broker.list", this.brokerList);
     58         // kafka发送数据方式
     59         if (this.isSync) {
     60             // 同步发送数据
     61             props.put("producer.type", "sync");
     62         } else {
     63             // 异步发送数据
     64             props.put("producer.type", "async");
     65             /**
     66              * 0: 不等待broker的返回
     67              * 1: 表示至少等待1个broker返回结果
     68              * -1:表示等待所有broker返回数据接收成功的结果
     69              */
     70             props.put("request.required.acks", "0");
     71         }
     72         // key/value数据序列化的类
     73         /**
     74          * 默认是:DefaultEncoder, 指发送的数据类型是byte类型
     75          * 如果发送数据是string类型,必须更改StringEncoder
     76          */
     77         props.put("serializer.class", "kafka.serializer.StringEncoder");
     78 
     79         // 给定分区器的class参数
     80         if (this.partitionerClass != null && !this.partitionerClass.trim().isEmpty()) {
     81             // 默认是:DefaultPartiioner,基于key的hashCode进行hash后进行分区
     82             props.put("partitioner.class", this.partitionerClass.trim());
     83         }
     84 
     85         // 2. 构建Kafka的Producer Configuration上下文
     86         ProducerConfig config = new ProducerConfig(props);
     87 
     88         // 3. 构建Kafka的生产者:Producerr
     89         this.producer = new Producer<String, String>(config);
     90     }
     91 
     92     /**
     93      * 关闭producer连接
     94      */
     95     public void closeProducer() {
     96         producer.close();
     97     }
     98 
     99     /**
    100      * 提供给外部应用调用的直接运行发送数据代码的方法
    101      *
    102      * @param threadNumbers
    103      * @param isRunning
    104      */
    105     public void run(int threadNumbers, final AtomicBoolean isRunning) {
    106         for (int i = 0; i < threadNumbers; i++) {
    107             new Thread(new Runnable() {
    108                 public void run() {
    109                     int count = 0;
    110                     while (isRunning.get()) {
    111                         // 只有在运行状态的情况下,才发送数据
    112                         KeyedMessage<String, String> message = generateMessage();
    113                         // 发送数据
    114                         producer.send(message);
    115                         count++;
    116                         // 打印一下
    117                         if (count % 100 == 0) {
    118                             System.out.println("Count = " + count + "; message:" + message);
    119                         }
    120 
    121                         // 假设需要休息一下
    122                         try {
    123                             Thread.sleep(random.nextInt(100) + 10);
    124                         } catch (InterruptedException e) {
    125                             // nothings
    126                         }
    127                     }
    128                     System.out.println("Thread:" + Thread.currentThread().getName() + " send message count is:" + count);
    129                 }
    130             }).start();
    131         }
    132     }
    133 
    134     /**
    135      * 产生一个随机的Kafka的KeyedMessage对象
    136      *
    137      * @return
    138      */
    139     public KeyedMessage<String, String> generateMessage() {
    140         String key = generateString(3) + "_" + random.nextInt(10);
    141         StringBuilder sb = new StringBuilder();
    142         int numWords = random.nextInt(5) + 1; // [1,5]单词
    143         for (int i = 0; i < numWords; i++) {
    144             String word = generateString(random.nextInt(5) + 1); // 单词中字符最少1个最多5个
    145             sb.append(word).append(" ");
    146         }
    147         String message = sb.toString().trim();
    148         return new KeyedMessage(this.topicName, key, message);
    149     }
    150 
    151     /**
    152      * 随机生产一个给定长度的字符串
    153      *
    154      * @param numItems
    155      * @return
    156      */
    157     public static String generateString(int numItems) {
    158         StringBuilder sb = new StringBuilder();
    159         for (int i = 0; i < numItems; i++) {
    160             sb.append(chars[random.nextInt(charsLength)]);
    161         }
    162         return sb.toString();
    163     }
    164 }

    3.测试类

     1 package com.jun.it;
     2 
     3 import java.util.concurrent.atomic.AtomicBoolean;
     4 
     5 public class JavaKafkaProducerTest {
     6     public static void main(String[] args) {
     7         String topicName = "beifeng";
     8         String brokerList = "linux-hadoop01.ibeifeng.com:9092,linux-hadoop01.ibeifeng.com:9093";
     9         String partitionerClass = "com.jun.it.JavaKafkaPartitioner";
    10         int threadNums = 10;
    11         AtomicBoolean isRunning = new AtomicBoolean(true);
    12         JavaKafkaProducer producer = new JavaKafkaProducer(topicName, brokerList,partitionerClass);
    13         producer.run(threadNums, isRunning);
    14 
    15         // 停留60秒后,进行关闭操作
    16         try {
    17             Thread.sleep(1000);
    18         } catch (InterruptedException e) {
    19             // nothings
    20         }
    21         isRunning.set(false);
    22 
    23         // 关闭连接
    24         producer.closeProducer();
    25     }
    26 }

    4.效果

      

  • 相关阅读:
    字体图标
    css精灵图
    css定位
    浮动的清除
    浮动、版心布局
    css 盒子模型
    css三大特性、行高属性、权重
    JSP实现上传文件(或图片)到MySQL
    MySQL 错误. Packet for query is too large (***> 4194304). You can change......
    使用Servlet3.0提供的API实现文件上传到指定位置
  • 原文地址:https://www.cnblogs.com/juncaoit/p/9426106.html
Copyright © 2011-2022 走看看