zoukankan      html  css  js  c++  java
  • Flume 自定义Source

    自定义source类,并将相关工程打包放在flume的lib目录下

    public class MySource extends AbstractSource implements Configurable, PollableSource {
    
        //全局变量,仅做演示,无实际意义
        private String prefix;
        private String suffix;
    
        @Override
        public void configure(Context context) {
            prefix = context.getString("prefix");
            suffix = context.getString("suffix","atguigu");
        }
    
        @Override
        public Status process() throws EventDeliveryException {
    
            Status status = null;
    
            try {
                //模拟接收数据
                for (int i = 0; i < 5; i++) {
                    SimpleEvent event = new SimpleEvent();
    
                    event.setBody((prefix+"--"+i+"--"+suffix).getBytes());
    
                    //将数据发送到channel
                    getChannelProcessor().processEvent(event);
    
                    status = Status.READY;
                }
            }catch (Exception e){
                status = Status.BACKOFF;
            }
    
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return status;
        }
    
        @Override
        public long getBackOffSleepIncrement() {
            return 0;
        }
    
        @Override
        public long getMaxBackOffSleepInterval() {
            return 0;
        }
    
    
    }

    flume配置

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = com.atguigu.source.MySource
    a1.sources.r1.prefix = feiji
    a1.sources.r1.suffix = xiaxian
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    测试略

  • 相关阅读:
    2020软件工程作业04
    2020软件工程作业03
    2020软件工程作业02
    2020软件工程作业01
    2020软件工程个人作业06——软件工程实践总结作业
    2020软件工程作业05
    2020软件工程作业02
    软件工程作业01
    2020软件工程个人作业06——软件工程实践总结作业
    2020软件工程作业05
  • 原文地址:https://www.cnblogs.com/noyouth/p/13094039.html
Copyright © 2011-2022 走看看