zoukankan      html  css  js  c++  java
  • Flume整合Kafka(基于kerberos认证)——完成实时数据采集

    如果现在要想将flume中的sink设置为kafka,因为在实际的开发中,可能会有若干个子系统或者若干个客户端进行flume日志采集,那么能够承受这种采集任务量的只有kafka来完成,可是需要注意一个问题,现在的kafka是采用了Kerberos认证,所以要想在flume之中去使用kafka操作,就需要考虑到开发包以及jaas配置问题。

    1、将kafka的客户端的程序jar文件拷贝到flume的lib目录之中:

    mv kafka-clients-0.10.2.1.jar D:devapache-flume-1.7.0-binlib

     

    2、在"D:"目录下建立jass配置文件
    vim D:kafka_client_jaas.conf

    KafkaClient {  
            org.apache.kafka.common.security.plain.PlainLoginModule required  
            username="alice"  
            password="alice-pwd";  
    };

     

    3、修改flume.cnf文件追加kafka
    vim D:devapache-flume-1.7.0-binconfflume.conf

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    #a1.sources.r1.type = netcat
    a1.sources.r1.type = avro
    a1.sources.r1.bind = 192.168.0.106
    a1.sources.r1.port = 44444
    
    # Describe the sink
    # a1.sinks.k1.type = logger
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    
    
    # Use a channel which buffers events in memory
    # a1.channels.c1.type = memory
    a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
    a1.channels.c1.kafka.bootstrap.servers = 203.195.205.63:9092
    a1.channels.c1.kafka.topic = mldn-topic
    a1.channels.c1.kafka.producer.security.protocol = SASL_PLAINTEXT
    a1.channels.c1.kafka.producer.sasl.mechanism = PLAIN
    
    
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1

    4、window启动flume

    cd D:devapache-flume-1.7.0-binin
    d:
    flume-ng.cmd agent --conf D:/dev/apache-flume-1.7.0-bin/conf --conf-file D:/dev/apache-flume-1.7.0-bin/conf/flume.conf --name a1 -property "flume.root.logger=INFO,console;java.security.auth.login.config=D:/kafka_client_jaas.conf"  

     

    5、启动kafka消费端——FlumeReceiveMessageConsumer.java

    package cn.mldn.mykafka.consumer;
    
    import java.util.Arrays;
    
    import java.util.Properties;
    
    import org.apache.kafka.clients.CommonClientConfigs;
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.config.SaslConfigs;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    /**
     * Flume整合Kafka -- kafka消费端
     * 
     * @author hp
     *
     */
    public class FlumeReceiveMessageConsumer {
        public static final String SERVERS = "203.195.205.63:9092";
        public static final String TOPIC = "mldn-topic";
        static {
            System.setProperty("java.security.auth.login.config",
                    "d:/kafka_client_jaas.conf");    // 表示系统环境属性
        }
        
        public static void main(String[] args) {
    
            Properties props = new Properties();
            
            props.setProperty(SaslConfigs.SASL_MECHANISM, "PLAIN");
            props.setProperty(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
    
            
            // 定义消息消费者的连接服务器地址
            props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, SERVERS);
            // 消息消费者一定要设置反序列化的程序类,与消息生产者完全对应
            props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                    StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                    StringDeserializer.class.getName());
            props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group-1");
            // 定义消费者处理对象
            Consumer<String, String> consumer = new KafkaConsumer<String, String>(
                    props);
            consumer.subscribe(Arrays.asList(TOPIC)); // 设置消费者读取的主题名称
            boolean flag = true; // 消费者需要一直进行数据的读取处理操作
            while (flag) { // 一直读取消息
                ConsumerRecords<String, String> allRecorders = consumer.poll(200);
                for (ConsumerRecord<String, String> record : allRecorders) {
                    System.out.println(
                            "flume.key = " + record.key() + ",flume.value = " + record.value());
                }
            }
            consumer.close();
        }
    }

     

    6、启动业务程序,模拟打印消息——TestFlumeDemo.java

    package cn.mldn.myflume;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class TestFlumeDemo {
        private static final Logger LOGGER = LoggerFactory
                .getLogger(TestFlumeDemo.class);
        public static void main(String[] args) {
            
            for (int x = 0 ; x < 10 ; x ++) {
                LOGGER.info("lynch.cn" + x);
            } 
        }
    }

     

    7、FlumeReceiveMessageConsumer.java消费端会接收到flume采集的日志数据

    flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705707577<flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level
    20000Fflume.client.log4j.message.encodingUTF8
    
    flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705716934<flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level
    20000Fflume.client.log4j.message.encodingUTF8
    
    flume.key = null,flume.value = 8flume.client.log4j.timestamp1593705717194<flume.client.log4j.logger.name:cn.mldn.myflume.TestFlumeDemo8flume.client.log4j.log.level
    20000Fflume.client.log4j.message.encodingUTF8

     

     

  • 相关阅读:
    Android学习笔记14:Tween Animation动画的实现
    Android学习笔记17:单项选择RadioButton和多项选择CheckBox的使用
    北国的雪
    Android学习笔记11:图像的平移、旋转及缩放
    三极管基本放大电路解析
    51单片机中data,idata,xdata,pdata的区别
    充电开关制作
    慢慢学Linux驱动开发,第五篇,初探设备模型概念
    慢慢学Linxu驱动开发,第二篇:启程,模块机制,Hello World
    H桥电机驱动原理与应用
  • 原文地址:https://www.cnblogs.com/linjiqin/p/13227991.html
Copyright © 2011-2022 走看看