zoukankan      html  css  js  c++  java
  • 自定义kafka Sink

    package my.bigdata;
    
    /**
     * Created by lq on 2017/8/22.
     */
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.util.Map;
    import java.util.Properties;
    
    
    import my.utils.PropertiesUtils;
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.Transaction;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    public class KafkaSink2 extends AbstractSink implements Configurable {
        private static String TOPIC = null;
        private Producer<String, String> producer;
        private static Properties properties = null;
    
        static {
            final String topicCfg ="topic.cfg";
            final String myKafkaSinkCfg ="myKafkaSink.cfg";
            TOPIC = (String) PropertiesUtils.getPropertiesFromClass(KafkaSink2.class,topicCfg).get("topic");
            properties = PropertiesUtils.getPropertiesFromClass(KafkaSink2.class,myKafkaSinkCfg);
        }
    
        public Status process() throws EventDeliveryException {
            // TODO Auto-generated method stub
            Channel channel = getChannel();
            Transaction transaction = channel.getTransaction();
    
            try {
                transaction.begin();
                Event event = channel.take();
                if (event == null) {
                    transaction.rollback();
                    return Status.BACKOFF;
                }
    
                Map<String, String> headers = event.getHeaders();
                String logtype = headers.get("logtype");
                //随机
                String random = System.currentTimeMillis() + "";//随机数,key,避免写热点问题
                String kafkaKey = random + "_" + logtype;
                // public ProducerRecord(String topic, K key, V value)
                ProducerRecord<String, String> data = new ProducerRecord<String, String>(
                        TOPIC, kafkaKey, new String(event.getBody()));
                producer.send(data);
                transaction.commit();
                return Status.READY;
            } catch (Exception e) {
                transaction.rollback();
                return Status.BACKOFF;
    
            } finally {
                transaction.close();
            }
        }
    
        public void configure(Context arg0) {
            producer = new KafkaProducer<>(properties);
        }
    }
    package my.bigdata;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    
    import java.util.Map;
    
    /**
     * Created by lq on 2017/8/22.
     */
    public class kafkaSinkPartitioner implements Partitioner {
        @Override
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            int parNums = cluster.partitionsForTopic(topic).size();
            try {
                String randomInKey = ((String) key).split("_")[0];
                return (int) Math.abs(Long.parseLong(randomInKey) % parNums);
            } catch (Exception e) {
                return Math.abs(key.hashCode() % parNums);
            }
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> map) {
    
        }
    
    }
  • 相关阅读:
    常见试题
    Makefile学习笔记
    安装java
    IntelliJ IDEA for mac 快捷键
    JPA--Caused by: javax.persistence.PersistenceException: [PersistenceUnit: mysqlJPA] Unable to configure EntityManagerFactory
    log4j 配置
    Numbers
    mac 命令
    Intellij IDEA for mac 快捷键
    vim命令
  • 原文地址:https://www.cnblogs.com/rocky-AGE-24/p/7497138.html
Copyright © 2011-2022 走看看