zoukankan      html  css  js  c++  java
  • kafka: Java实现简单的Producer和Consumer

    Maven   pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.jj</groupId>
        <artifactId>ak02</artifactId>
        <version>1.0.0</version>
    
        <properties>
            <kafka.version>1.1.0</kafka.version>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
    
        <dependencies>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>${kafka.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-simple</artifactId>
                <version>1.7.25</version>
                <scope>compile</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.7.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
    </project>
    BasicProducer.java
    package com.jj.ak02;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.util.Properties;
    
    public class BasicProducer {
        public static void main(String[] args) {
            // 步驟1. 設定要連線到Kafka集群的相關設定
            Properties props = new Properties();
    
            props.put("bootstrap.servers", "localhost:9092"); // Kafka集群在那裡?
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定msgKey的序列化器
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 指定msgValue的序列化器
    
            // 步驟2. 產生一個Kafka的Producer的實例
            Producer<String, String> producer = new KafkaProducer<>(props);
    
            // 步驟3. 指定想要發佈訊息的topic名稱
            String topicName = "test";
    
            int msgCounter = 0;
    
            try {
                System.out.println("Start sending messages ...");
    
                // 步驟4. 產生要發佈到Kafka的訊息 (把訊息封裝進一個ProducerRecord的實例中)
                //    - 參數#1: topicName
                //    - 參數#2: msgKey
                //    - 參數#3: msgValue
                producer.send(new ProducerRecord<>(topicName, null, "Hello"));
                producer.send(new ProducerRecord<>(topicName, null, "Hello2"));
                producer.send(new ProducerRecord<>(topicName, "8703147", "Hello3"));
                producer.send(new ProducerRecord<>(topicName, "8703147", "Hello4"));
    
                msgCounter+=4;
                System.out.println("Send " + msgCounter + " messages to Kafka");
    
            } catch (Exception e) {
                // 錯誤處理
                e.printStackTrace();
            }
    
            // 步驟5. 關掉Producer實例的連線
            producer.close();
    
            System.out.println("Message sending completed!");
        }
    }
    BasicConsumer.java
    package com.wistron.witlab4.ak02;
    
    import org.apache.kafka.clients.consumer.*;
    import org.apache.kafka.common.record.TimestampType;
    
    import java.util.Arrays;
    import java.util.Properties;
    
    public class BasicConsumer {
        public static void main(String[] args) {
            // 步驟1. 設定要連線到Kafka集群的相關設定
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092"); // Kafka集群在那裡?
            props.put("group.id", "my-group"); // <-- 這就是ConsumerGroup
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 指定msgKey的反序列化器
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 指定msgValue的反序列化器
            props.put("auto.offset.reset", "earliest"); // 是否從這個ConsumerGroup尚未讀取的partition/offset開始讀
            // 步驟2. 產生一個Kafka的Consumer的實例
            Consumer<String, String> consumer = new KafkaConsumer<>(props);
            // 步驟3. 指定想要訂閱訊息的topic名稱
            String topicName = "test";
            // 步驟4. 讓Consumer向Kafka集群訂閱指定的topic
            consumer.subscribe(Arrays.asList(topicName));
            // 步驟5. 持續的拉取Kafka有進來的訊息
            try {
                System.out.println("Start listen incoming messages ...");
                while (true) {
                    // 請求Kafka把新的訊息吐出來
                    ConsumerRecords<String, String> records = consumer.poll(1000);
                    // 如果有任何新的訊息就會進到下面的迭代
                    for (ConsumerRecord<String, String> record : records){
                        // ** 在這裡進行商業邏輯與訊息處理 **
                        // 取出相關的metadata
                        String topic = record.topic();
                        int partition = record.partition();
                        long offset = record.offset();
                        TimestampType timestampType = record.timestampType();
                        long timestamp = record.timestamp();
                        // 取出msgKey與msgValue
                        String msgKey = record.key();
                        String msgValue = record.value();
                        // 秀出metadata與msgKey & msgValue訊息
                        System.out.println(topic + "-" + partition + "-" + offset + " : (" + record.key() + ", " + record.value() + ")");
                    }
                }
            } finally {
                // 步驟6. 如果收到結束程式的訊號時關掉Consumer實例的連線
                consumer.close();
                System.out.println("Stop listen incoming messages");
            }
        }
    }
  • 相关阅读:
    VNC远程控制软件是什么?有没有更好的远程桌面控制解决方案?
    目前国内最热门的四款远程桌面控制软件
    深入Redis命令的执行过程
    深入Redis客户端(redis客户端属性、redis缓冲区、关闭redis客户端)
    Springboot拦截器的使用
    Springboot使用Undertow
    Jenkins parallel并行构建
    Jenkins使用docker构建
    Redis 的键命令、HyperLogLog 命令、脚本命令、连接命令、服务器命令
    Redis Set和ZSet常用命令
  • 原文地址:https://www.cnblogs.com/fangjb/p/13025972.html
Copyright © 2011-2022 走看看