zoukankan      html  css  js  c++  java
  • 项目01-flume、kafka与hdfs日志流转

    项目01-flume、kafka与hdfs日志流转

    1、启动kafka集群

    $>xkafka.sh start
    

    3、创建kafka主题

    kafka-topics.sh --zookeeper s102:2181 
    				--create 
    				--topic topic-umeng-raw-logs2
    				--replication-factor 3 
    				--partitions 4
    

    注意:kafka主题不要使用“_”,可以使用“-”。

    4、配置flume,收集日志到kafka

    在nginx web服务器节点(这里是s101和s102)上安装flume软件,编写配置文件。

    在/soft/flume/conf下创建umeng_nginx_to_kafka.conf文件,内容如下:

    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /usr/local/openresty/nginx/logs/access.log
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 10000
    
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = topic-umeng-raw-logs2
    a1.sinks.k1.kafka.bootstrap.servers = s102:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 0
    
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    

    5、启动flume进程

    $>flume-ng agent -f /soft/flume/conf/umeng_nginx_to_kafka.conf -n a1
    

    6、启动kafka控制条消费者,查看是否能够接收到日志

    $>kafka-console-consumer.sh --zookeeper s102:2181 --topic topic-umeng-raw-logs2
    

    7、配置flume,收集kafka消息到hdfs

    创建/soft/flume/conf/umeng-kakfa-to-hdfs.conf文件,内容如下:

    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 5000
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = s102:9092
    a1.sources.r1.kafka.topics = topic-umeng-raw-logs2
    a1.sources.r1.kafka.consumer.group.id = g10
    
    a1.channels.c1.type=memory
    
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /user/centos/umeng_big11/raw-logs/%Y%m/%d/%H%M
    a1.sinks.k1.hdfs.filePrefix = events-
    #round控制目录
    a1.sinks.k1.hdfs.round = true
    a1.sinks.k1.hdfs.roundValue = 1
    a1.sinks.k1.hdfs.roundUnit = minute
    #控制文件
    a1.sinks.k1.hdfs.rollInterval = 30
    a1.sinks.k1.hdfs.rollSize = 10240
    a1.sinks.k1.hdfs.rollCount = 500
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    a1.sinks.k1.hdfs.fileType = DataStream
    
    a1.sources.r1.channels=c1
    a1.sinks.k1.channel=c1
    

    8、启动flume进程,收集kafka消息到hdfs

    8.1 启动hdfs集群

    $>start-dfs.sh
    

    8.2 启动flume,指定收集文件

    $>flume-ng agent -f /soft/flume/conf/umeng-kafka-to-hdfs.conf -n a1
    

    9、启动手机端程序发送日志,观察kafka是否接收到

    spark_047

  • 相关阅读:
    fastadmin的会员中心和cms插件,两者整合在一起。界面上怎么整合啊?
    thinkphp5框架中为啥要使用traits
    TP5三足鼎力的目录结构,以及相关的文件位置
    tp5 如何创建公共函数
    PSR4自动加载
    关于js中循环遍历中顺序执行ajax的问题(vue)
    laravel+vue+vuetify 前端匹配不到数据记录 No matching records found
    mysql表中时间timestamp设计
    基本语法
    leetcode——1382. 将二叉搜索树变平衡
  • 原文地址:https://www.cnblogs.com/xupccc/p/9544652.html
Copyright © 2011-2022 走看看