zoukankan      html  css  js  c++  java
  • Thread Pool Engine, and Work-Stealing scheduling algorithm

    http://pages.videotron.com/aminer/threadpool.htm

    http://pages.videotron.com/aminer/zip/threadpool.zip  FPC Pascal v2.2.0+ / Delphi 5+

    http://pages.videotron.com/aminer/zip/pthreadpool_xe4.zip (for Delphi XE to XE4)

    http://pages.videotron.com/aminer/zip/pthreadpool.zip (for FreePascal and Lazarus and Delphi 7 to 2010)

    Thread Pool Engine.

    Please read an article that i wrote about my Threadpool engine: article.

    The following have been added:

    • Lockfree ParallelQueue for less contention and more efficiency or
      it can use lockfree_mpmc - flqueue that i have modified, enhanced and improved... -
    • A lock-free queue for each worker thread and it uses work-stealing - for more efficiency -
    • Enters in a wait state when there is no job in the queue - for more efficiency -
    • You can distribute your jobs to the workers threads and call any method with the threadpool's execute() method.


    Look into defines.inc there is many options:


    Lockfree_MPMC: it uses lockfree_MPMC 
    RingBuffer: it uses lock-free RingBuffer
    SINGLE_PRODUCER
    : for a single producer (thread)
    MUTIPLE_PRODUCER: mutiple producer (threads) 
    CPU32: for 32 bits architecture

    Required FPC switches: -O3 -Sd -dFPC -dWin32 -dFreePascal

    -Sd for delphi mode....

    Required Delphi switches: -DMSWINDOWS -$H+

    For Delphi 5,6,7 use -DDelphi

    For Delphi 2005,2006,2007,2009,2010+ use the switch -DDELPHI2005+


    Please look at the examples test.pas,testpool.pas and test_thread.pas...

    Note: testpool.pas is a parallel program of a Matrix multiply by a vector that uses SSE+ and it requires Delphi 5+. test.pas and test_thread.pas works with both FreePascal and Delphi.

    Threadpool with priorities

    Thread Pool Engine.

    The following have been added:

    - You can give the following priorities to jobs:

    LOW_PRIORITY
    NORMAL_PRIORITY
    HIGH_PRIORITY

    - Uses a FIFO queue 
    that satisfies many requirements:

    it is FIFO fair, it minimizes efficiently the cache-coherence traffic and it is energy efficient on the pop():

    when there is no items in the queue it will not spin-wait , but it will wait on a portable manual event object..

    - Enters in a wait state when there is no job in the queue - for more efficiency -

    - You can distribute your jobs to the workers threads and call any method with the threadpool's execute() method.

    - Uses O(1) complexity on enqueue and O(3) worst case complexity  on dequeue.

    Look into defines.inc there is many options:

    CPU32: for 32 bits architecture
    CPU64: for 64 bits architecture

    Please read an article that i wrote about my Threadpool engine: article.

    Look at test1.pas demo inside the zip file...

    Required FPC switches: -O3 -Sd -dFPC -dWin32 -dFreePascal

    -Sd for delphi mode....

    Required Delphi switches: -DMSWINDOWS -$H+

    For Delphi 5,6,7 use -DDelphi

    For Delphi 2005,2006,2007,2009,2010+ use the switch -DDELPHI2005+

    {$DEFINE CPU32} and {$DEFINE Win32} for 32 bit systems

    {$DEFINE CPU64} and {$DEFINE Win64} for 64 bit systems

    Note: testpool.pas is a parallel program of a Matrix multiply by a vector that uses SSE+ and it requires Delphi 5+.

    test.pas and test_thread.pas works with both FreePascal and Delphi. 

    Threadpool engine article 

    On a multicore system, your goal is to spread the work efficiently among many cores so that it does executes simultaneously.

    And performance gain should be directly related to how many cores you have.

    So, a quad core system should be able to get the work done 4 times faster than a single core system.

    A 16-core platform should be 4-times faster than a quad-core system, and 16-times faster than a single core...

    That's where my Threadpool is usefull , it spreads the work efficiently among many cores.

    Threadpool (and Threadpool with priority) consist of lock-free thread safe/concurrent enabled local FIFO queues of work items,

    so when you call ThreadPool.execute() , your work item get queued in the local lock-free queues.

    The worker threads pick them out in a First In First Out order (i.e., FIFO order), and execute them. .


    The following have been added to Threadpool: 

    • Lock-free ParallelQueue for less contention and more efficiency or
      it can use lockfree_mpmc - flqueue that i have modified, enhanced and improved... -

    See lock-free ParallelQueue: http://pages.videotron.com/aminer/parallelqueue/parallelqueue.htm

    •  It  uses a lock-free queue for each worker thread and it uses work-stealing - for more efficiency -
    • The worker threads enters in a wait state when there is no job in the lock-free queues - for more efficiency -
    • You can distribute your jobs to the worker threads and call any method with the threadpool's execute() method.

    Work-Stealing scheduling algorithm:

    Work-Stealing scheduling concepts:

    • Every worker thread have it’s own WSQ   (Work-Stealing Queue).
    • Any new task belong to worker thread will add in it’s own WSQ.
    • When the worker thread looking for new task he will follow this:
      • Looking for task into the local WSQ.
      • Then, looking and try to steal tasks from other workers.

    But what is the Work-Stealing Queue? As I said every worker thread will have it’s own Work-Stealing Queue.

    WSQ is data structure designed to be effective. WSQ data structure concepts:

    • It’s a lock-free FIFO queue.
    • It allows lock-free pushes, and pops from the private end.
    • It allows lock-free pushes, and pops from the public end.
    • Explains:
      • Private end: private for this worker thread.
      • Public end: public to any other thread,
        so any other thread can use the lock-free FIFO to steal a task as kind of load balancing.

    Work-Stealing scheduling algorithm offer many feature over the ordinary scheduling algorithm:

    1. Effective:
      • Using local queues, this will minimize contention.
    2. Load Balancing:
      • Every thread can steal work from the other threads, so Work-Stealing provides implicitly Load Balancing.

    My Threadpool allows load balancing, and also minimize contention.

    Threadpool is very easy to use, let's look now at an example in Object Pascal...

    program test;
    
    uses
    
    {$IFDEF Delphi}
      cmem,
    
    {$ENDIF}
      ThreadPool, sysutils, syncobjs;
    
    {$I defines.inc}
    
    type
    
      TMyThread = class( TThreadPoolThread )
    
        // procedure ProcessRequest(obj: Pointer); override;
    
        procedure MyProc1( obj : Pointer );
    
        procedure MyProc2( obj : Pointer );
    
      end;
    
    var
    
      myobj : TMyThread;
    
      TP : TThreadPool;
    
      obj : Pointer;
    
      cs : TCriticalSection;
    
    procedure TMyThread.MyProc1( obj : Pointer );
    
    begin
    
      cs.enter;
    
      writeln( 'This is MyProc1 with parameter: ', integer( obj ) );
    
      cs.leave;
    
    end;
    
    procedure TMyThread.MyProc2( obj : Pointer );
    
    begin
    
      cs.enter;
    
      writeln( 'This is MyProc2 with parameter: ', integer( obj ) );
    
      cs.leave;
    
    end;
    
    begin
    
      myobj := TMyThread.create;
    
      cs := TCriticalSection.create;
    
      TP := TThreadPool.create( 4, TMyThread, 20 );
      // 4 workers threads and 2^20 items for each queue.
    
      obj := Pointer( 1 );
    
      TP.execute( myobj.MyProc1, Pointer( obj ) );
    
      obj := Pointer( 2 );
    
      TP.execute( myobj.MyProc2, Pointer( obj ) );
    
      readln;
    
      TP.Terminate;
    
      TP.Free;
    
    end.

     Let us look at the first line...

    uses

    {$IFDEF Delphi}

    cmem,

    {$ENDIF}

    ThreadPool,sysutils,syncobjs;

    cmem is required for Delphi to use TBB memory manager (from Intel) ,

    this will allow delphi memory manager to scale linearely...

    Note: FPC doesn't need cmem, cause it scales linearely with threads...

    ThreadPool: is our threadpool unit ..

    syncobjs: contains all the sychronizations stuff like CriticalSections, Events etc..

    After that we have the following lines:

    type

    TMyThread = class (TThreadPoolThread)

    //procedure ProcessRequest(obj: Pointer); override;

    procedure MyProc1(obj: Pointer);

    procedure MyProc2(obj: Pointer);

    end;

    We declare a TMyThread that ineherit from TThreadPoolThread,

    and we declare our two methods MyProc1 and MyProc2 that we want to be executed by our threadpool's worker threads.

    Each method has an obj as a paramater.

    In the main body we create a TMyThread object like this:

    myobj:=TMyThread.create;

    and after that we create a TThreadPool object with 4 workers threads

    and lock-free FIFO queues and 2^20 items for each lock-free queue like this:

    TP := TThreadPool.Create(4, TMyThread, 20); // 4 workers threads and 2^20 items for each queue.

    After that we distribute to our worker threads the methods to be executed ,

    we do it by calling the Threadpool's execute() method and

    we pass it myobj.myproc1 and myobj.myproc2 with there parameters:.

    TP.execute(myobj.myproc1,pointer(obj));

    TP.execute(myobj.myproc2,pointer(obj));

    As you see, Threadpool (and threadpool with priority) is very easy to use...

    Let's look now at an example of a Threadpool with priority:.

    program test;
    
    uses
    
    {$IFDEF Delphi}
      cmem,
    
    {$ENDIF}
      PThreadPool, sysutils, syncobjs;
    
    {$I defines.inc}
    
    type
    
      TMyThread = class( TPThreadPoolThread )
    
        // procedure ProcessRequest(obj: Pointer); override;
    
        procedure MyProc1( obj : Pointer );
    
        procedure MyProc2( obj : Pointer );
    
      end;
    
    var
    
      myobj : TMyThread;
    
      TP : TPThreadPool;
    
      obj : Pointer;
    
      cs : TCriticalSection;
    
    procedure TMyThread.MyProc1( obj : Pointer );
    
    begin
    
      cs.enter;
    
      writeln( 'This is MyProc1 with parameter: ', integer( obj ) );
    
      cs.leave;
    
    end;
    
    procedure TMyThread.MyProc2( obj : Pointer );
    
    begin
    
      cs.enter;
    
      writeln( 'This is MyProc2 with parameter: ', integer( obj ) );
    
      cs.leave;
    
    end;
    
    begin
    
      myobj := TMyThread.create;
    
      cs := TCriticalSection.create;
    
      TP := TPThreadPool.create( 4, TMyThread, 20 );
      // 4 workers threads and 2^20 items for each queue.
    
      obj := Pointer( 1 );
    
      TP.execute( myobj.MyProc1, Pointer( obj ), NORMAL_PRIORITY );
    
      obj := Pointer( 2 );
    
      TP.execute( myobj.MyProc2, Pointer( obj ), NORMAL_PRIORITY );
    
      readln;
    
      TP.Terminate;
    
      TP.Free;
    
    end.

    As you have noticed, this is almost the same as threadpool..

    You use PThreadPool - P for priority - rather than Threadpool

    TPThreadPoolThread rather that TThreadPoolThread

    TPThreadPool.Create rather than TThreadPool.Create

    and as you have noticed in TP.execute(myobj.myproc1,pointer(obj),NORMAL_PRIORITY) we are using priorities.

    You can give the following priorities to jobs:

    LOW_PRIORITY
    NORMAL_PRIORITY
    HIGH_PRIORITY

    That's all.

    You can download threadpool (and threadpool with priority) from:

    http://pages.videotron.com/aminer/

    Sincerely,
    Amine Moulay Ramdane.

    lock-free ParallelQueue

    http://pages.videotron.com/aminer/parallelqueue/parallelqueue.htm

    I have tested 3 lockfree fifo queue algorithms against my fifo queue ParallelQueue

    that uses a hash based method and used 4 threads under contention and the results follows on the graphs bellow:

    Lock-free flqueue at: http://www.emadar.com/fpc/lockfree.htm

    Lock-free RingBuffer at: http://www.odsrv.com/RingBuffer/RingBuffer.htm

    GpLockfreequeue at: http://17slon.com/gp/gp/gplockfreequeue.htm

    and my ParallelQueue at: http://pages.videotron.com/aminer/

     

  • 相关阅读:
    今天,我们来聊一聊互联网真的有你所期待的那么好吗?来自一个老码农的碎碎念
    新鲜出炉!阿里Java后端面经,已拿offer!
    面试阿里,字节跳动,美团必被问到的红黑树原来这么简单
    凭借着这份Spring面试题,我拿到了阿里,字节跳动美团的offer!
    深度分析:理解Java中的多态机制,一篇直接帮你掌握!
    gdb调试core dump使用
    665. Non-decreasing Array
    netstat命令详解
    ifconfig命令
    #paragma详解
  • 原文地址:https://www.cnblogs.com/shangdawei/p/4019094.html
Copyright © 2011-2022 走看看