网上很多人已经总结的很好了,比如今天看到的这个。https://www.cnblogs.com/LipeiNet/p/9877189.html
我就不总结了,贴点代码。
RabbitMQConnect.cs
using System; using System.IO; using System.Net.Sockets; using Polly; using Polly.Retry; using RabbitMQ.Client; using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; namespace Common.Tool.RabbitMQ { public class RabbitMQConnect { static string host = "127.0.0.1"; static string UserName = "H"; static string password = "H"; public readonly static IConnectionFactory _connectionFactory; IConnection _connection; object sync_root = new object(); bool _disposed; static RabbitMQConnect() { //if (host == "localhost") //{ // _connectionFactory = new ConnectionFactory() { HostName = host }; //} //else { _connectionFactory = new ConnectionFactory() { HostName = host, UserName = UserName, Password = password }; } } public bool IsConnected => this._connection != null && this._connection.IsOpen && this._disposed; public IModel CreateModel() { if (!this.IsConnected) { this.TryConnect(); } return this._connection.CreateModel(); } public bool TryConnect() { lock (this.sync_root) { RetryPolicy policy = RetryPolicy.Handle<SocketException>()//如果我们想指定处理多个异常类型通过OR即可 .Or<BrokerUnreachableException>()//ConnectionFactory.CreateConnection期间无法打开连接时抛出异常 .WaitAndRetry(5, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) => { });// 重试次数,提供等待特定重试尝试的持续时间的函数,每次重试时调用的操作。 policy.Execute(() => { this._connection = _connectionFactory.CreateConnection(); }); if (this.IsConnected) { //当连接被破坏时引发。如果在添加事件处理程序时连接已经被销毁对于此事件,事件处理程序将立即被触发。 this._connection.ConnectionShutdown += this.OnConnectionShutdown; //在连接调用的回调中发生异常时发出信号。当ConnectionShutdown处理程序抛出异常时,此事件将发出信号。如果将来有更多的事件出现在RabbitMQ.Client.IConnection上,那么这个事件当这些事件处理程序中的一个抛出异常时,它们将被标记。 this._connection.CallbackException += this.OnCallbackException; this._connection.ConnectionBlocked += this.OnConnectionBlocked; //LogHelperNLog.Info($"RabbitMQ persistent connection acquired a connection {_connection.Endpoint.HostName} and is subscribed to failure events"); return true; } else { // LogHelperNLog.Info("FATAL ERROR: RabbitMQ connections could not be created and opened"); return false; } } } void OnConnectionShutdown(object sender, ShutdownEventArgs reason) { if (this._disposed) return; //RabbitMQ连接正在关闭。 尝试重新连接... //LogHelperNLog.Info("A RabbitMQ connection is on shutdown. Trying to re-connect..."); this.TryConnect(); } /// <summary> /// /// </summary> /// <param name="sender"></param> /// <param name="e"></param> void OnCallbackException(object sender, CallbackExceptionEventArgs e) { if (this._disposed) return; // LogHelperNLog.Info("A RabbitMQ connection throw exception. Trying to re-connect..."); this.TryConnect(); } private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e) { if (this._disposed) return; // LogHelperNLog.Info("A RabbitMQ connection is shutdown. Trying to re-connect..."); this.TryConnect(); } public void Dispose() { if (this._disposed) return; this._disposed = true; try { this._connection.Dispose(); } catch (IOException ex) { //_logger.LogCritical(ex.ToString()); // LogHelperNLog.Error(ex); } } } }
RabbitMQSend.cs
using Newtonsoft.Json; using Newtonsoft.Json.Converters; using System.Text; namespace Common.Tool.RabbitMQ { public class RabbitMQSend { /// <summary> /// Newtonsoft.Json利用IsoDateTimeConverter处理日期类型 /// </summary> static IsoDateTimeConverter dtConverter = new IsoDateTimeConverter { DateTimeFormat = "yyyy-MM-dd HH:mm:ss" }; static RabbitMQConnect connection=null; static RabbitMQSend() { connection = new RabbitMQConnect(); } /// <summary> /// 添加信息到队列 /// </summary> /// <typeparam name="T"></typeparam> /// <param name="item">信息</param> /// <param name="queueName">队列名</param> public static void PushMsgToMq<T>(T item, string queueName) { string msg = JsonConvert.SerializeObject(item, dtConverter); using (global::RabbitMQ.Client.IModel channel = connection.CreateModel()) { channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); //Construct a completely empty content header for use with the Basic content class. //构造一个完全空的内容标头,以便与Basic内容类一起使用。 global::RabbitMQ.Client.IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true; byte[] body = Encoding.UTF8.GetBytes(msg); channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: properties, body: body); } } } }
RabbitMQReceive.cs
using Newtonsoft.Json; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; namespace Common.Tool.RabbitMQ { public class RabbitMQReceive : IDisposable { IConnection connection = null; IModel channel = null; public void BindReceiveMqMsg<T>(Func<T, bool> func, Action<string> log, string queueName) { this.connection = RabbitMQConnect._connectionFactory.CreateConnection();//创建与指定端点的连接。 this.channel = this.connection.CreateModel(); //创建并返回新的频道,会话和模型。 this.channel.QueueDeclare(queue: queueName,//队列名称 durable: true,//是否持久化, 队列的声明默认是存放到内存中的,如果rabbitmq重启会丢失,如果想重启之后还存在就要使队列持久化,保存到Erlang自带的Mnesia数据库中,当rabbitmq重启之后会读取该数据库 exclusive: false,//是否排外的,有两个作用,一:当连接关闭时connection.close()该队列是否会自动删除;二:该队列是否是私有的private,如果不是排外的,可以使用两个消费者都访问同一个队列,没有任何问题,如果是排外的,会对当前队列加锁,其他通道channel是不能访问的,如果强制访问会报异常:com.rabbitmq.client.ShutdownSignalException: channel error; protocol method: #method<channel.close>(reply-code=405, reply-text=RESOURCE_LOCKED - cannot obtain exclusive access to locked queue 'queue_name' in vhost '/', class-id=50, method-id=20)一般等于true的话用于一个队列只能有一个消费者来消费的场景 autoDelete: false,//是否自动删除,当最后一个消费者断开连接之后队列是否自动被删除,可以通过RabbitMQ Management,查看某个队列的消费者数量,当consumers = 0时队列就会自动删除 arguments: null);//队列中的消息什么时候会自动被删除? this.channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);//(Spec方法)配置Basic内容类的QoS参数。 //第一个参数是可接收消息的大小的 0不受限制 //第二个参数是处理消息最大的数量 1 那如果接收一个消息,但是没有应答,则客户端不会收到下一个消息,消息只会在队列中阻塞 //第三个参数则设置了是不是针对整个Connection的,因为一个Connection可以有多个Channel,如果是false则说明只是针对于这个Channel的。 EventingBasicConsumer consumer = new EventingBasicConsumer(this.channel);//构造函数,它将Model属性设置为给定值。 consumer.Received += (model, bdea) => { byte[] body = bdea.Body; string message = Encoding.UTF8.GetString(body); log?.Invoke(message); T item = JsonConvert.DeserializeObject<T>(message); bool result = func(item); if (result) { //(Spec方法)确认一个或多个已传送的消息。 this.channel.BasicAck(deliveryTag: bdea.DeliveryTag, multiple: false); } }; this.channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer); //The consumer is started with noAck = false(i.e.BasicAck is required), an empty consumer tag (i.e. the server creates and returns a fresh consumer tag), noLocal=false and exclusive=false. } public void Dispose() { if (this.channel != null) { this.channel.Close(); } if (this.connection != null) { this.connection.Close(); } } } }