zoukankan      html  css  js  c++  java
  • RabbitMQ及延时队列

    一、简介

    我用过RabbirMQ的发布订阅模式,以及一对一的延迟队列。

    1、RabbitMQ的有消息确认机制,消费一条则队列中少一条,也有对应的消费到消息及认为是消费成功这样的模式,一般使用前者。

    发布订阅我是在处理大量数据的更新及与其他系统有数据来往时使用的。在本地程序处理一条则发送一条到队列,保证本地处理成功并发送到其他系统。

    延迟队列这种模式也是在与其他系统有交互并且在我这边系统接到成功后必须不马上发给其他的系统,如果在多少时间内本地没有接到说不发的指令才有延迟队列转发到其他系统。

    安装部署简介及可视化页面:

    https://www.cnblogs.com/Leo_wl/p/5402125.html

    2、在c#中的使用

    NuGet引用:RabbitMQ.Client

    2.1 发布消息

    2.1 .1发布订阅

    public static void PublishMsgString(IConnection conn, string message, PublishMsgModel model, IDictionary<string, object> headerDict)

    {

    try

    {

    using (var channel = conn.CreateModel())

    {

    channel.ExchangeDeclare(exchange: model.ExchangeName, type: model.ExchangeType, durable: model.Durable);

    channel.QueueDeclare(queue: model.QueueName, durable: model.Durable, exclusive: false, autoDelete: false, arguments: null);

    channel.QueueBind(queue: model.QueueName, exchange: model.ExchangeName, routingKey: model.RouteKey);

    var properties = channel.CreateBasicProperties();

    properties.DeliveryMode = 2;

    if (headerDict != null)

    {

    properties.Headers = headerDict;

    properties.ContentType = "text/plain";

    properties.ContentEncoding = "UTF-8";

    }

    //string messageString = JsonConvert.SerializeObject(message);

    byte[] body = Encoding.UTF8.GetBytes(message);

    channel.ConfirmSelect();

    channel.BasicPublish(model.ExchangeName, model.RouteKey, properties, body);

    if (channel.WaitForConfirms())

    {

    Common.WriteLog($"【Success】【发布消息】[MsgType]{model.MsgType}[messsage]{message}成功", "publish", model.QueueName);

    }

    else

    {

    Common.WriteLog($"【Error】【发布消息】[MsgType]{model.MsgType}[messsage]{message}失败", "publish", model.QueueName);

    }

    }

    }

    catch (Exception ex)

    {

    Common.WriteLog($"【Error_Ex】【发布消息】[MsgType]{model.MsgType}发布消息异常:{ex.Message}[message]{message}", "publish", model.QueueName);

    }

    }

    2.1.2、延迟队列

    public static void PublishExpirationMsgString(string message, PublishMsgModel model, PublishMsgModel modelExpiration, Dictionary<string, object> headerDict, bool isTest)

    {

    try

    {

    ConnectionFactory connFactory = new ConnectionFactory()

    {

    HostName = isTest ? ConfigurationManager.AppSettings["MQ.HostName.Test"] : ConfigurationManager.AppSettings["MQ.HostName"],

    UserName = isTest ? ConfigurationManager.AppSettings["MQ.UserName.Test"] : ConfigurationManager.AppSettings["MQ.UserName"],

    Password = isTest ? ConfigurationManager.AppSettings["MQ.Password.Test"] : ConfigurationManager.AppSettings["MQ.Password"],

    VirtualHost = isTest ? ConfigurationManager.AppSettings["MQ.VirtualHostLog.Test"] : ConfigurationManager.AppSettings["MQ.VirtualHostLog"],

    Port = int.Parse(isTest ? ConfigurationManager.AppSettings["MQ.Port.Test"] : ConfigurationManager.AppSettings["MQ.Port"]),

    AutomaticRecoveryEnabled = true,

    RequestedHeartbeat = 30

    };

    using (IConnection conn = connFactory.CreateConnection())

    {

    using (var channel = conn.CreateModel())

    {

    Dictionary<string, object> dic = new Dictionary<string, object>();

    dic.Add("x-expires", 4 * 1000 * 60);

    dic.Add("x-message-ttl", 3 * 1000 * 60);//队列上消息过期时间,应小于队列过期时间

    dic.Add("x-dead-letter-exchange", model.ExchangeName);//过期消息转向路由

    dic.Add("x-dead-letter-routing-key", model.RouteKey);//过期消息转向路由相匹配routingkey

    channel.QueueDeclare(queue: modelExpiration.QueueName, durable: modelExpiration.Durable, exclusive: false, autoDelete: false, arguments: dic);

    channel.ExchangeDeclare(exchange: modelExpiration.ExchangeName, type: modelExpiration.ExchangeType, durable: modelExpiration.Durable);

    channel.QueueBind(queue: modelExpiration.QueueName, exchange: modelExpiration.ExchangeName, routingKey: modelExpiration.RouteKey);

    var properties = channel.CreateBasicProperties();

    properties.DeliveryMode = 2;

    if (headerDict != null)

    {

    properties.Headers = headerDict;

    properties.ContentType = "text/plain";

    properties.ContentEncoding = "UTF-8";

    }

    //string messageString = JsonConvert.SerializeObject(message);

    byte[] body = Encoding.UTF8.GetBytes(message);

    channel.ConfirmSelect();

    // properties.Expiration = (2 * 1000 * 60).ToString();

    channel.BasicPublish(modelExpiration.ExchangeName, modelExpiration.RouteKey, properties, body);

    if (channel.WaitForConfirms())

    {

    Common.WriteLog($"【Success】【发布消息】[MsgType]{modelExpiration.MsgType}[messsage]{message}", "publish", modelExpiration.QueueName);

    }

    else

    {

    Common.WriteLog($"【Error】【发布消息】[MsgType]{modelExpiration.MsgType}[messsage]{message}失败", "publish", modelExpiration.QueueName);

    }

    }

    }

    }

    catch (Exception ex)

    {

    Common.WriteLog($"【Error_Ex】【发布消息】[MsgType]{modelExpiration.MsgType}发布消息异常:{ex.Message}[message]{message}", "publish", modelExpiration.QueueName);

    }

    }

    2.2消费消息

    public static string GetConfig(string key)

    {

    return ConfigurationManager.AppSettings[key];

    }

    public static bool IsTest

    {

    get { return (ConfigurationManager.AppSettings["IsTest"] ?? "") == "1"; }

    }

    private static ConnectionFactory connFactory = new ConnectionFactory()

    {

    HostName = IsTest ? GetConfig("MQ.HostName.Test") : GetConfig("MQ.HostName"),

    UserName = IsTest ? GetConfig("MQ.UserName.Test") : GetConfig("MQ.UserName"),

    Password = IsTest ? GetConfig("MQ.Password.Test") : GetConfig("MQ.Password"),

    VirtualHost = IsTest ? GetConfig("MQ.VirtualHost.Test") : GetConfig("MQ.VirtualHost"),

    Port = int.Parse(IsTest ? GetConfig("MQ.Port.Test") : GetConfig("MQ.Port")),

    AutomaticRecoveryEnabled = true,

    RequestedHeartbeat = 30

    };

    private static IConnection conn = connFactory.CreateConnection();

    private static IModel channel = conn.CreateModel();

    private const string queueAllOrder = "user_order_idex_to_log";

    private static EventingBasicConsumer consumerOrder,;

    private static void ListenMQ(object queue)

    {

    string[] queueInfo = (string[])queue;

    string queueName = queueInfo[0];

    string nowThread = queueInfo[1];

    PublishMsgModel model = new PublishMsgModel() { Durable = true, ExchangeName = queueName, ExchangeType = ExchangeType.Direct, QueueName = queueName, RouteKey = queueName, MsgType = "" };

    channel.BasicQos(0, 1, false);

    channel.QueueDeclare(queue: model.QueueName, durable: model.Durable, exclusive: false, autoDelete: false, arguments: null);

    switch (queueName)

    {

    case queueOrder:

    model.MsgType = "test";

    consumerOrder = new EventingBasicConsumer(channel);

    consumerOrder.Received += ConsumOrderMsg;

    channel.BasicConsume(model.QueueName, false, consumerOrder);//消费确认,逐条

    break;

    }

    }

    private static void ConsumOrderMsg(object sender, BasicDeliverEventArgs eventArgs)

    {

    string logName = "order_idex_to_devcenter";

    try

    {

    byte[] body = eventArgs.Body;

    if (body != null && body.Length > 0)

    {

    string message = Encoding.UTF8.GetString(body);

    ++consumOrderMessageTotalOrder;

    string msgType = Encoding.UTF8.GetString((byte[])eventArgs.BasicProperties.Headers["messageType"]),

    msgId = Encoding.UTF8.GetString((byte[])eventArgs.BasicProperties.Headers["messageId"]), userId = Encoding.UTF8.GetString((byte[])eventArgs.BasicProperties.Headers["userId"]);

    }catch{}

    }

  • 相关阅读:
    利用qt打开一张图片并转成灰度矩阵
    适配手机端浏览器
    ps常用快捷键(供自己学习查看)
    用选框工具画圆角矩形
    ps制作有背景图片的字体
    所有iOS 设备的屏幕尺寸
    九宫格有规律高亮滚动效果
    移动端点击事件全攻略
    移动端ios升级到11及以上时,手机弹框输入光标出现错位问题
    linux下截取整个网页
  • 原文地址:https://www.cnblogs.com/NikkiLiu/p/12916665.html
Copyright © 2011-2022 走看看