zoukankan      html  css  js  c++  java
  • Flume(二) —— 自定义拦截器、Source、Sink

    自定义拦截器

    自定义Source

    自定义Sink

    引入依赖

            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.7.0</version>
            </dependency>
    

    代码

    
    /**
     * @author Michael Fang
     * @since 2019-12-30
     */
    public class MySink extends AbstractSink implements Configurable {
    
        Logger logger = LoggerFactory.getLogger(MySink.class);
    
        private String prefix;
        private String subfix;
    
        @Override
        public synchronized void start() {
            super.start();
        }
    
        @Override
        public synchronized void stop() {
            super.stop();
        }
    
        public Status process() throws EventDeliveryException {
            Status status = null;
    
            // Start transaction
            Channel ch = getChannel();
            Transaction txn = ch.getTransaction();
            txn.begin();
            try {
                // This try clause includes whatever Channel operations you want to do
    
                Event event = ch.take();
    
                // Send the Event to the external repository.
                // storeSomeData(e);
                if(event != null){
                    String body = new String(event.getBody());
                    logger.info(prefix + " " + body + " " + subfix);
                }
                txn.commit();
                status = Status.READY;
            } catch (Throwable t) {
                txn.rollback();
    
                // Log exception, handle individual exceptions as needed
    
                status = Status.BACKOFF;
    
                // re-throw all Errors
                if (t instanceof Error) {
                    throw (Error) t;
                }
            } finally {
                txn.close();
            }
            return status;
        }
    
        public void configure(Context context) {
            prefix = context.getString("prefix");
            subfix = context.getString("subfix", "bigdata---");
        }
    
    }
    
    
    

    编译好的jar上传到flume/lib下

    创建配置文件mysink.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = com.fonxian.bigdata.flume.MySink
    a1.sinks.k1.prefix = thinkhard
    a1.sinks.k1.subfix = flume
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    ## 事件容量
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    ## channel 与 sink 的关系是 1对多 的关系。1个sink只可以绑定1个channel,1个channel可以绑定多个sink。
    a1.sinks.k1.channel = c1
    

    运行

    ./flume-ng agent --conf ../conf --conf-file ../job/mysink.conf --name a1 -Dflume.root.logger=INFO,console
    
    nc localhost 44444
    

    运行结果

  • 相关阅读:
    网线 ------ 交叉线
    ubuntu ------ 网络 ifconfig 不显示IP地址
    STM32L011D4 ----- 低功耗
    List 集合 使用 remove 踩得坑
    Map 集合遍历的4种方法
    elasticsearch 集群详解
    谷歌浏览器添加插件时显示程序包无效:"CRX_HEADER_INVALID" 解决办法
    MySql数据库 优化
    MySql 索引
    Kibana 安装
  • 原文地址:https://www.cnblogs.com/fonxian/p/12118272.html
Copyright © 2011-2022 走看看