zoukankan      html  css  js  c++  java
  • kafka的生产者类

    kafka的生产者类

      1 import com.*.message.Configuration;
      2 import org.apache.kafka.clients.producer.Callback;
      3 import org.apache.kafka.clients.producer.ProducerRecord;
      4 import org.apache.kafka.clients.producer.RecordMetadata;
      5 import org.apache.kafka.common.serialization.ByteArraySerializer;
      6 import org.slf4j.Logger;
      7 import org.slf4j.LoggerFactory;
      8 
      9 import java.io.IOException;
     10 import java.util.List;
     11 import java.util.concurrent.Future;
     12 import java.util.concurrent.atomic.AtomicInteger;
     13 
     14 /**
     15  * kafka消息生产者
     16  * 
     17  * @author 
     18  * @version V1.0
     19  * @param <T>
     20  * @modify by user: {修改人} 2015-4-14
     21  * @modify by reason:{方法名}:{原因}
     22  */
     23 public abstract class KafkaProducer<T> implements IProducer<T>{
     24 
     25     private final Logger LOGGER = LoggerFactory.getLogger(this.getClass());
     26     public AtomicInteger sendNum = new AtomicInteger();
     27 
     28     private String topic;
     29     private Integer partition = null;
     30     private org.apache.kafka.clients.producer.KafkaProducer<byte[], byte[]> producer;
     31 
     32     /**
     33      * 创建一个新的实例KafkaProducer.
     34      * 
     35      * @param configuration
     36      *            配置
     37      * @param topic
     38      *            消息主题
     39      */
     40     public KafkaProducer(Configuration configuration, String topic) {
     41         this.topic = topic;
     42         producer = new org.apache.kafka.clients.producer.KafkaProducer<byte[], byte[]>(configuration.getConfig(),
     43                 new ByteArraySerializer(), new ByteArraySerializer());
     44     }
     45 
     46     /**
     47      * 创建一个新的实例KafkaProducer.
     48      * 
     49      * @param configuration
     50      *            配置
     51      * @param topic
     52      *            消息主题
     53      * @param partition
     54      *            分区数
     55      */
     56     public KafkaProducer(Configuration configuration, String topic, Integer partition) {
     57         this(configuration, topic);
     58         this.partition = partition;
     59     }
     60 
     61     /**
     62      * 消息序列化方法
     63      * 
     64      * @author 
     65      * @param msgInfo
     66      * @return
     67      */
     68     protected abstract byte[] serialize(T msgInfo);
     69 
     70     /*
     71      * (non-Javadoc)
     72      * 
     73      * @see com.hikvision.bsp.message.producer.IProducer#send(T)
     74      */
     75     @Override
     76     public boolean send(final T msg) {
     77         return send(msg, new Callback() {
     78             @Override
     79             public void onCompletion(RecordMetadata metadata, Exception exception) {
     80                 if (exception != null) {
     81                     LOGGER.error("send message ,error info:{}", exception.toString());
     82                 }
     83             }
     84         });
     85     }
     86 
     87     /*
     88      * (non-Javadoc)
     89      * 
     90      * @see com.hikvision.bsp.message.producer.IProducer#send(T,
     91      * org.apache.kafka.clients.producer.Callback)
     92      */
     93     @Override
     94     public boolean send(final T msg, final Callback callback) {
     95         if (msg == null) {
     96             LOGGER.error("send msg is null. ");
     97             return false;
     98         }
     99 
    100         // serialize list to protostuff, and send msg
    101         byte[] data = serialize(msg);
    102         try {
    103             producer.send(new ProducerRecord<byte[], byte[]>(topic, partition, null, data), callback);
    104         } catch (Throwable e) {
    105             LOGGER.error("send msg faild.", e);
    106             return false;
    107         }
    108         return true;
    109     }
    110 
    111     @Override
    112     public boolean synSend(final T msg) {
    113         if (msg == null) {
    114             LOGGER.error("send msg is null. ");
    115             return false;
    116         }
    117 
    118         // serialize list to protostuff, and send msg
    119         byte[] data = serialize(msg);
    120         Future<?> future = null;
    121         try {
    122             future = producer.send(new ProducerRecord<byte[], byte[]>(topic, partition, null, data));
    123         } catch (Throwable e) {
    124             LOGGER.error("send msg faild.", e);
    125             return false;
    126         } finally {
    127             try {
    128                 future.get();
    129             } catch (Exception e) {
    130                 LOGGER.error("send msg faild.", e);
    131                 return false;
    132             }
    133         }
    134         return true;
    135     }
    136     
    137     /*
    138      * (non-Javadoc)
    139      * 
    140      * @see com.hikvision.bsp.message.producer.IProducer#send(java.util.List)
    141      */
    142     @Override
    143     public boolean send(List<T> msgList) {
    144         // check arguments validation
    145         if (msgList.size() == 0) {
    146             LOGGER.error("send msg is null. ");
    147             return false;
    148         }
    149         long curTime = System.currentTimeMillis();
    150 
    151         for (final T msg : msgList) {
    152             send(msg);
    153         }
    154         LOGGER.info("Send {} messages to kafka take: {}ms", msgList.size(), (System.currentTimeMillis() - curTime));
    155         return true;
    156     }
    157 
    158     @Override
    159     public void close() throws IOException {
    160         if (producer != null) {
    161             producer.close();
    162         }
    163     }
    164 }
  • 相关阅读:
    Android三种菜单的使用方式
    Express无法解析POST请求的JSON参数
    reids数据备份与恢复
    docker获取数据库时间相差8小时
    centos添加新用户
    创建一个新的容器并运行一个命令
    docker启动容器时报错unknown shorthand flag: ‘n‘ in -name
    linux查看cpu详细信息
    ValueError: Shapes (None, 1) and (None, 2) are incompatible
    Python:IOError: image file is truncated 的解决办法
  • 原文地址:https://www.cnblogs.com/jinniezheng/p/6383844.html
Copyright © 2011-2022 走看看