创建Connection,推荐使用长连接,长连接基础上创建多个通道
public void CreateConnection()
{
this.ConnectionFactory = new ConnectionFactory
{
HostName = "xx.xx.xx.xx",
//Port = 5672,
UserName = "admin",
Password = "admin",
VirtualHost = "my_vhost"
};
this.connection = this.ConnectionFactory.CreateConnection();
//方式2
//ConnectionFactory factory = new ConnectionFactory();
//factory.Uri = new Uri("amqp://user:pass@hostName:port/vhost");
//this.connection = factory.CreateConnection();
}
Direct交换机案例
private void btnRouteKeyPublish_Click(object sender, EventArgs e)
{
string exchangeName = "myexchange1";
string queueName_logElse = "log_else";
string queueName_logError = "log_error";
//2:创建channel
using (var channel = connection.CreateModel())
{
//创建交换机
channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, true, false, null);
//创建队列
channel.QueueDeclare(queueName_logElse, true, false, false, null);
channel.QueueDeclare(queueName_logError, true, false, false, null);
//绑定
channel.QueueBind(queueName_logElse, exchangeName, "info", null);
channel.QueueBind(queueName_logElse, exchangeName, "debug", null);
channel.QueueBind(queueName_logElse, exchangeName, "warn", null);
channel.QueueBind(queueName_logError, exchangeName, "error", null);
//发布info消息
for (int i = 0; i < 10; i++)
{
var msg = Encoding.UTF8.GetBytes($"{i}:haha-info");
channel.BasicPublish(exchangeName, "info", null, msg);
}
//发布debug消息
for (int i = 0; i < 10; i++)
{
var msg = Encoding.UTF8.GetBytes($"{i}:haha-debug");
channel.BasicPublish(exchangeName, "debug", null, msg);
}
//发布warn消息
for (int i = 0; i < 10; i++)
{
var msg = Encoding.UTF8.GetBytes($"{i}:haha-warn");
channel.BasicPublish(exchangeName, "warn", null, msg);
}
//发布error消息
for (int i = 0; i < 10; i++)
{
var msg = Encoding.UTF8.GetBytes($"{i}:haha-error");
channel.BasicPublish(exchangeName, "error", null, msg);
}
}
}
Topic交换机案例
private void btnTopicPublish_Click(object sender, EventArgs e)
{
string exchangeName = "myTopicExchange1";
string queueName1 = "topic_queue1";
string queueName2 = "topic_queue2";
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("myTopicExchange1", ExchangeType.Topic, true, false, null);
channel.QueueDeclare(queueName1, true, false, false, null);
channel.QueueDeclare(queueName2, true, false, false, null);
channel.QueueBind(queueName1, exchangeName, "#.cn", null);
channel.QueueBind(queueName2, exchangeName, "*.cn", null);
//发布info消息(消息会发送到两个队列,因为都匹配)
for (int i = 0; i < 100; i++)
{
var msg = Encoding.UTF8.GetBytes($"{i}:haha");
channel.BasicPublish(exchangeName, "fan.cn", null, msg);
}
}
}
Header交换机案例
private void btnHeadersPublish_Click(object sender, EventArgs e)
{
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("myHeadersExchange1", ExchangeType.Headers, true, false, null);
channel.QueueDeclare("headers_queue1", true, false, false, null);
channel.QueueBind("headers_queue1", "myHeadersExchange1", string.Empty, new Dictionary<string, object>() {
{"x-match","all" },//any
{ "username","fan"},
{ "password","123456"}
});
//properties
var properties = channel.CreateBasicProperties();
properties.Headers = new Dictionary<string, object>();
properties.Headers.Add("username", "fan");
properties.Headers.Add("password", "123456");
properties.Persistent = true;//消息持久化
//发布info消息
for (int i = 0; i < 100; i++)
{
var msg = Encoding.UTF8.GetBytes($"{i}:haha");
channel.BasicPublish("myHeadersExchange1", string.Empty, properties, msg);
}
}
}
Demo1.使用默认交换机收发消息(省略了创建交换机,实际使用的是默认交换机,所有创建的队列都隐式绑定默认交换机)
//发消息
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicPublish(exchange: "",
routingKey: "hello",
basicProperties: null,
body: Encoding.UTF8.GetBytes("Hello World!"));//默认交换机的名字就是""
}
//收消息
using (var channel = connection.CreateModel())
{
channel.QueueDeclare(queue: "hello",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var result = channel.BasicGet("hello", true);
string msg = Encoding.UTF8.GetString(result.Body);
}
Default Exchange:
Default Exchange 其实是AMQP中预先声明的,属于Direct类型,Default Exchange 的名是 "";
他有一个特殊的属性,当你手动创建一个队列时,MQ会自动将这个队列绑定到Default Exchange 上,绑定时 RoutingKey 与队列名称相同
默认交换器隐式绑定到每个队列,路由键等于队列名称。不能显式地绑定到默认交换器,也不能从默认交换器解除绑定。它也不能被删除。
Demo2.消费消息两种方式
/// <summary>
/// 手动拉消息
/// </summary>
/// <param name="queueName"></param>
private void ConsumePullQueue(string queueName)
{
var channel = connection.CreateModel();
var result = channel.BasicGet(queueName, false);//autoACK:false 开启手动确认
try
{
//处理消息
MessageBox.Show(Encoding.UTF8.GetString(result.Body));
//手动确认
channel.BasicAck(result.DeliveryTag, false);
}
catch
{
//退回
channel.BasicRecover(true);
}
//直接扔了
//channel.BasicReject(result.DeliveryTag, true);
//否认确认
//channel.BasicNack()
}
/// <summary>
/// 自动推消息
/// 如果多个消费者订阅一个队列,将轮询获取消息
/// </summary>
/// <param name="queueNames"></param>
private void ConsumeEventQueue(string queueName)
{
var channel = connection.CreateModel();
//开启QOS并行限制,每次发送一条,ack后再发送一条
channel.BasicQos(0, 1, false);
//通过事件订阅方式消费队列
EventingBasicConsumer consumer1 = new EventingBasicConsumer(channel);
consumer1.Received += (sender1, e1) =>
{
MessageBox.Show(Encoding.UTF8.GetString(e1.Body));
channel.BasicAck(e1.DeliveryTag, false);//手动确认
};
channel.BasicConsume(queueName, false, consumer1);//开始消费.(autoACK:false 手动确认)
}
Demo3.生产端消息确认、事务执行
//confirm确认(推荐)
using (var channel = connection.CreateModel())
{
var properties = channel.CreateBasicProperties();
properties.Persistent = true;//消息持久化
channel.ConfirmSelect();//将信道设置成confirm模式
//5:发布消息
for (int i = 0; i < 100; i++)
{
var msg = Encoding.UTF8.GetBytes($"{i}:haha");
channel.BasicPublish(exchangeName, routingKey, properties, msg);
}
bool isSuccess = channel.WaitForConfirms();//是否发布成功
}
//事务执行(低效,不推荐)
using (var channel = connection.CreateModel())
{
var properties = channel.CreateBasicProperties();
properties.Persistent = true;//消息持久化
channel.TxSelect();//将信道设置成事务模式
try
{
for (int i = 0; i < 100; i++)
{
var msg = Encoding.UTF8.GetBytes($"{i}:haha");
channel.BasicPublish(exchangeName, routingKey, properties, msg);
}
channel.TxCommit();
}
catch
{
channel.TxRollback();
}
}