package my.bigdata; /** * Created by lq on 2017/8/22. */ import java.io.File; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.util.Map; import java.util.Properties; import my.utils.PropertiesUtils; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; public class KafkaSink2 extends AbstractSink implements Configurable { private static String TOPIC = null; private Producer<String, String> producer; private static Properties properties = null; static { final String topicCfg ="topic.cfg"; final String myKafkaSinkCfg ="myKafkaSink.cfg"; TOPIC = (String) PropertiesUtils.getPropertiesFromClass(KafkaSink2.class,topicCfg).get("topic"); properties = PropertiesUtils.getPropertiesFromClass(KafkaSink2.class,myKafkaSinkCfg); } public Status process() throws EventDeliveryException { // TODO Auto-generated method stub Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); try { transaction.begin(); Event event = channel.take(); if (event == null) { transaction.rollback(); return Status.BACKOFF; } Map<String, String> headers = event.getHeaders(); String logtype = headers.get("logtype"); //随机 String random = System.currentTimeMillis() + "";//随机数,key,避免写热点问题 String kafkaKey = random + "_" + logtype; // public ProducerRecord(String topic, K key, V value) ProducerRecord<String, String> data = new ProducerRecord<String, String>( TOPIC, kafkaKey, new String(event.getBody())); producer.send(data); transaction.commit(); return Status.READY; } catch (Exception e) { transaction.rollback(); return Status.BACKOFF; } finally { transaction.close(); } } public void configure(Context arg0) { producer = new KafkaProducer<>(properties); } }
package my.bigdata; import org.apache.kafka.clients.producer.Partitioner; import org.apache.kafka.common.Cluster; import java.util.Map; /** * Created by lq on 2017/8/22. */ public class kafkaSinkPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { int parNums = cluster.partitionsForTopic(topic).size(); try { String randomInKey = ((String) key).split("_")[0]; return (int) Math.abs(Long.parseLong(randomInKey) % parNums); } catch (Exception e) { return Math.abs(key.hashCode() % parNums); } } @Override public void close() { } @Override public void configure(Map<String, ?> map) { } }