zoukankan      html  css  js  c++  java
  • kafka+flume+HDFS日志采集项目框架

    1,项目图如下:

    2, 实现过程

    启动HDFS:

    sbin/start-dfs.sh

    启动zookeeper(三台):

     bin/zkServer.sh start

    启动kafka(三台):

    root@Ubuntu-1:/usr/local/kafka# bin/kafka-server-start.sh config/server.properties >logs/kafka3.log 2>&1

    在131中创建access的topic:

    root@Ubuntu-1:/usr/local/kafka# bin/kafka-topics.sh --create --topic access --zookeeper 192.168.22.131:2181,192.168.22.132:2181,192.168.22.135:2181 --replication-factor 3 --partitions 2 

    查看创建的主题:

    root@Ubuntu-1:/usr/local/kafka# bin/kafka-topics.sh --list --zookeeper localhost:2181

    131启动flume:

    bin/flume-ng agent --conf conf/ --conf-file conf/access.conf  --name a1 -Dflume.root.logger=INFO,console &

    内容:

    #定义各个模块
    a1.sources = exec 
    a1.sinks = hdfs_sink kafka_sink
    a1.channels = hdfs_channel kafka_channel
    
    #配置 exec source
    a1.sources.exec.type = exec
    a1.sources.exec.command = tail -F /usr/local/apache-flume/logs/hu.log
    #配置拦截器
    a1.sources.exec.interceptors = i1
    a1.sources.exec.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
    
    # 配置 channel
    
    a1.channels.hdfs_channel.type = memory
    a1.channels.hdfs_channel.capacity = 100000
    a1.channels.hdfs_channel.transactionCapacity = 10000
    
    a1.channels.kafka_channel.type = memory
    a1.channels.kafka_channel.capacity = 100000
    a1.channels.kafka_channel.transactionCapacity = 10000
    
    
    # 配置hdfs sink
    a1.sinks.hdfs_sink.type = hdfs
    a1.sinks.hdfs_sink.hdfs.path =hdfs://Ubuntu-1:9000/source/%{type}/%Y%m%d
    a1.sinks.hdfs_sink.hdfs.filePrefix = events-
    a1.sinks.hdfs_sink.hdfs.fileType = DataStream
    #a1.sinks.hdfs_sink.hdfs.fileType = CompressedStream
    #a1.sinks.hdfs_sink.hdfs.codeC = gzip
    #不按照条数生成文件
    a1.sinks.hdfs_sink.hdfs.rollCount = 0
    #HDFS上的文件达到64M时生成一个文件
    a1.sinks.hdfs_sink.hdfs.rollSize = 67108864
    a1.sinks.hdfs_sink.hdfs.rollInterval = 0
    a1.sinks.hdfs_sink.hdfs.batchSize=100
    
    # 配置 kafka sink
    a1.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.kafka_sink.topic = access
    a1.sinks.kafka_sink.brokerList = 192.168.22.131:9092,192.168.22.132:9092,192.168.22.135:9092
    a1.sinks.kafka_sink.requiredAcks = 1
    a1.sinks.kafka_sink.batchSize = 5
    
    
    
    # 绑定三种组件的关系
    a1.sources.exec.channels = hdfs_channel kafka_channel
    a1.sinks.hdfs_sink.channel = hdfs_channel
    a1.sinks.kafka_sink.channel = kafka_channel

    132中创建kafka的producer:

    root@Ubuntu-2:/usr/local/kafka# bin/kafka-console-consumer.sh --zookeeper 192.168.22.131:2181,192.168.22.132:2181,192.168.22.135:2181 --topic access
  • 相关阅读:
    计算闰年
    三个数比较大小
    剪刀石头布编辑
    二进制转换,八进制,十六进制转换
    原来我学的还是不够。。。
    认知是一切的基础
    spark学习笔记-java调用spark简单demo
    spark学习笔记-RDD
    Sublime Text3时间戳查看转换插件开发
    Spring Boot + Freemarker多语言国际化的实现
  • 原文地址:https://www.cnblogs.com/huxinga/p/7465290.html
Copyright © 2011-2022 走看看