zoukankan      html  css  js  c++  java
  • 定制logstash-output-kafka 添加额外事务参数

    定制logstash-output-kafka 添加额外事务参数

    早期的项目通过logsatash 处理数据写入kafka的流程,运行了有数年,丰富监控信息后,发现会有少量的数据丢失,猜测部分原因为kafka的幂等问题

    kafka的写入幂等性保证,需要以下3个参数

    retries = Integer.MAX_VALUE
    max.in.flight.requests.per.connection = 1
    enable.idempotence=true
    

    官方插件即可设置retries

    看logstash的运行日志可知,其默认的

    enable.idempotence = false
    max.in.flight.requests.per.connection = 5
    
    运行日志
     
    Sending Logstash's logs to /usr/share/logstash/logs which is now configured via log4j2.properties
    [2020-12-07T11:29:57,094][WARN ][logstash.config.source.multilocal] Ignoring the 'pipelines.yml' file because modules or command line options are specified
    [2020-12-07T11:29:57,932][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.3.0"}
    [2020-12-07T11:30:00,977][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>24, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
    [2020-12-07T11:30:01,021][WARN ][logstash.outputs.kafka   ] Kafka output is configured with finite retry. This instructs Logstash to LOSE DATA after a set number of send attempts fails. If you do
    not want to lose data if Kafka is down, then you must remove the retry setting. {:retries=>2147483647}
    [2020-12-07T11:30:01,098][INFO ][org.apache.kafka.clients.producer.ProducerConfig] ProducerConfig values:
    	acks = all
    	batch.size = 16384
    	bootstrap.servers = [192.168.5.100:9092, 192.168.5.101:9092, 192.168.5.102:9092]
    	buffer.memory = 33554432
    	client.id =
    	compression.type = none
    	connections.max.idle.ms = 540000
    	enable.idempotence = false
    	interceptor.classes = null
    	key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    	linger.ms = 0
    	max.block.ms = 60000
    	max.in.flight.requests.per.connection = 5
    	max.request.size = 1048576
    	metadata.max.age.ms = 300000
    	metric.reporters = []
    	metrics.num.samples = 2
    	metrics.recording.level = INFO
    	metrics.sample.window.ms = 30000
    	partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    	receive.buffer.bytes = 32768
    	reconnect.backoff.max.ms = 10
    	reconnect.backoff.ms = 10
    	request.timeout.ms = 30000
    	retries = 2147483647
    	retry.backoff.ms = 100
    	sasl.jaas.config = null
    	sasl.kerberos.kinit.cmd = /usr/bin/kinit
    	sasl.kerberos.min.time.before.relogin = 60000
    	sasl.kerberos.service.name = null
    	sasl.kerberos.ticket.renew.jitter = 0.05
    	sasl.kerberos.ticket.renew.window.factor = 0.8
    	sasl.mechanism = GSSAPI
    	security.protocol = PLAINTEXT
    	send.buffer.bytes = 131072
    	ssl.cipher.suites = null
    	ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    	ssl.endpoint.identification.algorithm = null
    	ssl.key.password = null
    	ssl.keymanager.algorithm = SunX509
    	ssl.keystore.location = null
    	ssl.keystore.password = null
    	ssl.keystore.type = JKS
    	ssl.protocol = TLS
    	ssl.provider = null
    	ssl.secure.random.implementation = null
    	ssl.trustmanager.algorithm = PKIX
    	ssl.truststore.location = null
    	ssl.truststore.password = null
    	ssl.truststore.type = JKS
    	transaction.timeout.ms = 60000
    	transactional.id = null
    	value.serializer = class org.apache.kafka.common.serialization.StringSerializer
    [2020-12-07T11:30:01,165][INFO ][org.apache.kafka.common.utils.AppInfoParser] Kafka version : 1.0.0
      

    官方文档 https://www.elastic.co/guide/en/logstash/current/plugins-outputs-kafka.html

    集成版本信息

    问题来了官方的plugins-outputs-kafka插件直到最新版都不支持max.in.flight.requests.per.connectionenable.idempotence的配置

    先尝试 按plugins-outputs-kafka插件的参数配置规则 把.换为_,添加这两项配置

            max_in_flight_requests_per_connection => 1
            enable_idempotence => "true"
    

    服务报错,无法启动

    官方源码 https://github.com/logstash-plugins/logstash-output-kafka(官方源码迁到了 https://github.com/logstash-plugins/logstash-integration-kafka 但依然不支持这两项参数)

    官方有类似的issues

    https://github.com/logstash-plugins/logstash-output-kafka/issues/195

    这种某官方组件不支持的死胡同一般两条路

    • 另外实现一套环境替代
    • 定制代码添加功能(前提是开源,或可以逆向出代码)

    这里也一样,重开发并替代logstash kafka写入部分的功能成本较高,好在之前搞过logstash部分插件的自定义实现,只是添加配置而已,应该很好解决


    准备工作

    确定版本信息

    • 搭建的kafka 服务端版本是1.0.0

    • 目前使用的logstash版本

    logstash --version
    logstash 6.3.0
    
    • 默认集成的logstash-output-kafka插件版本是logstash-output-kafka-7.0.10

    • logstash-output-kafka-7.0.10 默认的kafka jar包版本为kafka-clients-1.0.0.jar

    ls -alh vendor/jar-dependencies/runtime-jars/*
    -rw-rw-r-- 1 logstash root 1.6M Jun 12  2018 vendor/jar-dependencies/runtime-jars/kafka-clients-1.0.0.jar
    -rw-rw-r-- 1 logstash root 479K Jun 12  2018 vendor/jar-dependencies/runtime-jars/log4j-1.2.17.jar
    -rw-rw-r-- 1 logstash root  43K Jun 12  2018 vendor/jar-dependencies/runtime-jars/log4j-1.2-api-2.6.2.jar
    -rw-rw-r-- 1 logstash root 195K Jun 12  2018 vendor/jar-dependencies/runtime-jars/log4j-api-2.6.2.jar
    -rw-rw-r-- 1 logstash root 1.2M Jun 12  2018 vendor/jar-dependencies/runtime-jars/log4j-core-2.6.2.jar
    -rw-rw-r-- 1 logstash root 362K Jun 12  2018 vendor/jar-dependencies/runtime-jars/lz4-java-1.4.jar
    -rw-rw-r-- 1 logstash root  41K Jun 12  2018 vendor/jar-dependencies/runtime-jars/slf4j-api-1.7.25.jar
    -rw-rw-r-- 1 logstash root 9.8K Jun 12  2018 vendor/jar-dependencies/runtime-jars/slf4j-log4j12-1.7.21.jar
    -rw-rw-r-- 1 logstash root 1.5M Jun 12  2018 vendor/jar-dependencies/runtime-jars/snappy-java-1.1.4.jar
    

    实际这些信息也可以通过源码查看

    logstash kafka 的集成信息

    https://rubygems.org/gems/logstash-output-kafka/versions/7.0.10

    https://www.elastic.co/guide/en/logstash-versioned-plugins/current/v10.7.0-plugins-outputs-kafka.html

    因为项目还用的旧版本,源码使用的是logstash-output-kafka而不是logstash-integration-kafka 所以我们先加在logstash-output-kafka 里

    修改代码添加配置项,改动见commit 很简单四行代码

    config :enable_idempotence, :validate => :boolean, :required => true
    config :max_in_flight_requests_per_connection, :validate => :number,:default => 1, :required => true
    
    props.put(kafka::MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, max_in_flight_requests_per_connection.to_s)  unless           										max_in_flight_requests_per_connection.nil?
    props.put(kafka::ENABLE_IDEMPOTENCE_CONFIG, enable_idempotence.to_s) unless enable_idempotence.nil?
    

    打包编译,成功会生成相应的文件gem文件

    logstash-output-kafka-7.0.10.gem

    替换国内源,打包会快些

    https://gems.ruby-china.com/

    https://developer.aliyun.com/mirror/rubygems

    打包日志
     
    The default interactive shell is now zsh.
    To update your account to use zsh, please run `chsh -s /bin/zsh`.
    For more details, please visit https://support.apple.com/kb/HT208050.
    bjdeMacBook-Pro-Work:logstash-output-kafka cclient$ bundle install
    unsupported Java version "11", defaulting to 1.7
    WARNING: An illegal reflective access operation has occurred
    WARNING: Illegal reflective access by org.jruby.util.io.FilenoUtil to method sun.nio.ch.SelChImpl.getFD()
    WARNING: Please consider reporting this to the maintainers of org.jruby.util.io.FilenoUtil
    WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
    WARNING: All illegal access operations will be denied in a future release
    Ignoring executable-hooks-1.4.2 because its extensions are not built. Try: gem pristine executable-hooks --version 1.4.2
    Ignoring gem-wrappers-1.4.0 because its extensions are not built. Try: gem pristine gem-wrappers --version 1.4.0
    Ignoring jruby-launcher-1.1.5-java because its extensions are not built. Try: gem pristine jruby-launcher --version 1.1.5
    Ignoring rainbow-2.2.2 because its extensions are not built. Try: gem pristine rainbow --version 2.2.2
    Ignoring ruby-debug-ide-0.6.1 because its extensions are not built. Try: gem pristine ruby-debug-ide --version 0.6.1
    Ignoring executable-hooks-1.4.2 because its extensions are not built. Try: gem pristine executable-hooks --version 1.4.2
    Ignoring gem-wrappers-1.4.0 because its extensions are not built. Try: gem pristine gem-wrappers --version 1.4.0
    Ignoring jruby-launcher-1.1.5-java because its extensions are not built. Try: gem pristine jruby-launcher --version 1.1.5
    Ignoring rainbow-2.2.2 because its extensions are not built. Try: gem pristine rainbow --version 2.2.2
    Ignoring ruby-debug-ide-0.6.1 because its extensions are not built. Try: gem pristine ruby-debug-ide --version 0.6.1
    Fetching gem metadata from https://rubygems.org/.........
    Resolving dependencies..........................
    Error loading RubyGems plugin "/Users/cclient/.rvm/gems/jruby-9.1.13.0@global/gems/executable-hooks-1.4.2/lib/rubygems_plugin.rb": no such file to load -- executable-hooks/wrapper (LoadError)
    Error loading RubyGems plugin "/Users/cclient/.rvm/gems/jruby-9.1.13.0@global/gems/gem-wrappers-1.4.0/lib/rubygems_plugin.rb": no such file to load -- gem-wrappers (LoadError)
    Fetching rake 13.0.1
    Installing rake 13.0.1
    Using bundler 1.17.1
    Using numerizer 0.1.1
    Using chronic_duration 0.10.6
    Using clamp 0.6.5
    Fetching coderay 1.1.3
    Installing coderay 1.1.3
    Fetching concurrent-ruby 1.1.7
    Installing concurrent-ruby 1.1.7
    Fetching diff-lcs 1.4.4
    Installing diff-lcs 1.4.4
    Fetching multi_json 1.15.0
    Installing multi_json 1.15.0
    Using elasticsearch-api 5.0.5
    Fetching multipart-post 2.1.1
    Installing multipart-post 2.1.1
    Fetching faraday 1.0.1
    Installing faraday 1.0.1
    Using elasticsearch-transport 5.0.5
    Using elasticsearch 5.0.5
    Fetching ffi 1.13.1 (java)
    Installing ffi 1.13.1 (java)
    Using filesize 0.0.4
    Fetching fivemat 1.3.7
    Installing fivemat 1.3.7
    Using gem_publisher 1.5.0
    Using gems 0.8.3
    Using i18n 0.6.9
    Using insist 1.0.0
    Using jar-dependencies 0.3.12
    Fetching jrjackson 0.4.13 (java)
    Installing jrjackson 0.4.13 (java)
    Using jruby-openssl 0.9.19 (java)
    Using kramdown 1.14.0
    Fetching openssl_pkcs8_pure 0.0.0.2
    Installing openssl_pkcs8_pure 0.0.0.2
    Fetching manticore 0.7.0 (java)
    Installing manticore 0.7.0 (java)
    Using minitar 0.5.4
    Using method_source 0.8.2
    Using slop 3.6.0
    Using spoon 0.0.6
    Using pry 0.10.4 (java)
    Using puma 2.16.0 (java)
    Using rack 1.6.6
    Using ruby-maven-libs 3.3.9
    Using ruby-maven 3.3.12
    Using rubyzip 1.1.7
    Using rack-protection 1.5.5
    Fetching tilt 2.0.10
    Installing tilt 2.0.10
    Using sinatra 1.4.8
    Using stud 0.0.23
    Using thread_safe 0.3.6 (java)
    Using polyglot 0.3.5
    Using treetop 1.4.15
    Using logstash-core 5.6.4 (java)
    Using logstash-core-plugin-api 2.1.28 (java)
    Fetching logstash-codec-json 3.0.5
    Installing logstash-codec-json 3.0.5
    Using logstash-codec-plain 3.0.6
    Fetching rspec-support 3.10.0
    Installing rspec-support 3.10.0
    Fetching rspec-core 3.10.0
    Installing rspec-core 3.10.0
    Fetching rspec-expectations 3.10.0
    Installing rspec-expectations 3.10.0
    Fetching rspec-mocks 3.10.0
    Installing rspec-mocks 3.10.0
    Fetching rspec 3.10.0
    Installing rspec 3.10.0
    Using rspec-wait 0.0.9
    Using logstash-devutils 1.3.6 (java)
    Using logstash-output-kafka 6.2.4 from source at `.`
    Fetching poseidon 0.0.5
    Installing poseidon 0.0.5
    Fetching snappy-jars 1.1.0.1.2 (java)
    Installing snappy-jars 1.1.0.1.2 (java)
    Fetching snappy 0.1.0 (java)
    Installing snappy 0.1.0 (java)
    Bundle complete! 5 Gemfile dependencies, 59 gems now installed.
    Use `bundle info [gemname]` to see where a bundled gem is installed.
      
    安装自定义打包的的`logstash-output-kafka-7.0.10.gem`替代官方原始组件
    • 卸载官方原始logstash-output-kafka插件

      logstash-plugin remove logstash-output-kafka

    • 安装自定义logstash-output-kafka插件

      logstash-plugin install --no-verify logstash-output-kafka-7.3.2.gem

    配置应用

    output{
        kafka {
            acks => "all"
            codec => "json"
            topic_id => "test_topic"
            bootstrap_servers =>"a:9092,b:9092,c:9092"
            batch_size => 2048
            max_request_size =>512000
            max_in_flight_requests_per_connection => 1
            enable_idempotence => "true"
        }
    }
    

    启动服务确认参数生效

    问题解决

    调整代码和gem见

    https://github.com/cclient/logstash-output-kafka

    个人打了两个docker镜像集成这个gem包

    https://hub.docker.com/repository/docker/cclient/logstash

    其他事项

    需重点关注logstash-output-kafka 和kafka-client的匹配问题

    log stash version logstash-output-kafka version kafka-client version
    6.3.0 logstash-output-kafka-7.0.10 1.0
    logstash-output-kafka-6.2.4.gem 0.11
    6.8.13 logstash-output-kafka-7.3.2.gem 2.1.0

    logstash个人重度使用了1年,交付后支持各种业务两年多,目前也应用在其他一些产景

    初期大大提高了工作效率,但也发现了一些痛点和瓶颈,目前技术序列上已经通过nifi替代logstash

    更高的版本和其他插件可以按这个思路调整

    这次也算是和logstash(ruby)告别了

  • 相关阅读:
    C++ primer plus读书笔记——第16章 string类和标准模板库
    C++ primer plus读书笔记——第15章 友元、异常和其他
    C++ primer plus读书笔记——第14章 C++中的代码重用
    C++ primer plus读书笔记——第13章 类继承
    C++ primer plus读书笔记——第12章 类和动态内存分配
    开发中常用的一些神器推荐
    收集常用的Linux常用命令
    【数据库】13种会导致索引失效语句写法
    Windows终端利器Cmder
    嵌入式操作系统的主要特点都有哪些
  • 原文地址:https://www.cnblogs.com/zihunqingxin/p/14459610.html
Copyright © 2011-2022 走看看