zoukankan      html  css  js  c++  java
  • Java操作Kafka执行不成功

    使用kafka-clients操作kafka始终不成功,原因不清楚,下面贴出相关代码及配置,请懂得指点一下,谢谢!

    环境及依赖

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.10.2.0</version>
    </dependency>

    JDK版本为1.8、Kafka版本为2.12-0.10.2.0,服务器使用CentOS-7构建。

    测试代码

    • TestBase.java

    public class TestBase {
    
        protected Logger log = LoggerFactory.getLogger(this.getClass());
    
        protected String kafka_server = "192.168.60.160:9092" ;
    
        protected String topic = "zlikun_topic";
    
    }
    • ProducerTest.java

    public class ProducerTest extends TestBase {
    
        protected Properties props = new Properties();
    
        @Before
        public void init() {
    
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);
            props.put(ProducerConfig.ACKS_CONFIG, "all");
            props.put(ProducerConfig.RETRIES_CONFIG, 0);
            props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
            props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
            props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    
            props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG ,MyPartitioner.class) ;
        }
    
        @Test
        public void test() throws InterruptedException {
    
            KafkaProducer<String, String> producer = new KafkaProducer<>(props);
    
            // 发送消息
            for (int i = 0; i < 10; i++) {
                producer.send(new ProducerRecord<String, String>(topic, Integer.toString(i), Integer.toString(i)), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if (e == null) {
                            System.out.printf("offset = %d ,partition = %d 
    ", recordMetadata.offset() ,recordMetadata.partition());
                        } else {
                            log.error("send error !" ,e);
                        }
                    }
                });
            }
    
            TimeUnit.SECONDS.sleep(3);
            producer.close();
    
        }
    
    }
    • ConsumerTest.java

    public class ConsumerTest extends TestBase {
    
        private Properties props = new Properties();
    
        @Before
        public void init() {
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka_server);
            props.put(ConsumerConfig.GROUP_ID_CONFIG ,"zlikun") ;
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        }
    
        @Test
        public void test() {
    
            Consumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(Arrays.asList(topic));
    //        consumer.assign(Arrays.asList(new TopicPartition(topic, 1)));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
    
        }
    
    }

    问题

    # 测试topic为手动创建
    $ bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 4 --topic zlikun_topic

    控制台输出信息

    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-3: 30042 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 2 record(s) for zlikun_topic-2: 30042 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-1: 30043 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time
    [kafka-producer-network-thread | producer-1] ERROR com.zlikun.mq.ProducerTest - send error !
    org.apache.kafka.common.errors.TimeoutException: Expiring 3 record(s) for zlikun_topic-0: 30046 ms has passed since batch creation plus linger time

    Java操作Kafka执行不成功 >> java

    这个答案描述的挺清楚的:
    http://www.goodpm.net/postreply/java/1010000008863969/Java操作Kafka执行不成功.html
  • 相关阅读:
    Oracle函数如何把符串装换为小写的格式
    Oralce中的synonym同义词
    JS中getYear()的兼容问题
    How to do SSH Tunneling (Port Forwarding)
    所谓深度链接(Deep linking)
    upload size of asp.net
    发一个自动刷网站PV流量的小工具
    解决Visual Studio 2008 下,打开.dbml(LINQ) 文件时,提示"The operation could not be completed." 的问题。
    在资源管理器中使鼠标右键增加一个命令,运行cmd,同时使得当前路径为资源管理器当前的目录
    使用SQL语句获取Sql Server数据库的版本
  • 原文地址:https://www.cnblogs.com/scrumme/p/7668819.html
Copyright © 2011-2022 走看看