zoukankan      html  css  js  c++  java
  • Hadoop辅助工具——Flume、Sqoop

      

    前言

      在一个完整的离线大数据处理系统中,除了hdfs+mapreduce+hive组成分析系统的核心之外,还需要数据采集、结果数据导出、任务调度等不可或缺的辅助系统,而这些辅助工具在hadoop生态体系中都有便捷的开源框架,如图所示:

    1. Flume日志采集框架

    1.1 Flume介绍

    1.1.1 概述

    • Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。
    • Flume可以采集文件,socket数据包、文件、文件夹、kafka等各种形式源数据,又可以将采集到的数据(下沉sink)输出到HDFS、hbase、hive、kafka等众多外部存储系统中
    • 一般的采集需求,通过对flume的简单配置即可实现
    • Flume针对特殊场景也具备良好的自定义扩展能力

      因此,flume可以适用于大部分的日常数据采集场景。

    1.1.2 运行机制

    1. Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成。
    2. 每一个agent相当于一个数据传递员(Source 到 Channel 到 Sink之间传递数据的形式是Event事件,Event事件是一个数据流单元),内部有3个组件:

        a) Source:采集组件,用于跟数据源对接,以获取数据

        b) Sink:下沉组件,用于往下一级agent传递数据或者往最终存储系统传递数据

        c) Channel:传输通道组件,用于从source将数据传递到sink

     

    1.1.3 Flume采集系统结构图

     1. 简单结构——单个agent采集数据

     

    2. 复杂结构——多级agent之间串联

     

    1.2 Flume实战案例

    1.2.1 Flume的安装部署

      1、Flume的安装非常简单,只需要解压即可,当然,前提是已有hadoop环境。上传安装包到数据源所在节点上

      然后解压  tar -zxvf apache-flume-1.6.0-bin.tar.gz

      然后进入flume的目录,修改conf下的flume-env.sh,在里面配置JAVA_HOME

      2、根据数据采集的需求配置采集方案,描述在配置文件中(文件名可任意自定义)

      3、指定采集方案配置文件,在相应的节点上启动flume agent

      先用一个最简单的例子来测试一下程序环境是否正常

      1、先在flume的conf目录下新建一个配置文件(采集方案)

      vi   netcat-logger.properties

    # 定义这个agent中各组件的名字
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # 描述和配置source组件:r1
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # 描述和配置sink组件:k1
    a1.sinks.k1.type = logger
    
    # 描述和配置channel组件,此处使用是内存缓存的方式
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # 描述和配置source  channel   sink之间的连接关系
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

      2、启动agent去采集数据

    bin/flume-ng agent -c conf -f conf/netcat-logger.conf -n a1  -Dflume.root.logger=INFO,console

      -c conf   指定flume自身的配置文件所在目录

      -f conf/netcat-logger.con  指定我们所描述的采集方案

      -n a1  指定我们这个agent的名字

      3、测试

      先要往agent的source所监听的端口上发送数据,让agent有数据可采。随便在一个能跟agent节点联网的机器上。

      telnet anget-hostname  port   (telnet localhost 44444) 

    1.2.2 采集案例

      1、采集日志目录中的文件到HDFS

      结构示意图:

      采集需求:某服务器的某特定目录下,会不断产生新的文件,每当有新文件出现,就需要把文件采集到HDFS中去

      根据需求,首先定义以下3大要素

    1. 数据源组件,即source ——监控文件目录 :  spooldir

        spooldir特性:

          1、监视一个目录,只要目录中出现新文件,就会采集文件中的内容

          2、采集完成的文件,会被agent自动添加一个后缀:COMPLETED

          3、所监视的目录中不允许重复出现相同文件名的文件

    1. 下沉组件,即sink——HDFS文件系统  :  hdfs sink
    2. 通道组件,即channel——可用file channel 也可以用内存channel

      配置文件编写:

    #定义三大组件的名称
    agent1.sources = source1
    agent1.sinks = sink1
    agent1.channels = channel1
    
    # 配置source组件
    agent1.sources.source1.type = spooldir
    agent1.sources.source1.spoolDir = /home/hadoop/logs/
    agent1.sources.source1.fileHeader = false
    
    #配置拦截器
    agent1.sources.source1.interceptors = i1
    agent1.sources.source1.interceptors.i1.type = host
    agent1.sources.source1.interceptors.i1.hostHeader = hostname
    
    # 配置sink组件
    agent1.sinks.sink1.type = hdfs
    agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M
    agent1.sinks.sink1.hdfs.filePrefix = access_log
    agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
    agent1.sinks.sink1.hdfs.batchSize= 100
    agent1.sinks.sink1.hdfs.fileType = DataStream
    agent1.sinks.sink1.hdfs.writeFormat =Text
    agent1.sinks.sink1.hdfs.rollSize = 102400
    agent1.sinks.sink1.hdfs.rollCount = 1000000
    agent1.sinks.sink1.hdfs.rollInterval = 60
    #agent1.sinks.sink1.hdfs.round = true
    #agent1.sinks.sink1.hdfs.roundValue = 10
    #agent1.sinks.sink1.hdfs.roundUnit = minute
    agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
    # Use a channel which buffers events in memory
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.keep-alive = 120
    agent1.channels.channel1.capacity = 500000
    agent1.channels.channel1.transactionCapacity = 600
    
    # Bind the source and sink to the channel
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1

      Channel参数解释:

      capacity:默认该通道中最大的可以存储的event数量

      trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量

      keep-alive:event添加到通道中或者移出的允许时间

      测试阶段,启动flume agent的命令:

    bin/flume-ng  agent  -c  ./conf  -f ./dir-hdfs.conf -n  agent1 -Dflume.root.logger=DEBUG,console

      -D后面跟的是log4j的参数,用于测试观察

      生产中,启动flume,应该把flume启动在后台:

    nohup bin/flume-ng  agent  -c  ./conf  -f ./dir-hdfs.conf -n  agent1 1>/dev/null 2>&1 &

    2、采集文件到HDFS

      采集需求:比如业务系统使用log4j生成的日志,日志内容不断增加,需要把追加到日志文件中的数据实时采集到hdfs

      根据需求,首先定义以下3大要素

    1. 采集源,即source——监控文件内容更新 :  exec  ‘tail -F file’
    2. 下沉目标,即sink——HDFS文件系统  :  hdfs sink
    3. Source和sink之间的传递通道——channel,可用file channel 也可以用 内存channel

      配置文件编写:

    agent1.sources = source1
    agent1.sinks = sink1
    agent1.channels = channel1
    
    # Describe/configure tail -F source1
    agent1.sources.source1.type = exec
    agent1.sources.source1.command = tail -F /home/hadoop/logs/access_log
    agent1.sources.source1.channels = channel1
    
    #configure host for source
    agent1.sources.source1.interceptors = i1
    agent1.sources.source1.interceptors.i1.type = host
    agent1.sources.source1.interceptors.i1.hostHeader = hostname
    
    # Describe sink1
    agent1.sinks.sink1.type = hdfs
    #a1.sinks.k1.channel = c1
    agent1.sinks.sink1.hdfs.path =hdfs://hdp-node-01:9000/weblog/flume-collection/%y-%m-%d/%H-%M
    agent1.sinks.sink1.hdfs.filePrefix = access_log
    agent1.sinks.sink1.hdfs.maxOpenFiles = 5000
    agent1.sinks.sink1.hdfs.batchSize= 100
    agent1.sinks.sink1.hdfs.fileType = DataStream
    agent1.sinks.sink1.hdfs.writeFormat =Text
    agent1.sinks.sink1.hdfs.rollSize = 102400
    agent1.sinks.sink1.hdfs.rollCount = 1000000
    agent1.sinks.sink1.hdfs.rollInterval = 60
    agent1.sinks.sink1.hdfs.round = true
    agent1.sinks.sink1.hdfs.roundValue = 10
    agent1.sinks.sink1.hdfs.roundUnit = minute
    agent1.sinks.sink1.hdfs.useLocalTimeStamp = true
    
    # Use a channel which buffers events in memory
    agent1.channels.channel1.type = memory
    agent1.channels.channel1.keep-alive = 120
    agent1.channels.channel1.capacity = 500000
    agent1.channels.channel1.transactionCapacity = 600
    
    # Bind the source and sink to the channel
    agent1.sources.source1.channels = channel1
    agent1.sinks.sink1.channel = channel1

    3、两个agent级联

    1.3 更多source和sink组件

      Flume支持众多的source和sink类型,详细手册可参考官方文档 http://flume.apache.org/FlumeUserGuide.html

    2. sqoop数据迁移工具

    2.1 概述

      sqoop是apache旗下一款“Hadoop和关系数据库服务器之间传送数据”的工具。

      导入数据:MySQL,Oracle导入数据到Hadoop的HDFS、HIVE、HBASE等数据存储系统;

      导出数据:从Hadoop的文件系统中导出数据到关系数据库mysql等

    2.2 工作机制

      将导入或导出命令翻译成mapreduce程序来实现,在翻译出的mapreduce中主要是对inputformat和outputformat进行定制。

    2.3 sqoop实战及原理

    2.3.1 sqoop安装

      安装sqoop的前提是已经具备java和hadoop的环境

      1、下载并解压

      最新版下载地址http://ftp.wayne.edu/apache/sqoop/1.4.6/

      2、修改配置文件

        $ cd $SQOOP_HOME/conf

        $ mv sqoop-env-template.sh sqoop-env.sh

      打开sqoop-env.sh并编辑下面几行:

    export HADOOP_COMMON_HOME=/home/hadoop/apps/hadoop-2.6.1/ 
    export HADOOP_MAPRED_HOME=/home/hadoop/apps/hadoop-2.6.1/
    export HIVE_HOME=/home/hadoop/apps/hive-1.2.1

      3、加入mysql的jdbc驱动包

        cp  ~/app/hive/lib/mysql-connector-java-5.1.28.jar   $SQOOP_HOME/lib/

      4、验证启动

        $ cd $SQOOP_HOME/bin

        $ sqoop-version

      预期的输出:

        15/12/17 14:52:32 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6

        Sqoop 1.4.6 git commit id 5b34accaca7de251fc91161733f906af2eddbe83

        Compiled by abe on Fri Aug 1 11:19:26 PDT 2015

      到这里,整个Sqoop安装工作完成。

     

      验证sqoop到mysql业务库之间的连通性:

        bin/sqoop-list-databases --connect jdbc:mysql://localhost:3306 --username root --password root

        bin/sqoop-list-tables --connect jdbc:mysql://localhost:3306/userdb --username root --password root

    2.4 Sqoop的数据导入

      “导入工具”导入单个表从RDBMS到HDFS。表中的每一行被视为HDFS的记录。所有记录都存储为文本文件的文本数据(或者Avro、sequence文件等二进制数据) 

    2.4.1 语法

      下面的语法用于将数据导入HDFS

    $ sqoop import (generic-args) (import-args) 

    2.4.2 示例

    表数据

      在mysql中有一个库userdb中三个表:emp, emp_add和emp_conn

      表emp:

    id

    name

    deg

    salary

    dept

    1201

    gopal

    manager

    50,000

    TP

    1202

    manisha

    Proof reader

    50,000

    TP

    1203

    khalil

    php dev

    30,000

    AC

    1204

    prasanth

    php dev

    30,000

    AC

    1205

    kranthi

    admin

    20,000

    TP

      表emp_add:

    id

    hno

    street

    city

    1201

    288A

    vgiri

    jublee

    1202

    108I

    aoc

    sec-bad

    1203

    144Z

    pgutta

    hyd

    1204

    78B

    old city

    sec-bad

    1205

    720X

    hitec

    sec-bad

      表emp_conn:

    id

    phno

    email

    1201

    2356742

    gopal@tp.com

    1202

    1661663

    manisha@tp.com

    1203

    8887776

    khalil@ac.com

    1204

    9988774

    prasanth@ac.com

    1205

    1231231

    kranthi@tp.com

    导入表表数据到HDFS

      下面的命令用于从MySQL数据库服务器中的emp表导入HDFS

    bin/sqoop import   
    --connect jdbc:mysql://hdp-node-01:3306/test   
    --username root  
    --password root   
    --table emp   
    --m 1  

      如果成功执行,那么会得到下面的输出

    14/12/22 15:24:54 INFO sqoop.Sqoop: Running Sqoop version: 1.4.5
    14/12/22 15:24:56 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
    INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-hadoop/compile/cebe706d23ebb1fd99c1f063ad51ebd7/emp.jar
    -----------------------------------------------------
    O mapreduce.Job: map 0% reduce 0%
    14/12/22 15:28:08 INFO mapreduce.Job: map 100% reduce 0%
    14/12/22 15:28:16 INFO mapreduce.Job: Job job_1419242001831_0001 completed successfully
    -----------------------------------------------------
    -----------------------------------------------------
    14/12/22 15:28:17 INFO mapreduce.ImportJobBase: Transferred 145 bytes in 177.5849 seconds (0.8165 bytes/sec)
    14/12/22 15:28:17 INFO mapreduce.ImportJobBase: Retrieved 5 records.

      为了验证在HDFS导入的数据,请使用以下命令查看导入的数据

    $ $HADOOP_HOME/bin/hadoop fs -cat /user/hadoop/emp/part-m-00000

      emp表的数据和字段之间用逗号(,)表示。

    1201, gopal,    manager, 50000, TP
    1202, manisha,  preader, 50000, TP
    1203, kalil,    php dev, 30000, AC
    1204, prasanth, php dev, 30000, AC
    1205, kranthi,  admin,   20000, TP

    导入到HDFS指定目录

      在导入表数据到HDFS使用Sqoop导入工具,我们可以指定目标目录。

      以下是指定目标目录选项的Sqoop导入命令的语法。

    --target-dir <new or exist directory in HDFS>

    下面的命令是用来导入emp_add表数据到'/queryresult'目录。

    bin/sqoop import 
    --connect jdbc:mysql://hdp-node-01:3306/test 
    --username root 
    --password root 
    --target-dir /queryresult 
    --fields-terminated-by ‘001--table emp 
    --split-by id
    --m 1

      注意:如果报错,说emp类找不到,则可以手动从sqoop生成的编译目录(/tmp/sqoop-root/compile)中,找到这个emp.class和emp.jar,拷贝到sqoop的lib目录下:

      如果设置了 --m 1,则意味着只会启动一个maptask执行数据导入

      如果不设置 --m 1,则默认为启动4个map task执行数据导入,则需要指定一个列来作为划分map task任务的依据

      下面的命令是用来验证 /queryresult 目录中 emp_add表导入的数据形式。

     $HADOOP_HOME/bin/hadoop fs -cat /queryresult/part-m-*

      它会用逗号(,)分隔emp_add表的数据和字段。

    1201, 288A, vgiri,   jublee
    1202, 108I, aoc,     sec-bad
    1203, 144Z, pgutta,  hyd
    1204, 78B,  oldcity, sec-bad
    1205, 720C, hitech,  sec-bad

    导入关系表到HIVE

    bin/sqoop import --connect jdbc:mysql://hdp-node-01:3306/test --username root --password root --table emp --hive-import  --split-by id  --m 1

    导入表数据子集

      我们可以导入表的使用Sqoop导入工具,"where"子句的一个子集。它执行在各自的数据库服务器相应的SQL查询,并将结果存储在HDFS的目标目录。

      where子句的语法如下。

    --where <condition>

      下面的命令用来导入emp_add表数据的子集。子集查询检索员工ID和地址,居住城市为:Secunderabad

    bin/sqoop import 
    --connect jdbc:mysql://hdp-node-01:3306/test 
    --username root 
    --password root 
    --where "city ='sec-bad'" 
    --target-dir /wherequery 
    --table emp_add 
     --m 1

      按需导入

    bin/sqoop import 
    --connect jdbc:mysql://hdp-node-01:3306/test 
    --username root 
    --password root 
    --target-dir /wherequery2 
    --query 'select id,name,deg from emp WHERE id>1207 and $CONDITIONS' 
    --split-by id 
    --fields-terminated-by '	' 
    --m 2

      下面的命令用来验证数据从emp_add表导入/wherequery目录

    $HADOOP_HOME/bin/hadoop fs -cat /wherequery/part-m-*

      它用逗号(,)分隔 emp_add表数据和字段。

    1202, 108I, aoc, sec-bad
    1204, 78B, oldcity, sec-bad
    1205, 720C, hitech, sec-bad

    增量导入

      增量导入是仅导入新添加的表中的行的技术。

      sqoop支持两种增量MySql导入到hive的模式,

      一种是append,即通过指定一个递增的列,比如:

      --incremental append  --check-column num_id --last-value 0

      另种是可以根据时间戳,比如:

      --incremental lastmodified --check-column created --last-value '2012-02-01 11:0:00'

      就是只导入created 比'2012-02-01 11:0:00'更大的数据。

     

      1/ append模式

      它需要添加‘incremental’, ‘check-column’, 和 ‘last-value’选项来执行增量导入。

      下面的语法用于Sqoop导入命令增量选项。

    --incremental <mode>
    --check-column <column name>
    --last value <last check column value>

      假设新添加的数据转换成emp表如下:

      1206, satish p, grp des, 20000, GR

      下面的命令用于在EMP表执行增量导入。

    bin/sqoop import 
    --connect jdbc:mysql://hdp-node-01:3306/test 
    --username root 
    --password root 
    --table emp --m 1 
    --incremental append 
    --check-column id 
    --last-value 1208

      以下命令用于从emp表导入HDFS emp/ 目录的数据验证。

    $ $HADOOP_HOME/bin/hadoop fs -cat /user/hadoop/emp/part-m-*
    1201, gopal,    manager, 50000, TP
    1202, manisha,  preader, 50000, TP
    1203, kalil,    php dev, 30000, AC
    1204, prasanth, php dev, 30000, AC
    1205, kranthi,  admin,   20000, TP
    1206, satish p, grp des, 20000, GR

      下面的命令是从表emp 用来查看修改或新添加的行

    $HADOOP_HOME/bin/hadoop fs -cat /emp/part-m-*1
    1206, satish p, grp des, 20000, GR

    2.5 Sqoop的数据导出

      1/ 将数据从HDFS把文件导出到RDBMS数据库

      导出前,目标表必须存在于目标数据库中。

    1. 默认操作是从将文件中的数据使用INSERT语句插入到表中
    2. 更新模式下,是生成UPDATE语句更新表数据

      语法

      以下是export命令语法。

    $ sqoop export (generic-args) (export-args) 

      示例

      数据是在HDFS 中“EMP/”目录的emp_data文件中。所述emp_data如下:

    1201, gopal,     manager, 50000, TP
    1202, manisha,   preader, 50000, TP
    1203, kalil,     php dev, 30000, AC
    1204, prasanth,  php dev, 30000, AC
    1205, kranthi,   admin,   20000, TP
    1206, satish p,  grp des, 20000, GR

      1、首先需要手动创建mysql中的目标表

    $ mysql
    mysql> USE db;
    mysql> CREATE TABLE employee ( 
       id INT NOT NULL PRIMARY KEY, 
       name VARCHAR(20), 
       deg VARCHAR(20),
       salary INT,
       dept VARCHAR(10));

      2、然后执行导出命令

    bin/sqoop export 
    --connect jdbc:mysql://hdp-node-01:3306/test 
    --username root 
    --password root 
    --table employee 
    --export-dir /user/hadoop/emp/

      3、验证表mysql命令行。

    mysql>select * from employee;
    如果给定的数据存储成功,那么可以找到数据在如下的employee表。
    +------+--------------+-------------+-------------------+--------+
    | Id   | Name         | Designation | Salary            | Dept   |
    +------+--------------+-------------+-------------------+--------+
    | 1201 | gopal        | manager     | 50000             | TP     |
    | 1202 | manisha      | preader     | 50000             | TP     |
    | 1203 | kalil        | php dev     | 30000               | AC     |
    | 1204 | prasanth     | php dev     | 30000             | AC     |
    | 1205 | kranthi      | admin       | 20000             | TP     |
    | 1206 | satish p     | grp des     | 20000             | GR     |
    +------+--------------+-------------+-------------------+--------+
  • 相关阅读:
    Maven 分模块,启动父工程时异常
    Maven SSH三大框架整合的加载流程
    Maven配置 和创建一个Maven项目
    课程8:《Maven精品教程视频》--视频目录
    salesforce零基础学习(九十三)Email To Case的简单实现
    salesforce零基础学习(九十二)使用Ant Migration Tool 实现Metadata迁移
    Salesforce Sales Cloud 零基础学习(四) Chatter
    Salesforce Sales Cloud 零基础学习(三) Lead & Opportunity & Quote
    Salesforce Sales Cloud 零基础学习(二) Account 和 Contact
    Salesforce Sales Cloud 零基础学习(一) Product 和 Price Book
  • 原文地址:https://www.cnblogs.com/zhangchao162/p/9896805.html
Copyright © 2011-2022 走看看