zoukankan      html  css  js  c++  java
  • Hadoop生态圈-Flume的组件之自定义拦截器(interceptor)

                    Hadoop生态圈-Flume的组件之自定义拦截器(interceptor)

                                                  作者:尹正杰

    版权声明:原创作品,谢绝转载!否则将追究法律责任。

       本篇博客只是举例了一个自定义拦截器的方法,测试字节传输速度。

    1>.自定义interceptor方法

      1 /*
      2 @author :yinzhengjie
      3 Blog:http://www.cnblogs.com/yinzhengjie/tag/Hadoop%E7%94%9F%E6%80%81%E5%9C%88/
      4 EMAIL:y1053419035@qq.com
      5 */
      6 package cn.org.yinzhengjie.interceptor;
      7 
      8 import org.apache.flume.Context;
      9 import org.apache.flume.Event;
     10 import org.apache.flume.interceptor.Interceptor;
     11 
     12 import java.util.List;
     13 
     14 /**
     15  * 设置限速拦截器
     16  * <p>
     17  * 当 字节/时间,即同一时刻,如果进入的字节过多
     18  * 则休眠一会
     19  */
     20 public class MyInterceptor implements Interceptor {
     21 
     22     private int speed;
     23 
     24 
     25     //构造
     26     private MyInterceptor(int speed) {
     27         this.speed = speed;
     28     }
     29 
     30 
     31     //do nothing
     32     public void initialize() {
     33 
     34     }
     35 
     36     /**
     37      * 1、拿出上一个event的时间,和当前时间进行相减,得出上一个event的时间间隔
     38      * 2、得到上一个event的body字节数
     39      * 3、相除得到上一个event的速度,并在此event中先进行停留,再返回event
     40      *
     41      * @param event
     42      * @return
     43      */
     44 
     45     long lastTime = -1;
     46     long lastBodySize = 0;
     47 
     48     public Event intercept(Event event) {
     49 
     50 
     51         byte[] body = event.getBody();
     52         int len = body.length;
     53 
     54 
     55         long current = System.nanoTime();
     56 
     57         //第一个event
     58         if (lastTime == -1) {
     59             lastTime = current;
     60             lastBodySize = len;
     61         }
     62 
     63         //非第一个event
     64         else {
     65             //计算上一个event停留的时间
     66             long interval = current - lastTime;
     67             System.out.println("=========================" + current + "/" + lastTime + "/" + interval + "=========================");
     68             //上一个event的速度
     69             int now_speed = (int) ((double) lastBodySize / interval * 1000);
     70             if (now_speed > speed) {
     71                 System.out.println("=========================" + now_speed + "=========================");
     72                 //计算需要停留多少秒 线程休眠,时间 = shouldTime - interval
     73                 try {
     74                     Thread.sleep((lastBodySize / speed) * 1000 - interval);
     75                 } catch (InterruptedException e) {
     76                     e.printStackTrace();
     77                 }
     78             }
     79             lastBodySize = len;
     80             lastTime = System.currentTimeMillis();
     81 
     82         }
     83         return event;
     84 
     85     }
     86 
     87     //迭代List<Event>,将所有Event交给intercept(Event)进行处理
     88     public List<Event> intercept(List<Event> events) {
     89         for (Event event : events) {
     90             intercept(event);
     91         }
     92         return events;
     93     }
     94 
     95     //do nothing
     96     public void close() {
     97 
     98     }
     99 
    100     public static class Builder implements Interceptor.Builder {
    101 
    102         private int speed;
    103 
    104         public void configure(Context context) {
    105             speed = context.getInteger(Constants.SPEED, Constants.DEFAULT_SPEED);
    106 
    107         }
    108 
    109         public Interceptor build() {
    110             return new MyInterceptor(speed);
    111         }
    112     }
    113 
    114     public static class Constants {
    115         public static String SPEED = "speed";
    116         public static int DEFAULT_SPEED = 1;
    117 
    118     }
    119 }

    2>.打包并将其发送到 /soft/flume/lib下

    [yinzhengjie@s101 ~]$ cd /soft/flume/lib/
    [yinzhengjie@s101 lib]$ 
    [yinzhengjie@s101 lib]$ ll | grep MyFlume
    -rw-r--r--  1 yinzhengjie yinzhengjie    5231 Jun 20 18:53 MyFlume-1.0-SNAPSHOT.jar
    [yinzhengjie@s101 lib]$ 
    [yinzhengjie@s101 lib]$ rm -rf MyFlume-1.0-SNAPSHOT.jar 
    [yinzhengjie@s101 lib]$ 
    [yinzhengjie@s101 lib]$ rz
    
    [yinzhengjie@s101 lib]$ 
    [yinzhengjie@s101 lib]$ ll | grep MyFlume
    -rw-r--r--  1 yinzhengjie yinzhengjie    8667 Jun 20 21:02 MyFlume-1.0-SNAPSHOT.jar
    [yinzhengjie@s101 lib]$ 
    [yinzhengjie@s101 lib]$ 

    3>.编写agent的配置文件

    [yinzhengjie@s101 ~]$ more /soft/flume/conf/yinzhengjie_myInterceptor.conf 
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # 定义源: seq
    a1.sources.r1.type = seq
    # 定义一次RPC产生的批次数量
    a1.sources.r1.batchSize = 1024
    
    
    # 指定添加拦截器
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = cn.org.yinzhengjie.interceptor.MyInterceptor$Builder
    a1.sources.r1.interceptors.i1.speed = 1
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 100000
    a1.channels.c1.transactionCapacity = 10000
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    [yinzhengjie@s101 ~]$ 

    4>.启动flume并测试

    [yinzhengjie@s101 ~]$ flume-ng agent -f /soft/flume/conf/yinzhengjie_myInterceptor.conf -n a1

      下图是运行agent部分的输出内容 

  • 相关阅读:
    3.4、Python中的类详解(0601)
    3.3、Python函数详解(0601)
    3.2、Python函数详解(0601)
    3.1、Python中的正则表达式(0601)
    2.4、Python文件对象及os、os.path和pickle模块(0530)
    2.3、Python迭代器、列表解析及生成器(0530)
    2.2、Python程序控制结构(0530)
    PHP之pear包总结
    Mac之brew使用
    Node之安装篇
  • 原文地址:https://www.cnblogs.com/yinzhengjie/p/9208268.html
Copyright © 2011-2022 走看看