zoukankan      html  css  js  c++  java
  • flume 自定义sink


    http://flume.apache.org/FlumeDeveloperGuide.html#sink


    看了 还是比较好上手的,简单翻译一下


    sink的作用是从 Channel 提取 Event 然后传给流中的下一个 Flume Agent或者把它们存储在外部的仓库中。在Flume的配置文件中,一个 Sink 和一个唯一的 Channel 关联。有一个 SinkRunner 实例与每一个配好的 Sink 关联,当 Flume 框架调用 SinkRunner 的 start() 方法时,就创建一个新的线程来驱动这个 Sink (使用  SinkRunner 的实现Runnable接口的 PollingRunner 内部静态类来运行)。这个线程管理了 Sink 的生命周期。 Sink 需要实现 start() 和 stop() 方法。Sink 的 start() 方法需要初始化 Sink 并使它能够达到向目的地发送 Event 的状态。 Sink 的 process() 方法是处理从 Channel 传过来的 Event 和 发送 Event 的核心方法。 Sink 的 Stop() 方法需要做必要的清理工作(比如释放某些资源)。 Sink 也需要实现 Configurable 接口来处理自己的一些配置。


    官网也给出了模板类:

     1 public class MySink extends AbstractSink implements Configurable {
     2     private String myProp;
     3 
     4     @Override
     5     public void configure(Context context) {
     6         String myProp = context.getString("myProp", "defaultValue");
     7 
     8         // Process the myProp value (e.g. validation)
     9 
    10         // Store myProp for later retrieval by process() method
    11         this.myProp = myProp;
    12     }
    13 
    14     @Override
    15     public void start() {
    16         // Initialize the connection to the external repository (e.g. HDFS) that
    17         // this Sink will forward Events to ..
    18     }
    19 
    20     @Override
    21     public void stop() {
    22         // Disconnect from the external respository and do any
    23         // additional cleanup (e.g. releasing resources or nulling-out
    24         // field values) ..
    25     }
    26 
    27     @Override
    28     public Status process() throws EventDeliveryException {
    29         Status status = null;
    30 
    31         // Start transaction
    32         Channel ch = getChannel();
    33         Transaction txn = ch.getTransaction();
    34         txn.begin();
    35 
    36         try {
    37             // This try clause includes whatever Channel operations you want to do
    38             Event event = ch.take();
    39 
    40             // Send the Event to the external repository.
    41             // storeSomeData(e);
    42             txn.commit();
    43             status = Status.READY;
    44         } catch (Throwable t) {
    45             txn.rollback();
    46 
    47             // Log exception, handle individual exceptions as needed
    48             status = Status.BACKOFF;
    49 
    50             // re-throw all Errors
    51             if (t instanceof Error) {
    52                 throw (Error) t;
    53             }
    54         } finally {
    55             txn.close();
    56         }
    57 
    58         return status;
    59     }
    60 }

    拿来模板直接填充自己的逻辑代码即可,详细就可以直接参考HDFSSink或者HBaseSink等


  • 相关阅读:
    laravel进阶知识大纲
    spring boot 配置多个DispatcherServlet
    RepeatReadRequestWrapper
    RestTemplate HttpClient详解及如何设置忽略SSL
    Swagger注解-@ApiModel 和 @ApiModelProperty
    SpringBoot 接收 单个String入参之解决方案
    spring boot添加 LocalDateTime 等 java8 时间类序列化和反序列化的支持
    Mybatisplus实现MetaObjectHandler接口自动更新创建时间更新时间
    关于SpringBoot 2.0,Pageable 无法注入,提示缺少默认构造方法的解决办法
    OP_REQUIRES failed at save_restore_v2_ops.cc:109 : Permission denied: model/variables/variables_t emp; Permission denied
  • 原文地址:https://www.cnblogs.com/admln/p/flume-user-defined-sink.html
Copyright © 2011-2022 走看看