zoukankan      html  css  js  c++  java
  • Flume笔记--batchSize 和 transactionCapacity分析及应用

    理解FlumeNG的batchSize和transactionCapacity参数和传输事务的原理:http://blog.itpub.net/29754888/viewspace-1220545/
    Flume中transactionCapacity和batchSize概念的具体分析和解惑:http://www.mamicode.com/info-detail-513301.html

    自定义sink实现和属性注入:http://www.coderli.com/flume-ng-sink-properties/
    自定义拦截器:http://blog.csdn.net/xiao_jun_0820/article/details/38333171
    自定义kafkasink:www.itnose.net/detail/6187977.html

     

    batchSize是针对Source和Sink提出的一个概念,它用来限制source和sink对event批量处理的。
    即一次性你可以处理batchSize个event,这个一次性就是指在一个事务中。当event数量超过batchSize事务就会提交
    这个参数值越大,每个事务提交的范围就越大,taskList的清空等操作次数会减少,因此性能肯定会提升,但是可能在出错时,回滚的返回也会变大。

    transactionCapacity参数官方解释:channel在一次事务中传递事件的最大数量。 其实就是putList和takeList的容量大小。在flume1.5版本中SpillableMemoryChannel的putList和takeList的长度为largestTakeTxSize和largestPutTxSize参数,该参数值为5000

    capacity参数官方解释:The maximum number of events stored in the channel,channel存储事件的最大数量

     


    例:

    Constants.java

    public class Constants {
        static final String BATCH_SIZE = "batchSize"; 
        static final int DEFAULT_BATCH_SIZE = 50; //默认batch大小为50
        static final String TOPIC = "topic";
        static final String DEFAULT_TOPIC = "kafka_flume_topic"; //默认topic
    
    }


    KafkaUtil.java
    负责创建Properties

    import java.util.Properties;
    import org.apache.flume.Context;
    
    public class KafkaUtil {
        public static Properties getKafkaConfig(Context context){        
            //其实没有用到context参数
            Properties props = new Properties();
            props.setProperty("metadata.broker.list","192.168.1.160:9092,192.168.1.164:9093");
            props.setProperty("serializer.class", "kafka.serializer.StringEncoder"); 
            props.put("request.required.acks", "1");
            
            return props;
        }
    }


    KafkaSinkBatch.java
    自定义kafkaSink

    package com.evor.kafkawithbatch;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Properties;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.Transaction;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    //KafkaSinkBatch2的简化版
    public class KafkaSinkBatch extends AbstractSink implements Configurable {
        private static final Logger log = LoggerFactory.getLogger(KafkaSinkBatch.class);
        private Properties kafkaProps;
        private Producer<String,String> producer;
        private int batchSize;// 一次事务的event数量,整体提交
        private List<KeyedMessage<String, String>> messageList;
    
        @Override
        public Status process() throws EventDeliveryException {
            Status result = Status.READY;
            Channel channel = getChannel();
            Transaction transaction = null;
            Event event = null;
            String eventTopic = "kafka_flume_topic";
            try {
                long processedEvent = 0;
                transaction = channel.getTransaction();
                transaction.begin();// 事务开始
                //messageList.clear();
                for (; processedEvent < batchSize; processedEvent++) {
                    event = channel.take();// 从channel取出一个事件
                    if (event == null) {
                        break;
                    }
                    // Event对象有头和体之分
                    byte[] eventBody = event.getBody();                
                    KeyedMessage<String, String> data = new KeyedMessage<String, String>(eventTopic,new String(eventBody)+" batch大小:"+batchSize);
                    messageList.add(data);
                }
                if (processedEvent > 0) {
                    producer.send(messageList);
                }
                transaction.commit();// batchSize个事件处理完成,一次事务提交
    
            } catch (Exception e) {
                String errorMsg = "Failed to publish events !";
                log.error(errorMsg, e);
                result = Status.BACKOFF;
                transaction.rollback();
                log.debug("transaction rollback success !");
            } finally {
                transaction.close();
            }
            return result;
        }
    
        @Override
        public synchronized void start() {
            ProducerConfig config = new ProducerConfig(kafkaProps);
            producer = new Producer<String, String>(config);
            super.start();
        }
    
        @Override
        public synchronized void stop() {
            producer.close();
            super.stop();
        }
    
        @Override
        public void configure(Context context) {
            //获取flume配置文件中的各种配置,如果没有则使用默认值    
            batchSize = context.getInteger(Constants.BATCH_SIZE,    Constants.DEFAULT_BATCH_SIZE);        
            messageList = new ArrayList<KeyedMessage<String, String>>(batchSize);
            kafkaProps = KafkaUtil.getKafkaConfig(context);//通过函数来创建kafkaProps        
        }
    
    }


    附:KafkaSinkBatch2.java
    (KafkaSinkBatch.java的完全体,丰富了日志和处理)

    package com.evor.kafkawithbatch;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.Properties;
    
    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.Transaction;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    //KafkaSInkBatch.java的日志完整版
    public class KafkaSinkBatch2 extends AbstractSink implements Configurable {
        private static final Logger log = LoggerFactory.getLogger(KafkaSinkBatch2.class);
    
        public static final String KEY_HDR = "key";
        public static final String TOPIC_HDR = "topic";
        private static final String CHARSET = "UTF-8";
        private Properties kafkaProps;
        private Producer<String, String> producer;
        private String topic;
        private int batchSize;// 一次事务的event数量,整体提交
        private List<KeyedMessage<String, String>> messageList;
    
        @Override
        public Status process() throws EventDeliveryException {
            Status result = Status.READY;
            Channel channel = getChannel();
            Transaction transaction = null;
            Event event = null;
            String eventTopic = null;
            String eventKey = null;
            try {
                long processedEvent = 0;
                transaction = channel.getTransaction();
                transaction.begin();// 事务开始
                messageList.clear();
                for (; processedEvent < batchSize; processedEvent++) {
                    event = channel.take();// 从channel取出一个事件
                    if (event == null) {
                        break;
                    }
                    // Event对象有头和体之分
                    Map<String, String> headers = event.getHeaders();
                    byte[] eventBody = event.getBody();
                    if ((eventTopic = headers.get(TOPIC_HDR)) == null) {// 判断event头部中的topic是否为null
                        eventTopic = topic;
                    }
                    eventKey = headers.get(KEY_HDR);
    
                    if (log.isDebugEnabled()) {
                        log.debug("{Event}" + eventTopic + ":" + eventKey + ":"    + new String(eventBody, CHARSET));
                        log.debug("event #{}", processedEvent);
                    }
    
                    KeyedMessage<String, String> data = new KeyedMessage<String, String>(eventTopic, eventKey, new String(eventBody));
                    messageList.add(data);
                    //producer.send(data);
    
                }
                if (processedEvent > 0) {
                    producer.send(messageList);
                }
                transaction.commit();// batchSize个事件处理完成,一次事务提交
    
            } catch (Exception e) {
                String errorMsg = "Failed to publish events !";
                log.error(errorMsg, e);
                result = Status.BACKOFF;
                transaction.rollback();
                log.debug("transaction rollback success !");
            } finally {
                transaction.close();
            }
            return result;
        }
    
        @Override
        public synchronized void start() {
            ProducerConfig config = new ProducerConfig(kafkaProps);
            producer = new Producer<String, String>(config);
            super.start();
        }
    
        @Override
        public synchronized void stop() {
            producer.close();
            super.stop();
        }
    
        @Override
        public void configure(Context context) {
            //获取flume配置文件中的各种配置,如果没有则使用默认值
            batchSize = context.getInteger(Constants.BATCH_SIZE,    Constants.DEFAULT_BATCH_SIZE);        
            messageList = new ArrayList<KeyedMessage<String, String>>(batchSize);
            
            log.debug("batch size:", batchSize);
            
            topic = context.getString(Constants.TOPIC, Constants.DEFAULT_TOPIC);
            if (topic.equals(Constants.DEFAULT_TOPIC)) {
                log.warn("Default topic name [" + Constants.DEFAULT_TOPIC + "]");
            } else {
                log.info("configured topic:[" + topic    + "], may be over-ridden by event headers");
            }
            
            kafkaProps = KafkaUtil.getKafkaConfig(context);
            if (log.isDebugEnabled()) {
                log.debug("Kafka producer properties : " + kafkaProps);
            }
        }
    
    }
    View Code
  • 相关阅读:
    【插件】博客园markdown编辑器自定义代码黑色背景高亮显示
    【MatrixSynapseChat】安装教程(一)基于centOS7.* 安装Synapse
    【Python3】python安装 matrix-synapse[all] 报错’Error: pg_config executable not found‘
    【python3】在CentOS7上升级SQLite,并让Python3使用新版SQLite
    【linux】CentOS7 升级sqlite3
    【Python3】Centos7 安装Python3.7
    面试官:兄弟,说说Java的static关键字吧
    六一儿童节,程序员写给女儿的一封信
    被缠上了,小王问我怎么在 Spring Boot 中使用 JDBC 连接 MySQL
    女生适合学编程吗?
  • 原文地址:https://www.cnblogs.com/gnivor/p/4988935.html
Copyright © 2011-2022 走看看