zoukankan      html  css  js  c++  java
  • flume job

    file-flume-kafka :

    a1.sources=r1 
    a1.channels=c1 c2 
    
    # configure source 
    a1.sources.r1.type = TAILDIR 
    a1.sources.r1.positionFile = /apps/flume/test/log_position.json 
    a1.sources.r1.filegroups = f1 
    a1.sources.r1.filegroups.f1 = /tmp/logs/app.+ 
    a1.sources.r1.fileHeader = true 
    a1.sources.r1.channels = c1 c2 
    
    #interceptor 
    a1.sources.r1.interceptors = i1 i2 
    a1.sources.r1.interceptors.i1.type = com.atguigu.flume.interceptor.LogETLInterceptor$Builder 
    a1.sources.r1.interceptors.i2.type = com.atguigu.flume.interceptor.LogTypeInterceptor$Builder 
    a1.sources.r1.selector.type = multiplexing 
    a1.sources.r1.selector.header = topic 
    a1.sources.r1.selector.mapping.topic_start = c1 
    a1.sources.r1.selector.mapping.topic_event = c2 
    
    # configure channel 
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel 
    a1.channels.c1.kafka.bootstrap.servers = ubuntu-00:9092 
    a1.channels.c1.kafka.topic = topic_start 
    a1.channels.c1.parseAsFlumeEvent = false 
    a1.channels.c1.kafka.consumer.group.id = flume-consumer 
    a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel 
    a1.channels.c2.kafka.bootstrap.servers = ubuntu-00:9092 
    a1.channels.c2.kafka.topic = topic_event 
    a1.channels.c2.parseAsFlumeEvent = false 
    a1.channels.c2.kafka.consumer.group.id = flume-consumer

    kafka-flume-hdfs:

    ## 组件 
    a1.sources=r1 r2 
    a1.channels=c1 c2 
    a1.sinks=k1 k2 
    
    ## source1 
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource 
    a1.sources.r1.batchSize = 5000 
    a1.sources.r1.batchDurationMillis = 2000 
    a1.sources.r1.kafka.bootstrap.servers = ubuntu-00:9092 
    a1.sources.r1.kafka.topics=topic_start 
    
    ## source2 
    a1.sources.r2.type = org.apache.flume.source.kafka.KafkaSource 
    a1.sources.r2.batchSize = 5000 
    a1.sources.r2.batchDurationMillis = 2000 
    a1.sources.r2.kafka.bootstrap.servers = ubuntu-00:9092 
    a1.sources.r2.kafka.topics=topic_event 
    
    ## channel1 
    a1.channels.c1.type = file 
    a1.channels.c1.checkpointDir = /apps/flume/checkpoint/behavior1 
    a1.channels.c1.dataDirs = /apps/flume/data/behavior1/ 
    a1.channels.c1.maxFileSize = 2146435071 
    a1.channels.c1.capacity = 1000000 
    a1.channels.c1.keep-alive = 6 
    
    ## channel2 
    a1.channels.c2.type = file 
    a1.channels.c2.checkpointDir = /apps/flume/checkpoint/behavior2 
    a1.channels.c2.dataDirs = /apps/flume/data/behavior2/ 
    a1.channels.c2.maxFileSize = 2146435071 
    a1.channels.c2.capacity = 1000000 
    a1.channels.c2.keep-alive = 6 
    
    ## sink1 
    a1.sinks.k1.type = hdfs 
    a1.sinks.k1.hdfs.path = /origin_data/gmall/log/topic_start/%Y-%m-%d 
    a1.sinks.k1.hdfs.filePrefix = logstart- 
    
    ##sink2 
    a1.sinks.k2.type = hdfs 
    a1.sinks.k2.hdfs.path = /origin_data/gmall/log/topic_event/%Y-%m-%d 
    a1.sinks.k2.hdfs.filePrefix = logevent- 
    
    ## 不要产生大量小文件 
    a1.sinks.k1.hdfs.rollInterval = 3600 
    a1.sinks.k1.hdfs.rollSize = 134217728 
    a1.sinks.k1.hdfs.rollCount = 0
    
    a1.sinks.k2.hdfs.rollInterval = 10 
    a1.sinks.k2.hdfs.rollSize = 134217728 
    a1.sinks.k2.hdfs.rollCount = 0 
    
    ## 控制输出文件是原生文件
    a1.sinks.k1.hdfs.fileType = CompressedStream 
    a1.sinks.k2.hdfs.fileType = CompressedStream 
    a1.sinks.k1.hdfs.codeC = lzop 
    a1.sinks.k2.hdfs.codeC = lzop 
    
    ## 拼装 
    a1.sources.r1.channels = c1 
    a1.sinks.k1.channel= c1 
    a1.sources.r2.channels = c2 
    a1.sinks.k2.channel= c2
  • 相关阅读:
    Mybaits源码分析九之sql执行流程
    Mybaits源码分析八之sqlSessiion的openSession方法
    Mybaits源码分析七之XMLConfigBuilder类mappers标签解析
    Mybaits源码分析六之XMLConfigBuilder类 environments标签解析
    Mybaits源码分析五之XMLConfigBuilder类 typeAliases 标签解析
    Mybaits源码分析四之XMLConfigBuilder类settings 标签解析
    ajax与axios与fetch的比较
    if else的优化
    js 类型
    模板字符串
  • 原文地址:https://www.cnblogs.com/ldy233/p/14435308.html
Copyright © 2011-2022 走看看