zoukankan      html  css  js  c++  java
  • Rabbit使用CorrelationId进行可靠性消息回调

      先放一张使用CorrelationId相关ID进行消息回调处理的流程图

      

    • 客户端启动时,它将创建一个匿名回调队列
    • 对于 RPC 请求,客户端发送一条消息,该消息具有两个属性: reply_to(设置为回调队列)和 correlation_id(设置为每个请求的唯一值)
    • 该请求被发送到 rpc_queue 队列
    • RPC 工作程序等待该队列上的请求,出现请求时,它将使用 reply_to 字段中的队列来完成工作,并将消息和结果发送回客户端
    • 客户端等待回调队列上的数据,出现消息时,它将检查 correlation_id 属性。 如果它与请求中的值匹配,则将响应返回给应用程序(TODO)

    发送回调

            public void SendCallback<T>(string topic, T value, Action<byte[]> callback, byte priority = 1)
            {
                EnsureDeclare(topic);
                var replyQueue = _model.QueueDeclare().QueueName;
                var props = _model.CreateBasicProperties();
                props.Priority = priority;
                props.ReplyTo = replyQueue;
                var correlationId = Guid.NewGuid().ToString("N");
                props.CorrelationId = correlationId;
    
                //发布消息
                _model.BasicPublish(_exchange, topic, props, value.GetBuf());
    
                //创建消费者用于消息回调
                var callbackConsumer = new EventingBasicConsumer(_model);
                _model.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer);
    
                callbackConsumer.Received += (model, ea) =>
                {
                    if (ea.BasicProperties.CorrelationId == correlationId)
                        callback(ea.Body);
                };
            }

    消息接收,在接受消息后,通过ReplyTo来进行消息回执

            public void SetReceiveCallback(string topic, Action<byte[], IBasicProperties> received)
            {
                EnsureDeclare(topic);
                var consumer = new EventingBasicConsumer(_model);
                consumer.Received += (model, ea) =>
                {
                    var properties = ea.BasicProperties;
                    var replyProerties = _model.CreateBasicProperties();
                    replyProerties.CorrelationId = properties.CorrelationId;
                    replyProerties.ReplyTo = properties.ReplyTo;
                    received(ea.Body, replyProerties);
                    _model.BasicAck(ea.DeliveryTag, false);
                };
                _model.BasicConsume(topic, false, consumer);
            }

    单元测试代码

        [Order(1)]
        public class RabbitCallbackTest
        {
            [Fact, Order(1)]
            public void ServerCallback()
            {
                _rabbitSvr = new Aster.Itms.Core.Data.Provider.Rabbit(new Connection()
                {
                    Ip = "localhost",
                    Port = 5672,
                    User = "guest",
                    Password = "guest"
                }, "");
    
                _rabbitSvr.SetReceiveCallback("topic", ServerReceiveCommand);
            }
    
    
            private static volatile Aster.Itms.Core.Data.Provider.Rabbit _rabbitSvr;
            [Fact, Order(2)]
            public void ClientCallback()
            {
                var _rabbitSvr = new Aster.Itms.Core.Data.Provider.Rabbit(new Connection()
                {
                    Ip = "localhost",
                    Port = 5672,
                    User = "guest",
                    Password = "guest"
                }, "");
    
                string val = $"发送消息{Guid.NewGuid().ToString("N")}";
    
                _rabbitSvr.SendCallback<string>("topic", val, ClientReceiveCommand, 1);
            }
    
            private static void ClientReceiveCommand(byte[] body)
            {
                var result = body.To<string>();
    
                //拿到结果再发送回去
    
                Console.WriteLine($"接收服务端返回消息:{result}");
    
                Assert.NotEmpty(result);
            }
    
            private static void ServerReceiveCommand(byte[] body, IBasicProperties replyProps)
            {
                var result = body.To<string>();
    
                Assert.NotEmpty(result);
    
                Console.WriteLine($"服务端接收:{result},CorrelationId:{replyProps.CorrelationId}");
    
                int.TryParse(result, out int newResult);
    
                if (!string.IsNullOrEmpty(replyProps.ReplyTo))
                    _rabbitSvr.Send(replyProps.ReplyTo, $"{Convert.ToInt32(newResult) + 1000}", replyProps);
            }
        }
  • 相关阅读:
    JNI--java调用C&C++
    不能在utf8和UCS2之间转换:failUTF8Conv
    java生成随机数
    itext操作pdf文件
    Adobe Acrobat的安装时出现:服务print spooler启动失败,请确认您有足够权限启动系统服务
    lob类型数据处理 java.lang.IllegalStateException: No LobHandler found for configuration
    No Dialect mapping for JDBC type–4 hibernate查询MySQL中longBlob数据
    JAVA中的继承初始化
    Log4j简单使用
    深入浅出hibernate 学习
  • 原文地址:https://www.cnblogs.com/leeolevis/p/RabbitmqCallback.html
Copyright © 2011-2022 走看看