zoukankan      html  css  js  c++  java
  • flume interceptors flume拦截器

    flume用户自定义拦截器.创建flume-demo的maven项目.

    创建项目文件POM.xml.

    <dependency>
        <groupId>org.apache.flume</groupId>
        <artifactId>flume-ng-core</artifactId>
        <version>1.7.0</version>
    </dependency>
    package com.kpwong.flume.interceptor;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.interceptor.Interceptor;
    
    import java.util.List;
    import java.util.Map;
    
    public class CustomInterceptor implements Interceptor {
    
        @Override
        public void initialize() {
    
        }
    
        //单个事件拦截
        @Override
        public Event intercept(Event event) {
    
            Map<String, String> headers = event.getHeaders();
            String body = new String( event.getBody());
    
            if (body.contains("hello")){
                headers.put("topic","letter");
            }
            else
            {
                headers.put("topic","number");
            }
    
            return event;
        }
    
        //多个事件拦截
        @Override
        public List<Event> intercept(List<Event> list) {
            for (Event event : list) {
                intercept(event);
            }
            return list;
        }
    
        @Override
        public void close() {
    
        }
    
        public static class Builder implements Interceptor.Builder{
    
            @Override
            public Interceptor build() {
                return new CustomInterceptor();
            }
    
            @Override
            public void configure(Context context) {
    
            }
        }
    }

    打包项目jar文件。拷贝文件到/flume/lib目录下

     配置conf文件.准备三台机器(hadoop202,hadoop203,hadoop204)

    在hadoop202上。配置flume2.conf

    # Name the components on this agent
    a2.sources = r1
    a2.sinks = k1 k2
    a2.channels = c1 c2
    
    # Describe/configure the source
    a2.sources.r1.type = netcat
    a2.sources.r1.bind = localhost
    a2.sources.r1.port = 44444
    
    #channel interceptors
    a2.sources.r1.interceptors = i1
    a2.sources.r1.interceptors.i1.type =com.kpwong.flume.interceptor.CustomInterceptor$Builder
    a2.sources.r1.selector.type = multiplexing
    a2.sources.r1.selector.header = topic
    a2.sources.r1.selector.mapping.letter = c1
    a2.sources.r1.selector.mapping.number = c2
    
    # Describe the sink
    a2.sinks.k1.type = avro
    a2.sinks.k1.hostname = hadoop203
    a2.sinks.k1.port = 4141
    
    a2.sinks.k2.type=avro
    a2.sinks.k2.hostname = hadoop204
    a2.sinks.k2.port = 4142
    
    # Use a channel which buffers events in memory
    a2.channels.c1.type = memory
    a2.channels.c1.capacity = 1000
    a2.channels.c1.transactionCapacity = 100
    
    # Use a channel which buffers events in memory
    a2.channels.c2.type = memory
    a2.channels.c2.capacity = 1000
    a2.channels.c2.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a2.sources.r1.channels = c1 c2
    a2.sinks.k1.channel = c1
    a2.sinks.k2.channel = c2

     拦截器配置代码:

    a2.sources.r1.interceptors = i1
    a2.sources.r1.interceptors.i1.type =com.kpwong.flume.interceptor.CustomInterceptor$Builder
    a2.sources.r1.selector.type = multiplexing
    a2.sources.r1.selector.header = topic
    a2.sources.r1.selector.mapping.letter = c1
    a2.sources.r1.selector.mapping.number = c2
    hadoop203上配置flume3.conf
    a3.sources = r1
    a3.sinks = k1
    a3.channels = c1
    a3.sources.r1.type = avro
    a3.sources.r1.bind = hadoop203
    a3.sources.r1.port = 4141
    a3.sinks.k1.type = logger
    a3.channels.c1.type = memory
    a3.channels.c1.capacity = 1000
    a3.channels.c1.transactionCapacity = 100
    a3.sinks.k1.channel = c1
    a3.sources.r1.channels = c1
    hadoop204上配置:
    a4.sources = r1
    a4.sinks = k1
    a4.channels = c1
    a4.sources.r1.type = avro
    a4.sources.r1.bind = hadoop204
    a4.sources.r1.port = 4142
    a4.sinks.k1.type = logger
    a4.channels.c1.type = memory
    a4.channels.c1.capacity = 1000
    a4.channels.c1.transactionCapacity = 100
    a4.sinks.k1.channel = c1
    a4.sources.r1.channels = c1

    在hadoop204上运行:

    bin/flume-ng agent -c conf/ -f job/interceptor/flume4.conf  -n a4 -Dflume.root.logger=INFO,console

    在hadoop203上运行:

    bin/flume-ng agent -c conf/ -f job/interceptor/flume3.conf -n a3 -Dflume.root.logger=INFO,console

    在hadoop202上运行:

    bin/flume-ng agent -c conf/ -f job/interceptor/flume2.conf -n a2

     nc localhost 44444

    实验结果:

  • 相关阅读:
    算法竞赛入门经典习题2-3 韩信点兵
    ios入门之c语言篇——基本函数——5——素数判断
    ios入门之c语言篇——基本函数——4——数值交换函数
    144. Binary Tree Preorder Traversal
    143. Reorder List
    142. Linked List Cycle II
    139. Word Break
    138. Copy List with Random Pointer
    137. Single Number II
    135. Candy
  • 原文地址:https://www.cnblogs.com/kpwong/p/14504079.html
Copyright © 2011-2022 走看看