zoukankan      html  css  js  c++  java
  • 一个C#多线程的工作队列

        自己写了一个多线程的工作队列,能够实现对队列中对象的自动处理。多线程添加元素到队列中,队列根据绑定

    的事件进行自动处理,可以设置WorkSequential属性来实现对队列处理的单线程(严格顺序处理)或者多线程处理(循序出队,但是

    多线程处理,不保证对队列元素的处理顺利)的选择。

      源码:

    代码
    /***********多线程的工作队列***************
    * 此工作队列保证线程安全性
    *
    *
    *
    *
    * ******
    */
    namespace WorkQueue
    {
    using System.Collections.Generic;
    using System;
    using System.Threading;

    public delegate void UserWorkEventHandler<T>(object sender, WorkQueue<T>.EnqueueEventArgs e);
    public class WorkQueue<T>
    {
    private bool IsWorking; //表明处理线程是否正在工作
    private object lockIsWorking = new object();//对IsWorking的同步对象


    private Queue<T> queue; //实际的队列
    private object lockObj = new object(); //队列同步对象





    /// <summary>
    /// 绑定用户需要对队列中的item对象
    /// 施加的操作的事件
    /// </summary>
    public event UserWorkEventHandler<T> UserWork;

    public WorkQueue(int n)
    {
    queue
    = new Queue<T>(n);
    }

    public WorkQueue()
    {
    queue
    = new Queue<T>();
    }

    /// <summary>
    /// 谨慎使用此函数,
    /// 只保证此瞬间,队列值为空
    /// </summary>
    /// <returns></returns>
    public bool IsEmpty()
    {
    lock (lockObj)
    {
    return queue.Count == 0;
    }
    }

    private bool isOneThread;

    /// <summary>
    /// 队列处理是否需要单线程顺序执行
    /// ture表示单线程处理队列的T对象
    /// 默认为false,表明按照顺序出队,但是多线程处理item
    /// *****注意不要频繁改变此项****
    /// </summary>
    public bool WorkSequential
    {
    get
    {
    return isOneThread;
    }
    set
    {
    isOneThread
    = value;
    }

    }



    /// <summary>
    /// 向工作队列添加对象,
    /// 对象添加以后,如果已经绑定工作的事件
    /// 会触发事件处理程序,对item对象进行处理
    /// </summary>
    /// <param name="item">添加到队列的对象</param>
    public void EnqueueItem(T item)
    {
    lock (lockObj)
    {
    queue.Enqueue(item);
    }

    lock (lockIsWorking)
    {
    if (!IsWorking)
    {
    IsWorking
    = true;
    ThreadPool.QueueUserWorkItem(doUserWork);
    }
    }



    }


    /// <summary>
    /// 处理队列中对象的函数
    /// </summary>
    /// <param name="o"></param>
    private void doUserWork(object o)
    {
    try
    {
    T item;

    while (true)
    {
    lock (lockObj)
    {
    if (queue.Count > 0)
    {
    item
    = queue.Dequeue();
    }
    else
    {
    return;
    }
    }
    if (!item.Equals(default(T)))
    {

    if (isOneThread)
    {
    if (UserWork != null)
    {
    UserWork(
    this, new EnqueueEventArgs(item));
    }
    }
    else
    {
    ThreadPool.QueueUserWorkItem(obj
    =>
    {
    if (UserWork != null)
    {
    UserWork(
    this, new EnqueueEventArgs(obj));
    }
    }, item);
    }


    }

    }
    }
    finally
    {
    lock (lockIsWorking)
    {
    IsWorking
    = false;
    }

    }
    }

    /// <summary>
    /// UserWork事件的参数,包含item对象
    /// </summary>
    public class EnqueueEventArgs : EventArgs
    {
    public T Item { get; private set; }
    public EnqueueEventArgs(object item)
    {
    try
    {
    Item
    = (T)item;
    }
    catch (Exception)
    {

    throw new InvalidCastException("object to T 转换失败");
    }
    }
    }
    }
    }
      示例代码:
    代码
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.IO;
    using WorkQueue;
    namespace Program
    {
    class Program
    {
    private static List<string> list=new List<string>(1000);
    static StreamWriter sw = new StreamWriter(new FileStream("test.dat", FileMode.Create));
    static void Main(string[] args)
    {
    WorkQueue
    <int> workQueue=new WorkQueue<int>(1000);
    workQueue.UserWork
    += new UserWorkEventHandler<int>(workQueue_UserWork);
    // workQueue.WorkSequential = true;
    ThreadPool.QueueUserWorkItem(o =>
    {
    for (int i = 0; i < 1000; i++)
    {
    workQueue.EnqueueItem(i);

    }
    });
    Console.ReadLine();

    list.ForEach(str
    =>sw.WriteLine(str));
    Console.WriteLine(workQueue.IsEmpty());
    sw.Close();
    }

    static void workQueue_UserWork(object sender, WorkQueue<int>.EnqueueEventArgs e)
    {

    StringBuilder sb
    =new StringBuilder();
    sb.Append(e.Item).Append(
    "\t\t").Append(DateTime.Now.ToString("u")+"\t\t").Append(Thread.CurrentThread.ManagedThreadId);
    list.Add(sb.ToString());
    Thread.Sleep(
    15);
    }
    }
    }
  • 相关阅读:
    nodejs事件和事件循环详解
    keycloak集群化的思考
    Python解释器和IPython
    IndexedDB详解
    在onelogin中使用OpenId Connect Implicit Flow
    在onelogin中使用OpenId Connect Authentication Flow
    SAML和OAuth2这两种SSO协议的区别
    wildfly 21的配置文件和资源管理
    【老孟Flutter】2021 年 Flutter 官方路线图
    【老孟Flutter】为什么 build 方法放在 State 中而不是在 StatefulWidget 中
  • 原文地址:https://www.cnblogs.com/miniwiki/p/1774583.html
Copyright © 2011-2022 走看看