先放一张使用CorrelationId相关ID进行消息回调处理的流程图
- 客户端启动时,它将创建一个匿名回调队列
- 对于 RPC 请求,客户端发送一条消息,该消息具有两个属性: reply_to(设置为回调队列)和 correlation_id(设置为每个请求的唯一值)
- 该请求被发送到 rpc_queue 队列
- RPC 工作程序等待该队列上的请求,出现请求时,它将使用 reply_to 字段中的队列来完成工作,并将消息和结果发送回客户端
- 客户端等待回调队列上的数据,出现消息时,它将检查 correlation_id 属性。 如果它与请求中的值匹配,则将响应返回给应用程序(TODO)
发送回调
public void SendCallback<T>(string topic, T value, Action<byte[]> callback, byte priority = 1) { EnsureDeclare(topic); var replyQueue = _model.QueueDeclare().QueueName; var props = _model.CreateBasicProperties(); props.Priority = priority; props.ReplyTo = replyQueue; var correlationId = Guid.NewGuid().ToString("N"); props.CorrelationId = correlationId; //发布消息 _model.BasicPublish(_exchange, topic, props, value.GetBuf()); //创建消费者用于消息回调 var callbackConsumer = new EventingBasicConsumer(_model); _model.BasicConsume(queue: replyQueue, autoAck: true, consumer: callbackConsumer); callbackConsumer.Received += (model, ea) => { if (ea.BasicProperties.CorrelationId == correlationId) callback(ea.Body); }; }
消息接收,在接受消息后,通过ReplyTo来进行消息回执
public void SetReceiveCallback(string topic, Action<byte[], IBasicProperties> received) { EnsureDeclare(topic); var consumer = new EventingBasicConsumer(_model); consumer.Received += (model, ea) => { var properties = ea.BasicProperties; var replyProerties = _model.CreateBasicProperties(); replyProerties.CorrelationId = properties.CorrelationId; replyProerties.ReplyTo = properties.ReplyTo; received(ea.Body, replyProerties); _model.BasicAck(ea.DeliveryTag, false); }; _model.BasicConsume(topic, false, consumer); }
单元测试代码
[Order(1)] public class RabbitCallbackTest { [Fact, Order(1)] public void ServerCallback() { _rabbitSvr = new Aster.Itms.Core.Data.Provider.Rabbit(new Connection() { Ip = "localhost", Port = 5672, User = "guest", Password = "guest" }, ""); _rabbitSvr.SetReceiveCallback("topic", ServerReceiveCommand); } private static volatile Aster.Itms.Core.Data.Provider.Rabbit _rabbitSvr; [Fact, Order(2)] public void ClientCallback() { var _rabbitSvr = new Aster.Itms.Core.Data.Provider.Rabbit(new Connection() { Ip = "localhost", Port = 5672, User = "guest", Password = "guest" }, ""); string val = $"发送消息{Guid.NewGuid().ToString("N")}"; _rabbitSvr.SendCallback<string>("topic", val, ClientReceiveCommand, 1); } private static void ClientReceiveCommand(byte[] body) { var result = body.To<string>(); //拿到结果再发送回去 Console.WriteLine($"接收服务端返回消息:{result}"); Assert.NotEmpty(result); } private static void ServerReceiveCommand(byte[] body, IBasicProperties replyProps) { var result = body.To<string>(); Assert.NotEmpty(result); Console.WriteLine($"服务端接收:{result},CorrelationId:{replyProps.CorrelationId}"); int.TryParse(result, out int newResult); if (!string.IsNullOrEmpty(replyProps.ReplyTo)) _rabbitSvr.Send(replyProps.ReplyTo, $"{Convert.ToInt32(newResult) + 1000}", replyProps); } }