zoukankan      html  css  js  c++  java
  • C# 多线程小试牛刀

    2019年6月28日更新

    采用预先生成随机数 + bitarray 来判断重复 + 数组分段插入隔离进行插入 1000w的数据的不重复随机数插入可以压缩到 3 s 内。

    前言

    昨天在上班时浏览博问,发现了一个问题,虽然自己在 C# 多线程上没有怎么尝试过,看了几遍 CLR 中关于 线程的概念和讲解(后面三章)。也想拿来实践实践。问题定义是这样的:

    对于多线程不是很懂,面试的时候遇到一个多线程的题,不会做,分享出来,懂的大佬指点一下,谢谢

    建一个winform窗体,在窗体中放上一个开始按钮,一个停止按钮,一个文本框,在窗体中声明一个List类型的属性,点击开始按钮后开启10个线程,所有线程同时不间断的给List集合中添加1-10000之间的随机数,要求添加List集合中的数字不能重复,并且实时在文本框中显示集合的长度,当集合List的长度等于1000时自动停止所有线程,如果中途点击停止按钮也停止所有线程,点击开始又继续执行。

    我其实没有完全实现了这位博问中提问的同学的需求,具体问题的来源可查看该地址 问题来源

    开始尝试

    刚拿到这个需求的时候,映入我脑海里的是 Task, Threadpool,Concurrent,和 Lock 等概念,接下来就是组装和编码的过程了,首先理一理头绪,

    • 生成随机数
    • 插入到 List 中,且不能重复
    • 开启多个线程同时插入。

    首先是生成 随机数,使用 System.Random 类来生成伪随机数(这个其实性能和效率贼低,后面再叙述)

    private int GenerateInt32Num()
    {
        var num = random.Next(0, TOTAL_NUM);
        return num;
    }
    

    然后是插入到 List<Int32> 中的代码,判断是否 已经达到了 我们需要的 List 长度,如果已满足,则退出程序。

    private void AddToList(int num)
    {
        if (numList.Count == ENDNUM)
        {
            return;
        }
    
        numList.Add(num);
    }
    
    

    如果是个 单线程的,按照上面那样 while(true) 然后一直插入即可,可这个是个 多线程,那么需要如何处理呢?

    我思考了一下,想到了之前在 CLR 中学到的 可以用 CancellationTokenSource 中的 Cancel 来通知 Task 来取消操作。所以现在的逻辑是,用线程池来实现多线程。然后传入 CancellationTokenSource.Token 来取消任务。

    最后用 Task.WhanAny() 来获取到第一个到达此 Task 的 ID。

    首先是建立 Task[] 的数组

    internal void DoTheCompeteSecond()
    {
        Task[] tasks = new Task[10];
    
        for (int i = 0; i < 10; ++i)
        {
            int num = i;
            tasks[i] = Task.Factory.StartNew(() => AddNumToList(num, cts), cts.Token);
        }
    
        Task.WaitAny(tasks);
    }
    

    然后 AddNumToList 方法是这样定义的,

    private void AddNumToList(object state, CancellationTokenSource cts)
    {-
        Console.WriteLine("This is the {0} thread,Current ThreadId={1}",
                          state,
                          Thread.CurrentThread.ManagedThreadId);
    
        while (!cts.Token.IsCancellationRequested)
        {
            if (GetTheListCount() == ENDNUM)
            {
                cts.Cancel();
                Console.WriteLine("Current Thread Id={0},Current Count={1}",
                                  Thread.CurrentThread.ManagedThreadId,
                                  GetTheListCount());
    
                break;
            }
            var insertNum = GenerateInt32Num();
            if (numList.Contains(insertNum))
            {
                insertNum = GenerateInt32Num();
            }
    
            AddToList(insertNum);
        }
    }
    

    看起来是没有什么问题的,运行了一下。得到了如下结果,

    这应该是昨晚运行时得到的数据,当时也没有多想,就贴了上去,回答了那位提问同学的问题。但是心里有一个疑惑,为什么会同时由 两个 Thread 同时达到了该目标呢?

    发现问题

    今天早上到公司时,我又打开了这个 代码,发现确实有点不对劲,于是就和我边上 做 Go 语言开发的同学,问了问他,哪里出现了问题,他和我说:“你加了读写锁了吗?” 你这里有数据脏读写。心里面有了点眉目。

    按照他说的,修改了一下 AddToList 里面的逻辑,这时候,确实解决了上面的问题,

    private void AddToList(int num)
    {
        rwls.EnterReadLock();
        if (numList.Count == ENDNUM)
            return;
        rwls.ExitReadLock();
    
        rwls.EnterWriteLock();
        numList.Add(num);
        rwls.ExitWriteLock();
    }
    

    得到的结果如下:

    完整的代码如下所示:

    using System;
    using System.Collections.Generic;
    using System.ComponentModel;
    using System.Diagnostics;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace CSharpFundamental
    {
        class MultipleThreadCompete
        {
            List<int> numList = new List<int>();
            Random random = new Random();
            CancellationTokenSource cts = new CancellationTokenSource();
            private const int ENDNUM = 1000000;
    
            ReaderWriterLockSlim rwls = new ReaderWriterLockSlim();
    
            internal void DoTheCompeteSecond()
            {
                Stopwatch sw = new Stopwatch();
                sw.Start();
                Task[] tasks = new Task[100];
    
                for (int i = 0; i < 100; ++i)
                {
                    int num = i;
                    tasks[i] = Task.Run(() => AddNumToList(num, cts), cts.Token);
                }
    
                Task.WaitAny(tasks);
    
                Console.WriteLine("ExecuteTime={0}", sw.ElapsedMilliseconds / 1000);
            }
    
            private int GetTheListCount()
            {
                return numList.Count;
            }
    
            private void AddToList(int num)
            {
                rwls.EnterReadLock();
                if (numList.Count == ENDNUM)
                    return;
                rwls.ExitReadLock();
    
                rwls.EnterWriteLock();
                numList.Add(num);
                rwls.ExitWriteLock();
            }
    
            private void AddNumToList(object state, CancellationTokenSource cts)
            {
                Console.WriteLine("This is the {0} thread,Current ThreadId={1}",
                    state,
                    Thread.CurrentThread.ManagedThreadId);
    
                while (!cts.Token.IsCancellationRequested)
                {
                    try
                    {
                        rwls.EnterReadLock();
                        if (numList.Count == ENDNUM)
                        {
                            cts.Cancel();
                            Console.WriteLine("Current Thread Id={0},Current Count={1}",
                                Thread.CurrentThread.ManagedThreadId,
                                GetTheListCount());
                            break;
                        }
                    }
                    finally
                    {
                        rwls.ExitReadLock();
                    }
    
                    var insertNum = GenerateInt32Num();
                    if (numList.Contains(insertNum))
                    {
                        insertNum = GenerateInt32Num();
                    }
    
                    AddToList(insertNum);
                }
            }
    
            private int GenerateInt32Num()
            {
                return random.Next(1, ENDNUM);
            }
        }
    }
    

    这时候,那位 Go 语言的同学和我说,我们试试 1000w 的数据插入,看看需要多少时间?于是我让他用 Go 语言实现了一下上面的逻辑,1000w数据用了 三分钟,我让他看看总共生成了多少随机数,他查看了一下生成了 1亿4千多万的数据。

    最开始我用上面的代码来测,发现我插入 1000w 的数据,CPU 到100% 而且花了挺长时间,程序根本没反应,查看了一下我判断重复的语句numList.Contains()

    底层实现的代码为:

    [__DynamicallyInvokable]
        public bool Contains(T item)
        {
            if ((object) item == null)
            {
                for (int index = 0; index < this._size; ++index)
                {
                    if ((object) this._items[index] == null)
                        return true;
                }
                return false;
            }
            EqualityComparer<T> equalityComparer = EqualityComparer<T>.Default;
            for (int index = 0; index < this._size; ++index)
            {
                if (equalityComparer.Equals(this._items[index], item))
                    return true;
            }
            return false;
        }
    

    可想而知,如果数据量很大的话,这个循环不就 及其缓慢吗?

    我于是请教了那位 GO 的同学,判断重复的逻辑用什么来实现的,他和我说了一个位图 bitmap 的概念,

    我用其重写了一下判断重复的逻辑,代码如下:

    int[] bitmap = new int[MAX_SIZE];
    
    var index = num % TOTAL_NUM;
    bitMap[index] = 1;
    
    return bitMap[num] == 1;
    

    在添加到 List 的时候,顺便插入到 bitmap 中,判断重复只需要根据当前元素的位置是否 等于 1 即可,

    我修改代码后,跑了一下 1000w 的数据用来 3000+ ms。

    这时候,引起了他的极度怀疑,一向以高性能并发 著称的 Go 速度竟然这么慢吗?他一度怀疑我的逻辑有问题。

    下午结束了一个阶段的工作后,我又拾起了我上午写的代码,果不其然,发现了逻辑错误:

    如下:

    var insertNum = GenerateInt32Num();
    if (numList.Contains(insertNum))
    {
        insertNum = GenerateInt32Num();
    }
    

    生成随机数这里,这里有个大问题,就是其实只判断了一次,导致速度那么快,正确的写法应该是

    while (ContainsNum(currentNum))
    {
        currentNum = GenerateInt32Num();
    }
    
    private int GenerateInt32Num()
    {
        var num = random.Next(0, TOTAL_NUM);
        //Console.WriteLine(num);
    
        return num;
    }
    

    最后的代码如下:

    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace CSharpFundamental
    {
        class MultipleThreadCompete
        {
            List<int> numList = new List<int>();
            Random random = new Random();
            CancellationTokenSource cts = new CancellationTokenSource();
            private const int TOTAL_NUM = 1000000;
            private const int CURRENT_THREAD_COUNT = 35;
    
            ReaderWriterLockSlim rwls = new ReaderWriterLockSlim();
    
            int[] bitMap = new int[TOTAL_NUM];
    
            internal void DoTheCompete()
            {
                //ThreadPool.SetMinThreads(CURRENT_THREAD_COUNT, CURRENT_THREAD_COUNT);
                Stopwatch sw = new Stopwatch();
                sw.Start();
                Task[] tasks = new Task[CURRENT_THREAD_COUNT];
    
                for (int i = 0; i < CURRENT_THREAD_COUNT; ++i)
                {
                    int num = i;
                    tasks[i] = Task.Run(() => ExecuteTheTask(num, cts), cts.Token);
                }
    
                Task.WaitAny(tasks);
    
                Console.WriteLine("ExecuteTime={0}", sw.ElapsedMilliseconds);
            }
    
            private int GetTheListCount()
            {
                return numList.Count;
            }
    
            private void AddToList(int num)
            {
                if (numList.Count == TOTAL_NUM)
                    return;
                numList.Add(num);
    
                var index = num % TOTAL_NUM;
                bitMap[index] = 1;
            }
    
            private void ExecuteTheTask(object state, CancellationTokenSource cts)
            {
                Console.WriteLine("This is the {0} thread,Current ThreadId={1}",
                    state,
                    Thread.CurrentThread.ManagedThreadId);
    
                while (!cts.Token.IsCancellationRequested)
                {
                    try
                    {
                        rwls.EnterReadLock();
                        if (numList.Count == TOTAL_NUM)
                        {
                            cts.Cancel();
                            Console.WriteLine("Current Thread Id={0},Current Count={1}",
                                Thread.CurrentThread.ManagedThreadId,
                                GetTheListCount());
                            break;
                        }
                    }
                    finally
                    {
                        rwls.ExitReadLock();
                    }
    
                    var currentNum = GenerateInt32Num();
    
                    while (ContainsNum(currentNum))
                    {
                        currentNum = GenerateInt32Num();
                    }
    
                    rwls.EnterWriteLock();
                    AddToList(currentNum);
                    rwls.ExitWriteLock();
                }
            }
    
            private int GenerateInt32Num()
            {
                var num = random.Next(0, TOTAL_NUM);
                //Console.WriteLine(num);
    
                return num;
            }
    
            private bool ContainsNum(int num)
            {
                rwls.EnterReadLock();
                var contains = bitMap[num] == 1;
                rwls.ExitReadLock();
    
                return contains;
            }
        }
    }
    

    结果如下:

    但是这个代码执行 1000w的数据需要好久。 这个问题继续研究。

    源码地址:https://github.com/doublnt/dotnetcore/tree/master/CSharpFundamental

    欢迎大佬指点,还望不吝赐教。

  • 相关阅读:
    Linux Centos7配置mysql8.0数据库
    Linux Centos7配置ftp服务器
    线程池工具ThreadPoolExecutor
    Layui 实现input 输入和选择
    canvas验证码实现
    弹性布局flex 介绍
    java EE 新手入门了解
    java web工程web.xml介绍
    js 数组常用的一些方法
    详解为什么需要重写hashcode 和 equals 方法
  • 原文地址:https://www.cnblogs.com/xiyin/p/10915068.html
Copyright © 2011-2022 走看看