zoukankan      html  css  js  c++  java
  • 数仓项目03:数据采集层(Flume+Kafka)

    1.数据采集层介绍

    生成的日志文件,需要通过flume采集,然后同步至kafaka,再从kafaka 通过flume同步至hdfs,主要是为了练习使用的技术,设计上面不用纠结。

    2.flume source的选择

    Taildir Source

    http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#taildir-source

    Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files. If the new lines are being written, this source will retry reading them in wait for the completion of the write.

    This source is reliable and will not miss data even when the tailing files rotate. It periodically writes the last read position of each files on the given position file in JSON format. If Flume is stopped or down for some reason, it can restart tailing from the position written on the existing position file.

    3.flume channel的选择

    kafka-channel

    http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#kafka-channel

    The events are stored in a Kafka cluster (must be installed separately). Kafka provides high availability and replication, so in case an agent or a kafka broker crashes, the events are immediately available to other sinks

    The Kafka channel can be used for multiple scenarios:

    1. With Flume source and sink - it provides a reliable and highly available channel for events
    2. With Flume source and interceptor but no sink - it allows writing Flume events into a Kafka topic, for use by other apps  我们用这个
    3. With Flume sink, but no source - it is a low-latency, fault tolerant way to send events from Kafka to Flume sinks such as HDFS, HBase or Solr

    4.flume Interceptor

    http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-interceptors

    lume has the capability to modify/drop events in-flight. This is done with the help of interceptors. Interceptors are classes that implement org.apache.flume.interceptor.Interceptor interface. An interceptor can modify or even drop events based on any criteria chosen by the developer of the interceptor. Flume supports chaining of interceptors. 

    自定义一个拦截器

    https://github.com/Eric-chenjy/log_collector.git

    5.flume Channel Selectors

    http://flume.apache.org/releases/content/1.9.0/FlumeUserGuide.html#flume-channel-selectors

    If the type is not specified, then defaults to “replicating”.

    我们用:Multiplexing Channel Selector

    6.flume agent配置文件

    [hadoop@hadoop103 myagents]$ pwd
    /opt/module/flume/myagents
    [hadoop@hadoop103 myagents]$ cat test 
    #a1是agengt的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
    a1.sources = r1
    a1.channels = c1 c2
    a1.sources.r1.type = TAILDIR
    a1.sources.r1.channels = c1 c2
    a1.sources.r1.positionFile = /opt/module/flume/test/taildir_position.json
    a1.sources.r1.filegroups = f1
    a1.sources.r1.filegroups.f1 = /tmp/logs/^app.*log$
    
    #拦截器
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = com.eric.dw.flume.MyInterceptor$Builder
    
    #定义ChannelSelector
    a1.sources.r1.selector.type = multiplexing
    a1.sources.r1.selector.header = topic
    a1.sources.r1.selector.mapping.topic_start = c1
    a1.sources.r1.selector.mapping.topic_event = c2
    
    #channel
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.channels.c1.kafka.topic = topic_start
    #不要header信息
    a1.channels.c1.parseAsFlumeEvent=false
    
    a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.channels.c2.kafka.topic = topic_event
    a1.channels.c2.parseAsFlumeEvent=false
    [hadoop@hadoop103 myagents]$ 
    

     启动

    [hadoop@hadoop103 myagents]$ flume-ng agent -c conf/ -n a1 -f f1.conf -Dflume.root.logger=DEBUG

    7.验证

    8.分发和采集启动脚本编写

    分发

    [hadoop@hadoop103 flume]$ xsync  myagents/
    要分发的文件目录是:/opt/module/flume/myagents
    ----------------hadoop102-----------------
    sending incremental file list
    myagents/
    myagents/test
    
    sent 1266 bytes  received 35 bytes  2602.00 bytes/sec
    total size is 1166  speedup is 0.90
    ----------------hadoop103-----------------
    sending incremental file list
    
    sent 54 bytes  received 13 bytes  44.67 bytes/sec
    total size is 1166  speedup is 17.40
    ----------------hadoop104-----------------
    sending incremental file list
    myagents/
    myagents/test
    
    sent 1266 bytes  received 35 bytes  2602.00 bytes/sec
    total size is 1166  speedup is 0.90
    
    [hadoop@hadoop103 flume]$ xsync lib/
    要分发的文件目录是:/opt/module/flume/lib
    ----------------hadoop102-----------------
    sending incremental file list
    lib/
    lib/ETLInterceptor-1.0-SNAPSHOT.jar
    
    sent 8248 bytes  received 35 bytes  16566.00 bytes/sec
    total size is 52043090  speedup is 6283.12
    ----------------hadoop103-----------------
    sending incremental file list
    
    sent 3375 bytes  received 13 bytes  6776.00 bytes/sec
    total size is 52043090  speedup is 15361.01
    ----------------hadoop104-----------------
    sending incremental file list
    lib/
    lib/ETLInterceptor-1.0-SNAPSHOT.jar
    
    sent 8248 bytes  received 35 bytes  16566.00 bytes/sec
    total size is 52043090  speedup is 6283.12
    [hadoop@hadoop103 flume]$ pwd
    /opt/module/flume
    

     脚本编写 

    [hadoop@hadoop103 bin]$ cat f1
    #!/bin/bash
    #1.start或者stop命令判断
    if(($#!=1))
    then
    	echo  "请输入start|stop参数"
    	exit;
    fi
    #start和stop脚本编写
    cmd=cmd
    if [ $1 = start ]
    then
    	cmd="nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f1.conf -Dflume.root.logger=DEBUG,co
    nsole > /home/hadoop/log/f1.log  2>&1 &"elif [ $1 = stop ]
    then
    	cmd="ps -ef|grep f1.conf |grep -v grep|awk -F ' ' '{print $2}'|xargs kill -9"
    else
    	echo  "请输入start|stop参数"
            exit;
    fi
    #执行脚本
    for i in hadoop102 hadoop103
    do
    	echo $i
    	echo $cmd
    	ssh $i $cmd
    done
    [hadoop@hadoop103 bin]$ 
    

      

    使用

    [hadoop@hadoop103 bin]$ f1 stop
    hadoop102
    ps -ef|grep myagents |grep -v grep|awk -F ' ' '{print $2}'|xargs kill -9
    usage: kill [ -s signal | -p ] [ -a ] pid ...
           kill -l [ signal ]
    hadoop103
    ps -ef|grep myagents |grep -v grep|awk -F ' ' '{print $2}'|xargs kill -9
    usage: kill [ -s signal | -p ] [ -a ] pid ...
           kill -l [ signal ]
    [hadoop@hadoop103 bin]$ f1 start
    hadoop102
    nohup flume-ng agent -c /opt/module/flume/conf/ -n a1 -f /opt/module/flume/myagents/test -Dflume.root.logger=DEBUG,con
    sole > /home/hadoop/log/f1.log 2>&1 &hadoop103
    nohup flume-ng agent -c /opt/module/flume/conf/ -n a1 -f /opt/module/flume/myagents/test -Dflume.root.logger=DEBUG,con
    sole > /home/hadoop/log/f1.log 2>&1 &[hadoop@hadoop103 bin]$ xcall jps
    要执行的命令是: jps
    ----------------hadoop102-----------------
    27412 NodeManager
    7061 QuorumPeerMain
    30165 Jps
    27494 JobHistoryServer
    30088 Application
    27291 DataNode
    27181 NameNode
    11375 Kafka
    ----------------hadoop103-----------------
    5860 Application
    5940 Jps
    9590 ResourceManager
    9335 DataNode
    7690 Kafka
    2588 QuorumPeerMain
    9709 NodeManager
    ----------------hadoop104-----------------
    1489 Jps
    27498 NodeManager
    11403 Kafka
    27293 DataNode
    26670 QuorumPeerMain
    [hadoop@hadoop103 bin]$ 
    

    9.数据从kafaka到hdfs配置:基本配置

    flume相关配置,使用KafkaSource,FileChannel,HDFFSSink

    flume agent配置文件

    [hadoop@hadoop103 myagents]$ pwd
    /opt/module/flume/myagents
    [hadoop@hadoop103 myagents]$ ll
    total 8
    -rw-rw-r-- 1 hadoop hadoop 2386 Nov 20 11:13 f2.conf
    -rw-rw-r-- 1 hadoop hadoop 1166 Nov 16 19:08 test
    [hadoop@hadoop103 myagents]$ cat f2.conf 
    ## 组件
    a1.sources=r1 r2
    a1.channels=c1 c2
    a1.sinks=k1 k2
    
    ## source1
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.sources.r1.kafka.topics=topic_start
    a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
    a1.sources.r1.kafka.consumer.group.id=CG_Start
    ## source2
    a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r2.batchSize = 5000
    a1.sources.r2.batchDurationMillis = 2000
    a1.sources.r2.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092,hadoop104:9092
    a1.sources.r2.kafka.topics=topic_event
    a1.sources.r2.kafka.consumer.auto.offset.reset=earliest
    a1.sources.r2.kafka.consumer.group.id=CG_Event
    ## channel1
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /opt/module/flume/checkpoint/behavior1
    a1.channels.c1.dataDirs = /opt/module/flume/data/behavior1/
    a1.channels.c1.maxFileSize = 2146435071
    a1.channels.c1.capacity = 1000000
    a1.channels.c1.keep-alive = 6
    
    ## channel2
    a1.channels.c2.type = file
    a1.channels.c2.checkpointDir = /opt/module/flume/checkpoint/behavior2
    a1.channels.c2.dataDirs = /opt/module/flume/data/behavior2/
    a1.channels.c2.maxFileSize = 2146435071
    a1.channels.c2.capacity = 1000000
    a1.channels.c2.keep-alive = 6
    
    ## sink1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d
    a1.sinks.k1.hdfs.filePrefix = logstart-
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = second
    
    ##sink2
    a1.sinks.k2.type = hdfs
    a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d
    a1.sinks.k2.hdfs.filePrefix = logevent-
    a1.sinks.k2.hdfs.round = true
    a1.sinks.k2.hdfs.roundValue = 10
    a1.sinks.k2.hdfs.roundUnit = second
    
    ## 不要产生大量小文件
    a1.sinks.k1.hdfs.rollInterval = 10
    a1.sinks.k1.hdfs.rollSize = 134217728
    a1.sinks.k1.hdfs.rollCount = 0
    
    a1.sinks.k2.hdfs.rollInterval = 10
    a1.sinks.k2.hdfs.rollSize = 134217728
    a1.sinks.k2.hdfs.rollCount = 0
    
    ## 控制输出文件是原生文件。
    a1.sinks.k1.hdfs.fileType = CompressedStream 
    a1.sinks.k2.hdfs.fileType = CompressedStream 
    
    a1.sinks.k1.hdfs.codeC = lzop
    a1.sinks.k2.hdfs.codeC = lzop
    
    ## 拼装
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel= c1
    
    a1.sources.r2.channels = c2
    a1.sinks.k2.channel= c2
    [hadoop@hadoop103 myagents]$
    

     启动:flume-ng agent -c conf/ -n a1 -f myagents/f2.conf -Dflume.root.logger=DEBUG,console

    查看:

     

    10.数据从kafaka到hdfs配置:启动脚本编写

    因为第二次采集规划在104执行,所以需要把f2.conf 进行同步

    [hadoop@hadoop103 flume]$ xsync myagents/
    要分发的文件目录是:/opt/module/flume/myagents
    ----------------hadoop102-----------------
    sending incremental file list
    myagents/
    myagents/f1.conf
    myagents/f2.conf
    
    sent 2248 bytes  received 78 bytes  1550.67 bytes/sec
    total size is 3552  speedup is 1.53
    ----------------hadoop103-----------------
    sending incremental file list
    
    sent 77 bytes  received 13 bytes  180.00 bytes/sec
    total size is 3552  speedup is 39.47
    ----------------hadoop104-----------------
    sending incremental file list
    myagents/
    myagents/f1.conf
    myagents/f2.conf
    
    sent 2248 bytes  received 78 bytes  4652.00 bytes/sec
    total size is 3552  speedup is 1.53
    

    启动脚本

    [hadoop@hadoop103 bin]$ pwd
    /home/hadoop/bin
    [hadoop@hadoop103 bin]$ ll
    total 32
    -rwxrw-r-- 1 hadoop hadoop 232 Nov  9 12:10 ct
    -rwxrw-r-- 1 hadoop hadoop 251 Nov 10 20:45 dt
    -rwxrw-r-- 1 hadoop hadoop 562 Nov 20 12:17 f1
    -rwxrw-r-- 1 hadoop hadoop 531 Nov 20 11:24 f2
    -rwxrw-r-- 1 hadoop hadoop 333 Nov 10 22:14 hd
    -rwxrw-r-- 1 hadoop hadoop 265 Nov 13 11:43 kf
    -rwxrw-r-- 1 hadoop hadoop 225 Nov 10 20:28 lg
    -rwxrw-r-- 1 hadoop hadoop 235 Nov 13 10:42 zk
    [hadoop@hadoop103 bin]$ cat f2
    #!/bin/bash
    #1.start或者stop命令判断
    if(($#!=1))
    then
    	echo  "请输入start|stop参数"
    	exit;
    fi
    #start和stop脚本编写
    cmd=cmd
    if [ $1 = start ]
    then
    	cmd="nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f2.conf -Dflume.root.logger=DEBUG
    ,console > /home/hadoop/log/f2.log  2>&1 &"elif [ $1 = stop ]
    then
    	cmd="ps -ef|grep f2.conf  |grep -v grep|awk -F ' ' '{print $2}'|xargs kill -9"
    else
    	echo  "请输入start|stop参数"
            exit;
    fi
    #执行脚本,只在104上运行
    ssh hadoop104 $cmd
    [hadoop@hadoop103 bin]$ 
    

    11.一键启动脚本编写

    [hadoop@hadoop103 bin]$ cat onekeyboot 
    #!/bin/bash
    #一键启动或者停止集群的脚本编写
    #命令传入参数检测
    if(($#!=1))
    then
    	echo "请输入start|stop参数"
    	exit;
    fi
    
    #定义一个函数,用了计算kafka进程启动的个数
    function countKafkaBrokers()
    {
    	count=0
    	count=$(xcall jps|grep Kafka|wc -l)
    	return $count
    }
    #启动
    if [ $1 = start ]
    then
    	hd start
    	zk start
    	kf start
    	#kafka启动完成后,才能启动f1和f2
    	while [ 1 ]
    	do
    		countKafkaBrokers
    		if(($?==3))
    		then
    			break
    		fi
    		sleep 2s
    	done
    	f1 start
    	f2 start
    	xcall jps
    	exit;
    elif [ $1 = stop ]
    then
    	f1 stop
    	f2 stop
    	kf stop
    	#kafka停止完毕,才能停止zookeepser
    	while [ 1 ]
            do
                    countKafkaBrokers
                    if(($?==0))
                    then
                            break
                    fi
                    sleep 2s
            done
    	zk stop
    	hd stop
    	xcall jps
    	exit;
    else
    	echo "请输入start|stop参数"
    	exit;
    fi		
    [hadoop@hadoop103 bin]$ 
    

      

      

     

  • 相关阅读:
    【HTML5校企公益课】第二天
    【Python】IO编程
    【HTML5校企公益课】第一天
    【c++ primer, 5e】构造函数 & 拷贝、赋值和析构
    【c++ primer, 5e】定义类相关的非成员函数
    NHibernate -- HQL
    SQL 根据时间和打印状态抽取记录
    下载工具
    NHibernate
    使用代码辅助生成工具CodeSmith -- 生成NHibernate的映射文件
  • 原文地址:https://www.cnblogs.com/jycjy/p/6520074.html
Copyright © 2011-2022 走看看