zoukankan      html  css  js  c++  java
  • 基于无锁队列和c++11的高性能线程池

    基于无锁队列和c++11的高性能线程池
    线程使用c++11库
    和线程池之间的消息通讯使用一个简单的无锁消息队列
    适用于linux平台,gcc 4.6以上
     
    标签: <无>
     

    代码片段(6)[全屏查看所有代码]

    1. [代码]lckfree.h     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    // lckfree.h
    // Implementation of lock free queue using CAS operations
    // for simple multi-threading use cases like:
    // 1. multiple worker to process incoming messages
    // 2. async processing using a thread pool
    // 3. simple tcp server deal with async requests
    // Author: typhoon_1986@163.com
    // Refrence: http://coolshell.cn/articles/8239.html
     
    #ifndef __LCKFREE_H__
    #define __LCKFREE_H__
     
    #include <string>
    using namespace std;
     
    namespace bfd {
     
    struct LinkNode {
      string data;
      LinkNode* next;
    };
    typedef struct LinkNode LinkNode;
     
    class LckFreeQueue {
     public:
      LckFreeQueue();
      ~LckFreeQueue();
     
      int push(const string &msg);
      string pop();  // non-block pop method
    //  string bpop(); // block pop method
      bool empty();
     private:
      LinkNode * head_;
      LinkNode * tail_;
      bool empty_;
      unsigned int length_;
    };
     
    } // namespace bfd
    #endif

    2. [代码]lckfree.cpp     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    #include <lckfree.h>
     
    namespace bfd {
     
    LckFreeQueue::LckFreeQueue(): head_(NULL), tail_(NULL), empty_(true), length_(0) {
      head_ = new LinkNode;
      head_->next = NULL;
      tail_ = head_;
    }
     
    LckFreeQueue::~LckFreeQueue() {
      LinkNode *p = head_;
      if (p) {
        LinkNode *q = p->next;
        delete p;
        p = q;
      }
    }
     
    int LckFreeQueue::push(const string &msg) {
      LinkNode * q = new LinkNode;
      q->data = msg;
      q->next = NULL;
     
      LinkNode * p = tail_;
      LinkNode * oldp = p;
      do {
        while (p->next != NULL)
            p = p->next;
      } while( __sync_bool_compare_and_swap(&(p->next), NULL, q) != true); //如果没有把结点链在尾上,再试
     
      __sync_bool_compare_and_swap(&tail_, oldp, q); //置尾结点
      return 0;
    }
     
    string LckFreeQueue::pop() {
      LinkNode * p;
      do{
        p = head_;
        if (p->next == NULL){
          return "";
        }
      } while( __sync_bool_compare_and_swap(&head_, p, p->next) != true );
      return p->next->data;
    }
     
    bool LckFreeQueue::empty() {
      return empty_;
    }
     
    }

    3. [代码]workthreadpool.h     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    // workthreadpool.h
    // 一个用于将消息分发给多个进程,并使用多个进程处理的库,工作进程并不返回数据
    #ifndef __WORK_THREAD_POOL__
    #define __WORK_THREAD_POOL__
     
    #include <stdio.h>
    #include <thread>
    #include <queue>
    #include <string>
    #include <vector>
    #include "lckfree.h"
     
    using namespace std;
    namespace bfd {
     
    class WorkThreadPool {
     public:
      WorkThreadPool(int size);
      virtual ~WorkThreadPool();
     
      // 需要子类继承并实现的函数,每个线程实际执行的内容
      virtual void Init() {};
      virtual void Finish() {};
      virtual void Handle(const string &msg)=0;
     
      // 将消息放入处理队列, 消息只支持string类型
      int SendMessage(const string &msg);
     
      int Start();
      int Stop();
     
     private:
      void Worker();
     
      int size_;
      LckFreeQueue msg_queue_; // 线程池的协作基于这个无锁队列
      vector<thread> thread_pool_;
    };
    } // namespace
    #endif

    4. [代码]workthreadpool.cpp     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    #include "workthreadpool.h"
    #include <sstream>
    #include <unistd.h>
     
    namespace bfd {
    WorkThreadPool::WorkThreadPool(int size) {
      if (size <= 0) { // 最小也需要有1个线程
        size_ = 1;
      } else {
        size_ = size;
      }
    }
     
    WorkThreadPool::~WorkThreadPool() {
     
    }
     
    int WorkThreadPool::SendMessage(const string &msg) {
      msg_queue_.push(msg);
      return 0;
    }
     
    void WorkThreadPool::Worker() {
      unsigned int msg_count = 0;
      while (1) {
        string msg = msg_queue_.pop();
        if (msg.empty()) {
          printf("no msg got, sleep for 0.1 sec ");
          usleep(100000); // 0.1 sec
          continue;
        }
     
        if (msg == "__exit__") {
          stringstream ss;
          ss << "exit worker: " << std::this_thread::get_id() << ", processed: " << msg_count << "..";
          printf("%s ", ss.str().c_str());
          return;
        }
        Handle(msg);
        msg_count++;
        if (msg_count % 1000 == 0) {
          printf("every 1000 msg count ");
        }
      }
    }
     
    int WorkThreadPool::Start() {
      for (int i=0; i < size_; i++) {
        thread_pool_.push_back( thread(&WorkThreadPool::Worker, this) );
      }
      return 0;
    }
     
    int WorkThreadPool::Stop() {
      for (int i=0; i < size_; i++) {
        SendMessage("__exit__");
      }
      for (int i=0; i < size_; i++) {
        thread_pool_[i].join();
      }
      return 0;
    }
     
    }

    5. [代码]main.cpp     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    #include "workthreadpool.h"
    #include <sstream>
    #include <math.h>
     
    class MyThreadPool : public bfd::WorkThreadPool {
     public:
      MyThreadPool(int size) : bfd::WorkThreadPool(size) {
      }
      void Handle(const string &msg) {
        stringstream ss;
        ss << "worker (" << std::this_thread::get_id() << ") got msg: " << msg;
        printf("%s ", ss.str().c_str());
        for (int i=0; i<=999999; i++) {
          double result = sqrt(sqrt(i) / 93.234);
        }
      }
    };
     
    int main() {
      printf("start running .... ");
      MyThreadPool pool(5);
      pool.Start();
      for (int i=0; i<100; i++) {
        pool.SendMessage("msg info ----------");
      }
      pool.Stop();
     
      return 0;
    }

    6. [代码]Makefile     

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    LIB_SRC_FILES = src/workthreadpool.cpp src/lckfree.cpp
    TEST_SRC_FILES = src/main.cpp
    INCLUDE_DIR = src
    STD_FLAG = -std=c++0x
     
    all: main.o libs
        g++ $(STD_FLAG) -o test_workthreadpool main.o libworkthreadpool.so -lpthread
         
    main.o: $(TEST_SRC_FILES)
        g++ $(STD_FLAG) -c $(TEST_SRC_FILES) -I$(INCLUDE_DIR)
     
    libs: $(LIB_SRC_FILES)
        g++ $(STD_FLAG) -o libworkthreadpool.so -fPIC -O2 -shared -Wl,--no-as-needed -Isrc $(LIB_SRC_FILES) -lpthread
     
    .PHONY : clean
    clean :
        rm -f test_workthreadpool main.o libworkthreadpool.so
  • 相关阅读:
    敏捷宣言遵循的原则
    Python学习笔记(11):更多内容
    VBScript之Eval函数与Execute语句(Array.ForEach的实现)
    QTP自动化测试之VBScript对象
    ASP.NET服务器端数据查询控件
    Oracle 日期及GUID
    wp7查询公交路线
    wp7搜索引擎
    在windowsPhone中怎么样存储数据
    客户端PLSQL Developer连接远程数据库Oracle
  • 原文地址:https://www.cnblogs.com/lidabo/p/9767068.html
Copyright © 2011-2022 走看看