zoukankan      html  css  js  c++  java
  • 线程池处理异步任务队列

    线程池处理异步任务队列

    /// <author>cxg 2020-9-3</author>
    /// 线程池处理异步任务队列
    /// 支持d7以上版本,更低版本没有测试,支持跨OS
    
    unit tasks;
    
    interface
    
    uses
      {$IFDEF mswindows}
      Windows,
      {$ENDIF}
      {$IFDEF posix}
      posix.Unistd, posix.Semaphore,
      {$ENDIF}
      Contnrs, SyncObjs, Classes, SysUtils;
    
    type
      TCallBack = procedure(task: Pointer) of object;
    
    type
      TThreadConf = class
      private
        fCallBack: TCallBack;
        fThreadNum: Integer;
        fWorkers: array of TThread;
        fCS: TCriticalSection;
        procedure freeThreads;
        procedure newThreads;
      public
        constructor Create(const threadNum: Integer = 0);
        destructor Destroy; override;
        procedure startThreads;
        procedure stopThreads;
        procedure allotTask(task: Pointer);
        property onCallback: TCallBack read fCallBack write fCallBack;
      end;
    
    type
      TWorkThread = class(TThread)
      private
        fConf: TThreadConf;
        fQueue: TQueue;
        {$IFDEF mswindows}
        hsem: THandle; // 信号量
        {$ELSE}
        hsem: sem_t;
        {$ENDIF}
      public
        constructor Create(conf: TThreadConf);
        destructor Destroy; override;
        procedure Execute; override;
        procedure enqueue(task: Pointer);
      end;
    
    function cpuNum: Integer;
    
    implementation
    
    var
      gIndex: Integer = 0;
    
    function cpuNum: Integer;
    {$IFDEF MSWINDOWS}
    var
      si: SYSTEM_INFO;
    {$ENDIF}
    begin
      {$IFDEF MSWINDOWS}
      GetSystemInfo(si);
      Result := si.dwNumberOfProcessors;
      {$ELSE}
      Result := sysconf(_SC_NPROCESSORS_ONLN);
      {$ENDIF}
    end;
    
    { TThreadConf }
    
    procedure TThreadConf.allotTask(task: Pointer);
    var
      i: Integer;
    begin
      fCS.Enter;
      try
        i := gIndex mod fThreadNum;
        TWorkThread(fWorkers[i]).enqueue(task);
        inc(gIndex);
      finally
        fCS.Leave;
      end;
    end;
    
    constructor TThreadConf.Create(const threadNum: Integer = 0);
    begin
      fCS := TCriticalSection.Create;
      fThreadNum := threadNum;
      if fThreadNum = 0 then
        fThreadNum := cpuNum;
      SetLength(fWorkers, fThreadNum);
      newThreads;
    end;
    
    destructor TThreadConf.Destroy;
    begin
      freeThreads;
      FreeAndNil(fCS);
      inherited;
    end;
    
    procedure TThreadConf.freeThreads;
    var
      i: Integer;
    begin
      for i := 0 to fThreadNum - 1 do
      begin
        fWorkers[i].Terminate;
        fWorkers[i].WaitFor;
        FreeAndNil(fWorkers[i]);
      end;
    end;
    
    procedure TThreadConf.newThreads;
    var
      i: Integer;
    begin
      for i := 0 to fThreadNum - 1 do
        fWorkers[i] := TWorkThread.Create(Self);
    end;
    
    procedure TThreadConf.startThreads;
    var
      i: Integer;
    begin
      for i := 0 to fThreadNum - 1 do
        {$IFDEF unicode}
        fWorkers[i].Start;
        {$ELSE}
        fWorkers[i].Resume;
        {$ENDIF}
    end;
    
    procedure TThreadConf.stopThreads;
    var
      i: Integer;
    begin
      for i := 0 to fThreadNum - 1 do
        fWorkers[i].Suspend;
    end;
    
    { TWorkThread }
    
    constructor TWorkThread.Create(conf: TThreadConf);
    begin
      inherited Create(True);
      FreeOnTerminate := True;
      fConf := conf;
      fQueue := TQueue.Create;
      {$IFDEF mswindows}
      hsem := CreateSemaphore(nil, 0, 1, nil);
      {$ELSE}
      sem_init(hsem, 0, 0);
      {$ENDIF}
    end;
    
    destructor TWorkThread.Destroy;
    begin
      FreeAndNil(fQueue);
      {$IFDEF mswindows}
      CloseHandle(hsem);
      {$ELSE}
      sem_destroy(hsem);
      {$ENDIF}
      inherited;
    end;
    
    procedure TWorkThread.enqueue(task: Pointer);
    begin
      fQueue.Push(task);
      {$IFDEF mswindows}
      ReleaseSemaphore(hsem, 1, nil);
      {$ELSE}
      sem_post(hsem);
      {$ENDIF}
    end;
    
    procedure TWorkThread.Execute;
    var
      task: Pointer;
    
      procedure run;
      begin
        task := fQueue.Pop;
        if task <> nil then
          if Assigned(fConf.fCallBack) then
            fConf.fCallBack(task);
      end;
    
    begin
      while not Self.Terminated do
      begin
        {$IFDEF mswindows}
        if WaitForSingleObject(hsem, INFINITE) = WAIT_OBJECT_0 then
          run;
        {$ELSE}
        if sem_wait(hsem) > 0 then
          run;
        {$ENDIF}
      end;
    end;
    
    end.
    

      

  • 相关阅读:
    C++ primer plus读书笔记——第6章 分支语句和逻辑运算符
    C++ primer plus读书笔记——第7章 函数——C++的编程模块
    C++ primer plus读书笔记——第5章 循环和关系表达式
    C++ primer plus读书笔记——第4章 复合类型
    C++ primer plus读书笔记——第3章 处理数据
    C++ primer plus读书笔记——第2章 开始学习C++
    10款好用到爆的Vim插件,你知道几个?
    程序员最讨厌的100件事,瞬间笑喷了,哈哈~~
    20 个最常用的 Git 命令用法说明及示例
    史上最全的Nginx配置参数中文说明
  • 原文地址:https://www.cnblogs.com/hnxxcxg/p/13605686.html
Copyright © 2011-2022 走看看