zoukankan      html  css  js  c++  java
  • RabbitMQ

     说明

       为了方便使用,做了简单的封装,生产消息可以使用泛型等,可以每次创建,可以使用单例模式,或者IOC使配合单例模式使用。这里就不一一介绍,请大家根据自己的业务场景设计。

     测试代码:只有简单队列的代码作为参考
    •  RabbitMQHelper
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System;
    using System.Text;
    
    namespace RabbitMQTest.Common
    {
        public class RabbitMQHelper
        {
            /// <summary>
            /// 连接工厂
            /// </summary>
            ConnectionFactory connectionFactory;
    
            /// <summary>
            /// 连接
            /// </summary>
            IConnection connection;
    
            /// <summary>
            /// 通道
            /// </summary>
            IModel channel;
    
            /// <summary>
            /// 交换机名称
            /// </summary>
            string exchangeName;
    
            /// <summary>
            /// 构造函数
            /// </summary>
            private RabbitMQHelper()
            {
                //创建连接工厂
                connectionFactory = new ConnectionFactory()
                {
                    HostName = "192.168.1.101",
                    Port = 5672,
                    UserName = "admin",
                    Password = "admin",
                    VirtualHost = "testhost"
                };
                //创建连接
                connection = connectionFactory.CreateConnection();
                //创建通道
                channel = connection.CreateModel();
            }
    
            private static readonly Lazy<RabbitMQHelper> _singletonLock = new Lazy<RabbitMQHelper>(() => new RabbitMQHelper());
    
            public static RabbitMQHelper Instance
            {
                get
                {
                    return _singletonLock.Value;
                }
            }
    
            /// <summary>
            /// 生产消息
            /// <para>简单队列(一对一模式)、Worker队列(一对多模式)</para>
            /// </summary>
            /// <param name="queueName">队列名称</param>
            /// <param name="msg">消息内容</param>
            public void SendMsg(string queueName, string msg)
            {
                //声明一个队列
                channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
                byte[] body = Encoding.UTF8.GetBytes(msg);
                channel.BasicPublish(exchange: "", routingKey: queueName, basicProperties: null, body: body);
            }
    
            /// <summary>
            /// 消费消息
            /// <para>简单队列(一对一模式)、Worker队列(一对多模式)</para>
            /// </summary>
            /// <param name="queueName">队列名称</param>
            /// <param name="received">消费消息</param>
            public void Receive(string queueName, Action<string> received)
            {
                EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
                consumer.Received += (ch, ea) =>
                {
                    string message = Encoding.Default.GetString(ea.Body.ToArray());
                    received(message);
                    channel.BasicAck(ea.DeliveryTag, false);
                };
                channel.BasicConsume(queueName, false, consumer);
            }
        }
    }
    
    •  使用方式
    using RabbitMQTest.Common;
    using System;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace RabbitMQTest.Con
    {
        class Program
        {
            static void Main(string[] args)
            {
                Console.WriteLine("请输入消息队列测试数据:");
                string input = string.Empty;
                Task.Run(() =>
                {
                    do
                    {
                        input = Console.ReadLine();
    
                        RabbitMQHelper.Instance.SendMsg("test_queue_1", input);
                    } while (input.Trim().ToLower() != "exit");
                });
    
                Task.Run(() =>
                {
                    RabbitMQHelper.Instance.Receive("test_queue_1", item =>
                    {
                        Console.WriteLine($"消费消息:{item}");
                    });
                });
    
                //主线程不死
                while (true)
                {
                    Thread.Sleep(10000);
                    if (input == "exit") return;
                }
            }
        }
    }
  • 相关阅读:
    java socket解析和发送二进制报文工具(附java和C++转化问题)
    hibernate缓存机制(二级缓存)
    Spring MVC中前后台数据传输小结
    NUC972 MDK NON-OS
    代码是如何控制硬件的?
    C语言位运算+实例讲解(转)
    C语言程序真正的启动函数
    51单片机的时钟及总线时序和总线扩展
    如何通过Keil将程序正确的下载进flash中
    说说M451例程讲解之串口
  • 原文地址:https://www.cnblogs.com/gygtech/p/14927515.html
Copyright © 2011-2022 走看看