  • 自定义kafka Sink

    package my.bigdata;
     * Created by lq on 2017/8/22.
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.util.Map;
    import java.util.Properties;
    import my.utils.PropertiesUtils;
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.Transaction;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    public class KafkaSink2 extends AbstractSink implements Configurable {
        private static String TOPIC = null;
        private Producer<String, String> producer;
        private static Properties properties = null;
        static {
            final String topicCfg ="topic.cfg";
            final String myKafkaSinkCfg ="myKafkaSink.cfg";
            TOPIC = (String) PropertiesUtils.getPropertiesFromClass(KafkaSink2.class,topicCfg).get("topic");
            properties = PropertiesUtils.getPropertiesFromClass(KafkaSink2.class,myKafkaSinkCfg);
        public Status process() throws EventDeliveryException {
            // TODO Auto-generated method stub
            Channel channel = getChannel();
            Transaction transaction = channel.getTransaction();
            try {
                Event event = channel.take();
                if (event == null) {
                    return Status.BACKOFF;
                Map<String, String> headers = event.getHeaders();
                String logtype = headers.get("logtype");
                String random = System.currentTimeMillis() + "";//随机数,key,避免写热点问题
                String kafkaKey = random + "_" + logtype;
                // public ProducerRecord(String topic, K key, V value)
                ProducerRecord<String, String> data = new ProducerRecord<String, String>(
                        TOPIC, kafkaKey, new String(event.getBody()));
                return Status.READY;
            } catch (Exception e) {
                return Status.BACKOFF;
            } finally {
        public void configure(Context arg0) {
            producer = new KafkaProducer<>(properties);
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    public class kafkaSinkPartitioner implements Partitioner {
        public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
            int parNums = cluster.partitionsForTopic(topic).size();
            try {
                String randomInKey = ((String) key).split("_")[0];
                return (int) Math.abs(Long.parseLong(randomInKey) % parNums);
            } catch (Exception e) {
                return Math.abs(key.hashCode() % parNums);
        public void close() {
        public void configure(Map<String, ?> map) {
