zoukankan      html  css  js  c++  java
  • Flume+HBase+Kafka集成与开发

     


       今天的内容是完成Flume+HBase+Kafka的集成开发。如下图红框中所示,节点1的Flume的source有两个:节点2和节点3的sink输出。节点1接收后进行预处理然后分别以AsyncHBaseSink(HBaseSink)和Kafka Sink的方式推送给HBase和Kafka进行离线数据处理和实时数据处理。

    1.下载Flume源码并导入Idea开发工具

      1)将apache-flume-1.7.0-src.tar.gz源码下载到本地解压

      2)通过idea导入flume源码

      打开idea开发工具,选择File—>Open

     

      然后找到flume源码解压文件,选中flume-ng-hbase-sink,点击ok加载相应模块的源码。

     

                                 

     

    2.官方flume与hbase集成的参数介绍

    http://flume.apache.org/FlumeUserGuide.html 下的Flume Sink -> AsyncHBaseSink

     

      其中,加粗的属性是必须配置的,其它则作为优化参数。payloadColumn属性是告知HBase有多少个列要写入列簇columnFamily下。

     

    3.下载日志数据并分析

      到搜狗实验室下载用户查询日志(该过程在前面HBase环境部署中已经完成,有问题的可以回去查看一下:HBase分布式集群部署与设计

     1)介绍

      搜索引擎查询日志库设计为包括约1个月(2008年6月)Sogou搜索引擎部分网页查询需求及用户点击情况的网页查询日志数据集合。为进行中文搜索引擎用户行为分析的研究者提供基准研究语料

     2)格式说明

      数据格式为:访问时间 用户ID [查询词] 该URL在返回结果中的排名 用户点击的顺序号 用户点击的URL

      其中,用户ID是根据用户使用浏览器访问搜索引擎时的Cookie信息自动赋值,即同一次使用浏览器输入的不同查询对应同一个用户ID

     

      该数据将作为本项目的源数据,存放于节点2和节点3。

     

    4.flume agent-3聚合节点与HBase集成的配置

      用notepad++连接节点1,对配置文件进行重命名。

     

      配置fulme-env.sh文件

     

      配置flume-conf.properties文件

     

      原模板格式凌乱,直接全部干掉,输入以下内容:

    agent1.sources = r1
    agent1.channels = kafkaC hbaseC
    agent1.sinks = kafkaSink hbaseSink
    
    agent1.sources.r1.type = avro
    agent1.sources.r1.channels = hbaseC
    agent1.sources.r1.bind = bigdata-pro01.kfk.com
    agent1.sources.r1.port = 5555
    agent1.sources.r1.threads = 5
    
    agent1.channels.hbaseC.type = memory
    agent1.channels.hbaseC.capacity = 100000
    agent1.channels.hbaseC.transactionCapacity = 100000
    agent1.channels.hbaseC.keep-alive = 20
    
    agent1.sinks.hbaseSink.type = asynchbase
    agent1.sinks.hbaseSink.table = weblogs
    agent1.sinks.hbaseSink.columnFamily = info
    agent1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer
    agent1.sinks.hbaseSink.channel = hbaseC
    agent1.sinks.hbaseSink.serializer.payloadColumn = datatime,userid,searchname,retorder,cliorder,cliurl

     

    5.对日志数据进行格式处理

     1)将文件中的tab更换成逗号

    cat weblog.log|tr "	" "," > weblog2.log

     2)将文件中的空格更换成逗号

    cat weblog2.log|tr " " "," > weblog3.log

     

    [kfk@bigdata-pro01 datas]$ rm -f weblog2.log
    [kfk@bigdata-pro01 datas]$ rm -f weblog.log
    [kfk@bigdata-pro01 datas]$ mv weblog3.log weblog.log
    [kfk@bigdata-pro01 datas]$ ls
      wc.input  weblog.log

     3)然后分发到节点2和3

    [kfk@bigdata-pro01 datas]$ scp weblog.log bigdata-pro02.kfk.com:/opt/datas/
    weblog.log                                                                                                                 100%  145MB  72.5MB/s   00:02   
    [kfk@bigdata-pro01 datas]$ scp weblog.log bigdata-pro03.kfk.com:/opt/datas/
    weblog.log

     

    6.自定义SinkHBase程序设计与开发

     1)模仿SimpleAsyncHbaseEventSerializer自定义KfkAsyncHbaseEventSerializer实现类,修改一下代码即可。

     

     

    @Override
        public List getActions() {
            List actions = new ArrayList();
            if (payloadColumn != null) {
                byte[] rowKey;
                try {
                    /*---------------------------代码修改开始---------------------------------*/
                    //解析列字段
                    String[] columns = new String(this.payloadColumn).split(",");
                    //解析flume采集过来的每行的值
                    String[] values = new String(this.payload).split(",");
                    for(int i=0;i < columns.length;i++){
                        byte[] colColumn = columns[i].getBytes();
                        byte[] colValue = values[i].getBytes(Charsets.UTF_8);
    
                        //数据校验:字段和值是否对应
                        if(colColumn.length != colValue.length) break;
                        //时间
                        String datetime = values[0].toString();
                        //用户id
                        String userid = values[1].toString();
                        //根据业务自定义Rowkey
                        rowKey = SimpleRowKeyGenerator.getKfkRowKey(userid,datetime);
                        //插入数据
                        PutRequest putRequest =  new PutRequest(table, rowKey, cf,
                                colColumn, colValue);
                        actions.add(putRequest);
                    /*---------------------------代码修改结束---------------------------------*/
                    }
                } catch (Exception e) {
                    throw new FlumeException("Could not get row key!", e);
                }
            }
            return actions;
        }

     2)在SimpleRowKeyGenerator类中,根据具体业务自定义Rowkey生成方法

    /**
       * 自定义Rowkey
       * @param userid
       * @param datetime
       * @return
       * @throws UnsupportedEncodingException
       */
    
      public static byte[] getKfkRowKey(String userid,String datetime)throws UnsupportedEncodingException {
        return (userid + datetime + String.valueOf(System.currentTimeMillis())).getBytes("UTF8");
      }

     

    7.自定义编译程序打jar包

     1)在idea工具中,选择File—>ProjectStructrue

     

     2)左侧选中Artifacts,然后点击右侧的+号,最后选择JAR—>From modules with dependencies

     

     3)然后直接点击ok

     

     4)然后依次点击apply,ok

     

     6)点击build进行编译,会自动打成jar包

     

     

     7)到项目的目录下找到刚刚打的jar包

     

     8)将打包名字替换为flume自带的包名flume-ng-hbase-sink-1.7.0.jar ,然后上传至flume/lib目录下,覆盖原有的jar包即可。

     

    8.flume聚合节点与Kafka集成的配置

      继续在flume-conf.properties文件中追加以下内容:

     

    #*****************flume+Kafka***********************
    agent1.channels.kafkaC.type = memory
    agent1.channels.kafkaC.capacity = 100000
    agent1.channels.kafkaC.transactionCapacity = 100000
    agent1.channels.kafkaC.keep-alive = 20
    
    agent1.sinks.kafkaSink.channel = kafkaC
    agent1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
    agent1.sinks.kafkaSink.brokerList = bigdata-pro01.kfk.com:9092,bigdata-pro02.kfk.com:9092,bigdata-pro03.kfk.com:9092
    agent1.sinks.kafkaSink.topic = test
    agent1.sinks.kafkaSink.zookeeperConnect = bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181
    agent1.sinks.kafkaSink.requiredAcks = 1
    agent1.sinks.kafkaSink.batchSize = 1
    agent1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder

     

     以上就是博主为大家介绍的这一板块的主要内容,这都是博主自己的学习过程,希望能给大家带来一定的指导作用,有用的还望大家点个支持,如果对你没用也望包涵,有错误烦请指出。如有期待可关注博主以第一时间获取更新哦,谢谢!同时也欢迎转载,但必须在博文明显位置标注原文地址,解释权归博主所有!

  • 相关阅读:
    DB2 db2move导入导出数据及使用dblook导出表结构DDL
    【转】DB2 BLOB大字段数据通过命令行进行导入导出
    【转】【DataGuard】Oracle 11g物理Data Guard之Snapshot Standby数据库功能
    【转】Oracle 11g R2手动配置EM
    【转】Oracle Database Server 'TNS Listener'远程数据投毒漏洞(CVE-2012-1675)
    【转】ORACLE TNS Listener远程注册投毒(Poison Attack)漏洞
    【转】Oracle 11.2.0.4/12C新特性Valid Node Checking For Registration (VNCR)
    【转】使用 xtrabackup 进行MySQL数据库物理备份
    【转】MySQL-物理备份-Percona XtraBackup 备份原理
    【转】NBU expired Media,Media ID not found in EMM database
  • 原文地址:https://www.cnblogs.com/zimo-jing/p/9884305.html
Copyright © 2011-2022 走看看