zoukankan      html  css  js  c++  java
  • 高并发场景之RabbitMQ篇

    上次我们介绍了在单机、集群下高并发场景可以选择的一些方案,传送门:高并发场景之一般解决方案

    但是也发现了一些问题,比如集群下使用ConcurrentQueue或加锁都不能解决问题,后来采用Redis队列也不能完全解决问题,

    因为使用Redis要自己实现分布式锁

    这次我们来了解一下一个专门处理队列的组件:RabbitMQ,这个东西天生支持分布式队列。

    下面我们来用RabbitMQ来实现上一篇的场景

    一、新建RabbitMQ.Receive

    private static ConnectionFactory factory = new ConnectionFactory 
    { HostName = "192.168.1.109", UserName = "ljr", Password = "root", VirtualHost = "/" };
    复制代码
     1         static void Main(string[] args)
     2         {
     3             using (var connection = factory.CreateConnection())
     4             {
     5                 using (var channel = connection.CreateModel())
     6                 {
     7                     var consumer = new EventingBasicConsumer();
     8                     consumer.Received += (model, ea) =>
     9                     {
    10                         var body = ea.Body;
    11                         var message = Encoding.UTF8.GetString(body);
    12                         Console.WriteLine(" [x] Received {0}", message);
    13 
    14                         var total = DbHelper.ExecuteScalar("Select Total from ConCurrency where Id = 1", null).ToString();
    15                         var value = int.Parse(total) + 1;
    16 
    17                         DbHelper.ExecuteNonQuery(string.Format("Update ConCurrency Set Total = {0} where Id = 1", value.ToString()), null);
    18                     };
    19 
    20                     channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null);
    21                     channel.BasicConsume(queue: "queueName", noAck: true, consumer: consumer);
    22 
    23                     Console.WriteLine(" Press [enter] to exit.");
    24                     Console.ReadLine();
    25                 }
    26             }
    27         }
    复制代码

    二、新建RabbitMQ.Send  

    复制代码
     1         static void Main(string[] args)
     2         {
     3             for (int i = 1; i <= 500; i++)
     4             {
     5                 Task.Run(async () =>
     6                 {
     7                     await Produce();
     8                 });
     9 
    10                 Console.WriteLine(i);
    11             }
    12 
    13             Console.ReadKey();
    14         }
    15 
    16         public static Task Produce()
    17         {
    18             return Task.Factory.StartNew(() =>
    19             {
    20                 using (var connection = factory.CreateConnection())
    21                 {
    22                     using (var channel = connection.CreateModel())
    23                     {
    24                         var body = Encoding.UTF8.GetBytes(Guid.NewGuid().ToString());
    25                         channel.QueueDeclare(queue: "queueName", durable: false, exclusive: false, autoDelete: false, arguments: null);
    26                         channel.BasicPublish(exchange: "", routingKey: "queueName", basicProperties: null, body: body);
    27                     }
    28                 }
    29             });
    30         }
    复制代码

    这里是模拟500个用户请求,正常的话最后Total就等于500

    我们来说试试看,运行程序

    2.1、打开接收端

    2.2 运行客户端

    2.3、可以看到2边几乎是实时的,再去看看数据库

    三、我们在集群里执行 

    最后数据是1000

     完全没有冲突,好了,就是这样 、。

  • 相关阅读:
    智能指针
    C++学习之对类中的成员函数的定义和声明最后添加一个const作用
    动态链接,静态链接库
    Java 位运算
    Java 工具类
    Java 枚举
    Java 内部类
    Java 异常机制
    Java hashCode 和 equals
    Java 字节流和字符流
  • 原文地址:https://www.cnblogs.com/xumaojun/p/8521618.html
Copyright © 2011-2022 走看看