zoukankan      html  css  js  c++  java
  • 線程池

    // Written by 詠南工作室(陳新光) 2009-06-28 11:13:44

    //線程池

    unit Ut_CustomThread;

    interface
    uses
      Classes, SysUtils, SyncObjs, Windows;
    Type
      //
    基本线程类

      TBaseThread = class;
      //
    线程错误处理类
      EThreadException = class(Exception);
      //
    线程等待错误处理类
      EThreadTerminateAndWaitFor = class(EThreadException);
      //
    线程停止模式
      TThreadStopMode = (smTerminate, smSuspend);
      //
    例外线程事件函数事件
      TExceptionThreadEvent = procedure(AThread: TBaseThread; AException: Exception) of object;
      //
    线程事件通知函数事件
      TNotifyThreadEvent = procedure(AThread: TBaseThread) of object;
      //
    同步线程事件
      TSynchronizeThreadEvent = procedure(AThread: TBaseThread; AData: Pointer) of object;
      //
    自定义线程类
      TCustomThread = class(TThread)
      public
        //
    同步线程方法
        procedure Synchronize(Method: TThreadMethod); overload;
        //
    同步方法事件
        procedure Synchronize(Method: TMethod); overload;
        //
    返回值
        property  ReturnValue;
        //
    结束线程
        property  Terminated;
      End;
      //
    基本线程类
      TBaseThread = class(TCustomThread)
      protected
        //
    数据对象  可以是任何一对象
        FData: TObject;
        //
    临界区 用来线程数据保护同步
        FLock: TCriticalSection;
        //
    线程停止模式
        FStopMode: TThreadStopMode;
        //
    是否停止
        FStopped: Boolean;
        //
    线程例外字符串
        FTerminatingException: string;
        //
    线程停止意外类
        FTerminatingExceptionClass: TClass;
        //
    意外事件
        FOnException: TExceptionThreadEvent;
        //
    通知线程停止事件
        FOnStopped: TNotifyThreadEvent;
        //
        //
    处理例外时间
        procedure DoException (AException: Exception); virtual;
        //
    处理停止事件
        procedure DoStopped; virtual;
        //
    具体执行
        procedure Execute; override;
        //
    当前线程是否停止
        function  GetStopped: Boolean;
        //
    抽象运行
        procedure Run; virtual; abstract;
      public
        //
    运行后
        procedure AfterRun; virtual; //3* Not abstract - otherwise it is required
        //
    执行后
        procedure AfterExecute; virtual;//5 Not abstract - otherwise it is required
        //
    执行前
        procedure BeforeExecute; virtual;//1 Not abstract - otherwise it is required
        //
    运行前
        procedure BeforeRun; virtual; //2* Not abstract - otherwise it is required
        //
    释放
        procedure Cleanup; virtual;//4*
        //
    创建
        constructor Create(ACreateSuspended: Boolean = True); virtual;
        //
    释放
        destructor Destroy; override;
        //
    开始
        procedure Start; virtual;
        //
    停止
        procedure Stop; virtual;

        // Here to make virtual
        procedure Terminate; virtual;
        //
    等待推出线程
        procedure TerminateAndWaitFor; virtual;
        //
    当前对象
        property Data: TObject read FData write FData;
        //
    停止模式
        property StopMode: TThreadStopMode read FStopMode write FStopMode;
        //
    当前是否停止
        property Stopped: Boolean read GetStopped;
        // in future versions (D6+) we must move to TThread.FatalException
        property TerminatingException: string read FTerminatingException;
        property TerminatingExceptionClass: TClass read FTerminatingExceptionClass;
        // events
        property OnException: TExceptionThreadEvent read FOnException write FOnException;
        property OnStopped: TNotifyThreadEvent read FOnStopped write FOnStopped;
      End;//TBaseThread

      TBaseThreadClass = class of TBaseThread;



      //
    线程管理类
      TThreadMgr = class(TComponent)
      protected
        //
    当前线程列表
        FActiveThreads: TThreadList;
        //
    线程类
        FThreadClass: TBaseThreadClass;
        //
    线程优先级
        FThreadPriority: TThreadPriority;
      public
        //
    初始化
        constructor Create(AOwner: TComponent); override;
        //
    创建一个新的线程
        function CreateNewThread: TBaseThread; virtual;
        //
    释放
        destructor Destroy; override;
        //
    获得一个线程
        function GetThread: TBaseThread; virtual; abstract;
        //
    返回一个线程
        procedure ReleaseThread(AThread: TBaseThread); virtual; abstract;
        //
    终止线程
        procedure TerminateThreads; virtual;
        //
        property ActiveThreads: TThreadList read FActiveThreads;
        property ThreadClass: TBaseThreadClass read FThreadClass write FThreadClass;
        property ThreadPriority: TThreadPriority read FThreadPriority
         write FThreadPriority default tpNormal;
      end;

      EThreadMgrError = class(Exception);
      EThreadClassNotSpecified = class(EThreadMgrError);


     //
    线程池的管理
     type
      TThreadMgrPool = class(TThreadMgr)
      protected
        //
    当前池的大小
        FPoolSize: Integer;
        //
    当前的池
        FThreadPool: TThreadList;
        //
    停止一个线程
        procedure ThreadStopped(AThread: TBaseThread);
      public
        constructor Create(AOwner: TComponent); override;
        //
    释放线程池
        destructor Destroy; override;
        //
    获得一个线程
        function GetThread: TBaseThread; override;
        //  //
    返回一个线程
        procedure ReleaseThread(AThread: TBaseThread); override;
        //
    停止所有线程
        procedure TerminateThreads; override;
      published
        property PoolSize: Integer read FPoolSize write FPoolSize default 0;
      end;


    implementation

    { TCustomThread }
    procedure SetThreadPriority(AThread: TThread; const APriority: TThreadPriority; const APolicy: Integer = -MaxInt);
    begin
      AThread.Priority := APriority;
    end;
    //
    是否是当前线程
    function IsCurrentThread(AThread: TThread): boolean;
    begin
      result := AThread.ThreadID = GetCurrentThreadID;
    end;


    procedure TCustomThread.Synchronize(Method: TThreadMethod);
    begin
      inherited Synchronize(Method);
    end;

    procedure TCustomThread.Synchronize(Method: TMethod);
    begin
      inherited Synchronize(TThreadMethod(Method));
    end;

    { TBaseThread }

    procedure TBaseThread.AfterExecute;
    begin

    end;

    procedure TBaseThread.AfterRun;
    begin

    end;

    procedure TBaseThread.BeforeExecute;
    begin

    end;

    procedure TBaseThread.BeforeRun;
    begin

    end;

    procedure TBaseThread.Cleanup;
    begin
      FreeAndNil(FData);
    end;

    constructor TBaseThread.Create(ACreateSuspended: Boolean);
    begin
      // Before inherited - inherited creates the actual thread and if not suspeded
      // will start before we initialize
      FStopped := ACreateSuspended;
      FLock := TCriticalSection.Create;
      try
        inherited Create(ACreateSuspended);
      except
        FreeAndNil(FLock);
        raise;
      end;
    end;

    destructor TBaseThread.Destroy;
    begin
      FreeOnTerminate := FALSE; //prevent destroy between Terminate & WaitFor
      inherited Destroy; //Terminate&WaitFor
      Cleanup;
      FreeAndNil(FLock);
    end;

    procedure TBaseThread.DoException(AException: Exception);
    begin
      if Assigned(FOnException) then begin
        FOnException(self, AException);
      end;
    end;

    procedure TBaseThread.DoStopped;
    begin
      if Assigned(OnStopped) then begin
        OnStopped(Self);
      end;
    end;

    procedure TBaseThread.Execute;
    begin
      try
        try
          BeforeExecute;
          while not Terminated do begin
            if Stopped then begin
              DoStopped;
              // It is possible that either in the DoStopped or from another thread,
              // the thread is restarted, in which case we dont want to restop it.
              if Stopped then begin // DONE: if terminated?
                if Terminated then begin
                  Break;
                end;
                Suspended := True; // Thread manager will revive us
                if Terminated then begin
                  Break;
                end;
              end;
            end;

            try
              BeforeRun;
              try
                while not Stopped do begin
                  Run;
                end;
              finally
                AfterRun;
              end;//tryf
            finally
              Cleanup;
            end;

          end;//while NOT Terminated
        finally
          AfterExecute;
        end;
      except
        on E: Exception do begin
          FTerminatingExceptionClass := E.ClassType;
          FTerminatingException := E.Message;
          DoException(E);
          Terminate;
        end;
      end;//trye
    end;

    function TBaseThread.GetStopped: Boolean;
    begin
      if Assigned(FLock) then begin
        FLock.Enter;
        try
          // Suspended may be true if checking stopped from another thread
          Result := Terminated or FStopped or Suspended;
        finally FLock.Leave; end;
      end else begin
        Result := TRUE; //user call Destroy
      end;
    end;

    procedure TBaseThread.Start;
    begin
      FLock.Enter; try
        if Stopped then begin
          // Resume is also called for smTerminate as .Start can be used to initially start a
          // thread that is created suspended
          FStopped := False;
          Suspended := False;
        end;
      finally FLock.Leave; end;
    end;

    procedure TBaseThread.Stop;
    begin
      FLock.Enter;
      try
        if not Stopped then begin
          case FStopMode of
            smTerminate: Terminate;
            // DO NOT suspend here. Suspend is immediate. See Execute for implementation
            smSuspend: ;
          end;
          FStopped := True;
        end;
      finally FLock.Leave; end;
    end;

    procedure TBaseThread.Terminate;
    begin
      FStopped := True;
      inherited Terminate;
    end;

    procedure TBaseThread.TerminateAndWaitFor;
    begin

      if FreeOnTerminate then begin
       raise EThreadTerminateAndWaitFor.Create(
    thread terminate and waitfor);
      end;
      Terminate;
      if Suspended then begin
        Resume;
      end;
      WaitFor;
    end;

    { TThreadMgr }


    { TThreadMgr }

    constructor TThreadMgr.Create(AOwner: TComponent);
    begin
      inherited Create(AOwner);
      FActiveThreads := TThreadList.Create;
      FThreadPriority := tpNormal;
    end;

    function TThreadMgr.CreateNewThread: TBaseThread;
    begin
      if ThreadClass = nil then begin

    raise EThreadClassNotSpecified.create(‘’);
      end;
      Result := TBaseThreadClass(ThreadClass).Create;
      SetThreadPriority(Result, ThreadPriority);
    end;

    destructor TThreadMgr.Destroy;
    begin
      FreeAndNil(FActiveThreads);
      inherited Destroy;
    end;

    procedure TThreadMgr.TerminateThreads;
    begin

    end;

    { TThreadMgrPool }

    constructor TThreadMgrPool.Create(AOwner: TComponent);
    begin
      inherited Create(AOwner);
      FThreadPool := TThreadList.Create;
    end;

    destructor TThreadMgrPool.Destroy;
    var
      i: integer;
      LThreads: TList;
    begin
      PoolSize := 0;
      LThreads := FThreadPool.LockList;
      try
        for i := 0 to LThreads.Count - 1 do
        begin
          TBaseThread(LThreads[i]).Free;
        end;
      finally FThreadPool.UnlockList; end;
      FreeAndNil(FThreadPool);
      inherited Destroy;
    end;


    function TThreadMgrPool.GetThread: TBaseThread;
    var
      i: integer;
      LThreadPool: TList;
    begin
      //
    获得当前的池

      LThreadPool := FThreadPool.LockList;
      try
        //
    是否有可用的线程
        i := LThreadPool.Count - 1;
        if i >= 0 then
        begin
          //
    有责返回一个线程对象
          Result := TBaseThread(LThreadPool[0]);
          //
    充当前池删掉一个线程
          LThreadPool.Delete(0);
        end else begin
          //
    创建一个新的线程、
          Result := CreateNewThread;
          //
    设置停止模式
          Result.StopMode := smSuspend;
        end;
      finally FThreadPool.UnlockList; end;
      //
    添加到线程列表中
      ActiveThreads.Add(Result);
    end;

    procedure TThreadMgrPool.ReleaseThread(AThread: TBaseThread);
    var
      LThreadPool: TList;
    begin
      //
    删除当前正在使用的线程
      ActiveThreads.Remove(AThread);
      LThreadPool := FThreadPool.LockList;
      try
      //
    如果线程数量大于池的数量则释放线程
      // PoolSize = 0 means that we will keep all active threads in the thread pool
        if ((PoolSize > 0) and (LThreadPool.Count >= PoolSize)) or AThread.Terminated then begin
          if IsCurrentThread(AThread) then begin
            AThread.FreeOnTerminate := True;
            AThread.Terminate;
          end else begin
            if not AThread.Stopped then
            begin
              AThread.TerminateAndWaitFor;
            end;
            AThread.Free;
          end;
        end else begin
        //
    否则就是停止线程
          if not AThread.Suspended then begin
            AThread.OnStopped := ThreadStopped;
            AThread.Stop;
          end
          else begin
            AThread.Free;
          end;
        end;
      finally FThreadPool.UnlockList; end;
    end;

    procedure TThreadMgrPool.TerminateThreads;
    begin
      inherited TerminateThreads;

      with FThreadPool.LockList do
      try
        while Count > 0 do begin
          TBaseThread(Items[0]).FreeOnTerminate := true;
          TBaseThread(Items[0]).Terminate;
          TBaseThread(Items[0]).Start;
          Delete(0);
        end;
      finally
        FThreadPool.UnlockList;
      end;
    end;

    procedure TThreadMgrPool.ThreadStopped(AThread: TBaseThread);
    begin
      FThreadPool.Add(AThread);
    end;

    end.

     

    //調用演示

    unit Unit1;

    interface

    uses
      Windows, Messages, SysUtils, Variants, Classes, Graphics, Controls, Forms,
      Dialogs, StdCtrls
      ,Ut_CustomThread;    // uses thread pool unit

    type
      TForm1 = class(TForm)
        Button1: TButton;
        procedure Button1Click(Sender: TObject);
        procedure FormCreate(Sender: TObject);
      private
        { Private declarations }
      public
        { Public declarations }
      end;

      Tthread1 = class(TBaseThread)       
      protected
        procedure Run; override;
      end;

      Tthread2 = class(TBaseThread)
      protected
        procedure Run; override;
      end;

    var
      Form1: TForm1;
      Fthread1, Fthread2: TBaseThread;
      FthreadMgr: TThreadMgrPool;         

    implementation

    {$R *.dfm}

    procedure TForm1.Button1Click(Sender: TObject);
    begin
      // get a thread from thread pool

      FthreadMgr.ThreadClass := Tthread1;
      Fthread1 := FthreadMgr.GetThread;
      FthreadMgr.ThreadClass := Tthread2;
      Fthread2 := FthreadMgr.GetThread;
      // stopMode 

      Fthread1.StopMode := smSuspend;
      Fthread2.StopMode := smSuspend;
      // thread execute

      Tthread1(Fthread1).Run;
      // return thread to thread pool 

      FthreadMgr.ReleaseThread(Fthread1);
      FthreadMgr.ReleaseThread(Fthread2);
    end;

    { Tthread1 }

    procedure Tthread1.Run; 

    var
      i: Integer;
    begin
      inherited;
      for i := 1 to 500000 do
        Form1.Canvas.TextOut(100, 100, IntToStr(i));
    end;

    { Tthread2 }

    procedure Tthread2.Run;   
    var
      i: Integer;
    begin
      inherited;
      for i := 1 to 500000 do
        Form1.Canvas.TextOut(100, 200, IntToStr(i));
    end;

    procedure TForm1.FormCreate(Sender: TObject);
    begin
      // create thread pool manager 

      FthreadMgr := TThreadMgrPool.Create(Self);
      // max create two thread
      FthreadMgr.PoolSize := 2;
    end;

    end.

  • 相关阅读:
    python函数嵌套的实用技术
    windows10 装linux子系统
    彻底测试全部拷贝list相关操作的区别python
    c语言学习
    Pickling
    Filenames and paths
    Format operator
    Reading and writing
    Persistence
    Automation testing tool comparison
  • 原文地址:https://www.cnblogs.com/hnxxcxg/p/2940770.html
Copyright © 2011-2022 走看看