zoukankan      html  css  js  c++  java
  • Flume 自定义Sink

    自定义sink类,并将相关工程打包放在flume的lib目录下

    public class MySink extends AbstractSink implements Configurable {
    
        private static final Logger logger = LoggerFactory.getLogger(MySink.class);
    
        //全局变量,仅做演示,无实际意义
        private String prefix;
        private String suffix;
    
        @Override
        public void configure(Context context) {
    
            prefix = context.getString("prefix");
            suffix = context.getString("suffix","atguigu");
        }
    
        @Override
        public Status process() throws EventDeliveryException {
    
            Status status = null;
    
            Channel channel = getChannel();
            Transaction transaction = channel.getTransaction();
            transaction.begin();
    
            try {
                Event event = channel.take();
    
                //核心业务逻辑,输出到日志
                String body = new String(event.getBody());
                logger.info(prefix+body+suffix);
    
                transaction.commit();
                status = Status.READY;
    
            }catch (Exception e){
                transaction.rollback();
                status = Status.BACKOFF;
            }finally {
                transaction.close();
            }
    
    
    
            return status;
        }
    
    }

    flume配置文件

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = com.atguigu.sink.MySink
    a1.sinks.k1.prefix = sleep--
    a1.sinks.k1.suffix = --banzhang
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  • 相关阅读:
    find ./ -type d ! -name "."
    Linux入门-进程、计划任务
    Linux入门-用户管理
    Linux入门-shell使用技巧
    Linux入门-压缩、解压
    Linux入门-常用命令
    MySQL杂项(索引注意事项 快速导入导出数据 锁 字符集 慢查询)
    MySQL分区实验
    Lvs网络负载均衡 直接路由(dr)
    Lvs网络负载均衡 隧道(ip tunl)
  • 原文地址:https://www.cnblogs.com/noyouth/p/13094059.html
Copyright © 2011-2022 走看看