1.RabbitMQHelper.cs
public class RabbitMQHelper { string exchangeName = "demoexchange"; string queueName = "demoqueue"; string exchangeType = ExchangeType.Direct; string routingKey = "demoqueue"; string userName = "test"; string password = "test"; string hostName = "127.0.0.1"; int port = 5672; string virtualHost = "vhost"; public delegate void MQMsgDelegate(string msg); public event MQMsgDelegate MQMsg; public delegate void MQErrorDeletegate(string error); public event MQErrorDeletegate MQError; /// <summary> /// 发布消息队列 /// </summary> private Queue<string> ProducerQueue = new Queue<string>(); private object obj = new object(); /// <summary> /// 发布消息 /// </summary> /// <param name="msg"></param> public void SendMsg(string msg) { lock (obj) { ProducerQueue.Enqueue(msg); } } /// <summary> /// RabbitMQ /// </summary> /// <param name="exchangeName">消息交换机</param> /// <param name="queueName">消息队列</param> /// <param name="exchangeType">交换器类型</param> /// <param name="routingKey">路由关键字</param> /// <param name="userName">用户名</param> /// <param name="password">密码</param> /// <param name="hostName">IP地址</param> /// <param name="port">端口</param> /// <param name="virtualHost">虚拟主机</param> public RabbitMQHelper(string exchangeName, string queueName, string exchangeType, string routingKey, string userName, string password, string hostName, int port, string virtualHost) { this.exchangeName = exchangeName; this.queueName = queueName; this.exchangeType = exchangeType; this.routingKey = routingKey; this.userName = userName; this.password = password; this.hostName = hostName; this.port = port; this.virtualHost = virtualHost; } /// <summary> /// 开始消费 /// </summary> public void Consumer() { try { ConnectionFactory factory = new ConnectionFactory(); factory.UserName = userName; factory.Password = password; factory.HostName = hostName; factory.Port = port; factory.VirtualHost = virtualHost; //factory.AutomaticRecoveryEnabled = true; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //设置交换器的类型 channel.ExchangeDeclare(exchangeName, exchangeType); //声明一个队列,设置队列是否持久化,排他性,与自动删除 channel.QueueDeclare(queueName, false, false, false, null); //绑定消息队列,交换器,routingkey channel.QueueBind(queueName, exchangeName, routingKey, null); //流量控制 channel.BasicQos(0, 2, false); while (true) { //消费数据 var consumer = new EventingBasicConsumer(channel); //false为手动应答,true为自动应答 channel.BasicConsume(queueName, false, consumer); consumer.Received += (ch, ea) => { var body = ea.Body.ToArray(); MQMsg(Encoding.UTF8.GetString(body)); //Console.WriteLine("已接收: {0}", Encoding.UTF8.GetString(body)); //手动应答时使用 channel.BasicAck(ea.DeliveryTag, false); }; string consumerTag = channel.BasicConsume(queueName, false, consumer); channel.BasicCancel(consumerTag); Thread.Sleep(1); } } } } catch (Exception ex) { MQError(ex.Message); Console.WriteLine(ex.Message); } } /// <summary> /// 开始发布 /// </summary> public void Producer() { try { ConnectionFactory factory = new ConnectionFactory(); factory.UserName = userName; factory.Password = password; factory.HostName = hostName; factory.Port = port; factory.VirtualHost = virtualHost; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //设置交换器的类型 channel.ExchangeDeclare(exchangeName, exchangeType); //声明一个队列,设置队列是否持久化,排他性,与自动删除 channel.QueueDeclare(queueName, false, false, false, null); //绑定消息队列,交换器,routingkey channel.QueueBind(queueName, exchangeName, routingKey, null); //消息特点 var properties = channel.CreateBasicProperties(); properties.ContentType = "text/plain"; properties.DeliveryMode = 2; while (true) { System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch(); watch.Start();//开始计时 Console.WriteLine("队列内数据量:" + (ProducerQueue.Count));//输出时间 毫秒 lock (obj) { if (ProducerQueue.Count > 0) { while (ProducerQueue.Count > 0) { var sendMsg = ProducerQueue.Dequeue(); //发送消息 byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(sendMsg); channel.BasicPublish(exchangeName, routingKey, properties, messageBodyBytes); //Console.WriteLine("写入数据:" + sendMsg); //MQMsg(sendMsg +"待写入:"+ ProducerQueue.Count); Thread.Sleep(1); } } } watch.Stop();//停止计时 Console.WriteLine("耗时:" + (watch.ElapsedMilliseconds));//输出时间 毫秒 Thread.Sleep(1); } } } } catch (Exception ex) { MQError(ex.Message); Console.WriteLine(ex.Message); } } }
2.Producer.cs
class Program { private static System.Timers.Timer timer; static RabbitMQHelper helper; static void Main(string[] args) { //大概 400*3*7000/s 字节的写入速度 var len = "10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。".Length; helper = new RabbitMQHelper("demoexchange", "demoqueue", ExchangeType.Direct, "demoqueue", "test", "test", "127.0.0.1", 5672, "vhost"); helper.MQMsg += Helper_MQMsg; helper.MQError += Helper_MQError; timer = new System.Timers.Timer(1); timer.Elapsed += Timer_Elapsed; timer.Start(); helper.Producer(); } private static void Helper_MQMsg(string msg) { Console.WriteLine("已发送: {0}", msg); } private static void Helper_MQError(string error) { Console.WriteLine("错误信息: {0}", error); } private static void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) { int i = 7; while (i > 0) { helper.SendMsg("10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。"); i--; } } }
3.Consumer.cs
class Program { static void Main(string[] args) { //Consumer(); RabbitMQHelper helper = new RabbitMQHelper("demoexchange", "demoqueue", ExchangeType.Direct, "demoqueue", "test", "test", "127.0.0.1", 5672, "vhost"); helper.MQMsg += Helper_MQMsg; helper.MQError += Helper_MQError; helper.Consumer(); } private static void Helper_MQError(string error) { Console.WriteLine("错误信息: {0}", error); } private static void Helper_MQMsg(string msg) { Console.WriteLine("已接收: {0}", msg); } }