RabbitMQHelper
public static class RabbitMQHelper
{
// 定义 RabbitMQ 基本参数
private static string HostName = "127.0.0.1";
private static int Port = 5672;
private static string UserName = "admin";
private static string Password = "admin";
private static byte[] ToBytes(string str) => Encoding.UTF8.GetBytes(str);
/// <summary>
/// 设置交换机
/// </summary>
/// <param name="exchange">交换机名称</param>
/// <param name="type">direct fanout headers topic</param>
public static void SetExchange(string exchange, string type)
{
using (var channel = BuildChannel())
{
channel.ExchangeDeclare(exchange, type, true, false);
}
}
/// <summary>
/// 设置队列
/// </summary>
/// <param name="queue">队列名称</param>
/// <param name="exchange">交换机</param>
public static void SetQueue(string queue, string exchange)
{
using (var channel = BuildChannel())
{
//定义队列名称 持久化:是
channel.QueueDeclare(queue, true, false, false, null);
// 绑定队列
channel.QueueBind(queue, exchange, queue, null);
}
}
/// <summary>
/// 设置队列
/// </summary>
/// <param name="queues">队列集合</param>
/// <param name="exchange">交换机</param>
public static void SetQueue(List<string> queues, string exchange)
{
using (var channel = BuildChannel())
{
foreach (var queue in queues)
{
//定义队列名称 持久化:是
channel.QueueDeclare(queue, true, false, false, null);
// 绑定队列
channel.QueueBind(queue, exchange, queue, null);
}
}
}
/// <summary>
/// 发布
/// </summary>
/// <param name="exchange">交换机</param>
/// <param name="queue">队列</param>
/// <param name="message">消息体</param>
public static void Publish(string exchange, string queue, string message)
{
using (var channel = BuildChannel())
{
channel.BasicPublish(exchange, queue, null, ToBytes(message));
}
}
/// <summary>
/// 订阅
/// </summary>
/// <param name="queue">队列</param>
/// <param name="func">func</param>
public static void Subscribe(string queue, Func<string, bool> func)
{
var channel = BuildChannel();
//事件基本消费者
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
//接收到消息事件
consumer.Received += (ch, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body);
func.Invoke(message);
//Task.Run( () => { func.Invoke(message); });
//确认该消息已被消费
channel.BasicAck(ea.DeliveryTag, false);
};
//启动消费者 设置为手动应答消息
channel.BasicConsume(queue, false, consumer);
}
private static IModel BuildChannel()
{
return new ConnectionFactory()
{
HostName = HostName,
Port = Port,
UserName = UserName,
Password = Password
}.CreateConnection().CreateModel();
}
}
具体使用
class Program
{
static void Main(string[] args)
{
//初始化交换机
RabbitMQHelper.SetExchange("ServerDirect", "direct");
// 初始化队列
RabbitMQHelper.SetQueue("LogCenter", "ServerDirect");
RabbitMQHelper.SetQueue("SMSCenter", "ServerDirect");
RabbitMQHelper.SetQueue("EmailCenter", "ServerDirect");
//发布消息
for (int i = 0; i < 100; i++)
{
RabbitMQHelper.Publish("ServerDirect", "LogCenter", "log"+i);
}
RabbitMQHelper.Publish("ServerDirect", "SMSCenter", "sms111111111");
Console.ReadKey();
}
}