zoukankan      html  css  js  c++  java
  • flume 到 kafka整合

    fflume 版本为 1.6 cdh 5.13 注意启动flume是 --name 要和配置文件的 前缀一直 否知启动失败

    flume 目录为

     /opt/cloudera/parcels/CDH/lib/flume-ng

    flume 配置文件exec-memory-avro.conf

    vim /opt/test/exec-memory-avro.conf

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1

    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /opt/test/data.log
    a1.sources.r1.shell = /bin/sh -c

    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname = udap69a165
    a1.sinks.k1.port = 44444

    a1.channels.c1.type = memory
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    flume 配置文件avro-memory-kafka.conf

    vim /opt/test/avro-memory-kafka.conf

    avro-memory-kafka.sources = avro-source
    avro-memory-kafka.sinks = kafka-sink
    avro-memory-kafka.channels = memory-channel

    avro-memory-kafka.sources.avro-source.type = avro
    avro-memory-kafka.sources.avro-source.bind = udap69a165
    avro-memory-kafka.sources.avro-source.port = 44444

    avro-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
    avro-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = udap69a166:9092
    avro-memory-kafka.sinks.kafka-sink.topic = hh_test
    avro-memory-kafka.sinks.kafka-sink.batchSize = 5
    avro-memory-kafka.sinks.kafka-sink.requiredAcks = 1

    avro-memory-kafka.channels.memory-channel.type = memory

    avro-memory-kafka.sources.avro-source.channels = memory-channel
    avro-memory-kafka.sinks.kafka-sink.channel = memory-channel

    进入到flume 的bin目录下,启动flume agent

    ./flume-ng agent --name a1 --conf-file /opt/test/exec-memory-avro.conf -Dflume.root.logger=INFO,console
    
    ./flume-ng agent --name avro-memeory-kafka --conf-file /opt/test/avro-memeory-kafka.conf -Dflume.root.logger=INFO,console

    查看日志是否启动成功,然后干掉程序,选择后台启动

    jps -m
    nohup sh flume-ng agent --name a1 --conf-file /opt/test/exec-memory-avro.conf -Dflume.root.logger=INFO,console &
    nohup sh flume-ng agent --name avro-memeory-kafka --conf-file /opt/test/avro-memeory-kafka.conf -Dflume.root.logger=INFO,console &

    启动kafka消费的脚本

    kafka-console-consumer --zookeeper udap69a166:2181/kafka --topic hh_test

    写入一些数据 

    echo "lisi" >> /opt/test/data.log
    
    echo "wangwu" >> /opt/test/data.log
    
    echo "zhangdan" >> /opt/test/data.log

    查看kafka 消费者是否消费到数据 问题解决!

  • 相关阅读:
    动态规划(0-1背包)---划分数组为和相等的两部分
    动态规划(0-1背包)
    动态规划(最长递增子序列)---最长公共子序列
    动态规划(最长递增子序列)---最长摆动子序列
    动态规划(最长递增子序列)---最长递增子序列
    动态规划(最长递增子序列)
    动态规划(分割整数)---分割整数构成字母字符串
    浅谈进程同步和互斥的概念
    如何由Height Map生成Normal Map
    3D中的切线空间简介
  • 原文地址:https://www.cnblogs.com/erlou96/p/13030149.html
Copyright © 2011-2022 走看看