zoukankan      html  css  js  c++  java
  • flume_kafka_hdfs_hive数据的处理

         使用flume收集数据,将数据传递给kafka和hdfs,kafka上的数据可以使用storm构建实时计算,而hdfs上的数据,经过MR处理之后可以导入hive中进行处理。

         环境:hadoop1.2.1,hive 0.13.1,maven 3.2.5,flume 1.4,kafka 0.7.2,eclipse luna,jdk 1.7_75;mysql-connector-java-5.1.26.bin.jar,flume-kafka-master.zip。

         说明:所有服务都架设在一台机器上。

         1:安装hadoop:这篇文章写得比较完整,可以看看:Ubuntu 12.10 安装JDK、Hadoop全过程

    我在安装过程中出现:Does not contain a valid host:port authority: file:/// ,看了一遍自己的core-site.xml,hdfs-site.xml,mapred-site.xml没有发现错误,还特地看了些hosts配置,最后网上找到,fs.default.name中default写错了,启动hadoop。

    hadoopnew

         2:安装hive:下载解压之后,设置HIVE_HOME,将HIVE_HOME/bin加入到PATH变量中,直接输入hive即可启动。默认hive是使用嵌入模式的Derby数据库,它的特点是小巧,而且老爹也是apache,但存在单session,无法多用户共享,这里参考网上的资料将元数据存储到mysql中去:Hive集成Mysql作为元数据,这里我出现了点问题,只能使用localhost进行连接,无法使用root@myggg,试着按照文章中查找my.conf,但没有找到相关配置,在实验环境下这样也可以用:

    hive

         3:安装flume:

    Flume is a distributed, reliable, and available service for efficiently collecting, aggregating, and moving large amounts of log data. It has a simple and flexible architecture based on streaming data flows. It is robust and fault tolerant with tunable reliability mechanisms and many failover and recovery mechanisms. It uses a simple extensible data model that allows for online analytic application.

    Flume是由Cloudera推出的一个高效收集,处理,移动大量日志数据的分布式,可靠地,高可用的服务。它有简单并且灵活的架构基于数据流之上。它使用可靠地协调性,容错转移,恢复机制使它强健性并且容错。它使用简单可伸缩的数据模型能实现在线数据分析。

    Flume安agent划分,一个agent包括Source,Channel,Sink三个部分,Source从Web Server中取数据,push交给Channel,Sink将pull Channel得到数据,一个agent可以有多个Channel, Sink。

         配置FLUME_HOME,将PATH中加入Flume下的执行路径,将conf下的flume-conf.properties.template重命名为flume-conf.properties,然后进行配置,在单机情况下:

    agent.sources = r1                //agent中添加source,命名为r1
    agent.sinks = s1                  //agent中添加sink,命名为s1
    agent.channels = c1               //agent中添加channel,命名为c1
    agent.sinks.s1.channel = c1       //s1从c1中取数据
    agent.sources.r1.channels = c1    //r1将数据交给c1
    
    #describe the source
    agent.sources.r1.type = exec      //定义r1的类型为exec
    agent.sources.r1.command = tail -F /root/input/loginfo    //r1执行的命令
    
    #use a channel which buffers events in memory
    agent.channels.c1.type = memory    //定义c1的类型memory
    agent.channels.c1.capacity = 1000  //c1的容量
    agent.channels.c1.transactionCapacity = 100  //channel获取或者sink获得一次最大的数据量 
    
    #sinks type
    agent.sinks.s1.type = logger       //定义s1的类型为logger

    完成后启动flume,测试:

    bin/flume-ng agent --conf ./conf/ -f conf/flume-conf.properties -Dflume.root.logger=DEBUG,console -n agent

    flume

    4:安装kafka:关于kafka的介绍:Kafka快速入门,简单来说,kafka集群中的一台服务器就是一个broker,消息按名字分类,叫做topic,消息的产生是producer,消息的获取方为customer。kafka的安装方法同上。由于使用默认配置,kafka中的config下不需要配置了,直接启动即可进行模拟:

    bin/zookeeper-server-start.sh config/zookeeper.properties
    bin/kafka-server-start.sh config/server.properties
    bin/kafka-console-producer.sh --zookeeper localhost:2181 localhost:9092 --topic test 
    bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

    相关可以看apache-kafka

    5:进行整合:数据的处理过程包括数据收集,数据清理,数据存储,数据分析,数据展现。在这里数据的收集由flume负责,定期从web server中收集log相关信息,对于实时数据的处理,将数据直接发送到kafka,然后交给后面的storm处理(这个没有做),对于离线部分,经过简单的mr处理后存储到hdfs上,然后使用hive操作。

    总的架构图:

    arch

    Flume的设计:

    flume_ng

    在搭建之前,先安装maven:安装步骤同上Flume与Kafka

    安装完后echo $PATH:

    /usr/lib/qt-3.3/bin:/usr/local/sbin:/usr/local/bin:/sbin:/bin:/usr/sbin:/usr/bin:/usr/java/jdk1.7.0_75/bin:/root/hadoop-1.2.1/bin:/root/apache-hive-0.13.1/bin:/root/apache-flume-1.4.0/bin:/root/kafka-0.7.2/bin:/root/bin:/usr/java/jdk1.7.0_75/bin:/root/hadoop-1.2.1/bin:/root/apache-hive-0.13.1/bin:/root/apache-flume-1.4.0/bin:/root/kafka-0.7.2/bin:/root/Downloads/apache-maven-3.2.5/bin

    Flume与Kafka之间整合需要一个插件:这里介绍个flume-kafka插件,flume1.4,kafka0.7.2的基础上,将代码下载下来,进入目录,使用maven打包成jar文件,将生成的jar包放到flume的lib或相关目录下,依次将hadoop.1.2.1-*.jar,kafka0.7.2.jar, scala-compiler.jar(2.8),scala-library.jar(2.8),zkclient-0.1.jar导入,mvn package过程中可能会报错,找不到kafka0.7.2.jar,你需要将额外的extra-dependencies下的包放到‘~/.m2/repository/com/linkedin/kafka/kafka/0.7.2/’下,再进行package。

    对于myggg开启6个终端:

    console

    对于发送到kafka中的数据以后在处理,现在主要是针对hadoop中的数据,首先使用MR处理,格式化文本。

    6:后续:解压eclipse,将之前准备的hadoop-eclipse-plugin-1.2.1.jar放到eclipse下的plugins目录下,使用vnc连接到机器,编写MR程序。

    使用’hadoop fs -cat /myFlume/FlumeData.1426320728464’查看文件:

    1,b
    2,c
    3,d
    4,e
    5,f
    6,g
    7,z
    0,o

    编写MR,将一行记录拆分为key,value:

    public static class MyMapper
             extends Mapper<Object, Text, IntWritable, Text>{
          private IntWritable hello = new IntWritable();
          private Text world = new Text();
      
          public void map(Object key, Text value, Context context
                          ) throws IOException, InterruptedException {
            String[] array = value.toString().split(",");
            if (array.length == 2) {
              hello.set(Integer.parseInt(array[0]));
              world.set(array[1]);
      
              context.write(hello, world);
            }
          }
        }

    查看结果:

    [root@myggg eclipse]# hadoop fs -cat /myoutput/part-r-00000
    0    o
    1    b
    2    c
    3    d
    4    e
    5    f
    6    g
    7    z

    使用hive建立外部表查看数据:

    create external table employee(id int, name string)
    row format delimited fields terminated by '	'
    lines terminated by '
    '
    stored as textfile
    location '/myoutput';

    然后就可以进行相关查询与处理了。

  • 相关阅读:
    冗余链接-684-并查集
    Chrome浏览器进程
    BFC布局规则
    Front-end 前端优化总结
    Flex弹性布局
    Browse兼容性问题
    组合关系与组合模式
    YUI3组件框架之plugin
    javascript数据类型及转换
    矩阵打印
  • 原文地址:https://www.cnblogs.com/dslover/p/4337849.html
Copyright © 2011-2022 走看看