zoukankan      html  css  js  c++  java
  • flume将数据发送到kafka、hdfs、hive、http、netcat等模式的使用总结

    1、source为http模式,sink为logger模式,将数据在控制台打印出来。
    conf配置文件如下:
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
     
    # Describe/configure the source
    a1.sources.r1.type = http #该设置表示接收通过http方式发送过来的数据
    a1.sources.r1.bind = hadoop-master #运行flume的主机或IP地址都可以
    a1.sources.r1.port = 9000#端口
    #a1.sources.r1.fileHeader = true
     
    # Describe the sink
    a1.sinks.k1.type = logger#该设置表示将数据在控制台打印出来
     
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
     
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    启动flume命令为:
    bin/flume-ng agent -c conf -f conf/http.conf -n a1 -Dflume.root.logger=INFO,console。
    显示如下的信息表示启动flume成功。
    895 (lifecycleSupervisor-1-3) [INFO -org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started
    打开另外一个终端,通过http post的方式发送数据:
    curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:9000。
    hadoop-master就是flume配置文件绑定的主机名,9000就是绑定的端口。
    然后在运行flume的窗口就是看到如下的内容:
    2018-06-12 08:24:04,472 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{timestampe=1234567, host=master} body: 62 61 64 6F 75 20 66 6C 75 6D 65 badou flume }
     
    2、source为netcat(udp、tcp模式),sink为logger模式,将数据打印在控制台
    conf配置文件如下:
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
     
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = hadoop-master#绑定的主机名或IP地址
    a1.sources.r1.port = 44444
     
    a1.sinks.k1.type = logger
     
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transcationCapacity = 100
     
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    启动flume
    bin/flume-ng agent -c conf -f conf/netcat.conf -n a1 -Dflume.root.logger=INFO,console。
     
    然后在另外一个终端,使用telnet发送数据:
    命令为:telnet hadoop-maser 44444
     
    [root@hadoop-master ~]# telnet hadoop-master 44444
    Trying 192.168.194.6...
    Connected to hadoop-master.
    Escape character is '^]'.
    显示上面的信息表示连接flume成功,然后输入:
    12213213213
    OK
    12321313
    OK
    在flume就会收到相应的信息:
    2018-06-12 08:38:51,129 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 32 31 33 32 31 33 32 31 33 0D 12213213213. }
    2018-06-12 08:38:51,130 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 32 33 32 31 33 31 33 0D 12321313. }
     
    3、source为netcat/http模式,sink为hdfs模式,将数据存储在hdfs中。
    conf配置文件如下,文件名为hdfs.conf:
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
     
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = hadoop-master
    a1.sources.r1.port = 44444
     
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type =regex_filter
    a1.sources.r1.interceptors.i1.regex =^[0-9]*$
    a1.sources.r1.interceptors.i1.excludeEvents =true
     
    # Describe the sink
    #a1.sinks.k1.type = logger
    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hdfs.path = hdfs:/flume/events #文件在hdfs文件系统中存放的位置
    a1.sinks.k1.hdfs.filePrefix = events- #文件的前缀
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 10
    a1.sinks.k1.hdfs.roundUnit = minute
    a1.sinks.k1.hdfs.fileType = DataStream #制定文件的存放格式,这个设置是以text的格式存放从flume传输过来的数据。
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
     
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
     
    在hdfs文件系统中创建文件存放的路径:
    hadoop fs -mkdir /flume/event1。
     
    启动flume:
    bin/flume-ng agent -c conf -f conf/hdfs.conf -n a1 -Dflume.root.logger=INFO,console
     
    通过telnet模式向flume中发送文件:
    telnet hadoop-master 44444
    然后输入:
    aaaaaaaa
    bbbbbbb
    ccccccccc
    dddddddddd
     
    通过如下的命令hadoop fs -ls /flume/events/查看hdfs中的文件,可以看到hdfs中有/flume/events有如下文件:
    -rw-r--r-- 3 root supergroup 16 2018-06-05 06:02 /flume/events/events-.1528203709070
    -rw-r--r-- 3 root supergroup 5 2018-06-05 06:02 /flume/events/events-.1528203755556
    -rw-r--r-- 3 root supergroup 11 2018-06-05 06:03 /flume/events/events-.1528203755557
    -rw-r--r-- 3 root supergroup 26 2018-06-13 07:28 /flume/events/events-.1528900112215
    -rw-r--r-- 3 root supergroup 209 2018-06-13 07:29 /flume/events/events-.1528900112216
    -rw-r--r-- 3 root supergroup 72 2018-06-13 07:29 /flume/events/events-.1528900112217
    通过hadoop fs -cat /flume/events/events-.1528900112216查看文件events-.1528900112216的内容:
    aaaaaaaaaaaaaaaaa
    bbbbbbbbbbbbbbbb
    ccccccccccccccccccc
    dddddddddddddddd
    eeeeeeeeeeeeeeeeeee
    fffffffffffffffffffffff
    gggggggggggggggggg
    hhhhhhhhhhhhhhhhhhhhhhh
    iiiiiiiiiiiiiiiiiii
    jjjjjjjjjjjjjjjjjjj
     
    http模式就是把hdfs.conf文件中的netcat改为http,然后传输文件从telnet改为:
    curl -X POST -d '[{"headers":{"timestampe":"1234567","host":"master"},"body":"badou flume"}]' hadoop-master:44444。
    在hadoop文件中就会看到上面命令传输的内容:badou flume。
     
    4、source为netcat/http模式,sink为hive模式,将数据存储在hive中,并分区存储。
    conf配置如下,文件名为hive.conf:
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = hadoop-master
    a1.sources.r1.port = 44444
     
    # Describe the sink
    #a1.sinks.k1.type = logger
    a1.channels = c1
    a1.sinks = k1
    a1.sinks.k1.type = hive
    a1.sinks.k1.hive.metastore=thrift://hadoop-master:9083
    a1.sinks.k1.hive.database=default#hive数据库名
    a1.sinks.k1.hive.table=flume_user1
    a1.sinks.k1.serializer=DELIMITED
    a1.sinks.k1.hive.partition=3#如果以netcat模式,只能静态设置分区的值,因为netcat模式传输数据,无法传输某个字段的值,只能按照顺序来。这里设置age的分区值为3。
    #a1.sinks.k1.hive.partition=%{age}#如果以http或json等模式,只能动态设置分区的值,因为http模式可以动态传输age的值。
    a1.sinks.k1.serializer.delimiter=" "
    a1.sinks.k1.serializer.serderSeparator=' '
    a1.sinks.k1.serializer.fieldnames=user_id,user_name
    a1.sinks.k1.hive.txnsPerBatchAsk = 10
    a1.sinks.k1.hive.batchSize = 1500
     
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
     
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
     
    在hive中创建表:
    create table flume_user(
    user_id int
    ,user_name string
    )
    partitioned by(age int)
    clustered by (user_id) into 2 buckets
    stored as orc
     
    在hive-site.xml中添加如下内容:
    <property>
    <name>javax.jdo.option.ConnectionPassword</name>
    <value>hive</value>
    <description>password to use against metastore database</description>
    </property>
     
    <property>
    <name>hive.support.concurrency</name>
    <value>true</value>
    </property>
    <property>
    <name>hive.exec.dynamic.partition.mode</name>
    <value>nonstrict</value>
    </property>
    <property>
    <name>hive.txn.manager</name>
    <value>org.apache.hadoop.hive.ql.lockmgr.DbTxnManager</value>
    </property>
    <property>
    <name>hive.compactor.initiator.on</name>
    <value>true</value>
    </property>
    <property>
    <name>hive.compactor.worker.threads</name>
    <value>1</value>
    </property>
     
    将hive根目录下的/hcatalog/share/hcatalog文件夹中的如下三个文件夹添加到flume的lib目录下。
    运行flume:
    bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。
     
    重新打开一个窗口,
    启动metastroe服务:
    hive --service metastore &
    重新打开一个客户端,通过telnet连接到flume
    telnet hadoop-master 44444
    然后输入:
    1 1
    3 3
    就会在hive中看到如下两行数据:
    flume_user1.user_id flume_user1.user_name flume_user1.age
    1 1 3
    3 3 3
    age是在hive.conf中设置的值3。
     
    现在将flume的source换成http模式,然后hive分区通过参数模式动态的传输分区值。
    将hive.conf中的
    a1.sources.r1.type = netcat改成a1.sources.r1.type = http
    a1.sinks.k1.hive.partition=3改成a1.sinks.k1.hive.partition=%{age}。
    然后启动flume:
    bin/flume-ng agent -c conf -f conf/hive.conf -n a1 -Dflume.root.logger=INFO,console。
    在重新打开的窗口中通过http的模式传输数据到flume
    curl -X POST -d '[{"headers":{"age":"109"},"body":"11 ligongong"}]' hadoop-master:44444。
    在hive中可以看到如下的数据:
    flume_user1.user_id flume_user1.user_name flume_user1.age
    11 ligongong 109
    由此可以看出通过http模式传输数据到hive中时,分区字段的信息是在header中传输,而其他字段的信息是放在bady中传输,并且不同列之间以hive.conf文件定义好的分隔符分隔。
     
    5、使用avro模式,将数据在控制台打印出来。
    不同的agent之间传输数据只能通过avro模式。
    这里我们需要两台服务器来演示avro的使用,两台服务器分别是hadoop-master和hadoop-slave2
    hadoop-master中运行agent2,然后指定agent2的sink为avro,并且将数据发送的主机名设置为hadoop-slave2。hadoop-master中flume的conf文件设置如下,名字为push.conf:
    #Name the components on this agent
    a2.sources= r1
    a2.sinks= k1
    a2.channels= c1
     
    #Describe/configure the source
    a2.sources.r1.type= netcat
    a2.sources.r1.bind= hadoop-master
    a2.sources.r1.port = 44444
    a2.sources.r1.channels= c1
     
    #Use a channel which buffers events in memory
    a2.channels.c1.type= memory
    a2.channels.c1.keep-alive= 10
    a2.channels.c1.capacity= 100000
    a2.channels.c1.transactionCapacity= 100000
     
    #Describe/configure the source
    a2.sinks.k1.type= avro#制定sink为avro
    a2.sinks.k1.channel= c1
    a2.sinks.k1.hostname= hadoop-slave2#指定sink要发送数据到的目的服务器名
    a2.sinks.k1.port= 44444#目的服务器的端口
     
     
    hadoop-slave2中运行的是agent1,agent1的source为avro。flume配置内容如下,文件名为pull.conf
    #Name the components on this agent
    a1.sources= r1
    a1.sinks= k1
    a1.channels= c1
     
    #Describe/configure the source
    a1.sources.r1.type= avro
    a1.sources.r1.channels= c1
    a1.sources.r1.bind= hadoop-slave2
    a1.sources.r1.port= 44444
     
    #Describe the sink
    a1.sinks.k1.type= logger
    a1.sinks.k1.channel = c1
     
    #Use a channel which buffers events in memory
    a1.channels.c1.type= memory
    a1.channels.c1.keep-alive= 10
    a1.channels.c1.capacity= 100000
    a1.channels.c1.transactionCapacity= 100000。
    现在hadoop-slave2中启动flume,然后在hadoop-master中启动flume,顺序一定要对,否则会报如下的错误:org.apache.flume.FlumeException: java.net.SocketException: Unresolved address
     
    在hadoop-slave2中启动flume:
    bin/flume-ng agent -c conf -f conf/pull.conf -n a1 -Dflume.root.logger=INFO,console
    在hadoop-master中启动flume:
    bin/flume-ng agent -c conf -f conf/push.conf -n a2 -Dflume.root.logger=INFO,console
     
    重新打开一个窗口,通过telnet连接到hadoop-master
    telnet hadoop-master 44444
    然后发送11111aaaa
    在hadoop-slave2的控制台中就会显示之前发送的,11111aaaa,如下所示:
    2018-06-14 06:43:00,686 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:94)] Event: { headers:{} body: 31 31 31 31 31 61 61 61 61 0D 11111aaaa. }
     
     
    6、通过flume将数据通传输到kafka,然后通过kafka将数据存储在hdfs和hive中。
    首先要配置kafka。配置kafka请参考:https://blog.csdn.net/zxy987872674/article/details/72466504
    在分别在hadoop-master、hadoop-slave1、hadoop-slave2上启动zookeeper。
    命令为:
    然后启动kafka,进入kafka的安装目录,执行命令:
    ./bin/kafka-server-start.sh config/server.properties &
    在kafka中创建topic:
    bin/kafka-topics.sh --create --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --replication-factor 1 --partitions 2 --topic flume_kafka
     
    查看kafka中的topic:
    bin/kafka-topics.sh --list --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181
     
    启动kafka的消费者:
    ./kafka-console-consumer.sh --zookeeper hadoop-master:2181,hadoop-slave1:2181,hadoop-slave2:2181 --topic flume_kafka
     
    配置flume中conf文件,设置source类型为exec,sink为org.apache.flume.sink.kafka.KafkaSink,设置kafka的topic为上面创建的flume_kafka,具体配置如下:
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
     
    # Describe/configure the source
    #设置sources的类型为exec,就是执行命令的意思
    a1.sources.r1.type = exec
    #设置sources要执行的命令
    a1.sources.r1.command = tail -f /home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt
     
    # 设置kafka接收器
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    # 设置kafka的broker地址和端口号
    a1.sinks.k1.brokerList=hadoop-master:9092
    # 设置Kafka的topic
    a1.sinks.k1.topic=flume_kafka
    # 设置序列化的方式
    a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
     
    # use a channel which buffers events in memory
    a1.channels.c1.type=memory
    a1.channels.c1.capacity = 100000
    a1.channels.c1.transactionCapacity = 1000
     
    # Bind the source and sink to the channel
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
     
    启动flume:
    只要/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt中有数据时flume就会加载kafka中,然后被上面启动的kafka消费者消费掉。
    我们查看发现/home/hadoop/flumeHomeWork/flumeCode/flume_exec_test.txt文件中有如下的数据:
    131,dry pasta
    131,dry pasta
    132,beauty
    133,muscles joints pain relief
    133,muscles joints pain relief
    133,muscles joints pain relief
    133,muscles joints pain relief
    134,specialty wines champagnes
    134,specialty wines champagnes
    134,specialty wines champagnes
  • 相关阅读:
    nethogs命令执行报异常的解决方法
    性能监控
    Linux图形化监控网络流量:speedometer查看流量
    JMeter监控Slave机器是否执行
    安全测试robots
    在ASP.NET MVC4中实现同页面增删改查,无弹出框02,增删改查界面设计
    在ASP.NET MVC4中实现同页面增删改查,无弹出框01,Repository的搭建
    报错:非介入式客户端验证规则中的验证类型名称必须唯一。下列验证类型出现重复
    ObjectStateManager 中已存在具有同一键的对象。ObjectStateManager 无法跟踪具有相同键的多个对象
    在ASP.NET MVC中使用Knockout实践09,自定义绑定
  • 原文地址:https://www.cnblogs.com/redhat0019/p/9442402.html
Copyright © 2011-2022 走看看