zoukankan      html  css  js  c++  java
  • Nginx=>Flume=>Kafka 流程总结

    nginx=>flume=>kafka

    1. 编写flume 日志收集文件

    nginx日志
    access.log====>flume

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /root/logs/access.log
    a1.sources.r1.shell = /bin/sh -c
    
    a1.channels.c1.type = memory
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    # 这个地址是kafka的监听地址
    a1.sinks.k1.brokerList = spark001:9092
    # 注意这里的topic是zk的topic
    a1.sinks.k1.topic = test
    a1.sinks.k1.batchSize = 5
    a1.sinks.k1.requiredAcks =1
    
    #a1.sinks.k1.type = logger
    
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    1. 关于Kafka的部署

    Step 1: Start the zookeeper #得到QuorumPeerMain进程

    zkServer.sh start

    [hadoop@hadoop000 conf]$ vim zoo.cfg
    #修改数据的临时存放目录,默认是在tmp目录下的
    dataDir=/home/hadoop/app/tmp/zk
    #:wq 保存退出

    Step 2: Start the server 得到==》kafka进程

    kafka-server-start.sh -deamon $KAFKA_HOME/config/server.properties

    配置,修改解压目录下config配置文件

    配置server.properties[需要注意的地方]
    broker.id=0 解释:是kafka,一个broker
    listeners 解释:监听的端口
    host.name 解释:当前机器
    log.dirs 解释:kafka的日志存放目录
    zookeeper.connect zk的地址

    修改好之后,保存退出

    Step 3: Create a topic # 注意和之前flume的topic对应起来

    kafka-topics.sh -create -zookeeper spark001:2181 -replication-factor 1 -partitions 1 -topic test

    ps:kafka-topics.sh -list -zookeeper spark001:2181

    Step 4: 开启之前的agent (作为生产者)

    flume-ng agent --name a1 --conf . --conf-file ./lamda_imooc.conf -Dflume.root.logger=INFO,console

    步骤4:扩展:控制台作为生产者

    kafka-console-producer.sh --broker-list spark001:9092 --topic test

    Step 5: Start a consumer (消费者)

    kafka-console-consumer.sh --zookeeper spark001:2181 --topic test

    最后 确保flume监听的文件在不断产生新的日志
    文件位置 /root/logs/access.log

    #!/bin/bash
       i=1;
       j=0;
       while (test $i -le 6170967 )
         do
           j=`expr $i + 9`	
           sed -n $i,$j'p' /root/logs/all.log >> /root/logs/access.log
           i=`expr $i + 10`
           sleep 5
          # echo $i
         done
      #新建这个shell文件,把这个shell文件执行起来
    

    上面的操作执行之后,就会收到刷屏的结果,哈哈哈!!
    nginx=>flume=>kafka
    补充:
    head -n 100 大日志文件.log 100_access.log
    wc -l 100_access.log

    spark-submit --master local[5] 
    --jars $(echo /root/apps/hbase-1.2.0-cdh5.7.0/lib/*.jar | tr ' ' ',') 
    --class com.csylh.spark.project.spark.ImoocStatStreamingApp 
    --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0 
    /root/jars/streaming-1.0-SNAPSHOT.jar 
    spark001:2181 test test 1
    
    
  • 相关阅读:
    Mac下PyCharm快捷键大全
    SQL语句优化
    There was a problem confirming the ssl certificate: [SSL: TLSV1_ALERT_PROTOCOL_VERSION] tlsv1 alert protocol version (_ssl.c:661)
    Hadoop Trash回收站使用指南
    HDFS 中向 DataNode 写入数据失败了怎么办
    mapreduce 实现写出orc文件
    mapreduce读取压缩文件
    hive开窗函数over(partition by ......)用法
    Mac 安装 home Brew以及 XCTool的过程记录
    iOS开发中使用CocoaPods来管理第三方的依赖程序
  • 原文地址:https://www.cnblogs.com/liuge36/p/12614779.html
Copyright © 2011-2022 走看看