zoukankan      html  css  js  c++  java
  • Flume

    Flume

    第一节:简介

    一、概念

    flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(比如文本、HDFS、Hbase等)的能力 。

    做数据收集的工具,主要用于日志文件的收集。是一个单独的数据的采集的系统的,不在web项目controller,service,dao的三层中的。

    二、flume的架构

    1、NG架构(flume1.X

    source:数据源,数据的来源,相当于对应用系统的日志文件进行读取。

    channel:通道相当于一个缓冲区,作用就是缓冲source和sink的数据的速度不一致。

    sink:数据的目的地,相当于将收集的数据写出到最终的目的地。

    2、OG架构(flume0.9

    Agent->collector->master

    第二节:flume的核心概念

    一、event

    数据收集,流的基本单位,flume进行数据读写的基本单元。

    包含header(用以描述数据),body收集的数据(用以存储数据的,里面包含键值对,以及16进制的数字)

    二、agent

    数据收集代理,这里可以理解为一个数据收集服务器,一个代理包含flume的一套的组件的source、channel、sink。

    三、三大组件

    1、source

    数据源,需要进行收集的日志信息的来源。

    (1)avro source

    监听avro协议的端口,当监听的端口中有数据流入的时候会被avro source监听到。

    节点之间的传输一般也用avro协议。

    (2)exec source

    监听一个linux命令,指定一个需要监听的命令,当这个命令产生的数据就会被exec source监听到---进行采集。

    (3)Spooling Directory Source

    监听的是一个目录,这个目录是不能有子目录的。监听当前目录下的文件的数据的变化,一旦当前目录下文件的数据有变化就会被Spooling Directory Source监听到,就可以进行数据读取,一旦数据读取完成,当前的这个文件的名字就会加一个后缀.complete代表当前文件的数据读取完成。

    (4)NetCat TCP Source

    监听TCP协议的端口,一旦端口中有数据就可以被采集。

    2、channel

    数据通道,临时存储读取(source)的数据信息,给sink使用。可以使用内存、磁盘。

    (1)Memory Channel

    将数据存储在内存中,内存队列。

    (2)FileChannel

    event存储在文件中

    3、sink

    数据收集的目的地,将采集的数据进行最终的处理。

    (1)hdfs sink

    将采集的数据,写入到hdfs下

    (2)Logger sink

    将我们采集的数据,放在控制台打印,一般用于测试。

    (3)avro sink

    将采集的数据写入到制定的端口中

    第三节:flume的搭建

    第四节:flume的案例

    一、简介

    Source、channel、sink依赖于配置文件,使用flume必须写配置文件,这个配置文件必须是以 .conf ,.properties这两种类型的文件。

    二、运行命令

    flume-ng agent --conf conf --conf-file /home/hadoop/case_one_flume.conf --name a1 -Dflume.root.logger=INFO,console

    --conf 指定运行的方式

    --conf-file 指定配置文件的路径

    --name指定agent的名字,和配置文件中对应的

    三、案例

    # example.conf: A single-node Flume configuration

    # 这里的a1指的是agent的名字  标识agent的  自定义的  但是注意:同一个机器上的两个agent一定不能重复  不同节点上的agent是可以一样的

    # Name the components on this agent

    # 定义的是当前的agent的  source  sink  channel的名字

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # 设定source的类型和相关参数

    a1.sources.r1.type=spooldir

    a1.sources.r1.spoolDir=/home/hadoop/flumetest

    # 设定channel

    a1.channels.c1.type=memory

    # 设定sink

    a1.sinks.k1.type=logger

    # Bind the source and sink to the channel

    # 设定source的通道

    a1.sources.r1.channels = c1

    # 设定sink的通道

    a1.sinks.k1.channel = c1

    第五节:flume的拦截器

    一、简介

    数据流: source---channel(缓冲)---sink

    拦截器就是用于拦截source--sink的数据,对数据进行包装,对数据添加一个数据头信息 headers中封装的,sink在接受到数据的时候经过拦截器处理的数据,有头信息数据的,sink就可以根据不同数据的头信息进行筛选,分类。

    二、类型

    1、时间戳

    Timestamp Interceptor

    添加的表头信息

    headers:{timestamp=1558759325176}

    # 指定当前agent a1的 sources sinks  channels 的别名

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # agent的数据源的

    a1.sources.r1.type = netcat

    a1.sources.r1.bind = localhost

    a1.sources.r1.port = 44444

    # 指定agent的通道

    a1.channels.c1.type = memory

    # 指定agent的sink的

    a1.sinks.k1.type = logger

    # 指定拦截器

    # 指定拦截器的别名

    a1.sources.r1.interceptors = i1

    # 指定类型的

    a1.sources.r1.interceptors.i1.type = host

    # 绑定agent的  r1   c1   k1

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    2、主机名拦截器

    Host Interceptor

    添加的表头信息

    headers:{timestamp=1558759325176}

    # 指定当前agent a1的 sources sinks  channels 的别名

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # agent的数据源的

    a1.sources.r1.type = netcat

    a1.sources.r1.bind = localhost

    a1.sources.r1.port = 44444

    # 指定agent的通道

    a1.channels.c1.type = memory

    # 指定agent的sink的

    a1.sinks.k1.type = logger

    # 指定拦截器

    # 指定拦截器的别名

    a1.sources.r1.interceptors = i1

    # 指定类型的

    a1.sources.r1.interceptors.i1.type = host

    # 绑定agent的  r1   c1   k1

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

    3、静态拦截器

    Static Interceptor

    静态拦截器,自己静态指定,自己静态指定拦截器的 key value的值

    # 指定当前agent a1的 sources sinks  channels 的别名

    a1.sources = r1

    a1.sinks = k1

    a1.channels = c1

    # agent的数据源的

    a1.sources.r1.type = netcat

    a1.sources.r1.bind = localhost

    a1.sources.r1.port = 44444

    # 指定agent的通道

    a1.channels.c1.type = memory

    # 指定agent的sink的

    a1.sinks.k1.type = logger

    # 指定拦截器

    # 指定拦截器的别名

    a1.sources.r1.interceptors = i1

    # 指定类型的

    a1.sources.r1.interceptors.i1.type = static

    a1.sources.r1.interceptors.i1.key = country

    a1.sources.r1.interceptors.i1.value = CN

    # 绑定agent的  r1   c1   k1

    a1.sources.r1.channels = c1

    a1.sinks.k1.channel = c1

  • 相关阅读:
    Web网页数据抽取软件的设计与实现
    以Groovy的方式更稳定地解析HTML(转载)
    HTML 资讯汲取(上篇) 使用 JDOM 、 TagSoup 及 XPath
    html解析
    HTML 資訊汲取(下篇) TagSoup 輸出 namespace 問題的解決方案
    国外免费主机空间
    ASP.NET获取客户端IP地址、系统版本、浏览器版本
    阁下莫非就是当年华山论剑武功独步天下罕有其匹号称一朵梨花压海棠的少林寺智障大师收养的小沙弥低能的爱犬旺财踩扁的蟑螂小强曾滚过的一个粪球?
    html中table里的col标签
    Visual Studio 2010 中的 TODO
  • 原文地址:https://www.cnblogs.com/lizm166/p/13354834.html
Copyright © 2011-2022 走看看