一、创建maven项目
二、添加依赖pom.xml
https://maven.aliyun.com/mvn/search
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>org.example</groupId> <artifactId>kafka_cnn</artifactId> <version>1.0-SNAPSHOT</version> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> </properties> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>2.2.1</version> </dependency> <dependency> <groupId> org.apache.cassandra</groupId> <artifactId>cassandra-all</artifactId> <version>0.8.1</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> </dependencies> </project>
三、创建消费者类ConsumerDemo.java
package com.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; /** * @program: kafka consumer test * @Date: 2021/05/15 21:55 * @Author: kq * @Description: */ public class ConsumerDemo { public static void main(String[] args){ Properties properties = new Properties(); properties.put("bootstrap.servers", "192.168.1.12:9092"); /** * consumer分组id */ properties.put("group.id", "zabbix_perf"); properties.put("enable.auto.commit", "true"); properties.put("auto.commit.interval.ms", "1000"); /** * earliest * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 * latest * 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据 * none * topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常 * */ properties.put("auto.offset.reset", "earliest"); properties.put("session.timeout.ms", "30000"); /** * 反序列化 * 把kafka集群二进制消息反序列化指定类型。 */ properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties); kafkaConsumer.subscribe(Arrays.asList("my-kafka-topic5")); while (true) { ConsumerRecords<String, String> records = kafkaConsumer.poll(100);//100是超时时间 for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, value = %s", record.offset(), record.value()); System.out.println(); } } } }
四、创建生产者类ProducerDemo.java
package com.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * @program: kafka producer test * @Date: 2021/05/15 21:55 * @Author: kq * @Description: */ public class ProducerDemo { public static void main(String[] args){ Properties properties = new Properties(); /** *bootstrap.server用于建立到Kafka集群的初始连接的主机/端口对的列表,如果有两台以上的机器,逗号分隔 */ properties.put("bootstrap.servers", "192.168.1.12:9092"); /** * acks有三种状态 * acks=0 不等待服务器确认直接发送消息,无法保证服务器收到消息数据 * acks=1 把消息记录写到本地,但不会保证所有的消息数据被确认记录的情况下进行释放 * acks=all 确认所有的消息数据被同步副本确认,这样保证了记录不会丢失 * */ properties.put("acks", "all"); /** * 设置成大于0将导致客户端重新发送任何发送失败的记录 * */ properties.put("retries", 0); /** *16384字节是默认设置的批处理的缓冲区 */ properties.put("batch.size", 16384); properties.put("linger.ms", 1); properties.put("buffer.memory", 33554432); /** * 序列化类型。 * kafka是以键值对的形式发送到kafka集群的,其中key是可选的,value可以是任意类型,Message再被发送到kafka之前,Producer需要 * 把不同类型的消息转化成二进制类型。 */ properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = null; try { producer = new KafkaProducer<String, String>(properties); for (int i = 0; i < 5; i++) { String msg = "{Message " + i+"[{"partitions": [{"topic": "my-kafka-topic5", "partition": 4, "offset": 27}], "version":1 },{"partitions": [{"topic": "my-kafka-topic5", "partition": 4, "offset": 27}], "version":1 },{"partitions": [{"topic": "my-kafka-topic5", "partition": 4, "offset": 27}], "version":1 }]"; producer.send(new ProducerRecord<String, String>("my-kafka-topic5", msg)); System.out.println("Sent:" + msg); } } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } }