zoukankan      html  css  js  c++  java
  • logstash output kafka ip 设置的坑

    原设置

    output {
      kafka {
        acks => "0"
        enable_metric => false
        codec => "json"
        topic_id => "topic_test"
        bootstrap_servers =>"kafka:9092"
        batch_size => 2
      }
      stdout {
        codec => "json"
      }
    }

    异常

    ERROR logstash.pipeline - Error registering plugin {:plugin=>"#<LogStash::OutputDelegator:0x7f6968f0 @namespaced_metric=#<LogStash::Instrument::NamespacedMetric:0x6a481298 @metric=#<LogStash::Instrument::Metric:0x373adc76 @collector=#<LogStash::Instrument::Collector:0x5c413763 @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x68dbfaf3 @store=#<Concurrent::Map:0x00000000061f98 entries=2 default_proc=nil>, @structured_lookup_mutex=#<Mutex:0x422de9a2>, @fast_lookup=#<Concurrent::Map:0x00000000061f9c entries=53 default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs, :"ea6e3d1fb3cb9d03054be3c38cb045c1fb3aae21-4"]>, @metric=#<LogStash::Instrument::NamespacedMetric:0x763a85c3 @metric=#<LogStash::Instrument::Metric:0x373adc76 @collector=#<LogStash::Instrument::Collector:0x5c413763 @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x68dbfaf3 @store=#<Concurrent::Map:0x00000000061f98 entries=2 default_proc=nil>, @structured_lookup_mutex=#<Mutex:0x422de9a2>, @fast_lookup=#<Concurrent::Map:0x00000000061f9c entries=53 default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs]>, @logger=#<LogStash::Logging::Logger:0x6ccc74ee @logger=#<Java::OrgApacheLoggingLog4jCore::Logger:0x143e31e1>>, @strategy=#<LogStash::OutputDelegatorStrategies::Shared:0x4741e2d6 @output=<LogStash::Outputs::Kafka acks=>"0", codec=><LogStash::Codecs::JSON id=>"json_f412b478-1559-45fd-9d73-7b500ed05f4a", enable_metric=>true, charset=>"UTF-8">, topic_id=>"qingbo_news", bootstrap_servers=>"kafka_l:9092", batch_size=>2, id=>"ea6e3d1fb3cb9d03054be3c38cb045c1fb3aae21-4", enable_metric=>true, workers=>1, block_on_buffer_full=>true, buffer_memory=>33554432, compression_type=>"none", key_serializer=>"org.apache.kafka.common.serialization.StringSerializer", linger_ms=>0, max_request_size=>1048576, metadata_fetch_timeout_ms=>60000, metadata_max_age_ms=>300000, receive_buffer_bytes=>32768, reconnect_backoff_ms=>10, retries=>0, retry_backoff_ms=>100, send_buffer_bytes=>131072, ssl=>false, security_protocol=>"PLAINTEXT", sasl_mechanism=>"GSSAPI", timeout_ms=>30000, value_serializer=>"org.apache.kafka.common.serialization.StringSerializer">>, @id="ea6e3d1fb3cb9d03054be3c38cb045c1fb3aae21-4", @metric_events=#<LogStash::Instrument::NamespacedMetric:0x9221af6 @metric=#<LogStash::Instrument::Metric:0x373adc76 @collector=#<LogStash::Instrument::Collector:0x5c413763 @agent=nil, @metric_store=#<LogStash::Instrument::MetricStore:0x68dbfaf3 @store=#<Concurrent::Map:0x00000000061f98 entries=2 default_proc=nil>, @structured_lookup_mutex=#<Mutex:0x422de9a2>, @fast_lookup=#<Concurrent::Map:0x00000000061f9c entries=53 default_proc=nil>>>>, @namespace_name=[:stats, :pipelines, :main, :plugins, :outputs, :"ea6e3d1fb3cb9d03054be3c38cb045c1fb3aae21-4", :events]>, @output_class=LogStash::Outputs::Kafka>", :error=>"Failed to construct kafka producer"}
    06:49:57.474 [[main]-pipeline-manager] ERROR logstash.agent - Pipeline aborted due to error {:exception=>org.apache.kafka.common.KafkaException: Failed to construct kafka producer, :backtrace=>["org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:335)", "org.apache.kafka.clients.producer.KafkaProducer.<init>(org/apache/kafka/clients/producer/KafkaProducer.java:188)", "java.lang.reflect.Constructor.newInstance(java/lang/reflect/Constructor.java:423)", "RUBY.create_producer(/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.7/lib/logstash/outputs/kafka.rb:242)", "RUBY.register(/usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-output-kafka-5.1.7/lib/logstash/outputs/kafka.rb:178)", "RUBY.register(/usr/share/logstash/logstash-core/lib/logstash/output_delegator_strategies/shared.rb:9)", "RUBY.register(/usr/share/logstash/logstash-core/lib/logstash/output_delegator.rb:41)", "RUBY.register_plugin(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:281)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:292)", "org.jruby.RubyArray.each(org/jruby/RubyArray.java:1613)", "RUBY.register_plugins(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:292)", "RUBY.start_workers(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:301)", "RUBY.run(/usr/share/logstash/logstash-core/lib/logstash/pipeline.rb:226)", "RUBY.start_pipeline(/usr/share/logstash/logstash-core/lib/logstash/agent.rb:398)", "java.lang.Thread.run(java/lang/Thread.java:748)"]}
    06:49:57.611 [Api Webserver] INFO logstash.agent - Successfully started Logstash API endpoint {:port=>9600}
    View Code

    找不到任何相关信息

    https://discuss.elastic.co/t/failed-to-construct-kafka-producer/76195

    最相近的贴子时这个,但没有得到回复,问题过期也禁止回复

    kafka 写在 /etc/hosts 里,网络是通的

    bash-4.3# ping kafka
    PING kafka (172.32.255.81): 56 data bytes
    64 bytes from 172.32.255.81: seq=0 ttl=64 time=0.301 ms
    64 bytes from 172.32.255.81: seq=1 ttl=64 time=0.093 ms

    以前搞 hadoop 碰上过相反的问题,hadoop当时配的ip,失败报奇葩错误,配成host才正常。

    可以换 host换 ip 试试 换成ip,便通过了

    output {
     kafka {
      acks => "0"
      enable_metric => false
      codec => "json"
      topic_id => "topic_test"
      bootstrap_servers =>"172.32.255.81:9092"
      batch_size => 2
     }
     stdout {
      codec => "json"
     }
    }
  • 相关阅读:
    2020 年最棒的 9 个 Java 框架,哪个最香?
    CTO:不要在 Java 代码中写 set/get 方法了,逮一次罚款
    面试常考:Java中synchronized和volatile有什么区别?
    树莓派3B装ubuntu server后开启wifi
    转:程序内存空间(代码段、数据段、堆栈段)
    环境变量IFS
    python之格式化字符串速记整理
    logging模块简单用法
    理解正则表达式的匹配关系
    cut和tr命令的联合使用
  • 原文地址:https://www.cnblogs.com/zihunqingxin/p/8278693.html
Copyright © 2011-2022 走看看