zoukankan      html  css  js  c++  java
  • flume与kafka结合上传文件到HDFS上

     实现如图的的效果(详细步骤请参考官方文档(http://flume.apache.org/FlumeUserGuide.html),flume更新版本比较快)

    flume1.conf的配置文件内容

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #具体定义source  
    a1.sources.r1.type = spooldir
    #先创建此目录,保证里面空的  
    a1.sources.r1.spoolDir = /logs 
    
    #sink到kafka里面
    a1.sinks.k1.type =org.apache.flume.sink.kafka.KafkaSink
    #设置Kafka的Topic
    a1.sinks.k1.kafka.topic = haha1
    #设置Kafka的broker地址和端口号
    a1.sinks.k1.kafka.bootstrap.servers = zhiyou01:9092,zhiyou02:9092,zhiyou03:9092
    #配置批量提交的数量
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    a1.sinks.ki.kafka.producer.compression.type= snappy
    
    #对于channel的配置描述 使用文件做数据的临时缓存 这种的安全性要高
    a1.channels.c1.type = file
    a1.channels.c1.checkpointDir = /home/uplooking/data/flume/checkpoint
    a1.channels.c1.dataDirs = /home/uplooking/data/flume/data
    
    #通过channel c1将source r1和sink k1关联起来
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    flume2.conf的配置文件内容

    参考官方文档截图如下

    详细配置如下

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    #对于source的配置描述 监听avro
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize=5000
    a1.sources.r1.batchDurationMillis=2000
    a1.sources.r1.kafka.bootstrap.servers = han01:9092,han02:9092,han03:9092
    a1.sources.r1.kafka.topics=test
    
    #定义拦截器,为消息添加时间戳
    a1.sources.r1.interceptors = i1
    a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
    #对于sink的配置描述 传递到hdfs上面
    a1.sinks.k1.type = hdfs
    #集群的nameservers名字
    #单节点的直接写:hdfs://han01/xxx
    a1.sinks.k1.hdfs.path = hdfs://ns/flume/%Y%m%d
    a1.sinks.k1.hdfs.filePrefix = events-
    a1.sinks.k1.hdfs.fileType = DataStream
    #不按照条数生成文件
    a1.sinks.k1.hdfs.rollCount = 0
    #HDFS上的文件达到128M时生成一个文件
    a1.sinks.k1.hdfs.rollSize = 134217728
    #HDFS上的文件达到60秒生成一个文件
    a1.sinks.k1.hdfs.rollInterval = 60
    
    
    #对于channel的配置描述 使用内存缓冲区域做数据的临时缓存
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    #通过channel c1将source r1和sink k1关联起来
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
  • 相关阅读:
    opacity背景层透明导致文字也透明
    [css3]叉叉旋转效果
    [css]当父元素的margin-top碰上子元素的margin-top
    Python_Mix*异常处理
    Python_Mix*OS模块,序列化模块种的json,pickle
    Python_Mix*random模块,time模块,sys模块,os模块
    Python_Mix*re模块基础方法,进阶,正则表达式的使用
    Python_Mix*re模块,元字符,量词
    Python_Mix*匿名函数,sorted,filter,map,递归函数,二分法查找
    Python_Mix*内置函数
  • 原文地址:https://www.cnblogs.com/han-guang-xue/p/9966078.html
Copyright © 2011-2022 走看看