zoukankan      html  css  js  c++  java
  • 单独KafkaConsumer实例and多worker线程。

    1、单独KafkaConsumer实例and多worker线程。
    将获取的消息和消息的处理解耦,将消息的处理放入单独的工作者线程中,即工作线程中,同时维护一个或者若各干consumer实例执行消息获取任务。
    本例使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作,之后worker线程完成处理后上报位移状态,由全局consumer提交位移。

      1 package com.bie.kafka.kafkaWorker;
      2 
      3 import java.time.Duration;
      4 import java.util.Arrays;
      5 import java.util.Collection;
      6 import java.util.Collections;
      7 import java.util.HashMap;
      8 import java.util.Map;
      9 import java.util.Properties;
     10 import java.util.concurrent.ArrayBlockingQueue;
     11 import java.util.concurrent.ExecutorService;
     12 import java.util.concurrent.ThreadPoolExecutor;
     13 import java.util.concurrent.TimeUnit;
     14 
     15 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
     16 import org.apache.kafka.clients.consumer.ConsumerRecords;
     17 import org.apache.kafka.clients.consumer.KafkaConsumer;
     18 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
     19 import org.apache.kafka.common.TopicPartition;
     20 import org.apache.kafka.common.errors.WakeupException;
     21 
     22 /**
     23  * 
     24  * @Description TODO
     25  * @author biehl
     26  * @Date 2019年6月1日 下午3:28:53
     27  * 
     28  * @param <K>
     29  * @param <V>
     30  * 
     31  *            1、consumer多线程管理类,用于创建线程池以及为每个线程分配消息集合。 另外consumer位移提交也在该类中完成。
     32  * 
     33  */
     34 public class ConsumerThreadHandler<K, V> {
     35 
     36     // KafkaConsumer实例
     37     private final KafkaConsumer<K, V> consumer;
     38     // ExecutorService实例
     39     private ExecutorService executors;
     40     // 位移信息offsets
     41     private final Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
     42 
     43     /**
     44      * 
     45      * @param brokerList
     46      *            kafka列表
     47      * @param groupId
     48      *            消费组groupId
     49      * @param topic
     50      *            主题topic
     51      */
     52     public ConsumerThreadHandler(String brokerList, String groupId, String topic) {
     53         Properties props = new Properties();
     54         // broker列表
     55         props.put("bootstrap.servers", brokerList);
     56         // 消费者组编号Id
     57         props.put("group.id", groupId);
     58         // 非自动提交位移信息
     59         props.put("enable.auto.commit", "false");
     60         // 从最早的位移处开始消费消息
     61         props.put("auto.offset.reset", "earliest");
     62         // key反序列化
     63         props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
     64         // value反序列化
     65         props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
     66         // 将配置信息装配到消费者实例里面
     67         consumer = new KafkaConsumer<>(props);
     68         // 消费者订阅消息,并实现重平衡rebalance
     69         // rebalance监听器,创建一个匿名内部类。使用rebalance监听器前提是使用消费者组(consumer group)。
     70         // 监听器最常见用法就是手动提交位移到第三方存储以及在rebalance前后执行一些必要的审计操作。
     71         consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
     72 
     73             /**
     74              * 在coordinator开启新一轮rebalance前onPartitionsRevoked方法会被调用。
     75              */
     76             @Override
     77             public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
     78                 // 提交位移
     79                 consumer.commitSync(offsets);
     80             }
     81 
     82             /**
     83              * rebalance完成后会调用onPartitionsAssigned方法。
     84              */
     85             @Override
     86             public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
     87                 // 清除位移信息
     88                 offsets.clear();
     89             }
     90         });
     91     }
     92 
     93     /**
     94      * 消费主方法
     95      * 
     96      * @param threadNumber
     97      *            线程池中的线程数
     98      */
     99     public void consume(int threadNumber) {
    100         executors = new ThreadPoolExecutor(
    101                 threadNumber, 
    102                 threadNumber, 
    103                 0L, 
    104                 TimeUnit.MILLISECONDS,
    105                 new ArrayBlockingQueue<Runnable>(1000), 
    106                 new ThreadPoolExecutor.CallerRunsPolicy());
    107         try {
    108             // 消费者一直处于等待状态,等待消息消费
    109             while (true) {
    110                 // 从主题中获取消息
    111                 ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1000L));
    112                 // 如果获取到的消息不为空
    113                 if (!records.isEmpty()) {
    114                     // 将消息信息、位移信息封装到ConsumerWorker中进行提交
    115                     executors.submit(new ConsumerWorker<>(records, offsets));
    116                 }
    117                 // 调用提交位移信息、尽量降低synchronized块对offsets锁定的时间
    118                 this.commitOffsets();
    119             }
    120         } catch (WakeupException e) {
    121             // 此处忽略此异常的处理.WakeupException异常是从poll方法中抛出来的异常
    122             //如果不忽略异常信息,此处会打印错误哦,亲
    123             //e.printStackTrace();
    124         } finally {
    125             // 调用提交位移信息、尽量降低synchronized块对offsets锁定的时间
    126             this.commitOffsets();
    127             // 关闭consumer
    128             consumer.close();
    129         }
    130     }
    131 
    132     /**
    133      * 尽量降低synchronized块对offsets锁定的时间
    134      */
    135     private void commitOffsets() {
    136         // 尽量降低synchronized块对offsets锁定的时间
    137         Map<TopicPartition, OffsetAndMetadata> unmodfiedMap;
    138         // 保证线程安全、同步锁,锁住offsets
    139         synchronized (offsets) {
    140             // 判断如果offsets位移信息为空,直接返回,节省同步锁对offsets的锁定的时间
    141             if (offsets.isEmpty()) {
    142                 return;
    143             }
    144             // 如果offsets位移信息不为空,将位移信息offsets放到集合中,方便同步
    145             unmodfiedMap = Collections.unmodifiableMap(new HashMap<>(offsets));
    146             // 清除位移信息offsets
    147             offsets.clear();
    148         }
    149         // 将封装好的位移信息unmodfiedMap集合进行同步提交
    150         // 手动提交位移信息
    151         consumer.commitSync(unmodfiedMap);
    152     }
    153 
    154     /**
    155      * 关闭消费者
    156      */
    157     public void close() {
    158         // 在另一个线程中调用consumer.wakeup();方法来触发consume的关闭。
    159         // KafkaConsumer不是线程安全的,但是另外一个例外,用户可以安全的在另一个线程中调用consume.wakeup()。
    160         // wakeup()方法是特例,其他KafkaConsumer方法都不能同时在多线程中使用
    161         consumer.wakeup();
    162         // 关闭ExecutorService实例
    163         executors.shutdown();
    164     }
    165 
    166 }
     1 package com.bie.kafka.kafkaWorker;
     2 
     3 import java.util.List;
     4 import java.util.Map;
     5 
     6 import org.apache.kafka.clients.consumer.ConsumerRecord;
     7 import org.apache.kafka.clients.consumer.ConsumerRecords;
     8 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
     9 import org.apache.kafka.common.TopicPartition;
    10 
    11 /**
    12  * 
    13  * @Description TODO
    14  * @author biehl
    15  * @Date 2019年6月1日 下午3:45:38
    16  * 
    17  * @param <K>
    18  * @param <V>
    19  * 
    20  *            1、本质上是一个Runnable,执行真正的消费逻辑并且上报位移信息给ConsumerThreadHandler。
    21  * 
    22  */
    23 public class ConsumerWorker<K, V> implements Runnable {
    24 
    25     // 获取到的消息
    26     private final ConsumerRecords<K, V> records;
    27     // 位移信息
    28     private final Map<TopicPartition, OffsetAndMetadata> offsets;
    29 
    30     /**
    31      * ConsumerWorker有参构造方法
    32      * 
    33      * @param records
    34      *            获取到的消息
    35      * @param offsets
    36      *            位移信息
    37      */
    38     public ConsumerWorker(ConsumerRecords<K, V> records, Map<TopicPartition, OffsetAndMetadata> offsets) {
    39         this.records = records;
    40         this.offsets = offsets;
    41     }
    42 
    43     /**
    44      * 
    45      */
    46     @Override
    47     public void run() {
    48         // 获取到分区的信息
    49         for (TopicPartition partition : records.partitions()) {
    50             // 获取到分区的消息记录
    51             List<ConsumerRecord<K, V>> partConsumerRecords = records.records(partition);
    52             // 遍历获取到的消息记录
    53             for (ConsumerRecord<K, V> record : partConsumerRecords) {
    54                 // 打印消息
    55                 System.out.println("topic: " + record.topic() + ",partition: " + record.partition() + ",offset: "
    56                         + record.offset() 
    57                         + ",消息记录: " + record.value());
    58             }
    59             // 上报位移信息。获取到最后的位移消息,由于位移消息从0开始,所以最后位移减一获取到位移位置
    60             long lastOffset = partConsumerRecords.get(partConsumerRecords.size() - 1).offset();
    61             // 同步锁,锁住offsets位移
    62             synchronized (offsets) {
    63                 // 如果offsets位移不包含partition这个key信息
    64                 if (!offsets.containsKey(partition)) {
    65                     // 就将位移信息设置到map集合里面
    66                     offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
    67                 } else {
    68                     // 否则,offsets位移包含partition这个key信息
    69                     // 获取到offsets的位置信息
    70                     long curr = offsets.get(partition).offset();
    71                     // 如果获取到的位置信息小于等于上一次位移信息大小
    72                     if (curr <= lastOffset + 1) {
    73                         // 将这个partition的位置信息设置到map集合中。并保存到broker中。
    74                         offsets.put(partition, new OffsetAndMetadata(lastOffset + 1));
    75                     }
    76                 }
    77             }
    78         }
    79     }
    80 
    81 }
     1 package com.bie.kafka.kafkaWorker;
     2 
     3 /**
     4  * 
     5  * @Description TODO
     6  * @author biehl
     7  * @Date 2019年6月1日 下午4:13:25
     8  *
     9  *       1、单独KafkaConsumer实例和多worker线程。
    10  *       2、将获取的消息和消息的处理解耦,将消息的处理放入单独的工作者线程中,即工作线程中,
    11  *       同时维护一个或者若各干consumer实例执行消息获取任务。
    12  *       3、本例使用全局的KafkaConsumer实例执行消息获取,然后把获取到的消息集合交给线程池中的worker线程执行工作,
    13  *       之后worker线程完成处理后上报位移状态,由全局consumer提交位移。
    14  * 
    15  * 
    16  */
    17 
    18 public class ConsumerMain {
    19 
    20     public static void main(String[] args) {
    21         // broker列表
    22         String brokerList = "slaver1:9092,slaver2:9092,slaver3:9092";
    23         // 主题信息topic
    24         String topic = "topic1";
    25         // 消费者组信息group
    26         String groupId = "group2";
    27         // 根据ConsumerThreadHandler构造方法构造出消费者
    28         final ConsumerThreadHandler<byte[], byte[]> handler = new ConsumerThreadHandler<>(brokerList, groupId, topic);
    29         final int cpuCount = Runtime.getRuntime().availableProcessors();
    30         System.out.println("cpuCount : " + cpuCount);
    31         // 创建线程的匿名内部类
    32         Runnable runnable = new Runnable() {
    33 
    34             @Override
    35             public void run() {
    36                 // 执行consume,在此线程中执行消费者消费消息。
    37                 handler.consume(cpuCount);
    38             }
    39         };
    40         // 直接调用runnable此线程,并运行
    41         new Thread(runnable).start();
    42 
    43         try {
    44             // 此线程休眠20000
    45             Thread.sleep(20000L);
    46         } catch (InterruptedException e) {
    47             e.printStackTrace();
    48         }
    49         System.out.println("Starting to close the consumer...");
    50         // 关闭消费者
    51         handler.close();
    52     }
    53 
    54 }

    待续......

  • 相关阅读:
    [LeetCode] Remove Duplicates from Sorted List
    [LeetCode] Partition List
    oracle字符串载取及判断是否包含指定字符串
    oracle 添加序号
    Oracle的decode、sign、trunc函数
    Oracle行列转换
    java计算今天是今年的第几天
    Oracle 增加 修改 删除 列
    java 获取本机ip
    float类型数保留一位小数
  • 原文地址:https://www.cnblogs.com/biehongli/p/10962164.html
Copyright © 2011-2022 走看看