zoukankan      html  css  js  c++  java
  • RabbitMQ学习工作队列模式(以C#代码为例)

    参考地址:https://mp.weixin.qq.com/s/QG3uXhhpkE_Uo6Me15mxdg

                      https://www.bilibili.com/video/BV1GU4y1w7Yq?p=7

                      https://www.rabbitmq.com/tutorials/tutorial-two-dotnet.html

    工作队列模式(Work queues)

     

    在多个消费者之间分配任务(竞争的消费者模式),一个生产者对应多个消费者,一般适用于执行资源密集型任务,单个消费者处理不过来,需要多个消费者进行处理

    应用场景: 一个订单的处理需要10s,有多个订单可以同时放到消息队列,然后让多个消费者同时处理,这样就是并行了,而不是单个消费者的串行情况

     

     实现12306短信通知用户

    项目运行环境:windows10 vs2019 RabbitMQ 

    Demo

    建立三个控制台项目(RabbitMQ.Producer,RabbitMQ.Consumer01,RabbitMQ.Consumer02)一个类库项目(RabbitMQ.Common)

    1.RabbitMQ.Common

    (1)RabbitConstant

    using System;
    using System.Collections.Generic;
    using System.Text;
    
    namespace RabbitMQ.Common
    {
        public class RabbitConstant
        {
            public const string QUEUE_HELLO_WORLD = "Hello";
            public const string QUEUE_SMS = "WorkQueue";
        }
    }

    (2)RabbitUtils.cs

    using System;
    using RabbitMQ.Client;
    
    namespace RabbitMQ.Common
    {
        public class RabbitUtils
        {
            public static ConnectionFactory GetConnection()
            {
                var factory = new ConnectionFactory();
                factory.HostName = "127.0.0.1";
                factory.Port = 5672;//是服务端的端口号,与页面的端口号15672区分开
                factory.UserName = "guest";
                factory.Password = "guest";
                //factory.VirtualHost = "/";
                return factory;
            }
        }
    }

     2.RabbitMQ.Producer项目

    Producer下的SmsSender.cs

    using System;
    using System.Text;
    using Newtonsoft.Json;
    using RabbitMQ.Client;
    using RabbitMQ.Common;
    
    namespace RabbitMQ.Producer.Producer
    {
        public class SmsSender
        {
            public static void SendMessage()
            {
                using (var connection = RabbitUtils.GetConnection().CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        /*
                      *创建队列,声明并创建一个队列,如果队列存在,则使用这个队列
                      *queue:队列名称ID
                      *durable:是否持久化,false对应不持久化数据,MQ停掉数据就会数据丢失
                      *exclusive:是否队列私有化,false则代表所有的消费者都可以访问,true代表只有第一次拥有它的消费者才能一直使用
                      *exclusive:是否自动删除,false代表连接停掉后不自动删除这个队列
                      *arguments:其他额外参数为null
                      */
                        channel.QueueDeclare(queue: RabbitConstant.QUEUE_SMS,
                                             durable: true,
                                             exclusive: false,
                                             autoDelete: false,
                                             arguments: null);
                        for (int i = 0; i < 1000; i++)
                        {
                            Sms sms = new Sms($"乘客{i}", $"13600000{i}", "您的车票已预订成功");
                            string message = JsonConvert.SerializeObject(sms);
                            var body = Encoding.UTF8.GetBytes(message);
                            /*
                       * exchange:交换机,暂时用不到,在进行发布订阅时才会用到
                       * routingKey:路由key
                       * basicProperties:额外的设置属性
                       * body:要传递的消息字节数组
                       */
                            channel.BasicPublish(exchange: "",
                                                 routingKey: RabbitConstant.QUEUE_SMS,
                                                 basicProperties: null,
                                                 body: body);
                            Console.WriteLine($"正在发送内容{message}");
                        }
                        Console.WriteLine("发送数据成功");
                        Console.WriteLine("Press [Enter] to exit");
                        Console.ReadLine();
                    }
                }
            }
        }
    }

     Producer下的SmsSender.cs

    using System;
    using System.Collections.Generic;
    using System.Text;
    
    namespace RabbitMQ.Producer.Producer
    {
        public class Sms
        {
            public Sms(string name,string mobile,string content)
            {
                 Name =name;
                 Moblie=mobile;
                 Content=content;
            }
    
            public string Name { get; set; }
            public string Moblie { get; set; }
            public string Content { get; set; }
        }
    }

     Producer下的Program.cs

    using RabbitMQ.Producer.Producer;
    using System;
    
    namespace RabbitMQ.Producer
    {
        class Program
        {
            static void Main(string[] args)
            {
                SmsSender.SendMessage();
            }
        }
    }

    3.RabbitMQ.Consumer01和RabbitMQ.Consumer02(项目代码一致)

    Consumer下SmsReceive.cs

    using System;
    using System.Text;
    using System.Threading;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using RabbitMQ.Common;
    
    
    
    namespace RabbitMQ.Consumer01.Consumer
    {
        public class SmsReceive
        {
            public static void ReceiveMessage()
            {
                var connection = RabbitUtils.GetConnection().CreateConnection();
                var channel = connection.CreateModel();
                channel.QueueDeclare(queue: RabbitConstant.QUEUE_SMS,
                                        durable: true,
                                        exclusive: false,
                                        autoDelete: false,
                                        arguments: null);
                //如果不写BasicQos,则自动MQ会将所有请求平均发送给消费者
                //BasicQos,MQ不再对消费者发送多个请求,而是消费者处理完一个消息后(确认后),在队列中获取一个新的
                //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                Console.WriteLine("[*] Waiting for messages.");
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (sender, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    //Thread.Sleep(30);
                    Console.WriteLine($"SmsSender-发送短信成功:{message}");
                    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                };
                channel.BasicConsume(queue: RabbitConstant.QUEUE_SMS,
                                     autoAck: false,
                                     consumer: consumer);
                Console.WriteLine("Press [Enter] to exit");
                Console.ReadLine();
            }
        }
    }

    Program.cs

    using RabbitMQ.Consumer01.Consumer;
    using System;
    
    namespace RabbitMQ.Consumer1
    {
        class Program
        {
            static void Main(string[] args)
            {
                SmsReceive.ReceiveMessage();
            }
        }
    }

    4.运行

    设置多项目运行 右键解决方案--属性--多个启动项目

     

     

  • 相关阅读:
    CycleGAN的原理及Pytorch实现
    Pix2Pix的原理及Pytorch实现
    DCGAN的原理及Pytorch实现
    Simple GAN的原理及Pytorch实现
    瑞数无限debugger完美处理
    Python selenium 设置 火狐 谷歌 无头模式
    TypeError: Cannot read property 'userAgent' of undefined at Timeout.task [as _onTimeout] (D:cnipa ode_modulesjsdomlibjsdomrowserWindow.js:516:19)
    fiddler 增加请求响应时间
    前端js对象转formData
    js 访问 URL 链接
  • 原文地址:https://www.cnblogs.com/hobelee/p/15732940.html
Copyright © 2011-2022 走看看