zoukankan      html  css  js  c++  java
  • flume 读取kafka 数据

    本文介绍flume读取kafka数据的方法

    代码:


    /*******************************************************************************
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *  
     * http://www.apache.org/licenses/LICENSE-2.0
     *  
     * Unless required by applicable law or agreed to in writing,
     * software distributed under the License is distributed on an
     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     * KIND, either express or implied.  See the License for the
     * specific language governing permissions and limitations
     * under the License.
     *******************************************************************************/
    package org.apache.flume.source.kafka;

    import java.io.IOException;
    import java.nio.ByteBuffer;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    import kafka.consumer.ConsumerIterator;
    import kafka.consumer.KafkaStream;
    import kafka.javaapi.consumer.ConsumerConnector;
    import kafka.message.Message;

    import kafka.message.MessageAndMetadata;
    import org.apache.flume.*;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.conf.ConfigurationException;
    import org.apache.flume.event.SimpleEvent;
    import org.apache.flume.source.AbstractSource;
    import org.apache.flume.source.SyslogParser;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;


    /**
     * A Source for Kafka which reads messages from kafka. I use this in company production environment
     * and its performance is good. Over 100k messages per second can be read from kafka in one source.<p>
     * <tt>zookeeper.connect: </tt> the zookeeper ip kafka use.<p>
     * <tt>topic: </tt> the topic to read from kafka.<p>
     * <tt>group.id: </tt> the groupid of consumer group.<p>
     */
    public class KafkaSource extends AbstractSource implements Configurable, PollableSource {
        private static final Logger log = LoggerFactory.getLogger(KafkaSource.class);
        private ConsumerConnector consumer;
        private ConsumerIterator<byte[], byte[]> it;
        private String topic;
        
        public Status process() throws EventDeliveryException {
            List<Event> eventList = new ArrayList<Event>();
            MessageAndMetadata<byte[],byte[]> message;
            Event event;
            Map<String, String> headers;
            String strMessage;
            try {
                if(it.hasNext()) {
                    message = it.next();
                    event = new SimpleEvent();
                    headers = new HashMap<String, String>();
                    headers.put("timestamp", String.valueOf(System.currentTimeMillis()));

                    strMessage =  String.valueOf(System.currentTimeMillis()) + "|" + new String(message.message());
                    log.debug("Message: {}", strMessage);

                    event.setBody(strMessage.getBytes());
                    //event.setBody(message.message());
                    event.setHeaders(headers);
                    eventList.add(event);
                }
                getChannelProcessor().processEventBatch(eventList);
                return Status.READY;
            } catch (Exception e) {
                log.error("KafkaSource EXCEPTION, {}", e.getMessage());
                return Status.BACKOFF;
            }
        }

        public void configure(Context context) {
            topic = context.getString("topic");
            if(topic == null) {
                throw new ConfigurationException("Kafka topic must be specified.");
            }
            try {
                this.consumer = KafkaSourceUtil.getConsumer(context);
            } catch (IOException e) {
                log.error("IOException occur, {}", e.getMessage());
            } catch (InterruptedException e) {
                log.error("InterruptedException occur, {}", e.getMessage());
            }
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, new Integer(1));
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
            if(consumerMap == null) {
                throw new ConfigurationException("topicCountMap is null");
            }
            List<KafkaStream<byte[], byte[]>> topicList = consumerMap.get(topic);
            if(topicList == null || topicList.isEmpty()) {
                throw new ConfigurationException("topicList is null or empty");
            }
            KafkaStream<byte[], byte[]> stream =  topicList.get(0);
            it = stream.iterator();
        }

        @Override
        public synchronized void stop() {
            consumer.shutdown();
            super.stop();
        }

    }

    /*******************************************************************************
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *  
     * http://www.apache.org/licenses/LICENSE-2.0
     *  
     * Unless required by applicable law or agreed to in writing,
     * software distributed under the License is distributed on an
     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     * KIND, either express or implied.  See the License for the
     * specific language governing permissions and limitations
     * under the License.
     *******************************************************************************/
    package org.apache.flume.source.kafka;


    import java.io.IOException;
    import java.util.Map;
    import java.util.Properties;

    import com.google.common.collect.ImmutableMap;
    import kafka.consumer.Consumer;
    import kafka.consumer.ConsumerConfig;
    import kafka.javaapi.consumer.ConsumerConnector;

    import org.apache.flume.Context;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;


    public class KafkaSourceUtil {
        private static final Logger log = LoggerFactory.getLogger(KafkaSourceUtil.class);

        public static Properties getKafkaConfigProperties(Context context) {
            log.info("context={}",context.toString());
            Properties props = new Properties();
            ImmutableMap<String, String> contextMap = context.getParameters();
            for (Map.Entry<String,String> entry : contextMap.entrySet()) {
                String key = entry.getKey();
                if (!key.equals("type") && !key.equals("channel")) {
                    props.setProperty(entry.getKey(), entry.getValue());
                    log.info("key={},value={}", entry.getKey(), entry.getValue());
                }
            }
            return props;
        }
        public static ConsumerConnector getConsumer(Context context) throws IOException, InterruptedException {
            ConsumerConfig consumerConfig = new ConsumerConfig(getKafkaConfigProperties(context));
            ConsumerConnector consumer = Consumer.createJavaConsumerConnector(consumerConfig);
            return consumer;
        }
    }



    配置文件:( /etc/flume/conf/flume-kafka-file.properties)

    agent_log.sources = kafka0
    agent_log.channels = ch0
    agent_log.sinks = sink0

    agent_log.sources.kafka0.channels = ch0
    agent_log.sinks.sink0.channel = ch0



    agent_log.sources.kafka0.type = org.apache.flume.source.kafka.KafkaSource
    agent_log.sources.kafka0.zookeeper.connect = node3:2181,node4:2181,node5:2181
    agent_log.sources.kafka0.topic = kkt-test-topic
    agent_log.sources.kafka0.group.id= test

    agent_log.channels.ch0.type = memory
    agent_log.channels.ch0.capacity = 2048
    agent_log.channels.ch0.transactionCapacity = 1000


    agent_log.sinks.sink0.type=file_roll
    agent_log.sinks.sink0.sink.directory=/data/flumeng/data/test
    agent_log.sinks.sink0.sink.rollInterval=300

    启动脚本:

    sudo su  -l -s /bin/bash  flume  -c '/usr/lib/flume/bin/flume-ng agent --conf /etc/flume/conf --conf-file /etc/flume/conf/flume-kafka-file.properties -name agent_log -Dflume.root.logger=INFO,console '


    注意: 红色字体的功能是对原来数据增加时间戳

                版本号 flume-1.4.0.2.1.1.0 + kafka2.8.0-0.8.0

                參考资料:https://github.com/baniuyao/flume-kafka

                 编译用到的库:

                flume-ng-configuration-1.4.0.2.1.1.0-385

                flume-ng-core-1.4.0.2.1.1.0-385

                flume-ng-sdk-1.4.0.2.1.1.0-385

                flume-tools-1.4.0.2.1.1.0-385

                guava-11.0.2

                kafka_2.8.0-0.8.0

                log4j-1.2.15

                scala-compiler

                scala-library

                slf4j-api-1.6.1

                slf4j-log4j12-1.6.1

                zkclient-0.3

                zookeeper-3.3.4

                    





  • 相关阅读:
    数据库里面的表空间,用户在实际开发中的使用
    并行编程的模型机制
    临时表在开发中的使用

    HIbernate编程模型
    网络数据如何可靠传输
    Spring Security编程模型
    java的缓存框架
    sort函数使用自定义数据排序使用
    FailOver的机制
  • 原文地址:https://www.cnblogs.com/zhchoutai/p/8450023.html
Copyright © 2011-2022 走看看