zoukankan      html  css  js  c++  java
  • 对于amqplib的使用心得

         最近在nodejs使用了amqplib--rabbitmq的nodejs客户端。封装在了express中,先来代码。

       

     1 var amqp = require('amqplib/callback_api');
     2 var config=require('../config/config');
     3 var log=require('../util/loghelp');
     4 function fail(err, conn) {
     5     log.error(err);
     6     if (conn) conn.close();
     7 }
     8 exports.StartConsumer=function (action,qname) {
     9     function on_connect(err, conn) {
    10         if (err !== null) return fail(err);
    11         function on_channel_open(err, ch) {
    12             ch.assertQueue(qname, {durable: true}, function(err, ok) {
    13                 if (err !== null) return bail(err, conn);
    14                 ch.consume(qname, function(msg) {
    15 
    16                     log.info(`Received ${msg.content.toString()},start process`);
    17                     action(JSON.parse(msg.content))
    18                         .then(d=> {
    19                             log.info("mq 处理成功,确认");ch.ack(msg)
    20                         }
    21                            )
    22                         .catch(err=>
    23                         ch.nack(msg));
    24                 }, {noAck: false} );
    25             });
    26         }
    27         conn.createChannel(on_channel_open);
    28     }
    29     amqp.connect(config.amqp.url,on_connect);
    30 };
    31 
    32 exports.enqueue=function (data,qname) {
    33     function on_connect(err, conn) {
    34         if (err !== null) return bail(err);
    35 
    36         function on_channel_open(err, ch) {
    37             if (err !== null) return bail(err, conn);
    38             ch.assertQueue(qname, {durable: true}, function(err, ok) {
    39                 if (err !== null) return bail(err, conn);
    40                 var msg=JSON.stringify(data);
    41                 ch.sendToQueue(qname, new Buffer(msg));
    42                 log.info(`mq send ${msg}`);
    43                 ch.close(function() { conn.close(); });
    44             });
    45         }
    46         conn.createChannel(on_channel_open);
    47     }
    48     amqp.connect(config.amqp.url,on_connect);
    49 };

        其中StartConsumer 会在项目启动时启动,在整个生命周期中一直保持监听状态,在程序结束时mq的链接关闭。需要注意的是 noAck 这个参数,当为false是表示消息出队后不会自动删除,如果设置成true,则无论消息处理成功与否此消息会被删除。注意到在消息不成功是,调用了ch.nack(msg)),此方法是将消息重新入队。

       而enqueue 则是消息入队列后连接立刻关闭,以免占用资源。

    本人全手工打造的dotnetcore webapi 框架,可实现快速开发。地址:https://github.com/ryansecret/WebApiCore.git。 1 采用DDD模式开发,充血模型 2 添加Dapper扩展,默认实现增删改查基本操作。利用AutoMapper 做实体转换,减少重复劳动。 3 依赖注入融合Autofac,仓储层和应用层自动注入 4 实现JWT验证 5 加入swagger 文档 6 单元测试添加了xunit,MyMvc 可以方便对webapi测试 7 数据库版本控制
  • 相关阅读:
    ajax配置项中的type与method
    解决 eclipse出现 Address already in use: bind,以及tomcat端口占用
    网络流定理总结
    题解说明
    sol
    题解P4201: [NOI2008]设计路线
    题解 Luogu P5434: 有标号荒漠计数
    题解 Luogu P2499: [SDOI2012]象棋
    JZOJ-2019-11-8 A组
    JZOJ-2019-11-7 A组
  • 原文地址:https://www.cnblogs.com/ryansecreat/p/6030246.html
Copyright © 2011-2022 走看看