通过消费者去进行Exchange和Queue通过不同的RouteKey进行绑定
消费者1:
static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory() { HostName = "192.168.59.101", UserName = "admin", Password = "admin" }; string myexchange = "myexchange"; string myqueue = "myqueue"; using (var connection = factory.CreateConnection()) { var channel = connection.CreateModel(); channel.ExchangeDeclare(myexchange, ExchangeType.Direct, true, false, null); channel.QueueDeclare(myqueue, true, false, false, null); channel.QueueBind(myqueue, myexchange, "log_info", null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { var msg = Encoding.UTF8.GetString(e.Body); Console.WriteLine(msg); }; channel.BasicConsume(myqueue, false, consumer); Console.ReadKey(); } }
消费者2:
static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory() { HostName = "192.168.59.101", UserName = "admin", Password = "admin" }; string myexchange = "myexchange"; string myqueue = "myqueue"; using (var connection = factory.CreateConnection()) { var channel = connection.CreateModel(); channel.ExchangeDeclare(myexchange, ExchangeType.Direct, true, false, null); channel.QueueDeclare(myqueue, true, false, false, null); channel.QueueBind(myqueue, myexchange, "log_error", null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, e) => { var msg = Encoding.UTF8.GetString(e.Body); Console.WriteLine(msg); }; channel.BasicConsume(myqueue, false, consumer); Console.ReadKey(); } }
生产者:
static void Main(string[] args) { ConnectionFactory factory = new ConnectionFactory() { HostName = "192.168.59.101", UserName = "admin", Password = "admin" }; string myexchange = "myexchange"; string myroutekey = "myrotekey"; string myqueue = "myqueue"; using (var connection = factory.CreateConnection()) { var channel = connection.CreateModel(); //channel.ExchangeDeclare(myexchange, ExchangeType.Direct, true, false, null); //channel.QueueDeclare(myqueue,true,false, false, null); //channel.QueueBind(myqueue, myexchange, myroutekey, null); for (int i = 0; i < 10; i++) { var msg = Encoding.UTF8.GetBytes($"{i},你好"); var routeKey = i % 2 == 0 ? "log_info" : "log_error"; channel.BasicPublish(myexchange, routingKey: routeKey, basicProperties: null, body: msg); Console.WriteLine(i); } Console.ReadKey(); } }
先启动消费者,进行绑定和监听,再执行生产者进行发送消息,结果是监听同一个队列,不同的routeKey结果不同