zoukankan      html  css  js  c++  java
  • RabbitMQ 实践之在处理异步任务中的流程

    一、背景:

    我司的系统,用户可以创建任务,启动任务,但任务的运行需要很长的时间,所以采用消息队列的方式,后台异步处理。

    这里所用到的是 RabbitMQ ,对应的 Node.js 库为 amqplib ( 这里采用的是回调形式:require("amqplib/callback_api") )。

    二、MQ 处理任务的流程


    ① ② ③ ④ ⑤ :从前端发来 HTTP 请求,被 Producer(express) 处理,经过 Route -> Controller -> Function ,使用 amqplib 的 sendToQueue(),发送需要处理的任务的 uuid 入 MQ 队列。这时候,还要修改数据库,把该任务的状态从 "new" -> "queue"。

    ⑥ ⑦ ⑧ ⑨ ⑩ ⑪ : Consumer 消化 MQ 队列吐出来的 message,即任务的 uuid。先修改数据库该任务的状态为"runnning", 然后调用"处理"模块去执行复杂运算,执行完成后,修改数据库该任务的状态是 "success" 还是 ”fail”,然后返回 ack 信号给 MQ 。

    注:这次的需求比较简单,所以没有用到 MQ 的交换机功能。

    三、问与答


    Q:如何做好 MQ 的错误处理?

    MQ 的 connection 和 channel 对象都有 "error" 和 "close" 事件,需做好相关的日志记录。尤其是 "error",要加上 reconnect 机制,防止因为某个任务导致的错误或者 MQ 自身的原因,影响到后续任务的处理。

    connection.on("error", function(err) {
    		// reconnect 
    });
    
    channel.on("error", function(err) {
    		// reconnect 
    });
    

    最后可以根据实际需要,在全局加上 try……catch。

    Q:如何保证 MQ 自身消息的数据安全?

    为了防止 MQ server 的崩溃导致的消息损失,需要对数据做持久化。大致分两块:

    队列持久化 + 消息持久化

    channel.assertQueue(queue_name, {
    		durable: true
    		// 队列持久化
    }); 
    
    channel.sendToQueue(
              queue_name,
              Buffer.from(uuid),
              {
                persistent: true
                // 消息持久化
              },
              function(err, ok) { 
              
              }
    );
    

    Q:如何保证 DB 跟 MQ 数据的一致性?

    1、发送 message 时

    ④ 中的 sendToQueue() ,需要在 createConfirmChannel() 的基础下使用,这样 sendToQueue() 的第三个参数才有 MQ 收到 message 成功与否的回调,根据这个,去结合 ② 的 DB 操作, 绑定为事务,来保证数据的一致性。

    2、接受 message 时

    channel.consume() 需开启 ack 模式,等 Consumer 端一切确认完成后,再通知 MQ 。

    channel.consume(
            queue_name,
            function(msg) {
              const uuid = msg.content.toString();
      				// use uuid todo…… 
            },
            {
              noAck: false
            }
    );
    

    Q:如何避免 MQ 多发、少发的问题

    从上面的 如何保证 DB 跟 MQ 数据的一致性? 其实就避免了该问题的发生。

    但是额外要做的是:

    1、重试机制,例如 发送 message 失败,规定重试的次数。

    2、善用 MQ 的 Web 控制台,地址形如 http://localhost:15672。除了关注基本的服务器负载状态,还要关注任务队列是否正常吞吐,是否有卡壳。

    3、构建运维一体的后台管控系统,比上面的 2 自定义程度更高。

    4、提供用户类似"提交工单"/"问题反馈"/"错误上传"的功能,查缺补漏。

  • 相关阅读:
    streamsets 集成 cratedb 测试
    streamsets k8s 部署试用
    streamsets rest api 转换 graphql
    StreamSets sdc rpc 测试
    StreamSets 相关文章
    StreamSets 多线程 Pipelines
    StreamSets SDC RPC Pipelines说明
    StreamSets 管理 SDC Edge上的pipeline
    StreamSets 部署 Pipelines 到 SDC Edge
    StreamSets 设计Edge pipeline
  • 原文地址:https://www.cnblogs.com/xjnotxj/p/11234139.html
Copyright © 2011-2022 走看看