zoukankan      html  css  js  c++  java
  • Kafka与Logstash的数据采集对接 —— 看图说话,从运行机制到部署

    基于Logstash跑通Kafka还是需要注意很多东西,最重要的就是理解Kafka的原理。

    Logstash工作原理

    由于Kafka采用解耦的设计思想,并非原始的发布订阅,生产者负责产生消息,直接推送给消费者。而是在中间加入持久化层——broker,生产者把数据存放在broker中,消费者从broker中取数据。这样就带来了几个好处:

    • 1 生产者的负载与消费者的负载解耦
    • 2 消费者按照自己的能力fetch数据
    • 3 消费者可以自定义消费的数量

    另外,由于broker采用了主题topic-->分区的思想,使得某个分区内部的顺序可以保证有序性,但是分区间的数据不保证有序性。这样,消费者可以以分区为单位,自定义读取的位置——offset。

    Kafka采用zookeeper作为管理,记录了producer到broker的信息,以及consumer与broker中partition的对应关系。因此,生产者可以直接把数据传递给broker,broker通过zookeeper进行leader-->followers的选举管理;消费者通过zookeeper保存读取的位置offset以及读取的topic的partition分区信息。

    由于上面的架构设计,使得生产者与broker相连;消费者与zookeeper相连。有了这样的对应关系,就容易部署logstash-->kafka-->logstash的方案了。

    接下来,按照下面的步骤就可以实现logstash与kafka的对接了。

    启动kafka

    启动zookeeper:

    $zookeeper/bin/zkServer.sh start
    

    启动kafka:

    $kafka/bin/kafka-server-start.sh $kafka/config/server.properties &
    

    创建主题

    创建主题:

    $kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --create --topic hello --replication-factor 1 --partitions 1
    

    查看主题:

    $kafka/bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe
    

    测试环境

    执行生产者脚本:

    $kafka/bin/kafka-console-producer.sh --broker-list 10.0.67.101:9092 --topic hello
    

    执行消费者脚本,查看是否写入:

    $kafka/bin/kafka-console-consumer.sh --zookeeper 127.0.0.1:2181 --from-beginning --topic hello
    

    输入测试

    input{
    	stdin{}
    }
    output{
    	kafka{
    		topic_id => "hello"
    		bootstrap_servers => "192.168.0.4:9092" # kafka的地址
    		batch_size => 5
    	}
    	stdout{
    		codec => rubydebug
    	}
    }
    

    读取测试

    logstash配置文件:

    input{
        kafka {
            codec => "plain"
            group_id => "logstash1"
            auto_offset_reset => "smallest"
            reset_beginning => true
            topic_id => "hello"
            #white_list => ["hello"]
            #black_list => nil
            zk_connect => "192.168.0.5:2181" # zookeeper的地址
       }
    
    }
    output{
        stdout{
            codec => rubydebug
        }
    }
    
  • 相关阅读:
    用 tableExcel导出EXCEL数据
    个人作业——软件评测
    结对第二次作业——某次疫情统计可视化的实现
    软工实践寒假作业(1/2)
    java注解和反射
    共享密钥
    鲁棒性验证-第五小组
    维数约减报告--第五小组
    网络1911、1912 D&S第1次作业--线性表批改总结
    网络1911、1912 C语言第4次作业--函数批改总结
  • 原文地址:https://www.cnblogs.com/xing901022/p/5738485.html
Copyright © 2011-2022 走看看