zoukankan      html  css  js  c++  java
  • KAFKA基础(十五):Flume对接Kafka

    1 简单实现

    1)配置flume

    # define
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F  /opt/module/data/flume.log
    
    # sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.sinks.k1.kafka.topic = first
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    
    # channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # bind
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    2 启动kafka消费者

    3 进入flume根目录下,启动flume

    $ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

    4  /opt/module/data/flume.log里追加数据,查看kafka消费者消费情况

    $ echo hello >> /opt/module/data/flume.log

    2 数据分离

    0)需求:  flume采集的数据按照不同的类型输入到不同的topic

              将日志数据中带有atguigu的,输入到Kafkafirst主题中,

              将日志数据中带有shangguigu,输入到Kafkasecond主题中,

      其他的数据输入到Kafkathird主题中

    1) 编写FlumeInterceptor

    package com.atguigu.kafka.flumeInterceptor;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import javax.swing.text.html.HTMLEditorKit;
    import java.util.List;
    import java.util.Map;
    
    public class FlumeKafkaInterceptor implements Interceptor {
        @Override
        public void initialize() {
    
        }
    
        /**
         * 如果包含"atguigu"的数据,发送到first主题
         * 如果包含"sgg"的数据,发送到second主题
         * 其他的数据发送到third主题
         * @param event
         * @return
         */
        @Override
        public Event intercept(Event event) {
            //1.获取event的header
            Map<String, String> headers = event.getHeaders();
            //2.获取event的body
            String body = new String(event.getBody());
            if(body.contains("atguigu")){
                headers.put("topic","first");
            }else if(body.contains("sgg")){
                headers.put("topic","second");
            }
            return event;
    
        }
    
        @Override
        public List<Event> intercept(List<Event> events) {
            for (Event event : events) {
              intercept(event);
            }
            return events;
        }
    
        @Override
        public void close() {
    
        }
    
        public static class MyBuilder implements  Builder{
    
            @Override
            public Interceptor build() {
                return  new FlumeKafkaInterceptor();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
    }

    2)将写好的interceptor打包上传到Flume安装目录的lib目录下

    3)配置flume

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 6666
    
    
    # Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = third
    a1.sinks.k1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    
    #Interceptor
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.atguigu.kafka.flumeInterceptor.FlumeKafkaInterceptor$MyBuilder
    
    # # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    4 启动kafka消费者

    5 进入flume根目录下,启动flume

    $ bin/flume-ng agent -c conf/ -n a1 -f jobs/flume-kafka.conf

    6 6666端口写数据,查看kafka消费者消费情况

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/15224730.html

  • 相关阅读:
    npm install 报错问题解决
    搭建VUE脚手架流程
    gitLad创建新仓库、上传项目、拉取项目方法
    vscode打开终端失败,重新安装其它版本也没用,想知道这是为什么
    Vue组件 父组件使用子组件中的值(子组件给父组件传值的方法)
    Element-ui 上传el-upload组件的使用方法
    Vue 计算属性
    JQ+Layui弹窗 实现支付功能
    Vue+ElementUI 验证手机号发送验证码倒计时功能,先清除定时器然后创建定时器
    生命陪伴生命,生命唤醒生命
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/15224730.html
Copyright © 2011-2022 走看看