zoukankan      html  css  js  c++  java
  • flume-ng-sql-source实现oracle增量数据读取

    一、下载编译flume-ng-sql-source

    下载地址:https://github.com/keedio/flume-ng-sql-source.git ,安装说明文档编译和拷贝jar包

    嫌麻烦的也可以直接,CSDN下载地址:http://download.csdn.net/detail/chongxin1/9892184

    此时最新的版本为flume-ng-sql-source-1.4.3.jar,flume-ng-sql-source-1.4.3.jar是flume用于连接数据库的重要支撑jar包。

    二、把flume-ng-sql-source-1.4.3.jar放到flume的lib目录下

     

    三、把oracle(此处用的是oracle库)的驱动包放到flume的lib目录下

    oracle的jdbc驱动包,放在oracle安装目录下,路径为:D:appproduct11.2.0dbhome_1jdbclib

    如图:

    把ojdbc5.jar放到flume的lib目录下,如图:

    四、运行Demo

    1、创建数据库表

    
    
    1. create table  flume_ng_sql_source (
    2.      id         varchar2(32) primary key,
    3.      msg        varchar2(32),  
    4.      createTime date not null
    5. );  
    
    
    1. insert into flume_ng_sql_source(id,msg,createTime) values('1','Test increment Data',to_date('2017-08-01 07:06:20','yyyy-mm-dd hh24:mi:ss'));  
    2. insert into flume_ng_sql_source(id,msg,createTime) values('2','Test increment Data',to_date('2017-08-02 07:06:20','yyyy-mm-dd hh24:mi:ss')); 
    3. insert into flume_ng_sql_source(id,msg,createTime) values('3','Test increment Data',to_date('2017-08-03 07:06:20','yyyy-mm-dd hh24:mi:ss')); 
    4. insert into flume_ng_sql_source(id,msg,createTime) values('4','Test increment Data',to_date('2017-08-04 07:06:20','yyyy-mm-dd hh24:mi:ss')); 
    5. insert into flume_ng_sql_source(id,msg,createTime) values('5','Test increment Data',to_date('2017-08-05 07:06:20','yyyy-mm-dd hh24:mi:ss')); 
    6. insert into flume_ng_sql_source(id,msg,createTime) values('6','Test increment Data',to_date('2017-08-06 07:06:20','yyyy-mm-dd hh24:mi:ss')); 
    7. commit;  

    2、新建flume-sql.conf

    在/usr/local/flume目录新建flume-sql.conf :
    
    
    1. touch /usr/local/flume/flume-sql.conf
    2. sudo gedit /usr/local/flume/flume-sql.conf
    flume-sql.conf输入以下内容:
    
    
    1. agentTest.channels = channelTest
    2. agentTest.sources = sourceTest
    3. agentTest.sinks = sinkTest
    4. ###########sql source#################
    5. # For each Test of the sources, the type is defined
    6. agentTest.sources.sourceTest.type = org.keedio.flume.source.SQLSource
    7. agentTest.sources.sourceTest.hibernate.connection.url = jdbc:oracle:thin:@192.168.168.100:1521/orcl
    8. # Hibernate Database connection properties
    9. agentTest.sources.sourceTest.hibernate.connection.user = flume
    10. agentTest.sources.sourceTest.hibernate.connection.password = 1234
    11. agentTest.sources.sourceTest.hibernate.connection.autocommit = true
    12. agentTest.sources.sourceTest.hibernate.dialect = org.hibernate.dialect.Oracle10gDialect
    13. agentTest.sources.sourceTest.hibernate.connection.driver_class = oracle.jdbc.driver.OracleDriver
    14. agentTest.sources.sourceTest.run.query.delay=1
    15. agentTest.sources.sourceTest.status.file.path = /usr/local/flume
    16. agentTest.sources.sourceTest.status.file.name = agentTest.sqlSource.status
    17. # Custom query
    18. agentTest.sources.sourceTest.start.from = '2017-07-31 07:06:20'
    19.  
    20. agentTest.sources.sourceTest.custom.query = SELECT CHR(39)||TO_CHAR(CREATETIME,'YYYY-MM-DD HH24:MI:SS')||CHR(39),ID,MSG FROM FLUME_NG_SQL_SOURCE WHERE CREATETIME > TO_DATE($@$,'YYYY-MM-DD HH24:MI:SS') ORDER BY CREATETIME ASC
    21.  
    22. agentTest.sources.sourceTest.batch.size = 6000
    23. agentTest.sources.sourceTest.max.rows = 1000
    24. agentTest.sources.sourceTest.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
    25. agentTest.sources.sourceTest.hibernate.c3p0.min_size=1
    26. agentTest.sources.sourceTest.hibernate.c3p0.max_size=10
    27. ##############################
    28. agentTest.channels.channelTest.type = memory
    29. agentTest.channels.channelTest.capacity = 10000
    30. agentTest.channels.channelTest.transactionCapacity = 10000
    31. agentTest.channels.channelTest.byteCapacityBufferPercentage = 20
    32. agentTest.channels.channelTest.byteCapacity = 1600000
    33.  
    34. agentTest.sinks.sinkTest.type = org.apache.flume.sink.kafka.KafkaSink
    35. agentTest.sinks.sinkTest.topic = TestTopic
    36. agentTest.sinks.sinkTest.brokerList = 192.168.168.200:9092
    37. agentTest.sinks.sinkTest.requiredAcks = 1
    38. agentTest.sinks.sinkTest.batchSize = 20
    39. agentTest.sinks.sinkTest.channel = channelTest
    40.  
    41. agentTest.sinks.sinkTest.channel = channelTest
    42. agentTest.sources.sourceTest.channels=channelTest

    3、flume-ng启动flume-sql.conf和测试

    启动kafka的消费者,监听topic主题:

    
    
    1. kafka-console-consumer.sh --zookeeper 192.168.168.200:2181 --topic TestTopic

    flume-ng启动flume-sql.conf

    
    
    1. flume-ng agent --conf conf --conf-file /usr/local/flume/flume-sql.conf --name agentTest -Dflume.root.logger=INFO,console

    TestTopic消费者控制台打印:

    
    
    1. [root@master ~]# kafka-console-consumer.sh --zookeeper 192.168.168.200:2181 --topic TestTopic
    2. "'2017-08-01 07:06:20'","1","Test increment Data"
    3. "'2017-08-02 07:06:20'","2","Test increment Data"
    4. "'2017-08-03 07:06:20'","3","Test increment Data"
    5. "'2017-08-04 07:06:20'","4","Test increment Data"
    6. "'2017-08-05 07:06:20'","5","Test increment Data"
    7. "'2017-08-06 07:06:20'","6","Test increment Data"

     

    根据配置查看相应的状态文件/usr/local/flume/agentTest.sqlSource.status

    
    
    1. agentTest.sources.sourceTest.status.file.path = /usr/local/flume
    2. agentTest.sources.sourceTest.status.file.name = agentTest.sqlSource.status
    
    
    1. {"SourceName":"sourceTest","URL":"jdbc:oracle:thin:@192.168.168.100:1521/orcl","LastIndex":"'2017-08-06 07:06:20'","Query":"SELECT CHR(39)||TO_CHAR(CREATETIME,'YYYY-MM-DD HH24:MI:SS')||CHR(39) AS INCREMENTAL,ID,MSG FROM FLUME_NG_SQL_SOURCE WHERE TO_CHAR(CREATETIME,'YYYY-MM-DD HH24:MI:SS') > $@$ ORDER BY INCREMENTAL ASC"}

    "LastIndex":"'2017-08-06 07:06:20'",可以看出当前的最后一条增量数据日期是'2017-08-06 07:06:20',也就是说下一次WHERE TO_CHAR(CREATETIME,'YYYY-MM-DD HH24:MI:SS') > $@$,$@$的值就是'2017-08-06 07:06:20'。

    往flume_ng_sql_source表中插入增量数据:

     

    
    
    1. insert into flume_ng_sql_source(id,msg,createTime) values('7','Test increment Data',to_date('2017-08-07 07:06:20','yyyy-mm-dd hh24:mi:ss'));
    2. insert into flume_ng_sql_source(id,msg,createTime) values('8','Test increment Data',to_date('2017-08-08 07:06:20','yyyy-mm-dd hh24:mi:ss'));
    3. insert into flume_ng_sql_source(id,msg,createTime) values('9','Test increment Data',to_date('2017-08-09 07:06:20','yyyy-mm-dd hh24:mi:ss'));
    4. insert into flume_ng_sql_source(id,msg,createTime) values('10','Test increment Data',to_date('2017-08-10 07:06:20','yyyy-mm-dd hh24:mi:ss'));
    5. commit;

     

    TestTopic消费者控制台打印:

    
    
    1. [root@master ~]# kafka-console-consumer.sh --zookeeper 192.168.168.200:2181--topic TestTopic
    2. "'2017-08-01 07:06:20'","1","Test increment Data"
    3. "'2017-08-02 07:06:20'","2","Test increment Data"
    4. "'2017-08-03 07:06:20'","3","Test increment Data"
    5. "'2017-08-04 07:06:20'","4","Test increment Data"
    6. "'2017-08-05 07:06:20'","5","Test increment Data"
    7. "'2017-08-06 07:06:20'","6","Test increment Data"
    8. "'2017-08-07 07:06:20'","7","Test increment Data"
    9. "'2017-08-08 07:06:20'","8","Test increment Data"
    10. "'2017-08-09 07:06:20'","9","Test increment Data"
    11. "'2017-08-10 07:06:20'","10","Test increment Data"

    根据配置查看相应的状态文件/usr/local/flume/agentTest.sqlSource.status

    
    
    1. {"SourceName":"sourceTest","URL":"jdbc:oracle:thin:@192.168.168.100:1521/orcl","LastIndex":"'2017-08-10 07:06:20'","Query":"SELECT CHR(39)||TO_CHAR(CREATETIME,'YYYY-MM-DD HH24:MI:SS')||CHR(39) AS INCREMENTAL,ID,MSG FROM FLUME_NG_SQL_SOURCE WHERE TO_CHAR(CREATETIME,'YYYY-MM-DD HH24:MI:SS') > $@$ ORDER BY INCREMENTAL ASC"}

    "LastIndex":"'2017-08-10 07:06:20'"

    至此,flume-ng-sql-source实现oracle增量数据读取成功!!!

    五、相关配置参数说明

    https://github.com/keedio/flume-ng-sql-source

    Configuration of SQL Source:

    Mandatory properties in bold

    Property NameDefaultDescription
    channels - Connected channel names
    type - The component type name, needs to be org.keedio.flume.source.SQLSource
    hibernate.connection.url - Url to connect with the remote Database
    hibernate.connection.user - Username to connect with the database
    hibernate.connection.password - Password to connect with the database
    table - Table to export data
    status.file.name - Local file name to save last row number read
    status.file.path /var/lib/flume Path to save the status file
    start.from 0 Start value to import data
    delimiter.entry , delimiter of incoming entry
    enclose.by.quotes true If Quotes are applied to all values in the output.
    columns.to.select * Which colums of the table will be selected
    run.query.delay 10000 ms to wait between run queries
    batch.size 100 Batch size to send events to flume channel
    max.rows 10000 Max rows to import per query
    read.only false Sets read only session with DDBB
    custom.query - Custom query to force a special request to the DB, be carefull. Check below explanation of this property.
    hibernate.connection.driver_class - Driver class to use by hibernate, if not specified the framework will auto asign one
    hibernate.dialect - Dialect to use by hibernate, if not specified the framework will auto asign one. Check https://docs.jboss.org/hibernate/orm/4.3/manual/en-US/html/ch03.html#configuration-optional-dialects for a complete list of available dialects
    hibernate.connection.provider_class - Set to org.hibernate.connection.C3P0ConnectionProvider to use C3P0 connection pool (recommended for production)
    hibernate.c3p0.min_size - Min connection pool size
    hibernate.c3p0.max_size - Max connection pool size

     

    Custom Query

    A custom query is supported to bring the possibility of using the entire SQL language. This is powerful, but risky, be careful with the custom queries used.

    To avoid row export repetitions use the $@$ special character in WHERE clause, to incrementaly export not processed rows and the new ones inserted.

    IMPORTANT: For proper operation of Custom Query ensure that incremental field will be returned in the first position of the Query result.

    Example:

    
    
    1. agent.sources.sql-source.custom.query = SELECT incrementalField,field2 FROM table1 WHERE incrementalField > $@$

    这段话的意思大意就是为了避免出现问题,把增量字段写在查询的第一个位置。

  • 相关阅读:
    ubuntu问题集锦
    得把这个事情坚持下来
    海贼王有啥好看的?
    虚拟机网络连不上怎么办?
    耍耍Windows Live Writer
    Jquey模糊选择
    JS网址正则验证
    PowerDesigner 同步Name到Comment 及 同步 Comment 到Name
    进程调用系统默认和邮件客户端并附加指定文件
    Form.DialogResult 属性
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723849.html
Copyright © 2011-2022 走看看