zoukankan      html  css  js  c++  java
  • Kafka系列三 java API操作

    使用java API操作kafka

    1.pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>cn.itcast</groupId>
        <artifactId>KafkaDemo</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <dependencies>
            <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.12</artifactId>
                <version>1.0.0</version>
            </dependency>
        </dependencies>
    </project>

     2.producer和consumer配置文件

      2.1producer.properties

    #请求时候需要验证
    acks=all
    #请求失败时候需要重试
    retries=0
    #内存缓存区大小
    buffer.memory=33554432
    #分区类
    partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitioner
    #broker地址
    bootstrap.servers=192.168.25.151:9092,192.168.25.152:9092,192.168.25.153:9092
    #指定消息key序列化方式
    key.serializer=org.apache.kafka.common.serialization.StringSerializer
    #指定消息本身的序列化方式
    value.serializer=org.apache.kafka.common.serialization.StringSerializer

      2.2consumer.properties

    #每个消费者分配独立的组号
    group.id=test
    #如果value合法,则自动提交偏移量
    enable.auto.commit=true
    #设置多久一次更新被消费消息的偏移量
    auto.commit.interval.ms=1000
    #设置会话响应的时间,超过这个时间kafka可以选择放弃消费或者消费下一条消息
    session.timeout.ms=30000
    #指定消息key序列化方式
    key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #指定消息本身的序列化方式
    value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
    #broker地址
    bootstrap.servers=192.168.25.151:9092,192.168.25.152:9092,192.168.25.153:9092

    3.生产者和消费者代码

      3.1 KafkaProducerSimple.java

     1 package cn.itcast.kafka;
     2 
     3 import java.io.IOException;
     4 import java.io.InputStream;
     5 import java.util.Properties;
     6 import java.util.UUID;
     7 
     8 import org.apache.kafka.clients.producer.KafkaProducer;
     9 import org.apache.kafka.clients.producer.Producer;
    10 import org.apache.kafka.clients.producer.ProducerRecord;
    11 
    12 public class KafkaProducerSimple {
    13     public static void main(String[] args) throws IOException {
    14         Properties properties = new Properties();
    15         InputStream inStream = KafkaProducerSimple.class.getClassLoader().getResourceAsStream("producer.properties");
    16 
    17         properties.load(inStream);
    18 
    19         Producer<String, String> producer = new KafkaProducer<>(properties);
    20         String TOPIC = "orderMq6";
    21         for (int messageNo = 1; messageNo < 10000; messageNo++) {
    22             producer.send(new ProducerRecord<String, String>(TOPIC,messageNo + "", UUID.randomUUID() + "itcast"));
    23         }
    24     }
    25 }

      3.2 KafkaConsumerSimple.java

     1 package cn.itcast.kafka;
     2 
     3 import java.io.InputStream;
     4 import java.util.Arrays;
     5 import java.util.Properties;
     6 
     7 import org.apache.kafka.clients.consumer.Consumer;
     8 import org.apache.kafka.clients.consumer.ConsumerRecord;
     9 import org.apache.kafka.clients.consumer.ConsumerRecords;
    10 import org.apache.kafka.clients.consumer.KafkaConsumer;
    11 
    12 public class KafkaConsumerSimple {
    13 
    14     public static void main(String[] args) throws Exception {
    15         Properties properties = new Properties();
    16         InputStream inStream = KafkaConsumerSimple.class.getClassLoader().getResourceAsStream("consumer.properties");
    17         properties.load(inStream);
    18         Consumer<String, String> consumer = new KafkaConsumer<>(properties);
    19         consumer.subscribe(Arrays.asList("orderMq6"));
    20         while (true) {
    21             ConsumerRecords<String, String> records = consumer.poll(100);
    22             if (records.count() > 0) {
    23                 for (ConsumerRecord<String, String> record : records) {
    24                     System.out.println(record.value());
    25                 }
    26 
    27             }
    28         }
    29     }
    30 }

      以上代码如果执行超时,必须在本地host文件中配置broker的hostname和ip的映射。

  • 相关阅读:
    Https、SSL/TLS相关知识及wireShark抓包分析
    谷歌浏览器如何查看当前网页使用哪个TLS版本?
    centos7 ssh启动异常时,用ssh -t 查看报错信息。
    centos7设置久静态ip
    将cmder.exe添加到右键菜单,并配置环境变量
    KeepAlive与KeepAlive的区别
    openssl笔记
    1.javascript知识点总结
    js的小练习
    7.利用canvas和js画一个渐变的
  • 原文地址:https://www.cnblogs.com/zhaobingqing/p/8579215.html
Copyright © 2011-2022 走看看