zoukankan      html  css  js  c++  java
  • RabbitMQ 教程(四)RabbitMQ并发处理

    前言:前面我们都讲解了一些基本的RabbitMQ配置及操作,现在我们来试下使用RabbitMQ处理一些简单的数据并发问题

    准备条件:先创建一个表students, 字段有id, count

    CREATE TABLE Students
    (
    id INT IDENTITY PRIMARY KEY NOT NULL,
    count INT NULL
    )

    我们准备通过每一次累加1,总和存储在count字段上

    一、普通程序的处理

    //创建数据库连接
            private static BaseDAL dal = new BaseDAL("conName");
            static void Main(string[] args)
            {
                //开启线程进行处理
                new Thread(Update).Start();
    
                Console.ReadLine();
            }
    
            private static void Update()
            {
                for (int i = 0; i <100; i++)
                {
                    //获取当前表中的count数值
                    object total = dal.ExecComdToObject("select count from Students where id=1", null);
                    int value = int.Parse(total.ToString()) + 1;//将数值累加1
    
                    //将累加后的数值更新到表Students的字段count
                    string sql = string.Format("update Students set count={0}", value);
                    dal.ExecComd(sql);
    
                    Console.WriteLine("写入成功=" + i);
                }
            }

    上面代码,我们就开启了1个线程进行读取写入,结果如下:

    数据库的字段count值为:

    发现结果没有问题,循环100次,结果是100

    二、普通程序的并发处理

    这次,我们开启2个线程进行并发读取,再同时写入。我们先把数据库的count置回0,再将代码修改一下。

    class Program
        {
            //创建数据库连接
            private static BaseDAL dal = new BaseDAL("conName");
            static void Main(string[] args)
            {
                //开启2线程进行处理
                new Thread(Update).Start();
                new Thread(Update).Start();
    
                Console.ReadLine();
            }
    
            private static void Update()
            {
                for (int i = 1; i <=100; i++)
                {
                    //获取当前表中的count数值
                    object total = dal.ExecComdToObject("select count from Students where id=1", null);
                    int value = int.Parse(total.ToString()) + 1;//将数值累加1
    
                    //将累加后的数值更新到表Students的字段count
                    string sql = string.Format("update Students set count={0}", value);
                    dal.ExecComd(sql);
    
                    Console.WriteLine("写入成功=" + i);
                }
            }
        }

    程序执行结果:

    一看这程序我们就知道出问题了。那么这时候,数据库的count字段值是多少呢?

    为什么会出现这个结果。是因为在并发的时候,前面线程执行的结果,会被后面的update进行了覆盖,所以值不会是200,也不会是刚好100.

    那么这时候,我们的RabbitMQ就开始派上用场了(我这里就不介绍lock以及queue的使用,主要讲解RabbitMQ)

    三、RabbitMQ并发处理

    (1)RabbitMQ.Server代码处理,2线程并发,输送200次请求

    static void Main(string[] args)
            {
                //开启2线程进行处理
                new Thread(SendMsg).Start();
                new Thread(SendMsg).Start();
    
                Console.ReadLine();
            }
    
            private static ConnectionFactory factory = new ConnectionFactory() { HostName = "116.28.8.166", UserName = "admin", Password = "********", VirtualHost = "/" };
            private static void SendMsg() 
            { 
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        for (int i = 1; i <= 100; i++)
                        {
                            string guid = Guid.NewGuid().ToString();
                            var body = Encoding.UTF8.GetBytes(guid);
                            channel.QueueDeclare("AllenLeeQueue", false, false, false, null);
                            channel.BasicPublish("", "AllenLeeQueue", null, body);
    
                            Console.WriteLine("[Set Msg To AllenLeeQueue] " + guid);
                        }
                    }
                }
            }

    (2)RabbitMQ.Client代码处理,对发起的200个请求进行接收

    //创建数据库连接
            private static BaseDAL dal = new BaseDAL("conName");
            private static ConnectionFactory factory = new ConnectionFactory() { HostName = "116.28.8.166", UserName = "admin", Password = "********", VirtualHost = "/" };
            static void Main(string[] args)
            {
                factory.AutomaticRecoveryEnabled = true;//设置端口后自动恢复连接属性即可
                using (var connection = factory.CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        var consumer = new EventingBasicConsumer(channel);
                        consumer.Received += (model, ea) =>
                        {
                            var body = ea.Body;
                            var message = Encoding.UTF8.GetString(body);
                            Console.WriteLine(" [Get Msg from AllenLeeQueue] {0}", message);
    
                            //获取当前表中的count数值
                            object total = dal.ExecComdToObject("select count from Students where id=1", null);
                            int value = int.Parse(total.ToString()) + 1;//将数值累加1
                            string sql = string.Format("update Students set count={0}", value);
                            dal.ExecComd(sql);
                        };
    
                        channel.BasicConsume(queue: "AllenLeeQueue", noAck: true, consumer: consumer);
                        Console.ReadLine();
                    }
                }
            } 

    (3)如果RabbitMQ如果处理得当,数据库的字段count值就为200,不多不少,我们来看下数据库字段值

    实践证明,RabbitMQ能很好的进行并发处理,达到了我们预期的效果。

    但是可能有朋友会觉得,这传的都是guid,如果是我们实际工作中,直接传的是并发业务数据,又该怎么处理呢?

    其实就是在传输的body中,传入业务数据,再在RabbitMQ.Client进行业务数据转化就可以了。

  • 相关阅读:
    第二章 图像的显示
    c++ 使用PI
    c++函数写的都对,还是说incompatible或者not found的解决办法
    我理解的直方图均衡化
    解决360WiFi有时候手机连接不上
    c# 16进制byte转成int
    VS2010 代码突然改变字体 解决办法
    荣耀手机恢复那些“不再提示”的设置
    mfc视类中错误:IntelliSense: declaration is incompatible with。。。解决方案
    [原] Android 自定义View步骤
  • 原文地址:https://www.cnblogs.com/AllenLee/p/7592172.html
Copyright © 2011-2022 走看看