消费者Code:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.IO; using System.Threading; namespace Consumer { class Program { static void Main(string[] args) { new Thread(Consumer.Read).Start(); Console.ReadLine(); } public class Consumer { public static void Read() { var qName = "writeLogQueue"; var exchangeName = "fanoutExchange1"; var exchangeType = "fanout";//topic、fanout var routingKey = "writeLogKey"; var factory = new ConnectionFactory() { HostName = "10.40.3.65", UserName = "qhong", Password = "hongdada", VirtualHost = "rongzi_dev" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.ExchangeDeclare(exchangeName, exchangeType); channel.QueueDeclare(qName, false, false, false, null); channel.QueueBind(qName, exchangeName, routingKey); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); ExcuateWriteFile(message); Console.WriteLine(" Receiver Received {0}", message); }; channel.BasicConsume(queue: qName, noAck: true, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } public static void ExcuateWriteFile(string i) { //using (FileStream fs = new FileStream(@"d:\test.txt", FileMode.Append)) //{ // using (StreamWriter sw = new StreamWriter(fs, Encoding.Unicode)) // { // sw.WriteLine(i); // } //} Console.WriteLine(i); } } } }
查看Wireshark抓包:
"No.","Time","Source","Destination","Protocol","Length","Info" "410","2017-08-22 10:13:19.649167","10.40.3.65","10.51.0.204","AMQP","544","Connection.Start " "411","2017-08-22 10:13:19.672524","10.51.0.204","10.40.3.65","AMQP","466","Connection.Start-Ok " "412","2017-08-22 10:13:19.679205","10.40.3.65","10.51.0.204","AMQP","74","Connection.Tune " "413","2017-08-22 10:13:19.681379","10.51.0.204","10.40.3.65","AMQP","74","Connection.Tune-Ok " "414","2017-08-22 10:13:19.682598","10.51.0.204","10.40.3.65","AMQP","79","Connection.Open vhost=rongzi_dev " "416","2017-08-22 10:13:19.688692","10.40.3.65","10.51.0.204","AMQP","67","Connection.Open-Ok " "417","2017-08-22 10:13:19.694663","10.51.0.204","10.40.3.65","AMQP","67","Channel.Open " "418","2017-08-22 10:13:19.697618","10.40.3.65","10.51.0.204","AMQP","70","Channel.Open-Ok " "419","2017-08-22 10:13:19.699423","10.51.0.204","10.40.3.65","AMQP","96","Exchange.Declare x=fanoutExchange1 " "420","2017-08-22 10:13:19.702360","10.40.3.65","10.51.0.204","AMQP","66","Exchange.Declare-Ok " "421","2017-08-22 10:13:19.704335","10.51.0.204","10.40.3.65","AMQP","87","Queue.Declare q=writeLogQueue " "422","2017-08-22 10:13:19.709073","10.40.3.65","10.51.0.204","AMQP","88","Queue.Declare-Ok q=writeLogQueue " "423","2017-08-22 10:13:19.711838","10.51.0.204","10.40.3.65","AMQP","115","Queue.Bind q=writeLogQueue x=fanoutExchange1 bk=writeLogKey " "424","2017-08-22 10:13:19.714497","10.40.3.65","10.51.0.204","AMQP","66","Queue.Bind-Ok " "425","2017-08-22 10:13:19.716549","10.51.0.204","10.40.3.65","AMQP","88","Basic.Consume q=writeLogQueue " "426","2017-08-22 10:13:19.722084","10.40.3.65","10.51.0.204","AMQP","98","Basic.Consume-Ok " "429","2017-08-22 10:13:19.884511","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "670","2017-08-22 10:13:34.883534","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "990","2017-08-22 10:13:49.891534","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "1209","2017-08-22 10:14:04.899543","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "1419","2017-08-22 10:14:19.687198","10.40.3.65","10.51.0.204","AMQP","62","Heartbeat " "1422","2017-08-22 10:14:19.904023","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "1676","2017-08-22 10:14:34.912579","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "1947","2017-08-22 10:14:49.922623","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "2136","2017-08-22 10:15:04.930537","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "2481","2017-08-22 10:15:19.686306","10.40.3.65","10.51.0.204","AMQP","62","Heartbeat " "2483","2017-08-22 10:15:19.932026","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "2832","2017-08-22 10:15:34.941526","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "3054","2017-08-22 10:15:49.950055","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "3418","2017-08-22 10:16:04.958528","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "3850","2017-08-22 10:16:19.688245","10.40.3.65","10.51.0.204","AMQP","62","Heartbeat " "3859","2017-08-22 10:16:19.960069","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "4050","2017-08-22 10:16:34.968529","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "4736","2017-08-22 10:16:49.977085","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat "
生产者Code:
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client; using RabbitMQ.Client.Events; using System.IO; using Newtonsoft.Json; using System.ComponentModel; using System.Text.RegularExpressions; namespace ConsoleDemo { class Program { static void Main(string[] args) { new Thread(Provider.Write).Start(); Console.ReadLine(); } } public class Provider { public static void Write() { var qName = "writeLogQueue"; var exchangeName = "fanoutExchange1"; var exchangeType = "fanout";//topic、fanout var routingKey = "writeLogKey"; var factory = new ConnectionFactory() { HostName = "10.40.3.65", UserName = "qhong", Password = "hongdada", VirtualHost = "rongzi_dev" }; //var factory = new ConnectionFactory() { HostName = "10.51.0.204", UserName = "qhong", Password = "hongdada",VirtualHost="rongzi_qa" }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { ////设置交换器的类型 //channel.ExchangeDeclare(exchangeName, exchangeType); ////声明一个队列,设置队列是否持久化,排他性,与自动删除 //channel.QueueDeclare(qName, true, false, false, null); ////绑定消息队列,交换器,routingkey //channel.QueueBind(qName, exchangeName, routingKey); channel.ExchangeDeclare(exchangeName, exchangeType); channel.QueueDeclare(queue:qName, durable: false, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(qName, exchangeName, routingKey); for (int i = 0; i < 20; i++) { string message = i.ToString(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange:exchangeName, routingKey:routingKey, basicProperties: null, body: body); Console.WriteLine("Program Sent {0}", message); } } } } }
抓包,太长就不截图了
"32119","2017-08-22 10:33:20.584321","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "32454","2017-08-22 10:33:35.585035","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "32673","2017-08-22 10:33:45.475110","10.40.3.65","10.51.0.204","AMQP","544","Connection.Start " "32674","2017-08-22 10:33:45.492512","10.51.0.204","10.40.3.65","AMQP","466","Connection.Start-Ok " "32675","2017-08-22 10:33:45.495352","10.40.3.65","10.51.0.204","AMQP","74","Connection.Tune " "32676","2017-08-22 10:33:45.497523","10.51.0.204","10.40.3.65","AMQP","74","Connection.Tune-Ok " "32677","2017-08-22 10:33:45.498767","10.51.0.204","10.40.3.65","AMQP","79","Connection.Open vhost=rongzi_dev " "32679","2017-08-22 10:33:45.504320","10.40.3.65","10.51.0.204","AMQP","67","Connection.Open-Ok " "32680","2017-08-22 10:33:45.507044","10.51.0.204","10.40.3.65","AMQP","67","Channel.Open " "32681","2017-08-22 10:33:45.510357","10.40.3.65","10.51.0.204","AMQP","70","Channel.Open-Ok " "32682","2017-08-22 10:33:45.511938","10.51.0.204","10.40.3.65","AMQP","96","Exchange.Declare x=fanoutExchange1 " "32683","2017-08-22 10:33:45.515660","10.40.3.65","10.51.0.204","AMQP","66","Exchange.Declare-Ok " "32684","2017-08-22 10:33:45.517792","10.51.0.204","10.40.3.65","AMQP","87","Queue.Declare q=writeLogQueue " "32686","2017-08-22 10:33:45.521655","10.40.3.65","10.51.0.204","AMQP","88","Queue.Declare-Ok q=writeLogQueue " "32687","2017-08-22 10:33:45.523771","10.51.0.204","10.40.3.65","AMQP","115","Queue.Bind q=writeLogQueue x=fanoutExchange1 bk=writeLogKey " "32688","2017-08-22 10:33:45.527364","10.40.3.65","10.51.0.204","AMQP","66","Queue.Bind-Ok " "32689","2017-08-22 10:33:45.533525","10.51.0.204","10.40.3.65","AMQP","128","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32690","2017-08-22 10:33:45.534040","10.51.0.204","10.40.3.65","AMQP","128","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32691","2017-08-22 10:33:45.534170","10.51.0.204","10.40.3.65","AMQP","128","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32692","2017-08-22 10:33:45.534290","10.51.0.204","10.40.3.65","AMQP","128","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32693","2017-08-22 10:33:45.534612","10.51.0.204","10.40.3.65","AMQP","128","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32694","2017-08-22 10:33:45.535047","10.51.0.204","10.40.3.65","AMQP","128","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32695","2017-08-22 10:33:45.535142","10.51.0.204","10.40.3.65","AMQP","128","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32696","2017-08-22 10:33:45.535206","10.51.0.204","10.40.3.65","AMQP","128","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32697","2017-08-22 10:33:45.535266","10.51.0.204","10.40.3.65","AMQP","128","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32698","2017-08-22 10:33:45.535318","10.51.0.204","10.40.3.65","AMQP","128","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32699","2017-08-22 10:33:45.535373","10.51.0.204","10.40.3.65","AMQP","129","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32700","2017-08-22 10:33:45.535440","10.51.0.204","10.40.3.65","AMQP","129","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32701","2017-08-22 10:33:45.535492","10.51.0.204","10.40.3.65","AMQP","129","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32702","2017-08-22 10:33:45.535545","10.51.0.204","10.40.3.65","AMQP","129","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32703","2017-08-22 10:33:45.535601","10.51.0.204","10.40.3.65","AMQP","129","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32704","2017-08-22 10:33:45.535652","10.51.0.204","10.40.3.65","AMQP","129","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32705","2017-08-22 10:33:45.535723","10.51.0.204","10.40.3.65","AMQP","129","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32706","2017-08-22 10:33:45.535780","10.51.0.204","10.40.3.65","AMQP","129","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32707","2017-08-22 10:33:45.535840","10.51.0.204","10.40.3.65","AMQP","129","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32708","2017-08-22 10:33:45.535891","10.51.0.204","10.40.3.65","AMQP","129","Basic.Publish x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32710","2017-08-22 10:33:45.537191","10.51.0.204","10.40.3.65","AMQP","80","Channel.Close reply=Goodbye " "32711","2017-08-22 10:33:45.538158","10.40.3.65","10.51.0.204","AMQP","502","Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32713","2017-08-22 10:33:45.539595","10.40.3.65","10.51.0.204","AMQP","390","Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32715","2017-08-22 10:33:45.539597","10.40.3.65","10.51.0.204","AMQP","390","Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32717","2017-08-22 10:33:45.539597","10.40.3.65","10.51.0.204","AMQP","393","Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32719","2017-08-22 10:33:45.539598","10.40.3.65","10.51.0.204","AMQP","280","Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32721","2017-08-22 10:33:45.539598","10.40.3.65","10.51.0.204","AMQP","506","Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32722","2017-08-22 10:33:45.539599","10.40.3.65","10.51.0.204","AMQP","167","Basic.Deliver x=fanoutExchange1 rk=writeLogKey Content-Header Content-Body " "32726","2017-08-22 10:33:45.543441","10.40.3.65","10.51.0.204","AMQP","66","Channel.Close-Ok " "32730","2017-08-22 10:33:45.547820","10.51.0.204","10.40.3.65","AMQP","96","Connection.Close reply=Connection close forced " "32731","2017-08-22 10:33:45.551016","10.40.3.65","10.51.0.204","AMQP","66","Connection.Close-Ok " "32969","2017-08-22 10:33:50.587474","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat " "33382","2017-08-22 10:34:05.596326","10.51.0.204","10.40.3.65","AMQP","62","Heartbeat "
通过info可以很清晰的看出rabbitmq的进程!
可以看出,发布者发布了20条信息,请求了20次
但是消费者在消费的时候,并不是二十次,而是七次,可以看出Basic.Deliver在一次AMQP中最多被调用了4次,可见一次消费可以消费多条message。