zoukankan      html  css  js  c++  java
  • Flume-自定义 Source

    Source 是负责接收数据到 Flume Agent 的组件。

    Source 组件可以处理各种类型、各种格式的日志数据,包括 avro、thrift、exec、jms、spooling directory、netcat、sequence generator、syslog、http、legacy。

    官方提供的 source 类型已经很多,但是有时候并不能满足实际开发当中的需求,此时我们就需要根据实际需求自定义某些 source。

    官方也提供了自定义 source 的接口:https://flume.apache.org/FlumeDeveloperGuide.html#source

    根据官方说明自定义 Source 需要继承 AbstractSource 类并实现 Configurable 和 PollableSource 接口。

    实现相应方法:

    getBackOffSleepIncrement();
    
    getMaxBackOffSleepInterval();
    
    // 初始化 context(读取配置文件内容)
    configure(Context context);
    
    // 获取数据封装成 event 并写入 channel,这个方法将被循环调用
    process();

    使用场景:读取 MySQL 数据或者其他文件系统。

    这里使用 flume 接收数据,并给每条数据添加前缀,输出到控制台。前缀可从 flume 配置文件中配置。

    一、创建自定义 Source

    1.添加 pom 依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com</groupId>
        <artifactId>flume</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.9.0</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>

    2.编写自定义的 Source 类

    package source;
    
    import org.apache.flume.Context;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.PollableSource;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.event.SimpleEvent;
    import org.apache.flume.source.AbstractSource;
    
    import java.util.HashMap;
    
    public class MySource extends AbstractSource implements Configurable, PollableSource {
    
        // 定义配置文件将来要读取的字段
        private Long delay;
        private String field;
    
        // 初始化配置信息
        @Override
        public void configure(Context context) {
            delay = context.getLong("delay");
            field = context.getString("field", "Hello!");
        }
    
        @Override
        public Status process() throws EventDeliveryException {
            try {
                // 创建事件头信息
                HashMap<String, String> hearderMap = new HashMap<>();
                // 创建事件
                SimpleEvent event = new SimpleEvent();
                // 循环封装事件
                for (int i = 0; i < 5; i++) {
                    // 给事件设置头信息
                    event.setHeaders(hearderMap);
                    // 给事件设置内容
                    event.setBody((field + i).getBytes());
                    // 将事件写入 channel
                    getChannelProcessor().processEvent(event);
                    Thread.sleep(delay);
                }
            } catch (Exception e) {
                e.printStackTrace();
                return Status.BACKOFF;
            }
            return Status.READY;
        }
    
        @Override
        public long getBackOffSleepIncrement() {
            return 0;
        }
    
        @Override
        public long getMaxBackOffSleepInterval() {
            return 0;
        }
    }

    二、打包测试

    1.打包上传

    参考:https://www.cnblogs.com/jhxxb/p/11582804.html

    2.编写 flume 配置文件

    mysource.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = source.MySource
    # 代码中要获取的配置信息
    a1.sources.r1.delay = 1000
    # a1.sources.r1.field = jhxxb
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    启动

    cd /opt/apache-flume-1.9.0-bin
    bin/flume-ng agent --conf conf/ --name a1 --conf-file /tmp/flume-job/source/mysource.conf -Dflume.root.logger=INFO,console

  • 相关阅读:
    ubuntu配置jdk(收藏)
    ubuntu学习笔记-tar 解压缩命令详解(转)
    URL编码以及GET和POST提交乱码解决方案 (转)
    maven模块开发(转)
    Spring定时任务配置
    java调用webservice
    struts2 JSON 插件的使用
    Java 基础知识点滴(1)
    vscode语法报错
    无法加载文件 .ps1,因为在此系统中禁止执行脚本
  • 原文地址:https://www.cnblogs.com/jhxxb/p/11583778.html
Copyright © 2011-2022 走看看