zoukankan      html  css  js  c++  java
  • nodejs中的kafkajs,消费顺序,不重复消费

    参考:https://kafka.js.org/docs

    1.封装kafkaUtil类



    const { Kafka, logLevel } = require('kafkajs') //const cache = require('../conn/redis.js'); const kafka = new Kafka({ clientId: 'my-app', brokers: [ "lcoalhost:8092", "localhost:8093", "localhost:8094", "lcoalhost:8095", "localhost:8096", ], retry: { retries: 8 }, logLevel: logLevel.ERROR }) /** * 如果groupId已存在重复的,建立不同的kafka实例会报错 */ /** * kafka生产者发送消息 * messages: [{ value: 'Hello KafkaJS user!', }, { value: 'Hello KafkaJS user2!', }], */ exports.producer = async (topic, groupId, msg) => { try { const producer = kafka.producer({ groupId: groupId }) await producer.connect() await producer.send({ topic: topic, messages: msg, acks: 1 }) } catch (error) { throw error; } } exports.consumer = async (topic, groupId, callback) => { try { const consumer = kafka.consumer({ groupId: groupId }) await consumer.connect() await consumer.subscribe({ topic: topic }) await consumer.run({ autoCommit: true, eachMessage: async ({ topic, partition, message }) => {
    //防止重复消费数据 await consumer.commitOffsets([{ topic: topic, partition: partition, offset: Number(message.offset) + 1 }]) let msg = message.value.toString() console.log(72, '消费者接收到的数据为:', msg); callback(msg); } }) } catch (err) { throw err; } }

    2.producer.js

       

    const kafka = require('./kafkaUtil');
    (async function () {
        const topic = 'MY——TOPIC1'
        const groupId = 'MY——TOPIC1'
        try {
            for (let i = 0; i < 10000; i++) {
                await new Promise((resolve, reject) => {
                    setTimeout(async () => {
                        resolve(1)
                    }, 1000)
                }).then(async () => {
                    console.log('发送的数据为:', i)
                    await kafka.producer(topic, groupId, [{
                        key: "a",//key值为了保证消费者按照生产者生产的数据顺序,消费数据,key值必须一致;如果不需要消费者按照生产的顺序消费,key去掉即可,参考: https://www.zhihu.com/question/266390197
                        value: `${i}`
                    }])
                })
            }
        } catch (error) {
            console.log(14, error)
            throw error;
        }
    
    })()
    

    3.consumer.js

    const kafka = require('./kafkaUtil');
    (async function () {
        const fs = require('fs');
        let count = 1;
        const topic = 'MY——TOPIC1'
        const groupId = 'MY——TOPIC1'
        try {
            await kafka.consumer(topic, groupId, async (msg) => {
                let str = `第${count}接收到的数据为:${msg}`;
                count++;
                fs.writeFileSync(`${process.cwd()}/test01.txt`, str, {
                    flag: 'a',
                })
                console.log(str)
            })
        } catch (error) {
            console.log(14, error)
            throw error;
        }
    })()

    经实际测试,没有发现消费问题。如有发现问题,请多多指教,谢谢。。。  

     
  • 相关阅读:
    《Java TCP/IP Socket 编程 》读书笔记之十一:深入剖析socket——TCP套接字的生命周期
    c++实现二分查找
    hadoop序列化机制与java序列化机制对比
    C、C++中“*”操作符和“后++”操作符的优先级
    poj2774之最长公共子串
    Python之美[从菜鸟到高手]--urlparse源码分析
    (程序员面试题)字符串处理之寻找最大不重复子串
    hdu 4782 Beautiful Soupz
    教程Xcode 下编译发布与提交App到AppStore
    云端的ABAP Restful服务开发
  • 原文地址:https://www.cnblogs.com/qiyc/p/12898107.html
Copyright © 2011-2022 走看看