zoukankan      html  css  js  c++  java
  • Flume基础(十三):自定义 Sink

    介绍

      Sink 不断地轮询 Channel 中的事件且批量地移除它们,并将这些事件批量写入到存储或索引系统、或者被发送到另一个 Flume Agent。
      Sink 是完全事务性的。在从 Channel 批量删除数据之前,每个 Sink 用 Channel 启动一个事务。批量事件一旦成功写出到存储系统或下一个 Flume Agent,Sink 就利用 Channel 提
      Sink 组件目的地包括 hdfs、logger、avro、thrift、ipc、file、null、HBase、solr、自定义。官方提供的 Sink 类型已经很多,但是有时候并不能满足实际开发当中的需求,此
    时我们就需要根据实际需求自定义某些 Sink。
      官方也提供了自定义 sink 的接口:
    https://flume.apache.org/FlumeDeveloperGuide.html#sink 根据官方说明自定义MySink 需要继承 AbstractSink 类并实现 Configurable 接口。
      实现相应方法:
      configure(Context context)//初始化 context(读取配置文件内容)
      process()//从 Channel 读取获取数据(event),这个方法将被循环调用。
      使用场景:读取 Channel 数据写入 MySQL 或者其他文件系统。

    需求

    使用 flume 接收数据,并在 Sink 端给每条数据添加前缀和后缀,输出到控制台。前后缀可在 flume 任务配置文件中配置。
    流程分析:

    编码

    package com.atguigu;
    import org.apache.flume.*;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    public class MySink extends AbstractSink implements Configurable {
     //创建 Logger 对象
     private static final Logger LOG = 
    LoggerFactory.getLogger(AbstractSink.class);
     private String prefix;
     private String suffix;
     @Override
     public Status process() throws EventDeliveryException {
     //声明返回值状态信息
     Status status;
     //获取当前 Sink 绑定的 Channel
     Channel ch = getChannel();
     //获取事务
     Transaction txn = ch.getTransaction();
     //声明事件
     Event event;
     //开启事务
     txn.begin();
     //读取 Channel 中的事件,直到读取到事件结束循环
     while (true) {
     event = ch.take();
     if (event != null) {
     break;
     }
     }
     try {
     //处理事件(打印)
     LOG.info(prefix + new String(event.getBody()) + suffix);
     //事务提交
     txn.commit();
     status = Status.READY;
     } catch (Exception e) {
     //遇到异常,事务回滚
     txn.rollback();
     status = Status.BACKOFF;
     } finally {
     //关闭事务
     txn.close();
     }
     return status;
     }
     @Override
     public void configure(Context context) {
     //读取配置文件内容,有默认值
     prefix = context.getString("prefix", "hello:");
     //读取配置文件内容,无默认值
     suffix = context.getString("suffix");
     } }

    测试

    1.打包
    将写好的代码打包,并放到 flume 的 lib 目录(/opt/module/flume)下。
    2.配置文件
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    # Describe the sink
    a1.sinks.k1.type = com.atguigu.MySink
    #a1.sinks.k1.prefix = atguigu:
    a1.sinks.k1.suffix = :atguigu
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    3.开启任务
    [atguigu@hadoop102 flume]$ bin/flume-ng agent -c conf/ -f 
    job/mysink.conf -n a1 -Dflume.root.logger=INFO,console
    [atguigu@hadoop102 ~]$ nc localhost 44444
    hello
    OK
    atguigu
    OK
    4.结果展示

    本文来自博客园,作者:秋华,转载请注明原文链接:https://www.cnblogs.com/qiu-hua/p/13382821.html

  • 相关阅读:
    2021牛客寒假算法基础集训营4 B. 武辰延的字符串(二分/Hash/exkmp)
    2021牛客寒假算法基础集训营4 H. 吴楚月的表达式
    2021牛客寒假算法基础集训营4 J. 邬澄瑶的公约数(GCD/唯一分解定理)
    leetcode 995. K 连续位的最小翻转次数(差分)
    robot 源码解读2【run.py执行流程】
    robot 源码解读1【代码量】
    python计算代码的行数
    为什么要用yield
    任意网站添加目录
    mac 定时执行脚本
  • 原文地址:https://www.cnblogs.com/qiu-hua/p/13382821.html
Copyright © 2011-2022 走看看