zoukankan      html  css  js  c++  java
  • Flume_企业中日志处理

    企业中的日志存放_1

    201611/20161112.log.tmp
      第二天文件变为20161112.log与20161113.log.tmp
    拷贝一份flume-conf.properties.template改名为dir-mem-hdfs.properties
    实现监控某一目录,如有新文件产生则上传至hdfs,另外过滤掉新文件中tmp文件
    dir-mem-hdfs.properties
      a1.sources = s1
      a1.channels = c1
      a1.sinks = k1
      # defined the source
      a1.sources.s1.type = spooldir
      a1.sources.s1.spoolDir = /opt/data/log_hive/20161109
      a1.sources.s1.includePattern = ([^ ]*.log$) # 包含某些字段
      a1.sources.s1.ignorePattern = ([^ ]*.tmp$)  # 忽略某些字段
      # defined the channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 1000
      # defined the sink
      a1.sinks.k1.type = hdfs
      a1.sinks.k1.hdfs.useLocalTimeStamp = true
      a1.sinks.k1.hdfs.path = /flume/spdir
      a1.sinks.k1.hdfs.fileType = DataStream 
      a1.sinks.k1.hdfs.rollInterval = 0
      a1.sinks.k1.hdfs.rollSize = 20480
      a1.sinks.k1.hdfs.rollCount = 0
      # The channel can be defined as follows.
      a1.sources.s1.channels = c1
      a1.sinks.k1.channel = c1
    flmue目录下执行
      bin/flume-ng agent -c conf/ -n a1 -f conf/dir-mem-hdfs.properties -Dflume.root.logger=INFO,console
      这里使用了memory channel,可以使用file channel更加安全
    

     企业中的日志存放_2

    201611/20161112.log
      第二天文件继续往20161112.log写
    这样,既要使用exec和spoolingdir,如何处理
    编译flume1.7版tail dir source,并集成到我们已有的flume环境
      1. window上下载安装git 
      2. 在某个目录下加一个空的文件夹(文件夹路径尽量不要有中文),例GitHub
      3. 使用github常用命令
        $ pwd
        $ ls
        $ cd /C/Users/Administrator/Desktop/GitHub
        $ git clone (https|git)://github.com/apache/flume.git
        $ cd flume
        $ git branch -r # 查看有哪些分支
        $ git branch -r # 查看当前属于哪个分支
        $ git checkout origin/flume-1.7 #别换分支
      拷贝flumeflume-ng-sourcesflume-taildir-source
      使用eclipse导入flume-taildir-source项目
      修改pom.xml
      <repositories>
        <repository>
          <id>cloudera</id>
          <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
      </repositories>
      <modelVersion>4.0.0</modelVersion>
      <groupId>org.apache.flume.flume-ng-sources</groupId>
      <artifactId>flume-taildir-source</artifactId>
      <version>1.5.0-cdh5.3.6</version>
      <name>Flume Taildir Source</name>
      <build>
        <plugins>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>2.3.2</version>
            <configuration>
              <source>1.7</source>
              <target>1.7</target>
            </configuration>
          </plugin>
        </plugins>
      </build>
      <dependencies>
        <dependency>
          <groupId>org.apache.flume</groupId>
          <artifactId>flume-ng-core</artifactId>
          <version>1.5.0-cdh5.3.6</version>
        </dependency>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.10</version>
          <scope>test</scope>
        </dependency>
      </dependencies>
      4. MAVEN_BULID项目,获取jar包并放到当前flume的环境中(lib目录) 
      5. 创建文件夹和文件
        $ mkdir -p /opt/cdh-5.6.3/apache-flume-1.5.0-cdh5.3.6-bin/position
        $ mkdir -p /opt/data/tail/hadoop-dir/
        $ echo "" > /opt/data/tail/hadoop.log
        拷贝一份flume-conf.properties.template改名为tail-mem-hdfs.properties
        可从源码看出需要的参数
          a1.sources = s1
          a1.channels = c1
          a1.sinks = k1
          # defined the source
          a1.sources.s1.type = org.apache.flume.source.taildir.TaildirSource
          a1.sources.s1.positionFile = /opt/cdh-5.6.3/apache-flume-1.5.0-cdh5.3.6-bin/position/taildir_position.json
          a1.sources.s1.filegroups = f1 f2
          a1.sources.s1.filegroups.f1 = /opt/data/tail/hadoop.log
          a1.sources.s1.filegroups.f2 = /opt/data/tail/hadoop-dir/.*
          a1.sources.s1.headers.f1.headerKey1 = value1
          a1.sources.s1.headers.f2.headerKey1 = value2-1
          a1.sources.s1.headers.f2.headerKey2 = value2-2
          a1.sources.s1.fileHeader = true
          # defined the channel
          a1.channels.c1.type = memory
          a1.channels.c1.capacity = 1000
          a1.channels.c1.transactionCapacity = 1000
          # defined the sink
          a1.sinks.k1.type = hdfs
          a1.sinks.k1.hdfs.useLocalTimeStamp = true
          a1.sinks.k1.hdfs.path = /flume/spdir
          a1.sinks.k1.hdfs.fileType = DataStream 
          a1.sinks.k1.hdfs.rollInterval = 0
          a1.sinks.k1.hdfs.rollSize = 20480
          a1.sinks.k1.hdfs.rollCount = 0
          # The channel can be defined as follows.
          a1.sources.s1.channels = c1
          a1.sinks.k1.channel = c1
      flmue目录下执行
        bin/flume-ng agent -c conf/ -n a1 -f conf/tail-mem-hdfs.properties -Dflume.root.logger=INFO,console
        测试文件或新数据
    

     企业中常用架构 Flume多sink

    同一份数据采集到不同框架处理
    采集source: 一份数据
    管道channel: 多个
    目标sink: 多个
    如果多个sink从一个channel取数据将取不完整,而source会针对channel分别发送
    设计: source--hive.log channel--file sink--hdfs(不同路径)
    拷贝一份flume-conf.properties.template改名为hive-file-sinks.properties
    hive-file-sinks.properties
      a1.sources = s1
      a1.channels = c1 c2
      a1.sinks = k1 k2
      # defined the source
      a1.sources.s1.type = exec
      a1.sources.s1.command = tail -F /opt/cdh-5.6.3/hive-0.13.1-cdh5.3.6/logs/hive.log
      a1.sources.s1.shell = /bin/sh -c
      # defined the channel 1
      a1.channels.c1.type = file
      a1.channels.c1.checkpointDir = /opt/cdh-5.6.3/apache-flume-1.5.0-cdh5.3.6-bin/datas/checkp1
      a1.channels.c1.dataDirs = /opt/cdh-5.6.3/apache-flume-1.5.0-cdh5.3.6-bin/datas/data1
      # defined the channel 2
      a1.channels.c2.type = file
      a1.channels.c2.checkpointDir = /opt/cdh-5.6.3/apache-flume-1.5.0-cdh5.3.6-bin/datas/checkp2
      a1.channels.c2.dataDirs = /opt/cdh-5.6.3/apache-flume-1.5.0-cdh5.3.6-bin/datas/data2
      # defined the sink 1
      a1.sinks.k1.type = hdfs
      a1.sinks.k1.hdfs.path = /flume/hdfs/sink1
      a1.sinks.k1.hdfs.fileType = DataStream 
      # defined the sink 2
      a1.sinks.k2.type = hdfs
      a1.sinks.k2.hdfs.path = /flume/hdfs/sink2
      a1.sinks.k2.hdfs.fileType = DataStream 
      # The channel can be defined as follows.
      a1.sources.s1.channels = c1 c2
      a1.sinks.k1.channel = c1
      a1.sinks.k2.channel = c2		
    flmue目录下执行
      bin/flume-ng agent -c conf/ -n a1 -f conf/hive-file-sinks.properties -Dflume.root.logger=INFO,console
    hive目录下执行
      bin/hive -e "show databases"		
    
  • 相关阅读:
    sqlite表结构动态读取工具(Chole ORM框架)
    tomcat可以访问默认页面,但是无法访问webapp下的指定项目
    C#连接solr时提示 java内存异常 (jetty和tomcat哪个更High) java.lang.OutOfMemoryError
    Service Mesh
    Java 使用 UnixSocket 调用 Docker API
    ffmpeg-python 任意提取视频帧
    应用性能测试神器 Gatling,你用过吗?
    多语言应用性能监控系统:Elastic APM
    Ceph Plugin
    MAC iterm2 常用快捷键大全
  • 原文地址:https://www.cnblogs.com/eRrsr/p/6097355.html
Copyright © 2011-2022 走看看