zoukankan      html  css  js  c++  java
  • 队列处理器 WorkQueue<T>

    代码
    using System;
    using System.Collections.Generic;
    using System.Text;
    using System.Threading;

    public delegate void AnonymousHandler();

    public class WorkQueue : WorkQueue<AnonymousHandler> {
    public WorkQueue() : this(16, -1) { }
    public WorkQueue(int thread)
    :
    this(thread, -1) {
    }
    public WorkQueue(int thread, int capacity) {
    base.Thread = thread;
    base.Capacity = capacity;
    base.Process += delegate(AnonymousHandler ah) {
    ah();
    };
    }
    }

    public class WorkQueue<T> : IDisposable {
    public delegate void WorkQueueProcessHandler(T item);
    public event WorkQueueProcessHandler Process;

    private int _thread = 16;
    private int _capacity = -1;
    private int _work_index = 0;
    private Dictionary<int, WorkInfo> _works = new Dictionary<int, WorkInfo>();
    private object _works_lock = new object();
    private Queue<T> _queue = new Queue<T>();
    private object _queue_lock = new object();

    public WorkQueue() : this(16, -1) { }
    public WorkQueue(int thread)
    :
    this(thread, -1) {
    }
    public WorkQueue(int thread, int capacity) {
    _thread
    = thread;
    _capacity
    = capacity;
    }

    public void Enqueue(T item) {
    lock (_queue_lock) {
    if (_capacity > 0 && _queue.Count >= _capacity) return;
    _queue.Enqueue(item);
    }
    lock (_works_lock) {
    foreach (WorkInfo w in _works.Values) {
    if (w.IsWaiting) {
    w.Set();
    return;
    }
    }
    }
    if (_works.Count < _thread) {
    if (_queue.Count > 0) {
    int index = 0;
    lock (_works_lock) {
    index
    = _work_index++;
    _works.Add(index,
    new WorkInfo());
    }
    new Thread(delegate() {
    WorkInfo work
    = _works[index];
    while (true) {
    List
    <T> de = new List<T>();
    if (_queue.Count > 0) {
    lock (_queue_lock) {
    if (_queue.Count > 0) {
    de.Add(_queue.Dequeue());
    }
    }
    }

    if (de.Count > 0) {
    try {
    this.OnProcess(de[0]);
    }
    catch {
    }
    }

    if (_queue.Count == 0) {
    work.WaitOne(TimeSpan.FromSeconds(
    20));

    if (_queue.Count == 0) {
    break;
    }
    }
    }
    lock (_works_lock) {
    _works.Remove(index);
    }
    work.Dispose();
    }).Start();
    }
    }
    }

    protected virtual void OnProcess(T item) {
    if (Process != null) {
    Process(item);
    }
    }

    #region IDisposable 成员

    public void Dispose() {
    lock (_queue_lock) {
    _queue.Clear();
    }
    lock (_works_lock) {
    foreach (WorkInfo w in _works.Values) {
    w.Dispose();
    }
    }
    }

    #endregion

    public int Thread {
    get { return _thread; }
    set {
    if (_thread != value) {
    _thread
    = value;
    }
    }
    }
    public int Capacity {
    get { return _capacity; }
    set {
    if (_capacity != value) {
    _capacity
    = value;
    }
    }
    }

    public int UsedThread {
    get { return _works.Count; }
    }
    public int Queue {
    get { return _queue.Count; }
    }

    public string Statistics {
    get {
    string value = string.Format(@"线程:{0}/{1}
    队列:{2}

    ", _works.Count, _thread, _queue.Count);
    int[] keys = new int[_works.Count];
    try {
    _works.Keys.CopyTo(keys,
    0);
    }
    catch {
    lock (_works_lock) {
    keys
    = new int[_works.Count];
    _works.Keys.CopyTo(keys,
    0);
    }
    }
    foreach (int k in keys) {
    WorkInfo w
    = null;
    if (_works.TryGetValue(k, out w)) {
    value
    += string.Format(@"线程{0}:{1}
    ", k, w.IsWaiting);
    }
    }
    return value;
    }
    }

    class WorkInfo : IDisposable {
    private ManualResetEvent _reset = new ManualResetEvent(false);
    private bool _isWaiting = false;

    public void WaitOne(TimeSpan timeout) {
    try {
    _reset.Reset();
    _isWaiting
    = true;
    _reset.WaitOne(timeout,
    false);
    }
    catch { }
    }
    public void Set() {
    try {
    _isWaiting
    = false;
    _reset.Set();
    }
    catch { }
    }

    public bool IsWaiting {
    get { return _isWaiting; }
    }

    #region IDisposable 成员

    public void Dispose() {
    this.Set();
    _reset.Close();
    }

    #endregion
    }
    }

    调用方法:

    代码
    //方法1
    WorkQueue<xxx> writeWQ = new WorkQueue<xxx>(5, 1000); //有重载,最多开辟5个线程同时处理,队列大小设为1000,如果超出则不处理

    for (int a = 0; a < 1000000; a++) {

    //此方法按常理会立即返回,因为执行交给 WorkQueue 处理了
    writeWQ.Enqueue(delegate(xxx msg) {
    //处理
    });
    }

    //方法2,执行一个匿名委托
    WorkQueue writeWQ = new WorkQueue(5, 1000); //有重载,最多开辟5个线程同时处理,队列大小设为1000,如果超出则不处理

    for (int a = 0; a < 1000000; a++) {

    //此方法按常理会立即返回,因为执行交给 WorkQueue 处理了
    writeWQ.Enqueue(delegate() {
    //处理
    });
    }

  • 相关阅读:
    C# 中的委托和事件
    POJ题目分类
    A*算法
    Poj 1077 eight(BFS+全序列Hash解八数码问题)
    Poj 2304 Combination Lock(模拟顺、逆时钟开组合锁)
    Poj 2247 Humble Numbers(求只能被2,3,5, 7 整除的数)
    Poj 2328 Guessing Game(猜数字游戏)
    Poj 2403 Hay Points(Map)
    Poj 1338 Ugly Numbers(数学推导)
    Poj 1504 Adding Reversed Numbers(用字符串反转数字)
  • 原文地址:https://www.cnblogs.com/baobao2010/p/1795610.html
Copyright © 2011-2022 走看看