zoukankan      html  css  js  c++  java
  • 对于amqplib的使用心得

         最近在nodejs使用了amqplib--rabbitmq的nodejs客户端。封装在了express中,先来代码。

       

     1 var amqp = require('amqplib/callback_api');
     2 var config=require('../config/config');
     3 var log=require('../util/loghelp');
     4 function fail(err, conn) {
     5     log.error(err);
     6     if (conn) conn.close();
     7 }
     8 exports.StartConsumer=function (action,qname) {
     9     function on_connect(err, conn) {
    10         if (err !== null) return fail(err);
    11         function on_channel_open(err, ch) {
    12             ch.assertQueue(qname, {durable: true}, function(err, ok) {
    13                 if (err !== null) return bail(err, conn);
    14                 ch.consume(qname, function(msg) {
    15 
    16                     log.info(`Received ${msg.content.toString()},start process`);
    17                     action(JSON.parse(msg.content))
    18                         .then(d=> {
    19                             log.info("mq 处理成功,确认");ch.ack(msg)
    20                         }
    21                            )
    22                         .catch(err=>
    23                         ch.nack(msg));
    24                 }, {noAck: false} );
    25             });
    26         }
    27         conn.createChannel(on_channel_open);
    28     }
    29     amqp.connect(config.amqp.url,on_connect);
    30 };
    31 
    32 exports.enqueue=function (data,qname) {
    33     function on_connect(err, conn) {
    34         if (err !== null) return bail(err);
    35 
    36         function on_channel_open(err, ch) {
    37             if (err !== null) return bail(err, conn);
    38             ch.assertQueue(qname, {durable: true}, function(err, ok) {
    39                 if (err !== null) return bail(err, conn);
    40                 var msg=JSON.stringify(data);
    41                 ch.sendToQueue(qname, new Buffer(msg));
    42                 log.info(`mq send ${msg}`);
    43                 ch.close(function() { conn.close(); });
    44             });
    45         }
    46         conn.createChannel(on_channel_open);
    47     }
    48     amqp.connect(config.amqp.url,on_connect);
    49 };

        其中StartConsumer 会在项目启动时启动,在整个生命周期中一直保持监听状态,在程序结束时mq的链接关闭。需要注意的是 noAck 这个参数,当为false是表示消息出队后不会自动删除,如果设置成true,则无论消息处理成功与否此消息会被删除。注意到在消息不成功是,调用了ch.nack(msg)),此方法是将消息重新入队。

       而enqueue 则是消息入队列后连接立刻关闭,以免占用资源。

    本人全手工打造的dotnetcore webapi 框架,可实现快速开发。地址:https://github.com/ryansecret/WebApiCore.git。 1 采用DDD模式开发,充血模型 2 添加Dapper扩展,默认实现增删改查基本操作。利用AutoMapper 做实体转换,减少重复劳动。 3 依赖注入融合Autofac,仓储层和应用层自动注入 4 实现JWT验证 5 加入swagger 文档 6 单元测试添加了xunit,MyMvc 可以方便对webapi测试 7 数据库版本控制
  • 相关阅读:
    java线程的几种状态
    java事务的处理
    Java多线程中Sleep与Wait的区别
    分享一百多套开发视频教程的下载地址
    [Java]读取文件方法大全
    Android开发人员必备的10 个开发工具
    CentOS 安装MySQL rpm方式安装
    记录一些经典的算法
    CentOS 7安装Redis服务
    linux查看文件大小,磁盘占用情况 du df命令
  • 原文地址:https://www.cnblogs.com/ryansecreat/p/6030246.html
Copyright © 2011-2022 走看看