zoukankan      html  css  js  c++  java
  • 高并发场景之RabbitMQ篇

    上次我们介绍了在单机、集群下高并发场景可以选择的一些方案,传送门:高并发场景之一般解决方案

    但是也发现了一些问题,比如集群下使用ConcurrentQueue或加锁都不能解决问题,后来采用Redis队列也不能完全解决问题,

    因为使用Redis要自己实现分布式锁

    这次我们来了解一下一个专门处理队列的组件:RabbitMQ,这个东西天生支持分布式队列。

    下面我们来用RabbitMQ来实现上一篇的场景

    一、新建RabbitMQ.Receive

    private static ConnectionFactory factory = new ConnectionFactory 
    { HostName = "192.168.1.109", UserName = "ljr", Password = "root", VirtualHost = "/" };
    复制代码
     1         static void Main(string[] args)
     2         {
     3             using (var connection = factory.CreateConnection())
     4             {
     5                 using (var channel = connection.CreateModel())
     6                 {
     7                     var consumer = new EventingBasicConsumer();
     8                     consumer.Received += (model, ea) =>
     9                     {
    10                         var body = ea.Body;
    11                         var message = Encoding.UTF8.GetString(body);
    12                         Console.WriteLine(" [x] Received {0}", message);
    13 
    14                         var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString();
    15                         var value = int.Parse(total) + 1;
    16 
    17                         DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null);
    18                     };
    19 
    20                     channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null);
    21                     channel.BasicConsume(queue: "queueName", noAck: true, consumer: consumer);
    22 
    23                     Console.WriteLine(" Press [enter] to exit.");
    24                     Console.ReadLine();
    25                 }
    26             }
    27         }
    复制代码

    二、新建RabbitMQ.Send  

    复制代码
     1         static void Main(string[] args)
     2         {
     3             for (int i = 1; i <= 500; i++)
     4             {
     5                 Task.Run(async () =>
     6                 {
     7                     await Produce();
     8                 });
     9 
    10                 Console.WriteLine(i);
    11             }
    12 
    13             Console.ReadKey();
    14         }
    15 
    16         public static Task Produce()
    17         {
    18             return Task.Factory.StartNew(() =>
    19             {
    20                 using (var connection = factory.CreateConnection())
    21                 {
    22                     using (var channel = connection.CreateModel())
    23                     {
    24                         var body = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
    25                         channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null);
    26                         channel.BasicPublish(exchange: "", routingKey: "queueName", basicProperties: null, body: body);
    27                     }
    28                 }
    29             });
    30         }
    复制代码

    这里是模拟500个用户请求,正常的话最后Total就等于500

    我们来说试试看,运行程序

    2.1、打开接收端

    2.2 运行客户端

    2.3、可以看到2边几乎是实时的,再去看看数据库

    三、我们在集群里执行 

    最后数据是1000

     完全没有冲突,好了,就是这样 、。

  • 相关阅读:
    BZOJ 3744 Gty的妹子序列
    BZOJ 3872 Ant colony
    BZOJ 1087 互不侵犯
    BZOJ 1070 修车
    BZOJ 2654 tree
    BZOJ 3243 向量内积
    1003 NOIP 模拟赛Day2 城市建设
    CF865D Buy Low Sell High
    CF444A DZY Loves Physics
    Luogu 4310 绝世好题
  • 原文地址:https://www.cnblogs.com/xumaojun/p/8521618.html
Copyright © 2011-2022 走看看