zoukankan      html  css  js  c++  java
  • kafka生产者配置遇到的坑

    接入其他系统的kafka集群时,遇到了一下问题:

    org.springframework.kafka.support.LoggingProducerListener [76] [http-nio-9050-exec-1]- Exception thrown when sending a message with key='null' and payload='test' to topic lapm_notice:
    org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
    

     起初还是照抄网上给的接入的方法,按部就班的开始写客户端,。。。结果完成时测试执行到了send方法时出现了这种问题,找度娘说时因为你的topic不可用,我去,这个时调别人的集群,我哪里能管到,所以这个问题果断pass,熬了好几天,试了几种不同的方法,终于得以见天日,----版本的问题,我使用的kafka生产者版本是1.0.1,但是人家的集群的版本是0.9.0.1,我去,差了这么个版本就让我挂了,果断换了之后,发消息成功.

     下面是我的配置代码:

    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.util.Properties;
    
    @Component
    public class KafkaProducerUtil implements InitializingBean {
    
        @Resource
        private Properties props;
        @Resource
        private KafkaConfig kafkaConfig;
    
        private KafkaProducer<String, String> producer;
    
        public void init() {
            producer = new KafkaProducer<String, String>(getProps());
        }
    
        public KafkaProducer<String, String> getProducer() {
            return producer;
        }
    
        private Properties getProps() {
            // 服务器ip:端口号,集群用逗号分隔
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getServers());
            // key序列化指定类
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            // value序列化指定类
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            return props;
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            init();
        }
    }

    导入的maven为:

    <!-- kafka客户端引入以支持kafka输出 -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.10</artifactId>
                <version>${kafka.version}</version>
                <exclusions>
                    <exclusion>
                        <artifactId>slf4j-log4j12</artifactId>
                        <groupId>org.slf4j</groupId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>${kafka.version}</version>
            </dependency>

    另外,以log4j2的方式配置发消息的方式也是相当实用的(某司给的demo,可惜我的架构不是这种的),但是这种需要你的日志框架是log4j2,代码:

    <?xml version="1.0" encoding="UTF-8"?>
    <Configuration name="defaultConfiguration" status="off" strict="true" monitorInterval="5">
    	<properties>
    		<!-- 日志输出格式配置 -->
    		<property name="patternlayout">%date{yyyy-MM-dd HH:mm:ss,SSS}|%level|%thread|%class{36}|%M|%msg%xEx|$|$|%n</property>
    		<!-- 测试环境监控预警平台Kafka ip -->
            <property name="kafkaServers">127.0.0.1:9020</property>
    	</properties>
    
    	<Appenders>
    		<Console name="Console" target="SYSTEM_OUT">
    			<PatternLayout pattern="${patternlayout}" />
    		</Console>
    		<!-- 预警类日志KafkaAppender, lapm_notice为预警类Kafka topic -->
    		<Kafka name="NoticeKafkaAppender" topic="lapm_notice" syncSend="false">
    			<PatternLayout pattern="${patternlayout}" />
    			<Property name="bootstrap.servers">${kafkaServers}</Property>
    		</Kafka>
    		<!-- 统计类日志KafkaAppender, lapm_statistics为统计类Kafka topic -->
    		<Kafka name="StatisticsKafkaAppender" topic="lapm_statistics" syncSend="false">
    			<PatternLayout pattern="${patternlayout}" />
    			<Property name="bootstrap.servers">${kafkaServers}</Property>
    		</Kafka>
    		<!-- 场景类日志KafkaAppender, lapm_scene为场景类Kafka topic -->
    		<Kafka name="SceneKafkaAppender" topic="lapm_scene" syncSend="false">
    			<PatternLayout pattern="${patternlayout}" />
    			<Property name="bootstrap.servers">${kafkaServers}</Property>
    		</Kafka>
    	</Appenders>
    
    	<Loggers>
    		<AsyncRoot level="info" includeLocation="true">
    			<AppenderRef ref="Console" />
    		</AsyncRoot>
    		<!-- 预警类日志AsyncLogger -->
    		<AsyncLogger name="Notice" level="info" includeLocation="true" additivity="true">
    			<AppenderRef ref="NoticeKafkaAppender" />
    		</AsyncLogger>
    		<!-- 统计类日志 AsyncLogger -->
    		<AsyncLogger name="Statistics" level="info" includeLocation="true" additivity="true">
    			<AppenderRef ref="StatisticsKafkaAppender" />
    		</AsyncLogger>
    		<!-- 场景类日志AsyncLogger -->
    		<AsyncLogger name="Scene" level="info" includeLocation="true" additivity="true">
    			<AppenderRef ref="SceneKafkaAppender" />
    		</AsyncLogger>
    	</Loggers>
    </Configuration>
    
    /** 预警类日志 */
    private Logger noticeLogger = LoggerFactory.getLogger("Notice");
    
    noticeLogger.error(NoticeLogUtils.getNoticeInfo(businessCode, userCode, NoticeType.NOTICE_WARN, url, arguments, error, sysModuleCode, requestId, requestSerial)
    				+ "异常预警日志系统自定义部分");
    

     maven配置如下:

    <!-- LOG4j2引入 -->
    		<dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>${log4j2.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-core</artifactId>
                <version>${log4j2.version}</version>
            </dependency>	
            <dependency>
    			<groupId>org.apache.logging.log4j</groupId>
    			<artifactId>log4j-web</artifactId>
    			<version>${log4j2.version}</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.logging.log4j</groupId>
    			<artifactId>log4j-slf4j-impl</artifactId>
    			<version>${log4j2.version}</version>
    		</dependency>
    		<!-- SLF4J2引入 -->
    		<dependency>  
    		    <groupId>org.slf4j</groupId>  
    		    <artifactId>slf4j-api</artifactId>  
    		    <version>${slf4j.version}</version>  
    		</dependency>  
    		<dependency>  
    		    <groupId>org.slf4j</groupId>  
    		    <artifactId>jcl-over-slf4j</artifactId>  
    		    <version>${slf4j.version}</version>  
    		    <scope>runtime</scope>
    		</dependency>
    		<!-- LOG4J2异步日志需引入disruptor -->
     		<dependency>
    			<groupId>com.lmax</groupId>
    			<artifactId>disruptor</artifactId>
    			<version>${lmax.version}</version>
    		</dependency>
    		<!-- kafka客户端引入以支持kafka输出 -->
    		<dependency>
    			<groupId>org.apache.kafka</groupId>
    			<artifactId>kafka_2.10</artifactId>
    			<version>${kafka.version}</version>
    			<exclusions>
    				<exclusion>
    					<artifactId>slf4j-log4j12</artifactId>
    					<groupId>org.slf4j</groupId>
    				</exclusion>
    			</exclusions>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.kafka</groupId>
    			<artifactId>kafka-clients</artifactId>
    			<version>${kafka.version}</version>
    		</dependency>
    

      

  • 相关阅读:
    协程—gevent模块的使用
    协程—概念以及基本使用
    Python—同步和互斥
    Hugo博客搭建
    Linux编辑利器-Vim
    Linux命令与Shell
    python入门基础
    .netcore程序在linux下用supervisor守护
    .netcore中添加Swagger
    winform或wpf中全局异常捕获
  • 原文地址:https://www.cnblogs.com/otways/p/11542750.html
Copyright © 2011-2022 走看看