zoukankan      html  css  js  c++  java
  • 04-复制和复用


    #agent1(hadoop102)
    a1.sources = r1
    a1.channels = c1 c2
    a1.sinks = k1 k2

    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/module/flume/demo/123.log

    #配置channelSelector - replicating(复制-默认不配也可以)
    #a1.sources.r1.selector.type = replicating

    #复用
    a1.sources.r1.selector.type = multiplexing
    #event(headers | body)根据headers中的key和value进行数据的发送
    #state指的是headers,key的值
    a1.sources.r1.selector.header = state
    #CZ指的是key对应的value值那么就发送到c1
    a1.sources.r1.selector.mapping.CZ = c1
    #US指的是key对应的value值那么就发送到c2
    a1.sources.r1.selector.mapping.US = c2

    #需求:给event中的headers添加数据
    #static拦截器可以给所有的eventheaders设置我们自定义的key和value
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = static
    #设置key值
    a1.sources.r1.interceptors.i1.key = state
    #设置value值
    a1.sources.r1.interceptors.i1.value = CZ


    a1.channels.c1.type = memory
    a1.channels.c2.type = memory


    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = hadoop103
    a1.sinks.k1.port = 33333
    a1.sinks.k2.type = avro
    a1.sinks.k2.hostname = hadoop104
    a1.sinks.k2.port = 44444

    #一个source对接两个channel
    a1.sources.r1.channels = c1 c2
    a1.sinks.k1.channel = c1
    a1.sinks.k2.channel = c2

    -----------------------------------------------------------------

    #agent2(hadoop103)
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1

    a1.sources.r1.type = avro
    a1.sources.r1.bind = hadoop103
    a1.sources.r1.port = 33333

    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000

    a1.sinks.k1.type = logger

    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1


    -----------------------------------------------------------------

    #agent3(hadoop104)
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1

    a1.sources.r1.type = avro
    a1.sources.r1.bind = hadoop104
    a1.sources.r1.port = 44444

    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000

    #a1.sinks.k1.type = logger
    #将event数据存储到本地磁盘上
    a1.sinks.k1.type = file_roll
    #event存放的目录
    a1.sinks.k1.sink.directory = /opt/module/flume/demo
    #多久时间滚动一个新文件(30秒)
    a1.sinks.k1.sink.rollInterval = 30

    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

     

     

     

     

     

     

     

     

     

     

  • 相关阅读:
    WP7编译问题:The application could not be launched for debugging
    cocos2dxnaTweeJump学习笔记1(都是自己看别人代码后所感所想,希望有懂的人指出我的错误或者大家交流交流)
    判断datatalbe是否为空
    SOAP协议基础(转自Ksxs's )
    那些年我还不懂:IList,ICollection,IEnumerable,IEnumerator,IQueryable(转)
    MyEclipse中 智能提示 JSP 页面的html 标记属性值
    MyEclipse中设置智能提示
    MyEclipse快捷键大全
    向朋友借钱:文章值得一读,让人思索良久
    生存逼着我成功
  • 原文地址:https://www.cnblogs.com/shan13936/p/13935249.html
Copyright © 2011-2022 走看看