zoukankan      html  css  js  c++  java
  • flume+kafka实现实时流式日志处理

    环境搭建

    一. 搭建kafka运行环境
    1.安装zookeeper :
    配置环境变量ZOOKEEPER_HOME 修改zoo.cfg dataDir=./zookeeper-3.4.14/data
    2.运行zookeeper:
    cmd: zkserver
    注:不能安装最新版 会报错 改为 zookeeper-3.4.14 之后报错消失
    3.安装kafka:
    修改config/server.properties log.dirs=/tmp/kafka-logs
    4.运行kafka:
    // bin/kafka-server-start.sh -daemon config/server.properties >kafka.log 2>&1 &
    D:kafka_2.12-2.6kafka_2.12-2.6.0>.inwindowskafka-server-start.bat .configserver.properties

    如果需要创建多个broker,创建对应的server2.properties,

    然后 执行   .inwindowskafka-server-start.bat .configserver2.properties 就可以了

    这里的节点broker我理解是存储数据的小仓库,如果创建了3个仓库,其中一个被随机选举为leader负责数据的读写,而其他两个节点作为slave 负责数据的备份,当leader挂掉时,生产者和消费者仍然正常执行,不受影响。这件事情就是由zookeeper来控制的,所以serverx.properties中需要配置zookeeper的服务器IP和端口

    5.创建Topic:
    D:kafka_2.12-2.6kafka_2.12-2.6.0inwindows> kafka-topics.bat --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic test
    提示:Created topic test. 则创建成功
    错误:找不到或无法加载主类 kafka.admin.TopicCommand
    原因:kafka的安装包为xx-src 这种是源码,需要编译后才能运行,编译需要gradle 和scala 比较麻烦 所以重新下载编译好的安装包 比较方便
    6.开启一个生产者:
    D:kafka_2.12-2.6kafka_2.12-2.6.0inwindows> kafka-console-producer.bat --broker-list 127.0.0.1:9092 --topic test
    7.开启一个消费者:
    D:kafka_2.12-2.6kafka_2.12-2.6.0inwindows> kafka-console-consumer.bat --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning
    8.实现通信:
    在消费者cmd中输入word 能够在消费者cmd中出现,完成kafka通信,到现在为止有四个窗口:
    1)zookeeper 127.0.0.1:2181
    2)kafka 127.0.0.1:9092
    3)消费者
    4)生产者
    9.关闭kafka:
    kafka-server-stop.sh

    二. 搭建flume运行环境:
    1.安装flume:
    增加配置文件:/conf/example.conf
    2.运行flume:
    // D:apache-flume-1.9.0-binin> flume-ng agent -n a1 -c ../conf/ -f kafka_spool.conf -d flume.root.logger=INFO,console
    D:apache-flume-1.9.0-binin> flume-ng agent --conf ../conf --conf-file ../conf/kafka_spool.conf --name a1 -property flume.root.logger=INFO,console
    当控制台输出:/127.0.0.1:44444 表示flume进程正常启动了
    3.conf文件:
    kafka_netcat.conf :检测端口变化 nc localhost 44444

    kafka_spool.conf :检测文件夹中的文件变化,执行后的文件名发生变化 xx.log -> xx.log.COMPLETED

    #example.conf: A single-node flume configuration
    #test kafka sink with spooldir source
     
    #Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
     
    #Describe/configue the source
    a1.sources.r1.type = spooldir
    a1.sources.r1.channels = c1
    #这里是需要监控的文件夹路径 a1.sources.r1.spoolDir = /usr/flume/logs a1.sources.r1.fileHeader = true #Describe the sink a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink #设置kafka的主题topic a1.sinks.k1.topic = kafka_spooldir_test #设置消费者编码为UTF-8 a1.sinks.k1.custom.encoding=UTF-8 #绑定kafka主机以及端口号 a1.sinks.k1.kafka.bootstrap.servers = master:9092,slave1:9092,slave3:9092 #设置kafka序列化方式 a1.sinks.k1.serializer.class = kafka.serializer.StringEncoder #use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 #Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1

    kafka_exec.conf :通过执行本地shell检测某文件新增内容

    flume官方文档:http://flume.apache.org/FlumeUserGuide.html

    kafka官方文档中文版:https://blog.csdn.net/ld326/article/details/78118441

  • 相关阅读:
    [单调栈] Jzoj P4260 最大子矩阵
    [前缀和] Jzoj P4259 矩形
    [欧拉回路][状压dp] Jzoj P3290 吃货JYY
    [组合数][枚举] Jzoj P3332 棋盘游戏
    [欧拉函数][dp][快速幂] Jzoj P1161 机器人M号
    [exgcd] Jzoj P1158 荒岛野人
    [带权并查集] Jzoj P1503 体育场
    [dfs][树的直径] Jzoj P1737 删边
    [差分][倍增lca][tarjan] Jzoj P3325 压力
    [dfs] Jzoj P1497 景点中心
  • 原文地址:https://www.cnblogs.com/gaoquanquan/p/13736131.html
Copyright © 2011-2022 走看看