zoukankan      html  css  js  c++  java
  • Flume 自定义Source

    自定义source类,并将相关工程打包放在flume的lib目录下

    public class MySource extends AbstractSource implements Configurable, PollableSource {
    
        //全局变量,仅做演示,无实际意义
        private String prefix;
        private String suffix;
    
        @Override
        public void configure(Context context) {
            prefix = context.getString("prefix");
            suffix = context.getString("suffix","atguigu");
        }
    
        @Override
        public Status process() throws EventDeliveryException {
    
            Status status = null;
    
            try {
                //模拟接收数据
                for (int i = 0; i < 5; i++) {
                    SimpleEvent event = new SimpleEvent();
    
                    event.setBody((prefix+"--"+i+"--"+suffix).getBytes());
    
                    //将数据发送到channel
                    getChannelProcessor().processEvent(event);
    
                    status = Status.READY;
                }
            }catch (Exception e){
                status = Status.BACKOFF;
            }
    
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            return status;
        }
    
        @Override
        public long getBackOffSleepIncrement() {
            return 0;
        }
    
        @Override
        public long getMaxBackOffSleepInterval() {
            return 0;
        }
    
    
    }

    flume配置

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = com.atguigu.source.MySource
    a1.sources.r1.prefix = feiji
    a1.sources.r1.suffix = xiaxian
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    测试略

  • 相关阅读:
    ThinkPHP的ajaxReturn方法的使用
    PHP中如何获取网站根目录物理路径
    MySQL索引覆盖
    php对gzip的使用(实例)
    php对gzip的使用(开启)
    php对gzip的使用(理论)
    ThinkPHP中调用PHPExcel
    PHPExcel正确读取excel表格时间单元格(转载)
    Kubernetes pod网络解析
    vRO 添加已有磁盘到VM
  • 原文地址:https://www.cnblogs.com/noyouth/p/13094039.html
Copyright © 2011-2022 走看看