zoukankan      html  css  js  c++  java
  • flume自定义sink

    package me;
    
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.Transaction;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    
    public class MySink extends AbstractSink implements Configurable {
    
    	// 在整个sink结束时执行一遍
    	@Override
    	public synchronized void stop() {
    		// TODO Auto-generated method stub
    		super.stop();
    	}
    
    	// 在整个sink开始时执行一遍
    	@Override
    	public synchronized void start() {
    		// TODO Auto-generated method stub
    		super.start();
    	}
    
    	// 不断循环调用
    	@Override
    	public Status process() throws EventDeliveryException {
    		   // TODO Auto-generated method stub
            Channel ch = getChannel();
            Transaction txn = ch.getTransaction();
            Event event =null;
            txn.begin();
            while(true){
                event = ch.take();
                if (event!=null) {
                    break;
                }
            }
            try {
    
                String body = new String(event.getBody());
                System.out.println("event.getBody()-----" + body);
                txn.commit();
                return Status.READY;
            } catch (Throwable th) {
                txn.rollback();
    
                if (th instanceof Error) {
                    throw (Error) th;
                } else {
                    throw new EventDeliveryException(th);
                }
            } finally {
                txn.close();
            }
    
    	}
    
    	@Override
    	public void configure(Context arg0) {
    		// TODO Auto-generated method stub
    		System.out.println("configure-------" + arg0);
    	}
    
    }
    
    agent.sources = s1    
    agent.channels = c1  
    agent.sinks = sk1  
    
    agent.sources.s1.type = netcat  
    agent.sources.s1.bind = localhost  
    agent.sources.s1.port = 5678  
    agent.sources.s1.channels = c1  
    
    agent.sinks.sk1.type = me.MySink
    agent.sinks.sk1.hostname=192.168.16.33
    agent.sinks.sk1.port=3306
    agent.sinks.sk1.databaseName=test
    agent.sinks.sk1.tableName=user
    agent.sinks.sk1.user=root
    agent.sinks.sk1.password=WoChu@123
    agent.sinks.sk1.column_name=id, username, password
    agent.sinks.sk1.field_separator=\|
    agent.sinks.sk1.channel = c1  
    
    agent.channels.c1.type = memory
    agent.channels.c1.capacity = 1000     
    agent.channels.c1.transactionCapacity = 100
    
    lihudeMacBook-Pro:~ SunAndLi$ telnet  localhost 5678
    

     

  • 相关阅读:
    生产型企业原材料采购及入库的处理
    OpenERP财务管理若干概念讲解
    OE context 传参数
    Openerp 中打开 URL 的三种 方法
    view xml 中的 button 调用web客户端事件
    一招解决OpenERP8.0安装旧版模块报错
    ubuntu server激活即时通讯IM服务 Instant Messaging is not activated on this server
    error: command 'gcc' failed with exit status 1 while installing eventlet
    OpenERP函數字段的應用
    Doker容器之间连接
  • 原文地址:https://www.cnblogs.com/sunyaxue/p/6644466.html
Copyright © 2011-2022 走看看