zoukankan      html  css  js  c++  java
  • flume sink两种类型 file_rool 自定义sing com.mycomm.MySink even if there is only one event, the event has to be sent in an array


    mkdir /data/UnifiedLog/;
    cd /data/UnifiedLog/;
    wget http://mirror.bit.edu.cn/apache/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz;
    tar -xvf apache-flume-1.8.0-bin.tar.gz;
    ln -s apache-flume-1.8.0-bin flume;
    export FlumeHome=/data/UnifiedLog/flume;

    [root@d ~]# cat /data/UnifiedLog/flume/conf/httpSourceApp.conf
    a1.sources=r1
    a1.sinks=k1
    a1.channels=c1

    a1.sources.r1.type=http
    a1.sources.r1.bind=0.0.0.0
    a1.sources.r1.port=50000
    a1.sources.r1.channels=c1

    a1.sinks.k1.channel=c1
    #a1.sinks.k1.type = com.product.FlumeApp #自己打包jar
    a1.sinks.k1.type = file_roll # 无自定义jar
    a1.sinks.k1.sink.directory = /data/UnifiedLog/log
    a1.sinks.k1.batchSize=1
    #a1.sinks.k1.pathManager=%y%m%d%H%M%S
    a1.sinks.k1.pathManager.extension=log
    a1.sinks.k1.pathManager.prefix=webTrack
    a1.sinks.k1.rollInterval=0
    a1.sinks.k1.sink.serializer = text

    a1.channels.c1.type=memory
    #a1.channels.c1.capacity=1000
    #a1.channels.c1.transactionCapacity=100
    a1.channels.c1.capacity=8
    a1.channels.c1.transactionCapacity=4

    [root@d ~]#

    成功启动进程;
    $FlumeHome/bin/flume-ng agent -c $FlumeHome/conf/ -f $FlumeHome/conf/httpSourceApp.conf -n a1 -Dflume.root.logger=INFO,console;

    [root@d ~]# $FlumeHome/bin/flume-ng agent -c $FlumeHome/conf/ -f $FlumeHome/conf/httpSourceApp.conf -n a1 -Dflume.root.logger=INFO,console;
    Info: Including Hive libraries found via () for Hive access
    + exec /usr/java/jdk1.8.0_101/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/data/UnifiedLog/flume/conf:/data/UnifiedLog/flume/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application -f /data/UnifiedLog/flume/conf/httpSourceApp.conf -n a1
    2018-10-30 16:35:30,092 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:62)] Configuration provider starting
    2018-10-30 16:35:30,097 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:134)] Reloading configuration file:/data/UnifiedLog/flume/conf/httpSourceApp.conf
    2018-10-30 16:35:30,106 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
    2018-10-30 16:35:30,107 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: k1 Agent: a1
    2018-10-30 16:35:30,107 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
    2018-10-30 16:35:30,107 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
    2018-10-30 16:35:30,107 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
    2018-10-30 16:35:30,107 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
    2018-10-30 16:35:30,107 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
    2018-10-30 16:35:30,107 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
    2018-10-30 16:35:30,108 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k1
    2018-10-30 16:35:30,124 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [a1]
    2018-10-30 16:35:30,124 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:147)] Creating channels
    2018-10-30 16:35:30,131 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)] Creating instance of channel c1 type memory
    2018-10-30 16:35:30,136 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:201)] Created channel c1
    2018-10-30 16:35:30,137 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)] Creating instance of source r1, type http
    2018-10-30 16:35:30,201 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)] Creating instance of sink: k1, type: file_roll
    2018-10-30 16:35:30,208 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:116)] Channel c1 connected to [r1, k1]
    2018-10-30 16:35:30,216 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:137)] Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:org.apache.flume.source.http.HTTPSource{name:r1,state:IDLE} }} sinkRunners:{k1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@6918ccb0 counterGroup:{ name:null counters:{} } }} channels:{c1=org.apache.flume.channel.MemoryChannel{name: c1}} }
    2018-10-30 16:35:30,227 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:144)] Starting Channel c1
    2018-10-30 16:35:30,279 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: CHANNEL, name: c1: Successfully registered new MBean.
    2018-10-30 16:35:30,279 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: CHANNEL, name: c1 started
    2018-10-30 16:35:30,280 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:171)] Starting Sink k1
    2018-10-30 16:35:30,280 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.RollingFileSink.start(RollingFileSink.java:110)] Starting org.apache.flume.sink.RollingFileSink{name:k1, channel:c1}...
    2018-10-30 16:35:30,280 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:182)] Starting Source r1
    2018-10-30 16:35:30,281 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k1: Successfully registered new MBean.
    2018-10-30 16:35:30,281 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k1 started
    2018-10-30 16:35:30,282 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.sink.RollingFileSink.start(RollingFileSink.java:142)] RollingFileSink k1 started.
    2018-10-30 16:35:30,301 (lifecycleSupervisor-1-0) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] Logging to org.slf4j.impl.Log4jLoggerAdapter(org.mortbay.log) via org.mortbay.log.Slf4jLog
    2018-10-30 16:35:30,338 (lifecycleSupervisor-1-0) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] jetty-6.1.26
    2018-10-30 16:35:30,391 (lifecycleSupervisor-1-0) [INFO - org.mortbay.log.Slf4jLog.info(Slf4jLog.java:67)] Started SelectChannelConnector@0.0.0.0:50000
    2018-10-30 16:35:30,392 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
    2018-10-30 16:35:30,392 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started

    注意分析控制台输出,修改配置信息;
    $FlumeHome/conf/httpSourceApp.conf

    POST json 

    [{
    "headers" : {
    "timestamp" : "434324343",
    "host" : "random_host.example.com"
    },
    "body" : 'str'
    }]

    [{
    "headers" : {
    "timestamp" : "434324343",
    "host" : "random_host.example.com"
    },
    "body" : '{
    "timestamp" : "434324343",
    "host" : "random_host.example.com"
    }'
    }]

    记录的是body的字符串

    [root@d log]# cat 1540888530206-7
    {
    "timestamp" : "434324343",
    "host" : "random_host.example.com"
    }
    [root@d log]# ll -as
    total 16
    4 drwxr-xr-x 2 root root 4096 Oct 30 16:40 .
    4 drwxr-xr-x 5 root root 4096 Oct 30 12:06 ..
    0 -rw-r--r-- 1 root root 0 Oct 30 16:40 1540888530206-10
    4 -rw-r--r-- 1 root root 108 Oct 30 16:38 1540888530206-7
    0 -rw-r--r-- 1 root root 0 Oct 30 16:39 1540888530206-8
    4 -rw-r--r-- 1 root root 4 Oct 30 16:39 1540888530206-9
    [root@d log]# cat 1540888530206-9
    str
    [root@d log]#

    自定义sink

    关键点 event事件获取body后的byte转string

    String body = new String(event.getBody());
    System.out.println("body-----" + body);
    txn.commit();
    package com.product;
    
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.Transaction;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    
    public class FlumeApp extends AbstractSink implements Configurable {
        @Override
        public void configure(Context context) {
            // TODO Auto-generated method stub
        }
    
        @Override
        public Status process() throws EventDeliveryException {
            // TODO Auto-generated method stub
            Channel ch = getChannel();
            Transaction txn = ch.getTransaction();
            Event event = null;
            txn.begin();
            while (true) {
                event = ch.take();
                if (event != null) {
                    break;
                }
            }
            try {
                String body = new String(event.getBody());
                System.out.println("body-----" + body);
                txn.commit();
                return Status.READY;
            } catch (Throwable th) {
                txn.rollback();
                if (th instanceof Error) {
                    throw (Error) th;
                } else {
                    throw new EventDeliveryException(th);
                }
            } finally {
                txn.close();
            }
        }
    }
    

      

    A handler is provided out of the box which can handle events represented in JSON format, and supports UTF-8, UTF-16 and UTF-32 character sets. The handler accepts an array of events (even if there is only one event, the event has to be sent in an array) and converts them to a Flume event based on the encoding specified in the request. If no encoding is specified, UTF-8 is assumed. The JSON handler supports UTF-8, UTF-16 and UTF-32. Events are represented as follows.

    [{
      "headers" : {
                 "timestamp" : "434324343",
                 "host" : "random_host.example.com"
                 },
      "body" : "random_body"
      },
      {
      "headers" : {
                 "namenode" : "namenode.example.com",
                 "datanode" : "random_datanode.example.com"
                 },
      "body" : "really_random_body"
      }]
    

    To set the charset, the request must have content type specified as application/json; charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 as required).

    One way to create an event in the format expected by this handler is to use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON string using the Gson#fromJson(Object, Type) method. The type token to pass as the 2nd argument of this method for list of events can be created by:

    Type type = new TypeToken<List<JSONEvent>>() {}.getType();
    
     
  • 相关阅读:
    jQuery Easing 动画效果扩展--使用Easing插件,让你的动画更具美感。
    JavaScript表达式--掌握最全的表达式,一切尽在掌握中,让表达不再是难事
    JavaScript的格式--从格式做起,做最严谨的工程师
    JavaScript 简介--对javascript的初识,最基础的了解
    手机web页面制作时的注意事项
    实现像淘宝一样牛的语音搜索框
    Cufon在渲染网页字体你不知道的事
    .net中单选按钮RadioButton,RadioButtonList 以及纯Html中radio的用法实例?
    使用C#把发表的时间改为几个月,几天前,几小时前,几分钟前,或几秒前
    eval解析JSON中的注意点
  • 原文地址:https://www.cnblogs.com/rsapaper/p/9877736.html
Copyright © 2011-2022 走看看