zoukankan      html  css  js  c++  java
  • Java操作Kafka执行不成功

    使用kafka-clients操作kafka始终不成功,原因不清楚,下面贴出相关代码及配置,请懂得指点一下,谢谢!

    环境及依赖

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.0</version>
    </dependency>

    JDK版本为1.8、Kafka版本为2.12-0.10.2.0,服务器使用CentOS-7构建。

    测试代码

    • TestBase.java

    public class TestBase {
    
        protected Logger log = LoggerFactory.getLogger(this.getClass());
    
        protected String kafka_server = "192.168.60.160:9092" ;
    
        protected String topic = "zlikun_topic";
    
    }
    • ProducerTest.java

    public class ProducerTest extends TestBase {
    
        protected Properties props = new Properties();
    
        @Before
        public void init() {
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    
            props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG ,MyPartitioner.class) ;
        }
    
        @Test
        public void test() throws InterruptedException {
    
            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
            // 发送消息
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (e == null) {
                            System.out.printf("offset = %d ,partition = %d 
    ", recordMetadata.offset() ,recordMetadata.partition());
                        } else {
                            log.error("send error !" ,e);
                        }
                    }
                });
            }
    
            TimeUnit.SECONDS.sleep(3);
            producer.close();
    
        }
    
    }
    • ConsumerTest.java

    public class ConsumerTest extends TestBase {
    
        private Properties props = new Properties();
    
        @Before
        public void init() {
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);
            props.put(ConsumerConfig.GROUP_ID_CONFIG ,"zlikun") ;
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        }
    
        @Test
        public void test() {
    
            Consumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topic));
    //        consumer.assign(Arrays.asList(new TopicPartition(topic, 1)));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
    
        }
    
    }

    问题

    # 测试topic为手动创建
    $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic zlikun_topic

    控制台输出信息

    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time

    Java操作Kafka执行不成功 >> java

    这个答案描述的挺清楚的:
    http://www.goodpm.net/postreply/java/1010000008863969/Java操作Kafka执行不成功.html
  • 相关阅读:
    一周精彩内容分享(第 5 期):货拉拉悲剧的背后
    关于 HTTP 后端人员需要了解的 20+ 图片!
    百度地图午夜暗蓝风格
    百度地图开发自定义信息窗口openInfoWindow样式
    百度地图infowindow上添加自定义点击事件
    js显示当前日期时间和星期几
    iview 树形异步加载,首次加载子节点不能选择,点击父节点后才可以选择
    js 修改属性名和值。并只保留需要的属性
    css 条形百分比
    echarts 3d饼图
  • 原文地址:https://www.cnblogs.com/scrumme/p/7668819.html
Copyright © 2011-2022 走看看