使用默认账号:guest/guest登录http://localhost:15672/#/进去,添加一个新用户(Administrator权限),并设置其Permission
新建两个控制台程序
安装RabbitMQ.Client:https://www.nuget.org/packages/RabbitMQ.Client/
生产者(producer)
static void Main(string[] args) { try { ConnectionFactory factory = new ConnectionFactory { UserName = "admin",//用户名 Password = "123456",//密码 HostName = "localhost"//rabbitmq ip }; //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //声明一个队列 channel.QueueDeclare("hello", false, false, false, null); Console.WriteLine(" RabbitMQ连接成功,请输入消息,输入exit退出!"); string input; do { input = Console.ReadLine(); var sendBytes = Encoding.UTF8.GetBytes(input); //发布消息 channel.BasicPublish("", "hello", null, sendBytes); } while (input.Trim().ToLower() != "exit"); channel.Close(); connection.Close(); } catch(Exception ex) { Console.WriteLine(ex.Message); Console.Read(); } }
消费者(consumer)
static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory { UserName = "admin",//用户名 Password = "123456",//密码 HostName = "localhost"//rabbitmq ip }; //创建连接 var connection = factory.CreateConnection(); //创建通道 var channel = connection.CreateModel(); //事件基本消费者 EventingBasicConsumer consumer = new EventingBasicConsumer(channel); //接收到消息事件 consumer.Received += (ch, ea) => { var message = Encoding.UTF8.GetString(ea.Body); Console.WriteLine($"收到消息: {message}");//确认该消息已被消费 channel.BasicAck(ea.DeliveryTag, false); // Console.WriteLine($"已发送回执[{ea.DeliveryTag}]"); }; //启动消费者 设置为手动应答消息 channel.BasicConsume("hello", false, consumer); Console.WriteLine("消费者已启动"); Console.ReadKey(); channel.Dispose(); connection.Close(); }
在rabbitmq中,exchange有4个类型:direct,topic,fanout,header
direct类型
在direct类型的exchange中,只有这两个routingkey完全相同,exchange才会选择对应的binging进行消息路由。
//定义一个Direct类型交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Direct, false, false, null); //定义一个队列 channel.QueueDeclare(queueName, false, false, false, null); //将队列绑定到交换机 channel.QueueBind(queueName, exchangeName, routeKey, null);
fanout类型
此exchange的路由规则很简单直接将消息路由到所有绑定的队列中,无须对消息的routingkey进行匹配操作。
//定义一个Direct类型交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Fanout, false, false, null); //定义队列1 channel.QueueDeclare(queueName1, false, false, false, null); //定义队列2 channel.QueueDeclare(queueName2, false, false, false, null); //将队列绑定到交换机 channel.QueueBind(queueName1, exchangeName, routeKey, null); channel.QueueBind(queueName2, exchangeName, routeKey, null);
topic类型
此类型exchange和上面的direct类型差不多,但direct类型要求routingkey完全相等,这里的routingkey可以有通配符:'*','#'.
其中'*'表示匹配一个单词, '#'则表示匹配没有或者多个单词
string routeKey = "TestRouteKey.*";
//定义一个Direct类型交换机 channel.ExchangeDeclare(exchangeName, ExchangeType.Topic, false, false, null); //定义队列1 channel.QueueDeclare(queueName, false, false, false, null); //将队列绑定到交换机 channel.QueueBind(queueName, exchangeName, routeKey, null); //发布消息 channel.BasicPublish(exchangeName, "TestRouteKey.one", null, sendBytes);
以上就是exchange 类型的总结,一般来说direct和topic用来具体的路由消息,如果要用广播的消息一般用fanout的exchange。
header类型用的比较少就不多介绍了