zoukankan      html  css  js  c++  java
  • Delphi实现线程池组件(完整源码)

    //******************************************************************************
    //线程池
    //******************************************************************************
    //作者:Cai
    //日期:2011-3-10
    //******************************************************************************
    unit ThreadPoolClass;

    interface
    uses
    Windows, Classes, SyncObjectClass;

    type
    TThreadPool = class;


    TOnTerminateTask = procedure (Sender: TObject) of object;
    TTaskObject = class(TObject)
    private
    FOwner: TThread;
    FOnTerminateTask: TOnTerminateTask;
    FThreadID: Cardinal;
    FTaskID : Cardinal;
    procedure SetOnTerminateTask(const Value: TOnTerminateTask);
    protected
    procedure Execute();virtual; abstract;
    procedure WaitFor(iTimeOut: Cardinal);virtual;
    procedure Terminate;
    public
    constructor Create();virtual;
    destructor Destroy();override;
    procedure Synchronize(AMethod: TThreadMethod);
    property ThreadID:Cardinal read FThreadID;
    property TaskID:Cardinal read FTaskID;
    property OnTerminateTask: TOnTerminateTask read FOnTerminateTask write SetOnTerminateTask;
    end;

    TThreadPolicyInt = 0..6;

    TOnTerminatingTask = procedure(Sender: TObject; TaskObject: TTaskObject) of object;

    TThreadPool = class(TObject)
    private
    FCriticalSectionLocker: TCriticalSectionLocker;
    FThreadList: TList;
    FTaskObjectList: TList;
    FThreadMaxNum: Integer;
    FOnTerminatingTask: TOnTerminatingTask;
    FThreadPriority: TThreadPolicyInt;
    FNextTaskID: Cardinal;// 可记录已处理的任务数
    procedure SetThreadMaxNum(const Value: Integer);
    procedure SetOnTerminatingTask(const Value: TOnTerminatingTask);
    procedure SetThreadPriority(const Value: TThreadPolicyInt);
    protected
    function GetIdelThreadNum(): Integer;
    function WakeUpThreads(iNum:Integer): Integer;
    procedure GetTaskFromList(var TaskObject: TTaskObject; bPop:Boolean=True);
    procedure AddTaskToList(TaskObject: TTaskObject);
    procedure DeleteTaskFromList(TaskObject: TTaskObject);
    procedure ClearTaskList();
    procedure ClearThreadList();
    procedure ClearList();
    public
    constructor Create();virtual;
    destructor Destroy();override;
    procedure AddTask(TaskObject: TTaskObject);
    procedure KillTask(TaskObject: TTaskObject);
    procedure Clear();
    procedure WaitFor(iTimeOut:Cardinal);virtual;
    function IsThreadDone():Boolean;
    property ThreadMaxNum: Integer read FThreadMaxNum write SetThreadMaxNum;
    property ThreadPriority: TThreadPolicyInt read FThreadPriority write SetThreadPriority;
    property OnTerminatingTask: TOnTerminatingTask read FOnTerminatingTask write SetOnTerminatingTask;
    end;

    implementation

    type
    TTaskStatus = (tsRunning, {tsSuspend, tsWillTerminate, }tsTerminating, tsTerminated, tsDestroying);

    TThreadItem = class(TThread)
    private
    FCriticalSectionLocker: TCriticalSectionLocker;
    FOwner: TThreadPool;
    FTaskStatus: TTaskStatus;
    // FNextTaskStatus: TTaskStatus;
    FCurTaskObject: TTaskObject;
    procedure SetOwner(const Value: TThreadPool);
    protected
    procedure Execute();override;
    procedure SetTaskStatus(TaskStatus: TTaskStatus);
    public
    constructor Create();overload; virtual;
    destructor Destroy();override;
    property Owner: TThreadPool read FOwner write SetOwner;
    end;

    { TThreadPool }

    constructor TThreadPool.Create;
    begin
    FCriticalSectionLocker:= TCriticalSectionLocker.Create;
    FThreadList:=TList.Create;
    FTaskObjectList:=TList.Create;
    FThreadMaxNum := 3;
    FThreadPriority := 4;
    end;

    destructor TThreadPool.Destroy;
    begin
    ClearList();
    FThreadList.Destroy;
    FThreadList := nil;
    FTaskObjectList.Destroy;
    FTaskObjectList := nil;
    FCriticalSectionLocker.Destroy;
    inherited;
    end;

    procedure TThreadPool.KillTask(TaskObject: TTaskObject);
    begin
    DeleteTaskFromList(TaskObject);
    end;

    procedure TThreadPool.SetThreadMaxNum(const Value: Integer);
    begin
    FThreadMaxNum := Value;
    end;

    procedure TThreadPool.AddTask(TaskObject: TTaskObject);
    begin
    AddTaskToList(TaskObject);
    end;

    procedure TThreadPool.AddTaskToList(TaskObject: TTaskObject);
    var
    ThreadItem: TThreadItem;
    begin
    if not FCriticalSectionLocker.EnterLocker() then Exit;
    try
    if FTaskObjectList.IndexOf(TaskObject)>=0 then Exit;
    FTaskObjectList.Add(TaskObject);
    TaskObject.FTaskID := FNextTaskID;
    Inc(FNextTaskID);
    //检查线程数是否足够
    //======================================
    if WakeUpThreads(1)=0 then//没有线程被唤醒
    if FThreadList.Count < FThreadMaxNum then
    begin
    ThreadItem:= TThreadItem.Create();
    ThreadItem.Priority := TThreadPriority(FThreadPriority);
    FThreadList.Add(ThreadItem);
    ThreadItem.FOwner := Self;
    WakeUpThreads(1);
    end;
    finally
    FCriticalSectionLocker.LeaveLocker();
    end;
    end;

    procedure TThreadPool.GetTaskFromList(var TaskObject: TTaskObject; bPop:Boolean=True);
    begin
    TaskObject := nil;
    if not FCriticalSectionLocker.EnterLocker() then Exit;
    try
    if FTaskObjectList.Count=0 then Exit;
    TaskObject := TTaskObject(FTaskObjectList.Items[0]);
    if bPop then FTaskObjectList.Delete(0);
    finally
    FCriticalSectionLocker.LeaveLocker();
    end;
    end;

    procedure TThreadPool.DeleteTaskFromList(TaskObject: TTaskObject);
    var
    iIndex: Integer;
    begin
    if not FCriticalSectionLocker.EnterLocker() then Exit;
    try
    if Assigned(TaskObject) then Exit;
    iIndex := FTaskObjectList.IndexOf(Pointer(TaskObject));
    if iIndex = -1 then Exit;
    if TaskObject.FOwner=nil then Exit;
    if TThreadItem(TaskObject.FOwner).FTaskStatus<>tsTerminated then
    begin
    TaskObject.Terminate();
    TaskObject.WaitFor(DWORD(-1));
    end;
    FTaskObjectList.Delete(iIndex);
    finally
    FCriticalSectionLocker.LeaveLocker();
    end;
    end;

    procedure TThreadPool.SetOnTerminatingTask(
    const Value: TOnTerminatingTask);
    begin
    FOnTerminatingTask := Value;
    end;

    function TThreadPool.GetIdelThreadNum: Integer;
    var
    I: Integer;
    begin
    Result := 0;
    if FThreadList.Count>0 then
    for I:=0 to FThreadList.Count-1 do
    begin
    if TThread(FThreadList.Items[I]).Suspended then
    Inc(Result);
    end;
    end;

    function TThreadPool.WakeUpThreads(iNum: Integer): Integer;
    var
    I: Integer;
    begin
    Result := 0;
    if FThreadList.Count>0 then
    for I:=0 to FThreadList.Count-1 do
    begin
    if TThread(FThreadList.Items[I]).Suspended then
    TThread(FThreadList.Items[I]).Resume;
    end;
    end;

    procedure TThreadPool.ClearList;
    begin
    ClearTaskList();
    ClearThreadList();
    end;

    procedure TThreadPool.ClearTaskList;
    var
    I: Integer;
    begin
    //if not FCriticalSectionLocker.EnterLocker() then Exit;
    //try
    if FTaskObjectList.Count>0 then
    for I:=FTaskObjectList.Count-1 downto 0 do
    begin
    if TTaskObject(FTaskObjectList.Items[I])<>nil then
    if (TTaskObject(FTaskObjectList.Items[I]).FOwner<>nil) then
    begin
    TTaskObject(FTaskObjectList.Items[I]).Terminate();
    TTaskObject(FTaskObjectList.Items[I]).WaitFor(DWORD(-1));
    if (TTaskObject(FTaskObjectList.Items[I])<>nil) and
    Assigned(TTaskObject(FTaskObjectList.Items[I])) then
    TTaskObject(FTaskObjectList.Items[I]).FOwner := nil;
    end;
    //不能释放。。因为不是TThreadPool创建的资源
    //TTaskObject(FTaskObjectList.Items[I]).Destroy;
    FTaskObjectList.Delete(I);
    end;
    //finally
    //FCriticalSectionLocker.LeaveLocker();
    //end;
    end;

    procedure TThreadPool.ClearThreadList;
    var
    I: Integer;
    begin
    if FThreadList.Count>0 then
    for I:=FThreadList.Count-1 downto 0 do
    begin
    if Assigned(TThreadItem(FThreadList.Items[I])) then
    begin
    if (TThreadItem(FThreadList.Items[I]).FCurTaskObject<>nil) then
    begin
    if (TThreadItem(FThreadList.Items[I]).FTaskStatus<>tsTerminated) then
    begin
    TThreadItem(FThreadList.Items[I]).FCurTaskObject.Terminate;
    TThreadItem(FThreadList.Items[I]).FCurTaskObject.WaitFor(DWORD(-1));
    if (TThreadItem(FThreadList.Items[I]).FCurTaskObject <>nil) and
    Assigned(TThreadItem(FThreadList.Items[I]).FCurTaskObject) then
    TThreadItem(FThreadList.Items[I]).FCurTaskObject.FOwner := nil;
    end;
    end;
    TThreadItem(FThreadList.Items[I]).Free;
    end;
    FThreadList.Delete(I);
    end;
    end;

    procedure TThreadPool.WaitFor(iTimeOut: Cardinal);
    var
    iFirst: Cardinal;
    begin
    iFirst := GetTickCount();
    while (iTimeOut=DWORD(-1)) or
    ((GetTickCount()-iFirst)>=iTimeOut) do
    begin
    if IsThreadDone() then Break;
    Sleep(10);
    end;
    end;

    function TThreadPool.IsThreadDone: Boolean;
    var
    I: Integer;
    begin
    Result := False;
    if not FCriticalSectionLocker.EnterLocker() then Exit;
    try
    //任务不为空时肯定没有完成,可立即返回False
    if (FTaskObjectList<>nil) and (FTaskObjectList.Count=0) then
    begin
    for I:=0 to FThreadList.Count-1 do
    if not (TThreadItem(FThreadList.Items[I]).Suspended or
    TThreadItem(FThreadList.Items[I]).Terminated) then Exit;//Suspended then Exit;
    Result := True;
    end;
    finally
    FCriticalSectionLocker.LeaveLocker();
    end;
    end;

    procedure TThreadPool.SetThreadPriority(const Value: TThreadPolicyInt);
    begin
    FThreadPriority := Value;
    end;

    procedure TThreadPool.Clear();
    begin
    ClearList();
    FNextTaskID := 0;
    end;

    { TThreadItem }

    constructor TThreadItem.Create();
    begin
    FCriticalSectionLocker:= TCriticalSectionLocker.Create;
    Create(True);
    FTaskStatus:= tsTerminated;
    end;

    destructor TThreadItem.Destroy;
    begin
    FCriticalSectionLocker.Destroy;
    FTaskStatus:= tsDestroying;
    inherited;
    end;

    procedure TThreadItem.Execute;
    var
    TaskObject: TTaskObject;
    begin
    inherited;
    while not Self.Terminated do
    begin
    //申请任务
    if FOwner=nil then Break;
    FOwner.GetTaskFromList(TaskObject);
    //无任务。挂起等待Pool唤醒
    if TaskObject=nil then
    begin
    Self.Suspend;
    Continue;//保证唤醒后重新申请任务
    end;
    //绑定任务与当前线程
    TaskObject.FOwner := Self;
    TaskObject.FThreadID := Self.ThreadID;
    FCurTaskObject := TaskObject;
    Self.SetTaskStatus(tsRunning);
    //执行任务
    TaskObject.Execute();
    Self.SetTaskStatus(tsTerminating);
    Self.SetTaskStatus(tsTerminated);
    if Assigned(TaskObject.FOnTerminateTask) then TaskObject.FOnTerminateTask(TaskObject);
    //解除当前绑定关系
    FOwner.DeleteTaskFromList(TaskObject);
    TaskObject.FOwner := nil;
    FCurTaskObject := nil;
    if Assigned(FOwner.FOnTerminatingTask) then FOwner.FOnTerminatingTask(FOwner, TaskObject);
    end;
    //不释放线程时,挂起,保留线程资源
    if FTaskStatus<>tsDestroying then
    Self.Suspended := True;
    end;

    procedure TThreadItem.SetOwner(const Value: TThreadPool);
    begin
    FOwner := Value;
    end;

    procedure TThreadItem.SetTaskStatus(TaskStatus: TTaskStatus);
    begin
    if not Assigned(Self) or (not Assigned(FCriticalSectionLocker)) then
    begin
    if Self<>nil then ;
    Exit;
    end;
    if not FCriticalSectionLocker.EnterLocker() then Exit;
    try
    FTaskStatus := TaskStatus;
    finally
    FCriticalSectionLocker.LeaveLocker;
    end;
    end;

    { TTaskObject }

    constructor TTaskObject.Create;
    begin
    //
    end;

    destructor TTaskObject.Destroy;
    begin
    Terminate();
    WaitFor(DWORD(-1));
    inherited;
    end;

    procedure TTaskObject.SetOnTerminateTask(const Value: TOnTerminateTask);
    begin
    FOnTerminateTask := Value;
    end;

    procedure TTaskObject.Synchronize(AMethod: TThreadMethod);
    begin
    TThread.Synchronize(Self.FOwner, AMethod);
    end;

    procedure TTaskObject.Terminate;
    begin
    if FOwner<>nil then
    TThreadItem(FOwner).SetTaskStatus(tsTerminating);
    //if Assigned(FOnTerminatingTask) then FOnTerminatingTask(Self);
    //WaitFor();
    end;

    procedure TTaskObject.WaitFor(iTimeOut: Cardinal);
    var
    iFirst: Cardinal;
    begin
    iFirst := GetTickCount();
    if Self=nil then Exit;
    if FOwner=nil then Exit;
    try
    while (Self<>nil) and (FOwner<>nil) and Assigned(FOwner) and (TThreadItem(FOwner).FTaskStatus<>tsTerminated) do
    begin
    if (GetTickCount()-iFirst)>=iTimeOut then Break;
    Sleep(5);
    end;
    except
    end;
    end;

    end.

  • 相关阅读:
    HDU 5528 Count a * b 欧拉函数
    HDU 5534 Partial Tree 完全背包
    HDU 5536 Chip Factory Trie
    HDU 5510 Bazinga KMP
    HDU 4821 String 字符串哈希
    HDU 4814 Golden Radio Base 模拟
    LA 6538 Dinner Coming Soon DP
    HDU 4781 Assignment For Princess 构造
    LA 7056 Colorful Toy Polya定理
    LA 6540 Fibonacci Tree
  • 原文地址:https://www.cnblogs.com/caibirdy1985/p/4232977.html
Copyright © 2011-2022 走看看