ThreadPool
unit ThreadPool;
interface
uses Classes,Windows,SysUtils,common,BaseThread;
type
// TAddEvent = procedure(msg:string);
TBaseTask = class //基础任务
protected
Fid:byte;
FSendType : byte; //发送信息类型:0:原ASCII 1 D131或者D101之类的需要有字符替换
FNumber :string;
FBuffer:Tbuffer;
FMsg :string;
FLog:Tlog;
FProcessTaskThread:TThread;
FSendTaskThread:TBaseThread;
function GetData:string;
Function GetCount : integer;
public
property id:Byte read Fid;
property Number:string read FNumber;
property SendType :byte read FSendtype write FSendType;
property ProcessTaskThread : TThread read FProcessTaskThread write FProcessTaskThread;
property SendTaskThread : TBaseThread read FSendTaskThread write FSendTaskThread;
property Count : integer read GetCount;
procedure Run;virtual;
procedure Add(Msg:string);virtual;
Constructor Create(Sid:Byte;Number:string;Log_switch:integer);virtual; //对应POOL里面数据头的ID
destructor Destroy; override;
end;
TBaseTaskThread = class(TThread) //任务线程
protected
FName :string;
Flog: Tlog;
FList: TList;
function GetCount:integer;
function GetTask:TBaseTask;
procedure Execute; override;
procedure Terminatedevent(sender:Tobject);
public
property Count:integer read Getcount;
property Name :string read FName;
procedure Add(Item: Pointer);
Constructor Create(Name:string;suspendflag:boolean;logSwitch:integer);
end;
TBasePoolThread = class(TThread) //分派线程
protected
FName :string;
FLog :Tlog;
FThreadCount : integer;
FBuffer: TBuffer;
FThreadList : Tlist;
FTaskList : Tlist;
FOnThreadProcessEvent :TOnThreadProcessEvent;
FOnThreadBeforeEvent :TOnThreadBeforeEvent;
function SearchTask(Msg:string):TBaseTask;
function SearchThread:TBaseTaskThread;
procedure Execute; override;
procedure Terminatedevent(sender:Tobject);
public
property ChildThreadCount :integer read FThreadCount;
procedure Add(Msg:string); //传入 ID+MSG ,id是byte类型,一位0-255
procedure AddTask(Task:TBaseTask);
procedure CreateChildThread;
property Name :string read FName;
property OnThreadProcessEvent:TOnThreadProcessEvent read FOnThreadProcessEvent write FOnThreadProcessEvent;
property OnThreadBeforeEvent :TOnThreadBeforeEvent read FOnThreadBeforeEvent write FOnThreadBeforeEvent;
Constructor Create(Name:string;ThreadCount:integer;suspendflag:boolean;logSwitch:integer);
end;
implementation
{ TBaseTask }
procedure TBaseTask.Add(Msg: string);
begin
FBuffer.Add(Msg);
end;
constructor TBaseTask.Create(Sid: Byte; Number: string;Log_switch:integer);
begin
Fid := Sid;
FSendType := 1;
FNumber := number;
FLog := Tlog.Create(Number,Log_switch);
FBuffer := TBuffer.Create;
end;
destructor TBaseTask.Destroy;
begin
FBuffer.Free;
Flog.Free;
inherited;
end;
function TBaseTask.GetCount: integer;
begin
result := Fbuffer.Count;
end;
function TBaseTask.GetData: string;
begin
Result := Fbuffer.Getdata;
FBuffer.DeleteData;
end;
procedure TBaseTask.Run;
begin
if FBuffer.Count > 0 then begin
fmsg:= GetData;
{ToDosomething(msg);}
end;
end;
{ TBaseTaskThread }
procedure TBaseTaskThread.Add(Item: Pointer);
begin
Flist.Add(item);
TBaseTask(item).ProcessTaskThread := self;
self.Resume;
end;
constructor TBaseTaskThread.Create(Name: string; suspendflag: boolean;
logSwitch: integer);
begin
Fname := name;
Flist := TList.Create;
Flog := Tlog.Create(name,logswitch);
onterminate := terminatedevent;
FreeOnTerminate:=true;
Flog.Log(name+'线程对象创建',log_all);
inherited create(suspendflag); //建立后先挂起
end;
procedure TBaseTaskThread.Execute;
//var temp :string;
begin
while True do begin
if Terminated then break;
if Flist.Count > 0 then begin
try
GetTask.Run;
except
on e: Exception do
FLog.Log('异常:'+e.Message,log_fail);
end;
end else
suspend;
end;
end;
function TBaseTaskThread.GetCount: integer;
begin
result:= Flist.Count;
end;
function TBaseTaskThread.GetTask: TBaseTask;
begin
result := Flist.items[0];
Flist.Delete(0);
end;
procedure TBaseTaskThread.Terminatedevent(sender: Tobject);
begin
Flist.Free;
Flog.log(FName+'线程释放',log_all);
Flog.Free;
end;
{ TBasePoolThread }
procedure TBasePoolThread.Add(Msg: string);
begin
Fbuffer.Add(msg);
resume;
end;
procedure TBasePoolThread.AddTask(Task: TBaseTask);
begin
FTaskList.Add(task);
end;
constructor TBasePoolThread.Create(Name: string; ThreadCount: integer;
suspendflag: boolean; logSwitch: integer);
begin
Fname := name;
FBuffer := TBuffer.Create;
FThreadCount := ThreadCount;
FThreadList := Tlist.Create;
Flog := Tlog.Create(name,logswitch);
CreateChildThread;
onterminate := terminatedevent;
FreeOnTerminate:=true;
Flog.Log(name+'线程对象创建',log_all);
inherited create(suspendflag); //建立后先挂起
end;
procedure TBasePoolThread.CreateChildThread;
var tempThread : TBaseTaskThread;
i:integer;
begin
for i:=0 to FThreadCount -1 do begin
tempThread := TBaseTaskThread.Create(Fname+inttostr(i), true,log_all);
FThreadList.Add(tempThread);
end;
end;
procedure TBasePoolThread.Execute;
var temp :string;
// temptask : TBaseTask;
// tempThread : TBasePoolThread;
begin
while True do begin
if Terminated then break;
if Fbuffer.Count > 0 then begin
temp := Fbuffer.Getdata; Fbuffer.deletedata;
if Assigned(FOnThreadBeforeEvent) then temp:=FOnThreadBeforeEvent(Flog,temp);
SearchTask(temp);
flog.Log('处理数据:'+temp,log_all);
if Assigned(FOnThreadProcessEvent) then FOnThreadProcessEvent(Flog,temp);
end else
suspend;
end;
end;
function TBasePoolThread.SearchTask(Msg: string): TBaseTask;
var fid : byte;
i:integer;
begin
fid := byte(msg[1]);
result := nil;
for i:=0 to FTasklist.Count-1 do begin
if TBaseTask(FTasklist.Items[i]).Fid = fid then begin
if (TBaseTask(FTasklist.Items[i]).Count > 0) and
assigned( TBaseTask(FTasklist.Items[i]).ProcessTaskThread) then begin
TBaseTask(FTasklist.Items[i]).Add(copy(msg,2,length(msg)-1));
TBaseTaskThread(TBaseTask(FTasklist.Items[i]).ProcessTaskThread).Add(FTasklist.Items[i]) //只有原任务有线程,且有未完成的任务
end else begin
TBaseTask(FTasklist.Items[i]).Add(copy(msg,2,length(msg)-1));
SearchThread.Add(FTasklist.Items[i]);
end;
break;
end;
end;
end;
function TBasePoolThread.SearchThread: TBaseTaskThread;
var i,pole:integer;
begin
result := nil;
pole := 9999;
for i:=0 to FThreadlist.Count -1 do begin
if TBaseTaskThread(FThreadList.Items[i]).Count = 0 then begin
result := FThreadList.Items[i];
break;
end;
if pole < TBaseTaskThread(FThreadList.Items[i]).Count then
result := FThreadList.Items[i];
end;
end;
procedure TBasePoolThread.Terminatedevent(sender: Tobject);
var i:integer;
begin
for i:= 0 to FThreadList.Count-1 do begin
TBaseTaskThread(FThreadList.items[i]).Terminate;
TBaseTaskThread(FThreadList.items[i]).Resume;
end;
FThreadList.Free;
fbuffer.Free;
Flog.log(FName+'线程释放',log_all);
Flog.Free;
end;
end.
unit ThreadPool;
interface
uses Classes,Windows,SysUtils,common,BaseThread;
type
// TAddEvent = procedure(msg:string);
TBaseTask = class //基础任务
protected
Fid:byte;
FSendType : byte; //发送信息类型:0:原ASCII 1 D131或者D101之类的需要有字符替换
FNumber :string;
FBuffer:Tbuffer;
FMsg :string;
FLog:Tlog;
FProcessTaskThread:TThread;
FSendTaskThread:TBaseThread;
function GetData:string;
Function GetCount : integer;
public
property id:Byte read Fid;
property Number:string read FNumber;
property SendType :byte read FSendtype write FSendType;
property ProcessTaskThread : TThread read FProcessTaskThread write FProcessTaskThread;
property SendTaskThread : TBaseThread read FSendTaskThread write FSendTaskThread;
property Count : integer read GetCount;
procedure Run;virtual;
procedure Add(Msg:string);virtual;
Constructor Create(Sid:Byte;Number:string;Log_switch:integer);virtual; //对应POOL里面数据头的ID
destructor Destroy; override;
end;
TBaseTaskThread = class(TThread) //任务线程
protected
FName :string;
Flog: Tlog;
FList: TList;
function GetCount:integer;
function GetTask:TBaseTask;
procedure Execute; override;
procedure Terminatedevent(sender:Tobject);
public
property Count:integer read Getcount;
property Name :string read FName;
procedure Add(Item: Pointer);
Constructor Create(Name:string;suspendflag:boolean;logSwitch:integer);
end;
TBasePoolThread = class(TThread) //分派线程
protected
FName :string;
FLog :Tlog;
FThreadCount : integer;
FBuffer: TBuffer;
FThreadList : Tlist;
FTaskList : Tlist;
FOnThreadProcessEvent :TOnThreadProcessEvent;
FOnThreadBeforeEvent :TOnThreadBeforeEvent;
function SearchTask(Msg:string):TBaseTask;
function SearchThread:TBaseTaskThread;
procedure Execute; override;
procedure Terminatedevent(sender:Tobject);
public
property ChildThreadCount :integer read FThreadCount;
procedure Add(Msg:string); //传入 ID+MSG ,id是byte类型,一位0-255
procedure AddTask(Task:TBaseTask);
procedure CreateChildThread;
property Name :string read FName;
property OnThreadProcessEvent:TOnThreadProcessEvent read FOnThreadProcessEvent write FOnThreadProcessEvent;
property OnThreadBeforeEvent :TOnThreadBeforeEvent read FOnThreadBeforeEvent write FOnThreadBeforeEvent;
Constructor Create(Name:string;ThreadCount:integer;suspendflag:boolean;logSwitch:integer);
end;
implementation
{ TBaseTask }
procedure TBaseTask.Add(Msg: string);
begin
FBuffer.Add(Msg);
end;
constructor TBaseTask.Create(Sid: Byte; Number: string;Log_switch:integer);
begin
Fid := Sid;
FSendType := 1;
FNumber := number;
FLog := Tlog.Create(Number,Log_switch);
FBuffer := TBuffer.Create;
end;
destructor TBaseTask.Destroy;
begin
FBuffer.Free;
Flog.Free;
inherited;
end;
function TBaseTask.GetCount: integer;
begin
result := Fbuffer.Count;
end;
function TBaseTask.GetData: string;
begin
Result := Fbuffer.Getdata;
FBuffer.DeleteData;
end;
procedure TBaseTask.Run;
begin
if FBuffer.Count > 0 then begin
fmsg:= GetData;
{ToDosomething(msg);}
end;
end;
{ TBaseTaskThread }
procedure TBaseTaskThread.Add(Item: Pointer);
begin
Flist.Add(item);
TBaseTask(item).ProcessTaskThread := self;
self.Resume;
end;
constructor TBaseTaskThread.Create(Name: string; suspendflag: boolean;
logSwitch: integer);
begin
Fname := name;
Flist := TList.Create;
Flog := Tlog.Create(name,logswitch);
onterminate := terminatedevent;
FreeOnTerminate:=true;
Flog.Log(name+'线程对象创建',log_all);
inherited create(suspendflag); //建立后先挂起
end;
procedure TBaseTaskThread.Execute;
//var temp :string;
begin
while True do begin
if Terminated then break;
if Flist.Count > 0 then begin
try
GetTask.Run;
except
on e: Exception do
FLog.Log('异常:'+e.Message,log_fail);
end;
end else
suspend;
end;
end;
function TBaseTaskThread.GetCount: integer;
begin
result:= Flist.Count;
end;
function TBaseTaskThread.GetTask: TBaseTask;
begin
result := Flist.items[0];
Flist.Delete(0);
end;
procedure TBaseTaskThread.Terminatedevent(sender: Tobject);
begin
Flist.Free;
Flog.log(FName+'线程释放',log_all);
Flog.Free;
end;
{ TBasePoolThread }
procedure TBasePoolThread.Add(Msg: string);
begin
Fbuffer.Add(msg);
resume;
end;
procedure TBasePoolThread.AddTask(Task: TBaseTask);
begin
FTaskList.Add(task);
end;
constructor TBasePoolThread.Create(Name: string; ThreadCount: integer;
suspendflag: boolean; logSwitch: integer);
begin
Fname := name;
FBuffer := TBuffer.Create;
FThreadCount := ThreadCount;
FThreadList := Tlist.Create;
Flog := Tlog.Create(name,logswitch);
CreateChildThread;
onterminate := terminatedevent;
FreeOnTerminate:=true;
Flog.Log(name+'线程对象创建',log_all);
inherited create(suspendflag); //建立后先挂起
end;
procedure TBasePoolThread.CreateChildThread;
var tempThread : TBaseTaskThread;
i:integer;
begin
for i:=0 to FThreadCount -1 do begin
tempThread := TBaseTaskThread.Create(Fname+inttostr(i), true,log_all);
FThreadList.Add(tempThread);
end;
end;
procedure TBasePoolThread.Execute;
var temp :string;
// temptask : TBaseTask;
// tempThread : TBasePoolThread;
begin
while True do begin
if Terminated then break;
if Fbuffer.Count > 0 then begin
temp := Fbuffer.Getdata; Fbuffer.deletedata;
if Assigned(FOnThreadBeforeEvent) then temp:=FOnThreadBeforeEvent(Flog,temp);
SearchTask(temp);
flog.Log('处理数据:'+temp,log_all);
if Assigned(FOnThreadProcessEvent) then FOnThreadProcessEvent(Flog,temp);
end else
suspend;
end;
end;
function TBasePoolThread.SearchTask(Msg: string): TBaseTask;
var fid : byte;
i:integer;
begin
fid := byte(msg[1]);
result := nil;
for i:=0 to FTasklist.Count-1 do begin
if TBaseTask(FTasklist.Items[i]).Fid = fid then begin
if (TBaseTask(FTasklist.Items[i]).Count > 0) and
assigned( TBaseTask(FTasklist.Items[i]).ProcessTaskThread) then begin
TBaseTask(FTasklist.Items[i]).Add(copy(msg,2,length(msg)-1));
TBaseTaskThread(TBaseTask(FTasklist.Items[i]).ProcessTaskThread).Add(FTasklist.Items[i]) //只有原任务有线程,且有未完成的任务
end else begin
TBaseTask(FTasklist.Items[i]).Add(copy(msg,2,length(msg)-1));
SearchThread.Add(FTasklist.Items[i]);
end;
break;
end;
end;
end;
function TBasePoolThread.SearchThread: TBaseTaskThread;
var i,pole:integer;
begin
result := nil;
pole := 9999;
for i:=0 to FThreadlist.Count -1 do begin
if TBaseTaskThread(FThreadList.Items[i]).Count = 0 then begin
result := FThreadList.Items[i];
break;
end;
if pole < TBaseTaskThread(FThreadList.Items[i]).Count then
result := FThreadList.Items[i];
end;
end;
procedure TBasePoolThread.Terminatedevent(sender: Tobject);
var i:integer;
begin
for i:= 0 to FThreadList.Count-1 do begin
TBaseTaskThread(FThreadList.items[i]).Terminate;
TBaseTaskThread(FThreadList.items[i]).Resume;
end;
FThreadList.Free;
fbuffer.Free;
Flog.log(FName+'线程释放',log_all);
Flog.Free;
end;
end.