zoukankan      html  css  js  c++  java
  • flume中自定义sink InterCeptor

    SinkProcessor:
    ============================
        FailOver:
    
    
        Load balancing :    //负载均衡处理器
                    //round_robin 轮询 1-2-3-1-2-3-...
                    //random      随机 1-3-2-3-1-...
    
            1、round_robin 轮询 1-2-3-1-2-3-...
            
    
    
    
            2、random 随机:
                
            
    
    
    自定义Sink && InterCeptor
    =============================================
        1、pom
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.8.0</version>
            </dependency>
    
        2、编写sink:
            public class MySink extends AbstractSink {
                public Status process() throws EventDeliveryException {
                //初始化status
                Status result = Status.READY;
                //得到channel对象
                Channel channel = getChannel();
                Transaction transaction = channel.getTransaction();
                Event event = null;
                try {
                    //开启事务
                    transaction.begin();
                    //从channel中获取事件
                    event = channel.take();
                    if (event != null) {
                    //在事件中手动添加头部
                    Map<String,String> map = new HashMap<String, String>();
                    map.put("TimeStamp",System.currentTimeMillis() + "");
                    event.setHeaders(map);
                    //获取body
                    byte[] body = event.getBody();
                    //获取head值
                    String headVal = event.getHeaders().get("TimeStamp");
                    System.out.println("head: "+ headVal + "	body: " + new String(body));
                    } else {
                    //没有事件,即为backoff
                    result = Status.BACKOFF;
                    }
                    //提交事务
                    transaction.commit();
                } catch (Exception ex) {
                    //回滚事务
                    transaction.rollback();
                    throw new EventDeliveryException("Failed to log event: " + event, ex);
                } finally {
                    //关闭事务
                    transaction.close();
                }
                return result;
                }
            }
            
        3、打包并放在/soft/flume/lib下
            
        
        4、使用自定义sink
            
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            # 配置source
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 8888
    
            # 配置sink
            a1.sinks.k1.type = com.oldboy.flume.MySink
    
            # 配置channel
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            # 绑定channel-source, channel-sink
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
    
    
    自定义拦截器:InterCeptor    event大小拦截器
    ================================
            自定义限速拦截器:
                1、设置参数
                2、设置带参构造
                3、创建Builder内部类,通过Builder构造对象
                    1)通过configure得到参数的值或默认值
                    2)通过build方法,构建InterCeptor对象
                4、在Constants内部类设置常量值
        
            public class SpeedInterceptor implements Interceptor {
    
                private int speed;
    
                public SpeedInterceptor(int speed) {
                this.speed = speed;
                }
                public void initialize() {
                }
    
                /**
                 * 对event进行修改
                 * 限速拦截器,限速范围,单个事件
                 *            时间范围需注意第一个时间
                 *            speed = bodySize / time
                 *
                 * 对上一个事件进行速度计算,如果速度过快,sleep
                 * lastTime
                 * lastBodySize
                 */
                private long lastTime = -1 ;
                private long lastBodySize = 0;
                public Event intercept(Event event) {
                Map<String, String> headers = event.getHeaders();
                //获取body的长度
                long bodySize = event.getBody().length;
                //获取当前时间
                long current = System.currentTimeMillis();
                //第一个事件
                if(lastTime == -1){
                    lastTime = current;
                    lastBodySize = bodySize;
                }
                //非第一个事件
                else {
                    long duration = current - lastTime;
                    int currSpeed = (int) ((double)lastBodySize / duration * 1000);
                    //速度没超
                    if( speed >= currSpeed){
                    return event;
                    }
                    //速度超了
                    else {
                    try {
                        Thread.sleep(lastBodySize/speed * 1000 - duration);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    }
                    lastBodySize = bodySize;
                    lastTime = System.currentTimeMillis();
                }
                return event;
                }
                public List<Event> intercept(List<Event> events) {
                for (Event event : events) {
                    intercept(event);
                }
                return events;
                }
                public void close() {
                }
    
                public static class Builder implements Interceptor.Builder {
    
                private int speed;
                public void configure(Context context) {
                    //相当于 context.getInteger("speed",1024);
                    speed = context.getInteger(Constants.SPEED, Constants.SPEED_DEFAULT);
                }
    
                public Interceptor build() {
                    return new SpeedInterceptor(this.speed);
                }
                }
    
                public static class Constants {
                public static final String SPEED = "speed";
                public static final int SPEED_DEFAULT = 1024;
    
                }
            }
            
                
        自定义拦截器使用方法:
        ==============================
            1、编写代码,打包并放入/soft/flume/lib下
            2、编写配置文件i_speed.conf
                # 将agent组件起名
                a1.sources = r1
                a1.sinks = k1
                a1.channels = c1
    
                # 配置source
                a1.sources.r1.type = seq
                # 给拦截器起名
                a1.sources.r1.interceptors = i1
                # 指定拦截器类型
                a1.sources.r1.interceptors.i1.type = com.oldboy.flume.SpeedInterceptor$Builder
                a1.sources.r1.interceptors.i1.speed = 1
                a1.sources.r1.interceptors.i1.speed2 = 10
    
                # 配置sink
                a1.sinks.k1.type = logger
    
                # 配置channel
                a1.channels.c1.type = memory
                a1.channels.c1.capacity = 1000
                a1.channels.c1.transactionCapacity = 100
    
                # 绑定channel-source, channel-sink
                a1.sources.r1.channels = c1
                a1.sinks.k1.channel = c1
    
            3、flume-ng agent -n a1 -f i_speed.conf
    
    
    
    
            
    
    
    
    注意:SinkProcessor和ChannelSelector
    
        ChannelSelector:挑选通道 ----> sink
          
        SinkProcessor:   挑选sink
    
        在配置SinkProcessor的时候,注意ChannelSelector要设为默认(不配置)
        
        
    
    
    使用ZK进行flume配置管理:
    ============================================
        1、在zk客户端创建节点(/flume/a1)    //注意:节点a1是agent名称
            zkCli.sh -server s102:2181
    
    
        2、在/flume/a1节点添加数据,使用zooInspector即可    //中文字符会出现乱码
                       可以使用idea插件(zookeeper)
            a1.sources = r1
            a1.sinks = k1
            a1.channels = c1
    
            a1.sources.r1.type = netcat
            a1.sources.r1.bind = localhost
            a1.sources.r1.port = 8888
    
            a1.sinks.k1.type = logger
    
            a1.channels.c1.type = memory
            a1.channels.c1.capacity = 1000
            a1.channels.c1.transactionCapacity = 100
    
            a1.sources.r1.channels = c1
            a1.sinks.k1.channel = c1
        
        3、尝试启动flume
            flume-ng agent -n a1 -z s102:2181 -p /flume
  • 相关阅读:
    网络编程最佳实践一
    【NYOJ】[599]奋斗的小蜗牛
    【NYOJ】[599]奋斗的小蜗牛
    【NYOJ】[477]A+B Problem III
    【NYOJ】[477]A+B Problem III
    【NYOJ】[811]变态最大值
    【NYOJ】[811]变态最大值
    【POJ】[1417]True Liars
    【POJ】[1417]True Liars
    【杭电】[3038]How Many Answers Are Wrong
  • 原文地址:https://www.cnblogs.com/zyde/p/8946626.html
Copyright © 2011-2022 走看看