zoukankan      html  css  js  c++  java
  • Nginx+Flume+Hadoop日志分析,Ngram+AutoComplete

    配置Nginx

    • yum install nginx (在host99和host101)
    • service nginx start开启服务
    • ps -ef |grep nginx看一下进程
    ps -ef |grep nginx
    root     28230     1  0 14:54 ?        00:00:00 nginx: master process /usr/sbin/nginx -c /etc/nginx/nginx.conf
    nginx    28231 28230  0 14:54 ?        00:00:00 nginx: worker process                   
    nginx    28232 28230  0 14:54 ?        00:00:00 nginx: worker process                   
    nginx    28234 28230  0 14:54 ?        00:00:00 nginx: worker process                   
    nginx    28235 28230  0 14:54 ?        00:00:00 nginx: worker process 
    ....
    View Code
    • 在本机浏览器也可以通过ip + port(默认80端口)访问

    HTML

    • 很简单的网页配置,在/root/html下放了一个index.html文件
    • 先test一下是否正常,在browser里面输入ip地址就可以看到了。
    • 下一步配置一个输入框,模拟搜索数据。
    • index.html是特别简单的代码,(我是前端渣- -
    • <!DOCTYPE html>
      <html>
          <body>
      
              <form action="/result.html" method="GET">
                  Please Input:<br>
                  <input type="text" name="Input" value="Mickey">
                  <br>
                  <br>
                  <input type="submit" value="Submit">
              </form> 
      
          </body>
      </html>
      View Code
    • 跳转到的页面result.html就随便写一句话啦

    log日志

    • log日志放在/var/log/nginx目录的access.log中。
    • tail -f access.log

      在输入框输入数据,提交之后会看到log刷新,比如我输入“test”提交后log会刷新一条:

    • 10.109.255.90 - - [11/May/2017:14:24:16 +0800] "GET /action_page.php?Input=test HTTP/1.1" 404 169 "http://10.3.242.101/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/602.4.8 (KHTML, like Gecko) Version/10.0.3 Safari/602.4.8"

    配置Flume

    • 安装:(在host101)
    • wget http://mirrors.tuna.tsinghua.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz
      tar -xvf apache-flume-1.7.0-bin.tar.gz
    • 测试:
      • 在conf文件夹下新建一个test.conf文件如下:
      • # Name the components on this agent
        a1.sources = r1
        a1.sinks = k1
        a1.channels = c1
        
        # Describe/configure the source
        a1.sources.r1.type = netcat
        a1.sources.r1.bind = localhost
        a1.sources.r1.port = 44444
        
        # Describe the sink
        a1.sinks.k1.type = logger
        
        # Use a channel which buffers events in memory
        a1.channels.c1.type = memory
        a1.channels.c1.capacity = 1000
        a1.channels.c1.transactionCapacity = 100
        
        # Bind the source and sink to the channel
        a1.sources.r1.channels = c1
        a1.sinks.k1.channel = c1
        View Code
      • 然后,运行
      • bin/flume-ng agent -n a1 -c conf -f conf/test.conf
      • 然后在logs文件夹下运行如下,刷新log
      • tail -f flume.log
      • 然后telnet localhost 44444。随便输入一些数据。log刷新显示出
      • 11 May 2017 15:26:55,372 INFO  [SinkRunner-PollingRunner-DefaultSinkProcessor] (org.apache.flume.sink.LoggerSink.process:95)  - Event: { headers:{} body: 64 61 6B 6A 64 63 0D                            dakjdc. }
      • 就没问题啦

    Nginx(WebServer)端 

    • Nginx端配置source-side flume agent来收集访问日志流数据
    • WebAccLog.sources = NginxAccess
      WebAccLog.sinks = AvroSink
      WebAccLog.channels = MemChannel
      
      WebAccLog.sources.NginxAccess.type = exec
      
      WebAccLog.sources.NginxAccess.command = tail -f /var/log/nginx/access.log
      WebAccLog.sources.NginxAccess.batchSize = 10
      
      WebAccLog.sources.NginxAccess.interceptors = itime
      WebAccLog.sources.NginxAccess.interceptors.itime.type = timestamp
      
      WebAccLog.sinks.AvroSink.type = avro
      WebAccLog.sinks.AvroSink.hostname = 10.3.242.99
      WebAccLog.sinks.AvroSink.port = 4545
      
      WebAccLog.channels.MemChannel.type = memory
      WebAccLog.sinks.AvroSink.channel = MemChannel
      WebAccLog.sources.NginxAccess.channels = MemChannel
    • 注意到我们在这里配置对source配置了一下interceptor拦截器。flume中source采集的日志首先会传入ChannelProcessor,在其内首先会通过interceptors进行过滤加工,然后通过ChannelSelector选择channel。
    • 这里配置了一个时间戳拦截器,后面会在指定hdfs.path的时候用到(根据时间戳来指定存放路径)。
    • 关于拦截器:flume内部实现了很多拦截器,同时还是先虑InterceptorChain用来链式处理event。
      • HostInterceptor:在所有拦截的events的header中上加上本机的host name或IP
      • TimestampInterceptor:在所有拦截的events的header中上加上它处理该时间的时间(in millis)。
    • 这里之前写的时候source忘记加s了,然后查一下log才发现的问题,说实话这个conf文件好容易眼花,要改的东西太多- -。

    Hadoop/Spark cluster端

    • Hadoop/Spark cluster端配置receiver- side flume agent来接收数据.
    • 首先使用logger sink来测试下是否一切正常
      • 启动cluster端的flume,然后在web server端运行
      • bin/flume-ng avro-client -c ./conf -H 10.3.242.99 -p 4545 -F /var/log/nginx/access.log
      • 然后cluster端的flume会在log里面打出一堆数据,说明可以了。
    • 接下来,把两边连通
    • 下面是一个HDFS sink的例子,其他需求可以通过修改sink部分实现。
    • # clusterLogAgent
      
      # Naming the components of the current agent.
      clusterLogAgent.sources = AvroSource
      clusterLogAgent.sinks = HDFS
      clusterLogAgent.channels = MemChannel
      
      # Source configuration
      clusterLogAgent.sources.AvroSource.type = avro
      # hostname or IP address to listen on
      clusterLogAgent.sources.AvroSource.bind = 0.0.0.0
      clusterLogAgent.sources.AvroSource.port = 4545
      
      # sink configuration(write to HDFS)
      clusterLogAgent.sinks.HDFS.type = hdfs
      clusterLogAgent.sinks.HDFS.hdfs.path = /logFlume/nginx/accesslog
      # File format: currently SequenceFile, DataStream or CompressedStream
      clusterLogAgent.sinks.HDFS.hdfs.fileType = DataStream
      # Number of events written to file before it rolled (0 = never roll based on number of events)
      clusterLogAgent.sinks.HDFS.hdfs.rollCount = 0
      
      clusterLogAgent.channels.MemChannel.type = memory
      
      clusterLogAgent.sources.AvroSource.channels = MemChannel
      clusterLogAgent.sinks.HDFS.channel = MemChannel

    启动

    • 首先开启cluster上的flume服务(这里要注意开启顺序
    • ./bin/flume-ng agent -n clusterLogAgent -c conf -f conf/flume-hdfsSink.conf
    • 开启Nginx端的flume服务
    • bin/flume-ng agent -n WebAccLog -c conf -f conf/flume-avro.conf
    • check HDFS来验证流访问log events是否成功写入
    • [root@host99 /home/hhh/apache-flume-1.7.0-bin/conf]$hadoop fs -ls /logFlume/nginx/accesslog
      17/05/11 17:25:47 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
      Found 3 items
      -rw-r--r--   2 root supergroup       1146 2017-05-11 17:15 /logFlume/nginx/accesslog/FlumeData.1494494113448
      -rw-r--r--   2 root supergroup       1190 2017-05-11 17:15 /logFlume/nginx/accesslog/FlumeData.1494494113449
      -rw-r--r--   2 root supergroup        952 2017-05-11 17:16 /logFlume/nginx/accesslog/FlumeData.1494494113450

    • [root@host101 /home/hhh/apache-flume-1.7.0-bin/conf]$hadoop fs -cat /logFlume/nginx/accesslog/FlumeData.1494494113448
      17/05/11 17:19:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
      10.109.255.90 - - [11/May/2017:16:56:35 +0800] "GET /result.html?Input=ttt HTTP/1.1" 200 23 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/602.4.8 (KHTML, like Gecko) Version/10.0.3 Safari/602.4.8"
      10.109.255.90 - - [11/May/2017:16:56:45 +0800] "GET /result.html?Input=ttt HTTP/1.1" 200 23 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/602.4.8 (KHTML, like Gecko) Version/10.0.3 Safari/602.4.8"
      10.109.255.90 - - [11/May/2017:16:57:04 +0800] "GET /result.html?Input=hhhhhh HTTP/1.1" 200 23 "http://10.3.242.101/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/602.4.8 (KHTML, like Gecko) Version/10.0.3 Safari/602.4.8"
      10.109.255.90 - - [11/May/2017:16:57:07 +0800] "GET /result.html?Input=hhhhhh HTTP/1.1" 200 23 "http://10.3.242.101/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/602.4.8 (KHTML, like Gecko) Version/10.0.3 Safari/602.4.8"

    Flume conf调优

    可优化的配置:

    • Avro Sink:
      • batch-size: 一次发送多少数据,默认是10
      • compression-type:压缩类型,默认不压缩。
      • compression-level: 压缩等级
      • maxIoWorkers:最大的I/O worker线程个数,默认2*机器最大可用处理器数目
    • Avro Source:

      • threads:最大worker线程数

    • HDFS sink:

      • HDFS sink支持创建text和sequence文件;支持压缩;

      • 文件可以被rolled。rolled基于elapse time或数据size或events数量

      • 可以根据events的属性(比如时间戳或机器)来划分数据(buckets/partitions data)
      • HDFS路径可以包含格式化转移序列(formatiting escape sequences),HDFS可以据其产生一个目录/文件名来存储events。如下:
      • AliasDescription
        %{host} Substitute value of event header named “host”. Arbitrary header names are supported.
        %t Unix time in milliseconds
        %a locale’s short weekday name (Mon, Tue, ...)
        %A locale’s full weekday name (Monday, Tuesday, ...)
        %b locale’s short month name (Jan, Feb, ...)
        %B locale’s long month name (January, February, ...)
        %c locale’s date and time (Thu Mar 3 23:05:25 2005)
        %d day of month (01)
        %e day of month without padding (1)
        %D date; same as %m/%d/%y
        %H hour (00..23)
        %I hour (01..12)
        %j day of year (001..366)
        %k hour ( 0..23)
        %m month (01..12)
        %n month without padding (1..12)
        %M minute (00..59)
        %p locale’s equivalent of am or pm
        %s seconds since 1970-01-01 00:00:00 UTC
        %S second (00..59)
        %y last two digits of year (00..99)
        %Y year (2010)
        %z +hhmm numeric timezone (for example, -0400)
        %[localhost] Substitute the hostname of the host where the agent is running
        %[IP] Substitute the IP address of the host where the agent is running
        %[FQDN] Substitute the canonical hostname of the host where the agent is running
      • 一些可能的优化配置:
        • hdfs.rollSize:触发roll的文件大小,in bytes,默认是1024
        • hdfs.rollCount: 在roll之前写入文件的events数目,默认是10。设置为0的话就不会根据events数量roll了。
        • hdfs.rollInterval:roll当前文件所等待的秒数,默认是9=30。设置为0的话就不会根据时间间隔roll了。
        • hdfs.batchSize: 在flushed到HDFS之前写入到文件的events数目
        • hdfs.threadsPoolSize:每个HDFS sink中HDFS IO操作的线程数目,默认10

    已完成的优化

    • hdfs文件目录:按日期分目录
    • # flume-hdfs.sink

      clusterLogAgent.sinks.HDFS.hdfs.path = /logFlume/nginx/%y.%m.%d/

       然后就变成啦酱紫:

      

    • 配置滚动文件的大小,避免产生一堆小文件
    • #clusterLogAgent.sinks.HDFS.hdfs.rollSize =64*1024*1024
      clusterLogAgent.sinks.HDFS.hdfs.rollSize = 67108864
      clusterLogAgent.sinks.HDFS.hdfs.rollCount = 0 
      clusterLogAgent.sinks.HDFS.hdfs.rollInterval = 0

    TBD

    • 压缩
    • 线程、worker数目
    • ...
    • flume内部是可以做一些过滤的。基于当前场景的话,是要用到正则的,主要是考虑这样的话会不会很影响效率。因为flume内部也不适合做复杂的过滤。

    Log Analyse

     搜索词分析-Ngram

    • 提取搜索词:正则。
    • 10.109.255.90 - - [12/May/2017:14:31:46 +0800] "GET /result.html?Input=Mickey HTTP/1.1" 304 0 "http://10.3.242.101/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_3) AppleWebKit/602.4.8 (KHTML, like Gecko) Version/10.0.3 Safari/602.4.8"
      10.30.146.74 - - [12/May/2017:14:33:55 +0800] "GET / HTTP/1.1" 304 0 "-" "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/55.0.2883.87 Safari/537.36"
      

      可以看到就是要提取 "?Input=Mickey"这一部分。用正则“?Input=[^s]*s”来匹配这段。

       还有一点就是看到有的log是没有input,状态是304的。这种请求是因为用了浏览器缓存。考虑到搜索结果不能是固定的,所以在conf里面加了一句add_header Cache-Control no-store;来禁用缓存

    • 在mapper的中建Pattern,而不是在map()函数中每次初始化
    • 用搜索词做Ngram,这里直接假设搜索词已经是'+'-split(在搜索框space分隔的数据会在log里变成+分隔的)。【TODO:后续可以实现切分
    • MapReduce实现:
      • Job1: SplitNgram --> 正则匹配搜索词,切分搜索词,统计ngram出现次数;
      • Job2: GetNgram --> 过滤count小于指定threshold的ngram,同时得到每个origin词联想到的topK。

     模拟搜索

    • nili小可爱真是可怜又坚强 什么也没有 零数据支持
    • 模拟的方式简单,就写一个程序(python or java or scala or whatever),就好啦
    • 但是要稍微有点意义的搜索词,目前想到的方法就是找一堆数据,脚本一边读一边向nginx请求。
    • 模拟http请求的demo如下:
      • import httplib
        
        coon = httplib.HTTPConnection('host101')
        word = 'test'
        conn.request('GET', 'result.html?input=' + word)
    • 以及搜索数据目前能找到的方式就是,我在kaggle上看到的一个quora的比赛,可以拿到一些quora的问题,这在某种程度上也是搜索数据了吧。hhh自我满足ing...[所以其实后续想再扩展的话可以考虑爬quora呀23333]
    • 加上搜索数据的整个模拟demo如下:【注:连接的效率等问题暂未考虑..
      • def get_search_word(file_name):
            # id, qid1, qid2, question1, question2, is_duplicate
            with open(file_name) as fi:
                for line in fi:
                    splited = line.split(",")
                    if len(splited) < 6:
                        continue
                    conn = httplib.HTTPConnection('10.3.242.101')
                    conn.request('GET', 'result.html?input=' + splited[3])
                    # time.sleep(5)
                    conn = httplib.HTTPConnection('10.3.242.101')
                    conn.request('GET', 'result.html?input=' + splited[4])
                    time.sleep(5)
        
    满地都是六便士,她却抬头看见了月亮。
  • 相关阅读:
    qemu -net tap配置上网
    qemu所支持的网卡
    linux fdisk
    linux dd命令
    busybox inittab
    busybox相关的工具
    mongoDB全文索引
    MySql 内存表使用
    oracle数据迁移到mysql
    centOS下安装mysql workbench详细步骤
  • 原文地址:https://www.cnblogs.com/wttttt/p/6829807.html
Copyright © 2011-2022 走看看