zoukankan      html  css  js  c++  java
  • Marshal Code Into Another Thread(STAThread)

    出处:http://www.codeproject.com/Articles/32113/Understanding-SynchronizationContext-Part-II

    SynchronizationContextMSND解释:

    SynchronizationContext 类是一个基类,可提供不带同步的自由线程上下文。此类实现的同步模型的目的是使公共语言运行时内部的异步/同步操作能够针对不同的异步模型采取正确的行为。此模型还简化了托管应用程序为在不同的同步环境下正常工作而必须遵循的一些要求。同步模型的提供程序可以扩展此类并为这些方法提供自己的实现。 

    一个我们所熟悉的SynchronizationContext是System.Windows.Forms.WindowsFormsSynchronizationContext它提供了将操作“封送”到UI线程的能力,而文章中演示了如何通过自己实现的SynchronizationContext将方法“封送”(Marshal)到STA线程上执行,并自己实现了STA线程模型。对了解多线程同步、SynchronizationContext和STA线程模型很有帮助值得学习。

    BlockingQueue
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    
    namespace SynchronizationContextDemo
    {
        internal interface IQueueReader<T> : IDisposable
        {
            T Dequeue();
            void ReleaseReader();
        }
    
        internal interface IQueueWriter<T> : IDisposable
        {
            void Enqueue(T data);
        }
    
        internal class BlockingQueue<T> : IQueueReader<T>, IQueueWriter<T>, IDisposable
        {
            // use a .NET queue to store the data
            private Queue<T> mQueue = new Queue<T>();
            // create a semaphore that contains the items in the queue as resources.
            // initialize the semaphore to zero available resources (empty queue).
            private Semaphore mSemaphore = new Semaphore(0, int.MaxValue);
            // a event that gets triggered when the reader thread is exiting
            private ManualResetEvent mKillThread = new ManualResetEvent(false);
            // wait handles that are used to unblock a Dequeue operation.
            // Either when there is an item in the queue
            // or when the reader thread is exiting.
            private WaitHandle[] mWaitHandles;
    
            public BlockingQueue()
            {
                mWaitHandles = new WaitHandle[2] { mSemaphore, mKillThread };
            }
            public void Enqueue(T data)
            {
                lock (mQueue)
                {
                    mQueue.Enqueue(data);
                    //mQueue.Enqueue(data);//test
                }
                // add an available resource to the semaphore,
                // because we just put an item
                // into the queue.
                mSemaphore.Release();
                //mSemaphore.Release(2);//test
            }
    
            public T Dequeue()
            {
                // wait until there is an item in the queue
                WaitHandle.WaitAny(mWaitHandles);
                lock (mQueue)
                {
                    if (mQueue.Count > 0)
                        return mQueue.Dequeue();
                }
                return default(T);
            }
    
            public void ReleaseReader()
            {
                mKillThread.Set();
            }
    
    
            void IDisposable.Dispose()
            {
                if (mSemaphore != null)
                {
                    mSemaphore.Close();
                    mQueue.Clear();
                    mSemaphore = null;
                }
            }
        }
    }
    SendOrPostCallbackItem
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    
    namespace SynchronizationContextDemo
    {
        internal enum ExecutionType
        {
            Post,
            Send
        }
    
        internal class SendOrPostCallbackItem
        {
            object mState;
            private ExecutionType mExeType;
            SendOrPostCallback mMethod;
            ManualResetEvent mAsyncWaitHandle = new ManualResetEvent(false);
            Exception mException = null;
    
            internal SendOrPostCallbackItem(SendOrPostCallback callback,
               object state, ExecutionType type)
            {
                mMethod = callback;
                mState = state;
                mExeType = type;
            }
    
            internal Exception Exception
            {
                get { return mException; }
            }
    
            internal bool ExecutedWithException
            {
                get { return mException != null; }
            }
    
            // this code must run ont the STA thread
            internal void Execute()
            {
                if (mExeType == ExecutionType.Send)
                    Send();
                else
                    Post();
            }
    
            // calling thread will block until mAsyncWaitHandle is set
            internal void Send()
            {
                try
                {
                    // call the thread
                    mMethod(mState);
                }
                catch (Exception e)
                {
                    mException = e;
                }
                finally
                {
                    mAsyncWaitHandle.Set();
                }
            }
    
            /// <summary />
            /// Unhandled exceptions will terminate the STA thread
            /// </summary />
            internal void Post()
            {
                mMethod(mState);
            }
    
            internal WaitHandle ExecutionCompleteWaitHandle
            {
                get { return mAsyncWaitHandle; }
            }
        }
    }
    StaThread
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    
    namespace SynchronizationContextDemo
    {
        internal class StaThread
        {
            private Thread mStaThread;
            private IQueueReader<SendOrPostCallbackItem> mQueueConsumer;
    
            private ManualResetEvent mStopEvent = new ManualResetEvent(false);
    
            internal StaThread(IQueueReader<SendOrPostCallbackItem> reader)
            {
                mQueueConsumer = reader;
                mStaThread = new Thread(Run);
                mStaThread.Name = "STA Worker Thread";
                mStaThread.SetApartmentState(ApartmentState.STA);
            }
    
            internal void Start()
            {
                mStaThread.Start();
            }
    
    
            internal void Join()
            {
                mStaThread.Join();
            }
    
            private void Run()
            {
    
                while (true)
                {
                    bool stop = mStopEvent.WaitOne(0);
                    if (stop)
                    {
                        break;
                    }
    
                    SendOrPostCallbackItem workItem = mQueueConsumer.Dequeue();
                    if (workItem != null)
                        workItem.Execute();
                }
            }
    
            internal void Stop()
            {
                mStopEvent.Set();
                mQueueConsumer.ReleaseReader();
                mStaThread.Join();
                mQueueConsumer.Dispose();
    
            }
        }
    }
    StaSynchronizationContext
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    
    namespace SynchronizationContextDemo
    {
        public class StaSynchronizationContext : SynchronizationContext, IDisposable
        {
            private BlockingQueue<SendOrPostCallbackItem> mQueue;
            private StaThread mStaThread;
            public StaSynchronizationContext()
                : base()
       {
          mQueue = new BlockingQueue<SendOrPostCallbackItem>();
          mStaThread = new StaThread(mQueue);
          mStaThread.Start();
       }
    
            public override void Send(SendOrPostCallback d, object state)
            {
                // create an item for execution
                SendOrPostCallbackItem item = new SendOrPostCallbackItem(d, state,
                                                  ExecutionType.Send);
                // queue the item
                mQueue.Enqueue(item);
                // wait for the item execution to end
                item.ExecutionCompleteWaitHandle.WaitOne();
    
                // if there was an exception, throw it on the caller thread, not the
                // sta thread.
                if (item.ExecutedWithException)
                    throw item.Exception;
            }
    
            public override void Post(SendOrPostCallback d, object state)
            {
                // queue the item and don't wait for its execution. This is risky because
                // an unhandled exception will terminate the STA thread. Use with caution.
                SendOrPostCallbackItem item = new SendOrPostCallbackItem(d, state,
                                                  ExecutionType.Post);
                mQueue.Enqueue(item);
            }
    
            public void Dispose()
            {
                mStaThread.Stop();
    
            }
    
            public override SynchronizationContext CreateCopy()
            {
                return this;
            }
        }
    }
    Main
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Diagnostics;
    
    namespace SynchronizationContextDemo
    {
        public class Params
        {
            public string Output { get; set; }
            public int CallCounter { get; set; }
            public int OriginalThread { get; set; }
        }
    
        class Program
        {
            private static int mCount = 0;
            private static StaSynchronizationContext mStaSyncContext = null;
            static void Main(string[] args)
            {
                mStaSyncContext = new StaSynchronizationContext();
                for (int i = 0; i < 100; i++)
                {
                    ThreadPool.QueueUserWorkItem(NonStaThread);
    
                }
                Console.WriteLine("Processing");
                Console.WriteLine("Press any key to dispose SyncContext");
                Console.ReadLine();
                mStaSyncContext.Dispose();
            }
    
            private static void NonStaThread(object state)
            {
                int id = Thread.CurrentThread.ManagedThreadId;
    
                for (int i = 0; i < 10; i++)
                {
                    var param = new Params { OriginalThread = id, CallCounter = i };
                    mStaSyncContext.Send(RunOnStaThread, param);
                    Debug.Assert(param.Output == "Processed", "Unexpected behavior by STA thread");
                }
            }
    
            private static void RunOnStaThread(object state)
            {
                mCount++;
                Console.WriteLine(mCount);
                int id = Thread.CurrentThread.ManagedThreadId;
                var args = (Params)state;
                Trace.WriteLine("STA id " + id + " original thread " +
                                args.OriginalThread + " call count " + args.CallCounter);
                args.Output = "Processed";
    
            }
        }
    }
    作者:zhanjindong
    出处:http://www.cnblogs.com/zhanjindong
    个人博客:http://zhanjindong.com
    关于:一个程序员而已
    说明:目前的技术水平有限,博客定位于学习心得和总结。
  • 相关阅读:
    装饰者模式
    代理模式
    享元模式
    模板模式
    命令模式
    建造者模式
    单例模式
    观察者模式
    迭代器模式
    访问者模式
  • 原文地址:https://www.cnblogs.com/zhanjindong/p/2978446.html
Copyright © 2011-2022 走看看