zoukankan      html  css  js  c++  java
  • flume-agent实例

    flume
        多种适配,多样化的数据收集
        核心概念
            event:一条消息
            client:访问者
            agent:
                重要组件Sources、Channels、Sinks。Interspactor、Selecter
            
            
    kafka
        吞吐量大,高并发场景下使用

    注意:flume的agent配置文件不允许有空格。

        
        一、flume打印内容到控制台
            1、创建一个agent(使用avroSource接收网络流在flume的控制台打印)配置文件agent1.conf
                cd /usr/local/flume/
                vi /conf/agent1.conf
                    agent1.sources=as1
                    agent1.channels=c1
                    agent1.sinks=s1

                    agent1.sources.as1.type=avro
                    agent1.sources.as1.bind=0.0.0.0            ##接收任意ip发送的数据
                    agent1.sources.as1.port=21111            ##在21111端口上监听
                    agent1.sources.as1.channels=c1
                    agent1.channels.c1.type=memory

                    agent1.sinks.s1.type=logger
                    agent1.sinks.s1.channel=c1
            2、启动agent1(每30秒检查agent1.conf文件一次,检查该文件是否有变化,有变化则马上生效),将输出打印在控制台上
                bin/flume-ng agent --conf conf/ -Dflume.root.logger=DEBUG,console -n agent1 -f conf/agent1.conf
            3、使用java代码生产log4j日志输出到flume
                
            3、验证agent,一种是flume控制台测试,一种是java代码通过log4j写日志
                1)bin/flume-ng avro-client --conf conf/ -H localhost -p 21111 -F ~/a        ##将~目录下的a文件内容写入到flume
                2)使用java类将log4j的日志写入到flume的agent中
                    log4j.properties配置文件
                        log4j.rootLogger=INFO,flume
                        log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
                        log4j.appender.flume.Hostname = 192.168.1.33                                    ##flume启动agent所在的节点ip
                        log4j.appender.flume.Port = 21111                                                ##flume启动agent监听的端口号
                        log4j.appender.flume.UnsafeMode = true
                    
                    java代码    
                        public class FlumeProducer {
                            public static void main(String[] args) throws Exception {
                                final Logger logger = Logger.getLogger(FlumeProducer.class);
                                while (true) {
                                    logger.info("logger datetime :" + System.currentTimeMillis());
                                    Thread.sleep(1000);
                                }
                            }
                        }

        二、flume生成avroLog文件写入到hdfs中,存放到不同的/IP/日期/文件夹中
            1、创建一个agent(使用avroSource接收网络流写入到hdfs)配置文件agent2.conf
                cd /usr/local/flume/
                vi /conf/agent2.conf
                    agent2.sources=source1
                    agent2.channels=channel1
                    agent2.sinks=sink1

                    agent2.sources.source1.type=avro
                    agent2.sources.source1.bind=0.0.0.0
                    agent2.sources.source1.port=44444
                    agent2.sources.source1.channels=channel1
                    
                    agent2.sources.source1.interceptors = i1 i2
                    agent2.sources.source1.interceptors.i1.type = org.apache.flume.interceptor.HostInterceptor$Builder
                    agent2.sources.source1.interceptors.i1.preserveExisting = true
                    agent2.sources.source1.interceptors.i1.useIP = true
                    agent2.sources.source1.interceptors.i2.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

                    
                    agent2.channels.channel1.type=memory
                    agent2.channels.channel1.capacity=10000
                    agent2.channels.channel1.transactionCapacity=1000
                    agent2.channels.channel1.keep-alive=30

                    agent2.sinks.sink1.type=hdfs
                    agent2.sinks.sink1.channel=channel1
                    agent2.sinks.sink1.hdfs.path=hdfs://ns1/flume/events/%{host}/%Y-%m-%d            ##flume将文件写入到hdfs的路径
                    agent2.sinks.sink1.hdfs.filePrefix=avroLog-                                        ##flume生成文件的前缀
                    agent2.sinks.sink1.hdfs.fileSuffix=.log                                            ##flume生成文件的后缀
                    agent2.sinks.sink1.hdfs.fileType=DataStream                                        ##flume生成文件的类型,DataStream或SequenceFile
                    agent2.sinks.sink1.hdfs.writeFormat=Text
                    agent2.sinks.sink1.hdfs.rollInterval=0
                    agent2.sinks.sink1.hdfs.rollSize=10000
                    agent2.sinks.sink1.hdfs.rollCount=0
                    agent2.sinks.sink1.hdfs.idleTimeout=5
            2、启动agent2(每30秒检查agent1.conf文件一次,检查该文件是否有变化,有变化则马上生效),将内容写入到hdfs的/flume/events/中
                bin/flume-ng agent --conf conf/ -Dflume.monitoring.type=http -Dflume.monitoring.port=34343 -n agent2 -f conf/agent2.conf
            3、使用java代码生产log4j日志输出到flume
                log4j.properties配置文件
                    log4j.rootLogger=INFO,flume
                    log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
                    log4j.appender.flume.Hostname = 192.168.1.33                                    ##flume启动agent所在的节点ip
                    log4j.appender.flume.Port = 21111                                                ##flume启动agent监听的端口号
                    log4j.appender.flume.UnsafeMode = true
                    
                java代码    
                    public class FlumeProducer {
                        public static void main(String[] args) throws Exception {
                            final Logger logger = Logger.getLogger(FlumeProducer.class);
                            while (true) {
                                logger.info("logger datetime :" + System.currentTimeMillis());
                                Thread.sleep(1000);
                            }
                        }
                    }            
            4、验证agent2是否成功写入到hdfs的/flume/events/文件夹下
                hdfs dfs -ls -h -R /flume/events/IP/yyyy-MM-dd/                                        ##如果存在一个或多个avroLog.timestamp.log文件表示成功
                
      三、使用Socket客户端写入到flume中,flume保存文件到本地

        1、创建agent_tcp.conf(接收socket客户端发送的数据然后写入到Linux本地)

          cd /usr/local/flume

          vi conf/agent_tcp.conf

            agent_tcp.sources=as1
            agent_tcp.channels=c1
            agent_tcp.sinks=s1

            agent_tcp.sources.as1.type=syslogtcp
            agent_tcp.sources.as1.bind=0.0.0.0
            agent_tcp.sources.as1.port=21111
            agent_tcp.sources.as1.channels=c1

            agent_tcp.channels.c1.type=memory
            agent_tcp.channels.c1.capacity=10000
            agent_tcp.channels.c1.transactionCapacity=10000
            agent_tcp.channels.c1.keep-alive=120
            agent_tcp.channels.c1.byteCapacityBufferPercentage=20
            agent_tcp.channels.c1.byteCapacity=800000
      
            agent_tcp.sinks.s1.type=file_roll
            agent_tcp.sinks.s1.rollSize=10000
            agent_tcp.sinks.s1.sink.directory =/home/lefuBigDataDev/clouds/flume/logs
            agent_tcp.sinks.s1.channel=c1
        2、启动flume的agent_tcp.conf

          bin/flume-ng agent -n agent_tcp -c conf/ -f conf/agent_tcp.conf -Dflume.root.logger=DEBUG,console

        3、java代码socket客户端

          package com.left.clouds.cluster.flume.test;

                  import java.io.InputStream;
                  import java.io.OutputStream;
                  import java.net.Socket;

                  import org.junit.Before;
                  import org.junit.Test;

                  public class TestFlume {

                      private Socket client = null;
                      InputStream in = null;
                      OutputStream out = null;
                    
                      @Before
                      public void before(){
                          try {
                              client = new Socket("192.168.0.218", 21111);
                          } catch (Exception e) {
                              e.printStackTrace();
                          }
                      }
                    
                      @Test
                      public void sender() {
                          try {
                              out = client.getOutputStream();
                              int i = 0;
                              while(true){
                                  out.write(("device-"+(i++)+(" ")).getBytes());
                                  Thread.sleep(4000);
                                  System.out.println("第:"+i+"次发送...");
                              }
                          } catch (Exception e) {
                              e.printStackTrace();
                          }                
                      }

                  }

     

    Flume-1.6.0中包含了kafka的source,agent配置文件实例如下
    front_agent_kafka.sources=as1
    front_agent_kafka.channels=c1
    front_agent_kafka.sinks=s1

    front_agent_kafka.sources.as1.type=org.apache.flume.source.kafka.KafkaSource
    front_agent_kafka.sources.as1.zookeeperConnect=192.168.0.20:2181
    front_agent_kafka.sources.as1.topic=test
    front_agent_kafka.sources.as1.groupId=flume
    front_agent_kafka.sources.as1.batchSize=100
    front_agent_kafka.sources.as1.channels=c1
                                  
    front_agent_kafka.channels.c1.type=memory
    front_agent_kafka.channels.c1.capacity=10000
    front_agent_kafka.channels.c1.transactionCapacity=10000
    front_agent_kafka.channels.c1.keep-alive=120
    front_agent_kafka.channels.c1.byteCapacityBufferPercentage=20
    front_agent_kafka.channels.c1.byteCapacity=800000

    front_agent_kafka.sinks.s1.type=com.lefukj.flume.sinks.JdbcSink
    front_agent_kafka.sinks.s1.channel=c1





















        
        
        
        
        
       

  • 相关阅读:
    Oracle之表空间
    Oracle 数据库实现数据更新:update、merge
    union和union all用法
    SQL Server 使用游标更新数据库中的数据(使用存储过程)
    MDX函数(官方顺序,带示例)
    开窗函数 --over()
    MySql安装与MySQL添加用户、删除用户与授权
    samba服务
    asp.net core系列 58 IS4 基于浏览器的JavaScript客户端应用程序
    asp.net core系列 57 IS4 使用混合流(OIDC+OAuth2.0)添加API访问
  • 原文地址:https://www.cnblogs.com/mengyao/p/4370984.html
Copyright © 2011-2022 走看看