zoukankan      html  css  js  c++  java
  • 二十分钟快速上手Kafka开发(Java示例)

    引子

    Kafka 是一个高性能、高可用、高可靠的支持事件数据流的消息队列系统,是实时计算的可靠数据源。

    本文给出使用 Kafka 的基本示例。关于 Kafka 的更多原理性的信息,可阅读《Kafka权威指南》一书。

    基本知识

    基本概念

    • 事件(Event):包含 [key, value, timestamp, headers] ,是写入 Kafka 或从 Kafka 读取的数据对象。通常是由其它数据源或设备源触发而来;事件可以看做是消息或记录;
    • 主题(Topic):将同一类事件对象组织在一起的名字空间。生产者写入指定的 Topic ,消费者订阅并从 Topic 中读取数据;
    • 分区(Partition):分区是事件在主题中的存储单元。同一个主题下的所有事件会存放在多个分区里;分区可以保证 Kafka 的高可用和可伸缩性。Kafka 保证数据写入分区和从分区读取的顺序是一致的;
    • 偏移量(Offset):消费者读取消息时会向 _consumer_offset 的特殊主题提交消息偏移量,便于追踪消息读取进度;如果发生分区再均衡(消费者群组中的消费者上线或下线),可以控制消息读取的不丢失和一致性;

    Kafka 遵循生产者-消费者模型(P-C):

    • 生产者(Producer): 往 Kafka 分区写数据的数据生产者;
    • 消费者(Consumer): 从 Kafka 分区读取数据的数据消费者;消费者通常会处于某个消费者群组里。

    消息系统的基本原理见: “【整理】互联网服务端技术体系:服务解耦之消息系统”

    流程图

    Kafka 生产者写入示意图:

    Kafka 消费者读取示意图:


    准备工作

    Zookeeper 和 Kafka

    “Zookeeper Download” 下载 zookeeper 压缩包,从 “Kafka Download” 下载 Kafka 压缩包,使用 tar xzvf xxx.tar.gz 解压即可。

    启动服务

    启动 Zookeeper 服务。切换到 Zookeeper 解压目录下,执行如下命令:

    bin/zkServer.sh start-foreground
    

    启动 Kafka 服务。切换到 Kafka 解压目录下,执行如下命令:

    bin/kafka-server-start.sh config/server.properties
    

    创建和查看消息主题

    执行如下命令,创建了一个 order-events 的消息主题:

    bin/kafka-topics.sh --create --topic order-events --bootstrap-server localhost:9092
    

    查看主题 order-events 的信息:

    bin/kafka-topics.sh --describe --topic order-events --bootstrap-server localhost:9092
    

    Java示例

    步骤一:引入 POM 依赖

           <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.13</artifactId>
                <version>2.7.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-streams</artifactId>
                <version>2.7.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.7.0</version>
            </dependency>
    

    步骤二:创建Kafka消息发送组件

    package cc.lovesq.kafkamsg;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.util.Properties;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @Description kafka消息发送
     * @Date 2021/2/4 10:47 上午
     * @Created by qinshu
     */
    @Component
    public class KafkaMessageProducer {
    
        private static Log log = LogFactory.getLog(KafkaMessageProducer.class);
    
        private KafkaProducer producer;
    
        @PostConstruct
        public void init() {
            Properties properties = new Properties();
            properties.put("bootstrap.servers","localhost:9092");    // 指定 Broker
            properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");  // 将 key 的 Java 对象转成字节数组
            properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // 将 value 的 Java 对象转成字节数组
            properties.put("acks", "1");       // 消息至少成功发给一个副本后才返回成功
            properties.put("retries", "5");    // 消息重试 5 次
    
            producer = new KafkaProducer<String,String>(properties);
    
        }
    
        /**
         * 同步发送消息
         */
        public void send(ProducerRecord record) {
            try {
                producer.send(record).get(200, TimeUnit.MILLISECONDS);
            } catch (Exception ex) {
                log.error(ex.getMessage(), ex);
            }
    
        }
    
        /**
         * 异步发送消息
         */
        public void sendAsync(ProducerRecord record, Callback callback) {
            try {
                producer.send(record, callback);
            } catch (Exception ex) {
                log.error(ex.getMessage(), ex);
            }
    
        }
    }
    

    步骤三: 创建Kafka消息消费组件

    package cc.lovesq.kafkamsg;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    /**
     * @Description kafka消息接收
     * @Date 2021/2/4 11:04 上午
     * @Created by qinshu
     */
    @Component
    public class KafkaMessageConsumer {
    
        private static Log log = LogFactory.getLog(KafkaMessageConsumer.class);
    
        private KafkaConsumer consumer;
    
        @PostConstruct
        public void init() {
            Properties properties = new Properties();
            properties.put("bootstrap.servers","localhost:9092");  // 指定 Broker
            properties.put("group.id", "experiment");              // 指定消费组群 ID
            properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 将 key 的字节数组转成 Java 对象
            properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  // 将 value 的字节数组转成 Java 对象
    
            consumer = new KafkaConsumer(properties);
            consumer.subscribe(Collections.singleton("order-events"));  // 订阅主题 order-events
    
            new Thread(this::consumer).start();
        }
    
        public void consumer() {
            try {
                while (true) {
                    ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(100));
                    for (ConsumerRecord<String,String> record: records) {
                        String info = String.format("[Topic: %s][Partition:%d][Offset:%d][Key:%s][Message:%s]",
                                record.topic(), record.partition(), record.offset(), record.key(), record.value());
                        log.info("Received:" + info);
                    }
                }
            } finally {
                consumer.close();
            }
    
        }
    }
    

    步骤四:创建消息发送者(业务),这里借用了“后端简易实验框架” 的功能。这里的消息对象可以替换成自己工程里的对象哈,稍加改动即可。

    package cc.lovesq.experiments;
    
    import cc.lovesq.constants.DeliveryType;
    import cc.lovesq.controller.GoodsSnapshotController;
    import cc.lovesq.kafkamsg.KafkaMessageProducer;
    import cc.lovesq.model.BookInfo;
    import cc.lovesq.model.GoodsInfo;
    import cc.lovesq.model.Order;
    import cc.lovesq.model.transfer.BookInfoToMessageTransfer;
    import cc.lovesq.result.BaseResult;
    import com.alibaba.fastjson.JSON;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.kafka.clients.producer.Callback;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.Resource;
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * @Description 下单实验
     * @Date 2021/1/4 10:50 上午
     * @Created by qinshu
     */
    @Component
    public class BookExperiment implements IExperiment {
    
        private static Log log = LogFactory.getLog(BookExperiment.class);
    
        @Resource
        private GoodsSnapshotController goodsSnapshotController;
    
        @Resource
        private KafkaMessageProducer producer;
    
        private ExecutorService es = Executors.newFixedThreadPool(10);
    
        @Override
        public void test() {
            generateOrders();
        }
    
        // 模拟并发下单
        public void generateOrders() {
            for (int i=1; i < 1000; i++) {
                es.submit(() -> {
                    book();
                });
            }
        }
    
        Random random = new Random(System.currentTimeMillis());
    
        private BaseResult book() {
            BookInfo bookInfo = new BookInfo();
            Order order = new Order();
    
            Long shopId = 654321L + random.nextInt(10000);
            Long userId = 1234L + random.nextInt(1000);
            Long goodsId = 5678L + random.nextInt(4000);
            order.setShopId(shopId);
            order.setUserId(userId);
            order.setDeliveryType(DeliveryType.express);
            order.setIsCodPay(false);
            bookInfo.setOrder(order);
    
            GoodsInfo goods = new GoodsInfo();
            goods.setGoodsId(goodsId);
            goods.setShopId(shopId);
            goods.setTitle("认养一头牛");
            goods.setDesc("2箱*250g");
            bookInfo.setGoods(goods);
    
            BaseResult bookResult = goodsSnapshotController.save(bookInfo);
            log.info("下单结果:" + JSON.toJSONString(bookResult));
    
            // 下单成功后发送消息
            producer.sendAsync(
                    BookInfoToMessageTransfer.transfer(bookInfo),
                    (metadata, exception) -> callback(bookInfo, metadata, exception));
    
            return bookResult;
        }
    
        // 消息发送后的回调函数
        private void callback(BookInfo bookInfo, RecordMetadata metadata, Exception ex) {
            if (metadata != null) {
                log.info("发送订单消息:" + bookInfo.getOrder().getOrderNo() + " 偏移量: " + metadata.offset() + " 主题: " + metadata.topic());
            } else {
                log.error("发送订单消息失败: " + ex.getMessage(), ex);
            }
        }
    }
    

    至此,就可以实现 Kafka 的消息发送和消息消费示例了。

    Kafka 还可以用于可靠的数据源,为实时计算组件提供事件流,如下图所示代码:

    package cc.lovesq.kafkamsg;
    
    import cc.lovesq.model.BookInfo;
    import cc.lovesq.util.TimeUtil;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.kafka.common.serialization.Serdes;
    import org.apache.kafka.streams.KafkaStreams;
    import org.apache.kafka.streams.StreamsBuilder;
    import org.apache.kafka.streams.StreamsConfig;
    import org.apache.kafka.streams.kstream.KStream;
    import org.apache.kafka.streams.kstream.Materialized;
    import org.apache.kafka.streams.kstream.Printed;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.util.Properties;
    
    /**
     * @Description Kafka 事件流
     * @Date 2021/2/4 8:17 下午
     * @Created by qinshu
     */
    @Component
    public class KafkaMessageStream {
    
        private static Log log = LogFactory.getLog(KafkaMessageStream.class);
    
        @PostConstruct
        public void init() {
            Properties properties = new Properties();
            properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "orderCount");
            properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    
            StreamsBuilder streamBuilder = new StreamsBuilder();
            KStream<String,String> source = streamBuilder.stream("order-events");
    
            // 计算下单中每个 goodsId 出现的次数
            KStream result = source.filter(
                    (key, value) -> value.startsWith("{") && value.endsWith("}")
            ).mapValues(
                    value -> JSONObject.parseObject(value, BookInfo.class)
            ).mapValues(
                    bookInfo -> bookInfo.getGoods().getGoodsId().toString()
            ).groupBy((key,value) -> value).count(Materialized.as("goods-order-count")
            ).mapValues(value -> Long.toString(value)).toStream();
    
            result.print(Printed.toSysOut());
    
            new Thread(
                    () -> {
                        TimeUtil.sleepInSecs(10);
                        KafkaStreams streams = new KafkaStreams(streamBuilder.build(), properties);
                        streams.start();
                        log.info("stream-start ...");
                        TimeUtil.sleepInSecs(10);
                        streams.close();
                    }
            ).start();
        }
    }
    
    

    这里还必须事先创建一个 Topic = goods-order-count 的主题:

    bin/kafka-topics.sh --create --topic goods-order-count --bootstrap-server localhost:9092
    

    小结

    Kafka 是一个很有潜力的用于业务系统和大数据系统的消息系统。本文给出了使用 Kafka 进行消息发送、消息消费以及事件流处理的基本示例,方便 Kafka 初学者(包括我自己)更好滴上手,进一步去探索 Kafka.

    参考资料

  • 相关阅读:
    mysql索引
    数据库修复
    数据库取值 三级分类后台遍历
    创建数据库!
    mysql按条件 导出sql
    nodejs 简单安装环境
    C++ 性能剖析 (一)
    C++ 性能剖析 (二):值语义 (value semantics)
    JavaScript Nested Function 的时空和身份属性
    C++ Reference 的“三位一体”诠释
  • 原文地址:https://www.cnblogs.com/lovesqcc/p/14379440.html
Copyright © 2011-2022 走看看