zoukankan      html  css  js  c++  java
  • 新闻实时分析系统-数据采集/存储/分发完整流程测试

    (一)idea工具开发数据生成模拟程序

    1.在idea开发工具中构建weblogs项目,编写数据生成模拟程序。

    package main.java;

    import java.io.*;

    public class ReadWrite {

          static String readFileName;

          static String writeFileName;

          public static void main(String args[]){

               readFileName = args[0];

               writeFileName = args[1];

              try {

                 // readInput();

                readFileByLines(readFileName);

              }catch(Exception e){

              }

          }

     

        public static void readFileByLines(String fileName) {

            FileInputStream fis = null;

            InputStreamReader isr = null;

            BufferedReader br = null;

            String tempString = null;

            try {

                System.out.println("以行为单位读取文件内容,一次读一整行:");

                fis = new FileInputStream(fileName);// FileInputStream

                // 从文件系统中的某个文件中获取字节

                isr = new InputStreamReader(fis,"GBK");

                br = new BufferedReader(isr);

                int count=0;

                while ((tempString = br.readLine()) != null) {

                    count++;

                    // 显示行号

                    Thread.sleep(300);

                    String str = new String(tempString.getBytes("UTF8"),"GBK");

                    System.out.println("row:"+count+">>>>>>>>"+tempString);

                    method1(writeFileName,tempString);

                    //appendMethodA(writeFileName,tempString);

                }

                isr.close();

            } catch (IOException e) {

                e.printStackTrace();

            } catch (InterruptedException e) {

                e.printStackTrace();

            } finally {

                if (isr != null) {

                    try {

                        isr.close();

                    } catch (IOException e1) {

                    }

                }

            }

        }

        public static void method1(String file, String conent) {

            BufferedWriter out = null;

            try {

                out = new BufferedWriter(new OutputStreamWriter(

                        new FileOutputStream(file, true)));

                out.write(" ");

                out.write(conent);

            } catch (Exception e) {

                e.printStackTrace();

            } finally {

                try {

                    out.close();

                } catch (IOException e) {

                    e.printStackTrace();

                }

            }

        }

    }

    2.参照前面idea工具项目打包方式,将该项目打成weblogs.jar包,然后上传至bigdata-pro01.kfk.com节点的/opt/jars目录下(目录需要提前创建)

    3.将weblogs.jar分发到另外两个节点

    1)在另外两个节点上分别创建/opt/jars目录

    mkdir /opt/jars

    2)将weblogs.jar分发到另外两个节点

    scp weblogs.jar bigdata-pro02.kfk.com:/opt/jars/

    scp weblogs.jar bigdata-pro03.kfk.com:/opt/jars/

    4.编写运行模拟程序的shell脚本

    1)在bigdata-pro02.kfk.com节点的/opt/datas目录下,创建weblog-shell.sh脚本。

    vi weblog-shell.sh

    #/bin/bash

    echo "start log......"

    #第一个参数是原日志文件,第二个参数是日志生成输出文件

    java -jar /opt/jars/weblogs.jar /opt/datas/weblog.log /opt/datas/weblog-flume.log

    修改weblog-shell.sh可执行权限

    chmod 777 weblog-shell.sh

    2)将bigdata-pro02.kfk.com节点上的/opt/datas/目录拷贝到bigdata-pro03节点.kfk.com

    scp -r /opt/datas/ bigdata-pro03.kfk.com:/opt/datas/

    3)修改bigdata-pro02.kfk.com和bigdata-pro03.kfk.com节点上面日志采集文件路径。以bigdata-pro02.kfk.com节点为例。

    vi flume-conf.properties

    agent2.sources = r1

    agent2.channels = c1

    agent2.sinks = k1

     

    agent2.sources.r1.type = exec

    #修改采集日志文件路径,bigdata-pro03.kfk.com节点也是修改此处

    agent2.sources.r1.command = tail -F /opt/datas/weblog-flume.log

    agent2.sources.r1.channels = c1

     

    agent2.channels.c1.type = memory

    agent2.channels.c1.capacity = 10000

    agent2.channels.c1.transactionCapacity = 10000

    agent2.channels.c1.keep-alive = 5

     

    agent2.sinks.k1.type = avro

    agent2.sinks.k1.channel = c1

    agent2.sinks.k1.hostname = bigdata-pro01.kfk.com

    agent2.sinks.k1.port = 5555

    (二)编写启动flume服务程序的shell脚本

    1.在bigdata-pro02.kfk.com节点的flume安装目录下编写flume启动脚本。

    vi flume-kfk-start.sh

    #/bin/bash

    echo "flume-2 start ......"

    bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent2 -Dflume.root.logger=INFO,console

    2.在bigdata-pro03.kfk.com节点的flume安装目录下编写flume启动脚本。

    vi flume-kfk-start.sh

    #/bin/bash

    echo "flume-3 start ......"

    bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent3 -Dflume.root.logger=INFO,console

    3.在bigdata-pro01.kfk.com节点的flume安装目录下编写flume启动脚本。

    vi flume-kfk-start.sh

    #/bin/bash

    echo "flume-1 start ......"

    bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n agent1 -Dflume.root.logger=INFO,console

    (三)编写Kafka Consumer执行脚本

    1.在bigdata-pro01.kfk.com节点的Kafka安装目录下编写Kafka Consumer执行脚本

    vi kfk-test-consumer.sh

    #/bin/bash

    echo "kfk-kafka-consumer.sh start ......"

    bin/kafka-console-consumer.sh --zookeeper bigdata-pro01.kfk.com:2181,bigdata-pro02.kfk.com:2181,bigdata-pro03.kfk.com:2181 --from-beginning --topic weblogs

    2.将kfk-test-consumer.sh脚本分发另外两个节点

    scp kfk-test-consumer.sh bigdata-pro02.kfk.com:/opt/modules/kakfa_2.11-0.8.2.1/

    scp kfk-test-consumer.sh bigdata-pro03.kfk.com:/opt/modules/kakfa_2.11-0.8.2.1/

    (四)启动模拟程序并测试

    在bigdata-pro02.kfk.com节点启动日志产生脚本,模拟产生日志是否正常。

    /opt/datas/weblog-shell.sh

     

    (五)启动数据采集所有服务

    1.启动Zookeeper服务

    bin/zkServer.sh start

    2.启动hdfs服务

    sbin/start-dfs.sh

    3.启动HBase服务

     bin/start-hbase.sh

    创建hbase业务表

    bin/hbase shell

    create 'weblogs','info'

    4.启动Kafka服务

    bin/kafka-server-start.sh config/server.properties &

    创建业务数据topic

    bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic weblogs --replication-factor 1 --partitions 1

    5.配置flume相关环境变量

    vi flume-env.sh

    export JAVA_HOME=/opt/modules/jdk1.7.0_67

    export HADOOP_HOME=/opt/modules/hadoop-2.5.0

    export HBASE_HOME=/opt/modules/hbase-0.98.6-cdh5.3.0

    (六)完成数据采集全流程测试

    1.在bigdata-pro01.kfk.com节点上启动flume聚合脚本,将采集的数据分发到Kafka集群和hbase集群。

    ./flume-kfk-start.sh

    2.在bigdata-pro02.kfk.com节点上完成数据采集

    1)使用shell脚本模拟日志产生

    cd /opt/datas/

    ./weblog-shell.sh

    2)启动flume采集日志数据发送给聚合节点

    ./flume-kfk-start.sh

    3.在bigdata-pro03.kfk.com节点上完成数据采集

    1)使用shell脚本模拟日志产生

    cd /opt/datas/

    ./weblog-shell.sh

    2)启动flume采集日志数据发送给聚合节点

    ./flume-kfk-start.sh

    4.启动Kafka Consumer查看flume日志采集情况

    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic weblogs --from-beginning

    5.查看hbase数据写入情况

    ./hbase-shell

    count 'weblogs'

  • 相关阅读:
    git命令回退代码并同步到远程仓库
    git拉取远程指定分支
    vue动态绑定样式
    友链
    css三大特性
    CSS的背景background
    元素显示模式(块元素、行内元素、行内块元素)
    CSS复合选择器
    快速生成HTML结构+CSS样式语法
    文本属性
  • 原文地址:https://www.cnblogs.com/misliu/p/10979256.html
Copyright © 2011-2022 走看看