20140310补充:
rabbitmq有requeue属性,可以选择消息是否返回队列,另,本文的解决方式非常之山寨,只能应用于发送和接收方式。
这几天在折腾消息队列,在.Net环境下有基于RabbitMQ有很多有API的选择,最后选择了比较简单的EasyNetQ(http://easynetq.com/)
在测试使用的时候发现一个问题,对于处理错误的消息,EasyNetQ默认会放到一个错误队列中并提供了一个工具可以对错误队列的消息进行重新分发。RabiitMQ是有一个事务功能的,但是貌似在EasyNetQ的实现中,没有发现相关的操作方式
现在的需求是,在某种特定的情况下,需要将处理不成功的消息,保留在消息原队列
做了一个变通处理,出现错误消息的时候,将其再送回原队列。从客户端上可以直接send回队列,但这样不是一个很好的方式
研究了一下EasyNetQ的相关代码,发现其定义了一个IConsumerErrorStrategy接口,我们只要自己自行实现,并注册一个自定义方法就可以
ComponentRegistration.cs原注册方法相关,默认错误消息处理方式在DefaultConsumerErrorStrategy.cs中
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 public class ComponentRegistration 2 { 3 public static void RegisterServices(IContainer container) 4 { 5 Preconditions.CheckNotNull(container, "container"); 6 7 // Note: IConnectionConfiguration gets registered when MQContext.CreateBus(..) is run. 8 #region DefaultConsumerErrorStrategy 9 container 10 .Register(_ => container) 11 .Register<IMessageQueueQLogger, ConsoleLogger>() 12 .Register<ISerializer, JsonSerializerN>() 13 .Register<IConventions, Conventions>() 14 .Register<IEventBus, EventBus>() 15 .Register<ITypeNameSerializer, TypeNameSerializer>() 16 .Register<Func<string>>(x => CorrelationIdGenerator.GetCorrelationId) 17 .Register<IClusterHostSelectionStrategy<ConnectionFactoryInfo>, DefaultClusterHostSelectionStrategy<ConnectionFactoryInfo>>() 18 .Register<IConsumerDispatcherFactory, ConsumerDispatcherFactory>() 19 .Register<IPublishExchangeDeclareStrategy, PublishExchangeDeclareStrategy>() 20 .Register<IPublisherConfirms, PublisherConfirms>() 21 .Register<IConsumerErrorStrategy, DefaultConsumerErrorStrategy>() 22 .Register<IHandlerRunner, HandlerRunner>() 23 .Register<IInternalConsumerFactory, InternalConsumerFactory>() 24 .Register<IConsumerFactory, ConsumerFactory>() 25 .Register<IConnectionFactory, ConnectionFactoryWrapper>() 26 .Register<IPersistentChannelFactory, PersistentChannelFactory>() 27 .Register<IClientCommandDispatcherFactory, ClientCommandDispatcherFactory>() 28 .Register<IHandlerCollectionFactory, HandlerCollectionFactory>() 29 .Register<IAdvancedBus, RabbitAdvancedBus>() 30 .Register<IRpc, Rpc>() 31 .Register<ISendReceive, SendReceive>() 32 .Register<IBus, RabbitBus>(); 33 #endregion 34 } 35 }
对DefaultConsumerErrorStrategy的代码进行研究后发现,EasyNetQ是产生一个处理错误消息队列的Exchanges,将消息二次封装后推送到默认的错误队列
只要将消息队列改为源消息队列,取消消息二次封装,直接推送到队列
队列绑定
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 private string DeclareErrorExchangeAndBindToDefaultErrorQueue(IModel model, ConsumerExecutionContext context) 2 { 3 var originalRoutingKey = context.Info.RoutingKey; 4 5 return errorExchanges.GetOrAdd(originalRoutingKey, _ => 6 { 7 var exchangeName = conventions.ErrorExchangeNamingConvention(context.Info); 8 model.ExchangeDeclare(exchangeName, ExchangeType.Direct, durable: true); 9 //更改第一个参数 10 model.QueueBind(originalRoutingKey, exchangeName, originalRoutingKey); 11 return exchangeName; 12 }); 13 }
消息处理
![](https://images.cnblogs.com/OutliningIndicators/ContractedBlock.gif)
1 public virtual PostExceptionAckStrategy HandleConsumerError(ConsumerExecutionContext context, Exception exception) 2 { 3 Preconditions.CheckNotNull(context, "context"); 4 Preconditions.CheckNotNull(exception, "exception"); 5 6 try 7 { 8 Connect(); 9 10 using (var model = connection.CreateModel()) 11 { 12 var errorExchange = DeclareErrorExchangeQueueStructure(model, context); 13 var messageBody = context.Body; 14 var properties = model.CreateBasicProperties(); 15 context.Properties.CopyTo(properties); 16 properties.Type = context.Properties.Type; 17 //消息持久化 18 properties.SetPersistent(true); 19 20 model.BasicPublish(errorExchange, context.Info.RoutingKey, properties, messageBody); 21 22 } 23 } 24 catch (Exception unexpectedException) 25 { 26 // Something else unexpected has gone wrong :( 27 logger.ErrorWrite("EasyNetQMessageQueue Consumer Error Handler: Failed to publish error message Exception is: " 28 + unexpectedException); 29 } 30 return Consumer.PostExceptionAckStrategy.ShouldAck; 31 }
因为EasyNetQ在CreateBus的时候就会将相关事件注册,所以我们只需最后自行在config中加入有关配置,在注册事件的位置加入判断即可让异常抛回
由于刚接触RabbitMQ,对于一些概念的理解可能还不是非常到位,这是我目前所能找到的解决方案
当然可能以后的版本中,EasyNetQ会加入对事务的操作,现在EasyNetQ支持了一种叫Publisher Confirms的方法,貌似还是不是我想要的