zoukankan      html  css  js  c++  java
  • Flume-自定义 Source 读取 MySQL 数据

    开源实现:https://github.com/keedio/flume-ng-sql-source

    这里记录的是自己手动实现。

    测试中要读取的表

    CREATE TABLE `student` (
      `id` int(11) NOT NULL AUTO_INCREMENT,
      `name` varchar(255) COLLATE utf8_bin DEFAULT NULL,
      `age` int(11) DEFAULT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

    记录表(必须),告诉 Flume 每次从哪开始读取

    CREATE TABLE `flume_meta` (
      `source_tab` varchar(255) COLLATE utf8_bin NOT NULL,
      `current_index` bigint(255) DEFAULT NULL,
      PRIMARY KEY (`source_tab`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

    一、编写自定义 Source

    1.添加 pom 依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com</groupId>
        <artifactId>flume</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flume</groupId>
                <artifactId>flume-ng-core</artifactId>
                <version>1.9.0</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.27</version>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>

    2.编写类

    MySQLSourceHelper,JDBC 工具类,主要是读取数据表和更新读取记录

    package source;
    
    import org.apache.flume.Context;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.math.BigInteger;
    import java.sql.*;
    import java.util.ArrayList;
    import java.util.List;
    
    public class MySQLSourceHelper {
    
        private static final Logger LOG = LoggerFactory.getLogger(MySQLSourceHelper.class);
    
        // 开始 id
        private String startFrom;
        private static final String DEFAULT_START_VALUE = "0";
    
        // 表名
        private String table;
        // 用户传入的查询的列
        private String columnsToSelect;
        private static final String DEFAULT_Columns_To_Select = "*";
    
        private static String dbUrl, dbUser, dbPassword, dbDriver;
        private static Connection conn = null;
        private static PreparedStatement ps = null;
    
        // 获取 JDBC 连接
        private static Connection getConnection() {
            try {
                Class.forName(dbDriver);
                return DriverManager.getConnection(dbUrl, dbUser, dbPassword);
            } catch (SQLException | ClassNotFoundException e) {
                e.printStackTrace();
            }
            return null;
        }
    
        // 构造方法
        MySQLSourceHelper(Context context) {
            // 有默认值参数:获取 flume 任务配置文件中的参数,读不到的采用默认值
            this.startFrom = context.getString("start.from", DEFAULT_START_VALUE);
            this.columnsToSelect = context.getString("columns.to.select", DEFAULT_Columns_To_Select);
    
            // 无默认值参数:获取 flume 任务配置文件中的参数
            this.table = context.getString("table");
    
            dbUrl = context.getString("db.url");
            dbUser = context.getString("db.user");
            dbPassword = context.getString("db.password");
            dbDriver = context.getString("db.driver");
            conn = getConnection();
        }
    
        // 构建 sql 语句,以 id 作为 offset
        private String buildQuery() {
            StringBuilder execSql = new StringBuilder("select " + columnsToSelect + " from " + table);
            return execSql.append(" where id ").append("> ").append(getStatusDBIndex(startFrom)).toString();
        }
    
        // 执行查询
        List<List<Object>> executeQuery() {
            try {
                // 每次执行查询时都要重新生成 sql,因为 id 不同
                String customQuery = buildQuery();
                // 存放结果的集合
                List<List<Object>> results = new ArrayList<>();
    
                ps = conn.prepareStatement(customQuery);
                ResultSet result = ps.executeQuery(customQuery);
                while (result.next()) {
                    // 存放一条数据的集合(多个列)
                    List<Object> row = new ArrayList<>();
                    // 将返回结果放入集合
                    for (int i = 1; i <= result.getMetaData().getColumnCount(); i++) {
                        row.add(result.getObject(i));
                    }
                    results.add(row);
                }
                LOG.info("execSql:" + customQuery + "	resultSize:" + results.size());
                return results;
            } catch (SQLException e) {
                LOG.error(e.toString());
                // 重新连接
                conn = getConnection();
            }
            return null;
        }
    
        // 将结果集转化为字符串,每一条数据是一个 list 集合,将每一个小的 list 集合转化为字符串
        List<String> getAllRows(List<List<Object>> queryResult) {
            List<String> allRows = new ArrayList<>();
            StringBuilder row = new StringBuilder();
            for (List<Object> rawRow : queryResult) {
                for (Object aRawRow : rawRow) {
                    if (aRawRow == null) {
                        row.append(",");
                    } else {
                        row.append(aRawRow.toString()).append(",");
                    }
                }
                allRows.add(row.toString());
                row = new StringBuilder();
            }
            return allRows;
        }
    
        // 更新 offset 元数据状态,每次返回结果集后调用。必须记录每次查询的 offset 值,为程序中断续跑数据时使用,以 id 为 offset
        void updateOffset2DB(BigInteger size) {
            try {
                // 以 source_tab 做为 KEY,如果不存在则插入,存在则更新(每个源表对应一条记录)
                String sql = "insert into flume_meta VALUES('" + table + "','" + size + "') on DUPLICATE key update current_index='" + size + "'";
                LOG.info("updateStatus Sql:" + sql);
                ps = conn.prepareStatement(sql);
                ps.execute();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    
        // 从 flume_meta 表中查询出当前的 id 是多少
        private BigInteger getStatusDBIndex(String startFrom) {
            BigInteger dbIndex = new BigInteger(startFrom);
            try {
                ps = conn.prepareStatement("select current_index from flume_meta where source_tab='" + table + "'");
                ResultSet result = ps.executeQuery();
                if (result.next()) {
                    String id = result.getString(1);
                    if (id != null) {
                        dbIndex = new BigInteger(id);
                    }
                }
            } catch (SQLException e) {
                e.printStackTrace();
            }
            // 如果没有数据,则说明是第一次查询或者数据表中还没有存入数据,返回最初传入的值
            return dbIndex;
        }
    
        // 关闭相关资源
        void close() {
            try {
                ps.close();
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    
        public String getTable() {
            return table;
        }
    }

    MySQLSource,自定义 Source 类

    package source;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.PollableSource;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.event.SimpleEvent;
    import org.apache.flume.source.AbstractSource;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.math.BigInteger;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    
    public class MySQLSource extends AbstractSource implements Configurable, PollableSource {
    
        // 打印日志
        private static final Logger LOG = LoggerFactory.getLogger(MySQLSource.class);
    
        // sqlHelper
        private MySQLSourceHelper sqlSourceHelper;
    
        // 两次查询的时间间隔
        private int queryDelay;
        private static final int DEFAULT_QUERY_DELAY = 10000;
    
        @Override
        public long getBackOffSleepIncrement() {
            return 0;
        }
    
        @Override
        public long getMaxBackOffSleepInterval() {
            return 0;
        }
    
        @Override
        public void configure(Context context) {
            // 初始化
            sqlSourceHelper = new MySQLSourceHelper(context);
            queryDelay = context.getInteger("query.delay", DEFAULT_QUERY_DELAY);
        }
    
        @Override
        public Status process() throws EventDeliveryException {
            try {
                // 存放 event 的集合
                List<Event> events = new ArrayList<>();
                // 存放 event 头集合
                HashMap<String, String> header = new HashMap<>();
                header.put("table", sqlSourceHelper.getTable());
    
                // 查询数据表
                List<List<Object>> result = sqlSourceHelper.executeQuery();
                // 如果有返回数据,则将数据封装为 event
                if (!result.isEmpty()) {
                    List<String> allRows = sqlSourceHelper.getAllRows(result);
                    Event event = null;
                    for (String row : allRows) {
                        event = new SimpleEvent();
                        event.setHeaders(header);
                        event.setBody(row.getBytes());
                        events.add(event);
                    }
                    // 将 event 写入 channel
                    getChannelProcessor().processEventBatch(events);
                    // 更新数据表中的 offset 信息,取最后一条数据的第一列(id 列)
                    sqlSourceHelper.updateOffset2DB(new BigInteger(result.get(result.size()-1).get(0).toString()));
                }
                // 等待时长
                Thread.sleep(queryDelay);
                return Status.READY;
            } catch (InterruptedException e) {
                LOG.error("Error procesing row", e);
                return Status.BACKOFF;
            }
        }
    
        @Override
        public synchronized void stop() {
            LOG.info("Stopping sql source {} ...", getName());
            try {
                sqlSourceHelper.close();
            } finally {
                super.stop();
            }
        }
    }

    二、打包测试

    1.打包上传

    记得把 pom 依赖中的 MySQL 的 jar 包也传上去。

    参考:https://www.cnblogs.com/jhxxb/p/11582804.html

    2.编写 flume 配置文件

    mysql.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = source.MySQLSource
    a1.sources.r1.db.driver = com.mysql.jdbc.Driver
    a1.sources.r1.db.url = jdbc:mysql://192.168.8.136:3306/rbac0
    a1.sources.r1.db.user = root
    a1.sources.r1.db.password = root
    a1.sources.r1.table = student
    # a1.sources.r1.columns.to.select = *
    # a1.sources.r1.start.from = 0
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Describe the channel
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    启动

    cd /opt/apache-flume-1.9.0-bin
    
    bin/flume-ng agent --conf conf/ --name a1 --conf-file /tmp/flume-job/mysql.conf -Dflume.root.logger=INFO,console

    向监控表插入数据

    INSERT student VALUES(NULL,'zhangsan',18);

    Flume 的控制台日志

  • 相关阅读:
    爬虫1 爬虫介绍, requests模块, 代理(正向代理,反向代理), 爬梨视频, 自动登录网站, HTTP协议复习, 伪静态概念, 301和302状态码区别, http版本0.9 1.1 和2.0的区别
    数据结构 线性结构(数组[列表] ,链表 单链表的增删改查**, 线性结构的应用 队列 栈[函数的调用**]),非线性结构 树
    算法 时间复杂度, 空间复杂度, 冒泡排序**, 选择排序, 插入算法, 快速排序**, 希尔算法,计数排序, 二分法查找**
    量化分析 在线量化分析网站
    数据分析3 matplotlib绘图, 折线图(刻度与范围,标题,注释), 曲线图例, 过滤报警信息, 柱状图, 曲线图, 饼图
    [编织消息框架][netty源码分析]6 ChannelPipeline 实现类DefaultChannelPipeline职责与实现
    [编织消息框架][netty源码分析]5 EventLoopGroup 实现类NioEventLoopGroup职责与实现
    [编织消息框架][netty源码分析]4 EventLoop 实现类NioEventLoop职责与实现
    编程之路
    [编织消息框架][netty源码分析]3 EventLoop 实现类SingleThreadEventLoop职责与实现
  • 原文地址:https://www.cnblogs.com/jhxxb/p/11589851.html
Copyright © 2011-2022 走看看