zoukankan      html  css  js  c++  java
  • C#完成RabbitMQ数据简单生成和消费-RabbitMQHelper

    1.RabbitMQHelper.cs

    public class RabbitMQHelper
        {
            string exchangeName = "demoexchange";
            string queueName = "demoqueue";
            string exchangeType = ExchangeType.Direct;
            string routingKey = "demoqueue";
    
            string userName = "test";
            string password = "test";
            string hostName = "127.0.0.1"; 
            int port = 5672;
            string virtualHost = "vhost";
    
            public delegate void MQMsgDelegate(string msg);
            public event MQMsgDelegate MQMsg;
    
            public delegate void MQErrorDeletegate(string error);
            public event MQErrorDeletegate MQError;
    
            /// <summary>
            /// 发布消息队列
            /// </summary>
            private Queue<string> ProducerQueue = new Queue<string>();
    
            private object obj = new object();
            /// <summary>
            /// 发布消息
            /// </summary>
            /// <param name="msg"></param>
            public void SendMsg(string msg)
            {
                lock (obj)
                {
                    ProducerQueue.Enqueue(msg);
                }
            }
    
            /// <summary>
            /// RabbitMQ
            /// </summary>
            /// <param name="exchangeName">消息交换机</param>
            /// <param name="queueName">消息队列</param>
            /// <param name="exchangeType">交换器类型</param>
            /// <param name="routingKey">路由关键字</param>
            /// <param name="userName">用户名</param>
            /// <param name="password">密码</param>
            /// <param name="hostName">IP地址</param>
            /// <param name="port">端口</param>
            /// <param name="virtualHost">虚拟主机</param>
            public RabbitMQHelper(string exchangeName, string queueName, string exchangeType, string routingKey, string userName, string password, string hostName, int port, string virtualHost)
            {
                this.exchangeName = exchangeName;
                this.queueName = queueName;
                this.exchangeType = exchangeType;
                this.routingKey = routingKey;
                this.userName = userName;
                this.password = password;
                this.hostName = hostName;
                this.port = port;
                this.virtualHost = virtualHost;
            }
    
            /// <summary>
            /// 开始消费
            /// </summary>
            public void Consumer()
            {
                try
                {
    
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.UserName = userName;
                    factory.Password = password;
                    factory.HostName = hostName;
                    factory.Port = port;
                    factory.VirtualHost = virtualHost;
    
                    //factory.AutomaticRecoveryEnabled = true;
                    using (var connection = factory.CreateConnection())
                    {
    
                        using (var channel = connection.CreateModel())
                        {
                            //设置交换器的类型
                            channel.ExchangeDeclare(exchangeName, exchangeType);
    
                            //声明一个队列,设置队列是否持久化,排他性,与自动删除
                            channel.QueueDeclare(queueName, false, false, false, null);
    
                            //绑定消息队列,交换器,routingkey
                            channel.QueueBind(queueName, exchangeName, routingKey, null);
    
                            //流量控制
                            channel.BasicQos(0, 2, false);
    
                            while (true)
                            {
                                //消费数据
                                var consumer = new EventingBasicConsumer(channel);
    
                                //false为手动应答,true为自动应答
                                channel.BasicConsume(queueName, false, consumer);
    
                                consumer.Received += (ch, ea) =>
                                {
                                    var body = ea.Body.ToArray();
    
                                    MQMsg(Encoding.UTF8.GetString(body));
    
                                    //Console.WriteLine("已接收: {0}", Encoding.UTF8.GetString(body));
    
                                    //手动应答时使用
                                    channel.BasicAck(ea.DeliveryTag, false);
                                };
    
                                string consumerTag = channel.BasicConsume(queueName, false, consumer);
                                channel.BasicCancel(consumerTag);
    
                                Thread.Sleep(1);
                            }
                        }
    
                    }
                }
                catch (Exception ex)
                {
                    MQError(ex.Message);
                    Console.WriteLine(ex.Message);
                }
            }
    
            /// <summary>
            /// 开始发布
            /// </summary>
            public void Producer()
            {
                try
                {
                    ConnectionFactory factory = new ConnectionFactory();
                    factory.UserName = userName;
                    factory.Password = password;
                    factory.HostName = hostName;
                    factory.Port = port;
                    factory.VirtualHost = virtualHost;
    
                    using (var connection = factory.CreateConnection())
                    {
                        using (var channel = connection.CreateModel())
                        {
                            //设置交换器的类型
                            channel.ExchangeDeclare(exchangeName, exchangeType);
    
                            //声明一个队列,设置队列是否持久化,排他性,与自动删除
                            channel.QueueDeclare(queueName, false, false, false, null);
    
                            //绑定消息队列,交换器,routingkey
                            channel.QueueBind(queueName, exchangeName, routingKey, null);
    
                            //消息特点
                            var properties = channel.CreateBasicProperties();
                            properties.ContentType = "text/plain";
                            properties.DeliveryMode = 2;
    
                            while (true)
                            {
                                System.Diagnostics.Stopwatch watch = new System.Diagnostics.Stopwatch();
                                watch.Start();//开始计时
    
                                Console.WriteLine("队列内数据量:" + (ProducerQueue.Count));//输出时间 毫秒
                                lock (obj)
                                {
                                    if (ProducerQueue.Count > 0)
                                    {
                                        while (ProducerQueue.Count > 0)
                                        {
                                            var sendMsg = ProducerQueue.Dequeue();
    
                                            //发送消息
                                            byte[] messageBodyBytes = System.Text.Encoding.UTF8.GetBytes(sendMsg);
                                            channel.BasicPublish(exchangeName, routingKey, properties, messageBodyBytes);
                                            //Console.WriteLine("写入数据:" + sendMsg);
    
                                            //MQMsg(sendMsg +"待写入:"+ ProducerQueue.Count);
    
                                            Thread.Sleep(1);
                                        }
                                    }
                                }
                                watch.Stop();//停止计时
    
                                Console.WriteLine("耗时:" + (watch.ElapsedMilliseconds));//输出时间 毫秒
                    
    
                                Thread.Sleep(1);
                            }
                        }
                    }
    
                }
                catch (Exception ex)
                {
                    MQError(ex.Message);
                    Console.WriteLine(ex.Message);
                }
            }
        }
    View Code

    2.Producer.cs

     class Program
        {
    
            private static System.Timers.Timer timer;
    
            static RabbitMQHelper helper;
    
            static void Main(string[] args)
            {
    
                //大概 400*3*7000/s 字节的写入速度
    
                var len = "10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。".Length;
    
                helper = new RabbitMQHelper("demoexchange", "demoqueue", ExchangeType.Direct, "demoqueue", "test", "test", "127.0.0.1", 5672, "vhost");
                helper.MQMsg += Helper_MQMsg;
                helper.MQError += Helper_MQError;
    
                timer = new System.Timers.Timer(1);
                timer.Elapsed += Timer_Elapsed;
                timer.Start();
    
                helper.Producer();      
            }
    
            private static void Helper_MQMsg(string msg)
            {
                Console.WriteLine("已发送: {0}", msg);
            }
            private static void Helper_MQError(string error)
            {
                Console.WriteLine("错误信息: {0}", error);
            }
            private static void Timer_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
            {
                int i = 7;
                while (i > 0)
                {
                    helper.SendMsg("10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。10日,中国人民银行发布《关于开展大额现金管理试点的通知》。《通知》指出,该试点为期2年,先在河北省开展,再推广至浙江省、广东省深圳市。");
                    i--;
                }
            }
        }
    View Code

    3.Consumer.cs

     class Program
        {
            static void Main(string[] args)
            {
                //Consumer();
                RabbitMQHelper helper = new RabbitMQHelper("demoexchange", "demoqueue", ExchangeType.Direct, "demoqueue", "test", "test", "127.0.0.1", 5672, "vhost");
                helper.MQMsg += Helper_MQMsg;
                helper.MQError += Helper_MQError;
                helper.Consumer();
            }
    
            private static void Helper_MQError(string error)
            {
                Console.WriteLine("错误信息: {0}", error);
            }
    
            private static void Helper_MQMsg(string msg)
            {
                Console.WriteLine("已接收: {0}", msg);
            }
        }
    View Code
  • 相关阅读:
    如何快速搞定websocket
    websocket断网消息补发
    div嵌套多个点击事件,点击后如何阻止多次事件触发冒泡
    仿照 MediatR实现了一个中介者模式Publish功能,使用同MediatR
    git提交指南(超级详细)
    删除github中的文件夹
    抽取进程集成模式注册报错,OGG-08221,OCI Error ORA-44004 invalid qualified SQL Name
    Oracle-参数学习_no_or_expansion
    OGG19版本源端新增字段,目标端复制进程不报错,使用MAPALLCOLUMNS进行测试
    Oracle存储过程如何定位慢SQL?
  • 原文地址:https://www.cnblogs.com/kuangxiangnice/p/13131580.html
Copyright © 2011-2022 走看看