说明
为了方便使用,做了简单的封装,生产消息可以使用泛型等,可以每次创建,可以使用单例模式,或者IOC使配合单例模式使用。这里就不一一介绍,请大家根据自己的业务场景设计。
测试代码:只有简单队列的代码作为参考
- RabbitMQHelper
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System;
using System.Text;
namespace RabbitMQTest.Common
{
public class RabbitMQHelper
{
/// <summary>
/// 连接工厂
/// </summary>
ConnectionFactory connectionFactory;
/// <summary>
/// 连接
/// </summary>
IConnection connection;
/// <summary>
/// 通道
/// </summary>
IModel channel;
/// <summary>
/// 交换机名称
/// </summary>
string exchangeName;
/// <summary>
/// 构造函数
/// </summary>
private RabbitMQHelper()
{
//创建连接工厂
connectionFactory = new ConnectionFactory()
{
HostName = "192.168.1.101",
Port = 5672,
UserName = "admin",
Password = "admin",
VirtualHost = "testhost"
};
//创建连接
connection = connectionFactory.CreateConnection();
//创建通道
channel = connection.CreateModel();
}
private static readonly Lazy<RabbitMQHelper> _singletonLock = new Lazy<RabbitMQHelper>(() => new RabbitMQHelper());
public static RabbitMQHelper Instance
{
get
{
return _singletonLock.Value;
}
}
/// <summary>
/// 生产消息
/// <para>简单队列(一对一模式)、Worker队列(一对多模式)</para>
/// </summary>
/// <param name="queueName">队列名称</param>
/// <param name="msg">消息内容</param>
public void SendMsg(string queueName, string msg)
{
//声明一个队列
channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
byte[] body = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
}
/// <summary>
/// 消费消息
/// <para>简单队列(一对一模式)、Worker队列(一对多模式)</para>
/// </summary>
/// <param name="queueName">队列名称</param>
/// <param name="received">消费消息</param>
public void Receive(string queueName, Action<string> received)
{
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (ch, ea) =>
{
string message = Encoding.Default.GetString(ea.Body.ToArray());
received(message);
channel.BasicAck(ea.DeliveryTag, false);
};
channel.BasicConsume(queueName, false, consumer);
}
}
}
- 使用方式
using RabbitMQTest.Common;
using System;
using System.Threading;
using System.Threading.Tasks;
namespace RabbitMQTest.Con
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("请输入消息队列测试数据:");
string input = string.Empty;
Task.Run(() =>
{
do
{
input = Console.ReadLine();
RabbitMQHelper.Instance.SendMsg("test_queue_1", input);
} while (input.Trim().ToLower() != "exit");
});
Task.Run(() =>
{
RabbitMQHelper.Instance.Receive("test_queue_1", item =>
{
Console.WriteLine($"消费消息:{item}");
});
});
//主线程不死
while (true)
{
Thread.Sleep(10000);
if (input == "exit") return;
}
}
}
}