zoukankan      html  css  js  c++  java
  • 从flume到kafka,日志收集

      实时日志分析:

    本篇文章主要测试 从flume到kafka的日志收集,storm日志分析,学习中!

    flume 配置文件

    #collector
    collector.sources=cs
    collector.sinks=ck hbaseSink
    collector.channels=cc hbaseChannel
    
    collector.sources.cs.type = exec
    collector.sources.cs.command = tail -F /data/hudonglogs/self/channel.log
    collector.sources.cs.channels=cc hbaseChannel
    
    collector.channels.cc.type = memory
    collector.channels.cc.capacity = 1000
    collector.channels.cc.transactionCapacity = 100
    
    collector.channels.hbaseChannel.type = memory
    collector.channels.hbaseChannel.capacity = 1000
    collector.channels.hbaseChannel.transactionCapacity = 100
    
    #sink kafka
    collector.sinks.ck.type = org.apache.flume.sink.kafka.KafkaSink
    collector.sinks.ck.topic = logs
    collector.sinks.ck.brokerList = localhost:9092
    collector.sinks.ck.requiredAcks = 1
    collector.sinks.ck.batchSize = 20
    collector.sinks.ck.channel = cc
    
    #hbase sink
    collector.sinks.hbaseSink.type = asynchbase
    collector.sinks.hbaseSink.channel = hbaseChannel
    collector.sinks.hbaseSink.table = logs
    collector.sinks.hbaseSink.columnFamily = content
    collector.sinks.hbaseSink.batchSize = 5
    
    

     注意: flume中每一个source可以有多个channel,但是一个sink只能对应一个channel。

    kafka consumer

    public class KafkaConsumer extends Thread {
    
        ConsumerConnector connector;
    
        private String topic;
    
        public KafkaConsumer(String topic) {
            this.topic = topic;
            this.connector = Consumer.createJavaConsumerConnector(createConsumerConfig());
        }
    
        private ConsumerConfig createConsumerConfig() {
            Properties props = new Properties();
            props.put("zookeeper.connect", KafkaConfig.zkConnect);
            props.put("group.id", KafkaConfig.groupId);
            props.put("zookeeper.session.timeout.ms", "40000");
            props.put("zookeeper.sync.time.ms", "200");
            props.put("auto.commit.interval.ms", "1000");
            return new ConsumerConfig(props);
        }
    
        @Override
        public void run() {
            Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
            topicCountMap.put(topic, new Integer(1));
            Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = connector.createMessageStreams(topicCountMap);
            KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
            ConsumerIterator<byte[], byte[]> it = stream.iterator();
            while (it.hasNext()) {
                System.out.println("receive:" + new String(it.next().message()));
                try {
                    sleep(3000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    启动 kafka集群, 然后启动producer,启动flume

    hbase查看:

    以上所有环境都是单节点部署!

    用放荡不羁的心态过随遇而安的生活
  • 相关阅读:
    os.path.split()、os.path.realpath()和os.path.join()
    我终于也有了自己的博客网站
    (Bug修复)C#爬虫,让你不再觉得神秘
    DevExpress弹框、右键菜单、Grid的使用
    Linux 宝塔部署 ASP.NET Core 应用
    C#高级特性(反射)
    WPF 的内部世界(Binding)
    WPF 的内部世界(控件与布局)
    Layui事件监听(表单和数据表格)
    (待更新)tensorboard [Fatal error in launcher: Unable to create process using]
  • 原文地址:https://www.cnblogs.com/re-myself/p/5226935.html
Copyright © 2011-2022 走看看