zoukankan      html  css  js  c++  java
  • Kafka java demo

    pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>org.example</groupId>
        <artifactId>SpringbootDemo</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.2.RELEASE</version>
            <relativePath/>
        </parent>
    
        <properties>
            <maven.compiler.source>8</maven.compiler.source>
            <maven.compiler.target>8</maven.compiler.target>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
    <!--        <dependency>-->
    <!--            <groupId>org.redisson</groupId>-->
    <!--            <artifactId>redisson-spring-boot-starter</artifactId>-->
    <!--            <version>3.13.1</version>-->
    <!--        </dependency>-->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.18.16</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>1.3.2</version>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>8.0.11</version>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.retry</groupId>
                <artifactId>spring-retry</artifactId>
            </dependency>
            <dependency>
                <groupId>org.aspectj</groupId>
                <artifactId>aspectjweaver</artifactId>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>

    ProducerDemo.java

    package com.gxf.kafka;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.common.serialization.StringSerializer;
    
    import java.util.Properties;
    
    public class ProducerDemo {
        private final KafkaProducer<String, String> producer;
        private final String topic;
    
        public ProducerDemo(String topicName) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("key.serializer", StringSerializer.class.getName());
            props.put("value.serializer", StringSerializer.class.getName());
            this.producer = new KafkaProducer<String, String>(props);
            this.topic = topicName;
        }
    
        public void sendMsg() {
            int messageNo = 1;
            try {
                for(;;) {
                    String messageStr="你好,这是第"+messageNo+"条数据";
                    producer.send(new ProducerRecord<String, String>(topic,  messageStr));
                    System.out.println("发送的信息:" + messageStr);
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                producer.close();
            }
        }
    
        public static void main(String args[]) {
            ProducerDemo producerDemo = new ProducerDemo("KAFKA_TEST");
            producerDemo.sendMsg();
        }
    }

    Consumer.java

    package com.gxf.kafka;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    public class ConsumerDemo {
        private final KafkaConsumer<String, String> consumer;
        private ConsumerRecords<String, String> msgList;
        private final String topic;
        private static final String GROUPID = "groupA";
    
        public ConsumerDemo(String topicName) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", GROUPID);
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("session.timeout.ms", "30000");
            props.put("auto.offset.reset", "earliest");
            props.put("key.deserializer", StringDeserializer.class.getName());
            props.put("value.deserializer", StringDeserializer.class.getName());
            this.consumer = new KafkaConsumer<String, String>(props);
            this.topic = topicName;
            this.consumer.subscribe(Arrays.asList(topic));
        }
    
    
        public void receiveMsg() {
            int messageNo = 1;
            System.out.println("---------开始消费---------");
            try {
                for (;;) {
                    msgList = consumer.poll(1000);
                    if(null!=msgList&&msgList.count()>0){
                        for (ConsumerRecord<String, String> record : msgList) {
                            System.out.println(messageNo+"=======receive: key = " + record.key() + ", value = " + record.value()+" offset==="+record.offset());
                        }
                    }else{
                        Thread.sleep(1000);
                    }
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                consumer.close();
            }
        }
        public static void main(String args[]) {
            ConsumerDemo consumerDemo = new ConsumerDemo("KAFKA_TEST");
            consumerDemo.receiveMsg();
        }
    }
    Please call me JiangYouDang!
  • 相关阅读:
    OpenJudge计算概论-四大湖
    OpenJudge计算概论-排队游戏【这个用到了栈的思想】
    OpenJudge计算概论-流感传染【这个题用二维数组】
    OpenJudge计算概论-扩号匹配问题【这个用到了栈的思想】
    Openjudge计算概论-角谷猜想
    OpenJudge计算概论-发票统计
    OpenJudge计算概论-Tomorrow never knows【输入日期计算下一天的日期】
    已知二叉树的中序和前序序列(或后序)求解树
    OpenJudge计算概论-寻找下标
    OpenJudge计算概论-校门外的树
  • 原文地址:https://www.cnblogs.com/luckygxf/p/15383489.html
Copyright © 2011-2022 走看看