zoukankan      html  css  js  c++  java
  • QT 线程池 + TCP 小试(一)线程池的简单实现

    *免分资源链接点击打开链接http://download.csdn.net/detail/goldenhawking/4492378

        很久以前做过ACE + MFC/QT 的中轻量级线程池应用,大概就是利用线程池执行客户机上的运算需求,将结果返回。ACE是跨平台重量级的通信中间件,与常见的应用程序框架需要精心契合,才能不出问题。最近想到既然QT框架本身就已经具有各类功能,何不玩一玩呢吐舌头,那就开搞!这个实验的代码可以从我的资源内下载。

        第一步打算实现的模式,我们需要一个设置为CPU核心数的线程池,这个线程池可以异步接受N个数据生产者传入的数据,均衡的分配处理任务,处理后的数据返回给某1个或者几个消费者。有两种均衡方法。一种是生产者粒度的均衡。同一个生产者的各批数据FIFO顺序不被打破,这需要判断,当处理线程队列中还有该生产者的数据时,不改变当前处理线程。第二种是数据粒度的并行,某个生产者传来的数据被分配到不同的线程,不保证后到的数据后被处理(也可能先到的处理的慢,后到的快)。

        这种异步队列机制如果在MFC、WinAPI中,需要手工使用 Mutex 同步队列,更可恶的是分配的数据对象的生存期非常微妙,一不小心就会出红叉叉。QT首先为我们提供了信号和槽的机制,且该机制原生支持跨线程。假设我们在16核心服务器上,则使用 15个 QThread对象管理15组工作线程(留一个给主界面)。但是,如果仔细看了QT的文档,就会发现QThread的信号事件循环默认是在创建者中(很多时候就是主线程!),所以,要想让槽在子线程运行,一般是派生一个QObject的类,并把对象MoveToThread到某个QThread管理的线程上去。这样,信号和槽就是全异步FIFO了。其次,QT提供了引用计数的QByteArray封装,这个东西在参数传递的时候,速度很快,很少出现memcpy,生存期也特别容易控制。虽然C++11里有 shared_ptr<T>,但是那个东西还是需要在一开始new 一个int8型的存储区,很讨厌。

    说了这么多,上关键代码。

     先是线程池的封装qghthreadengine.h

    [cpp] view plain copy
     
    1. #ifndef QGHTHREADENGINE_H  
    2. #define QGHTHREADENGINE_H  
    3.   
    4. #include <QObject>  
    5. #include <QThread>  
    6. #include <QVector>  
    7. #include <QList>  
    8. #include <QMap>  
    9. #include <QMutex>  
    10. #include "qghthreadtaskitem.h"  
    11. #include "qghthreadobject.h"  
    12.   
    13. //线程池引擎,帮助用户进行动态平衡  
    14. class QGHThreadEngine : public QObject  
    15. {  
    16.     Q_OBJECT  
    17. public:  
    18.     QGHThreadEngine(QObject *parent,QGHThreadTaskItem * pTaskItem,int nThreads = 2,bool bFIFOKeep = true);  
    19.     ~QGHThreadEngine();  
    20. protected:  
    21.     QVector<QThread *> m_ThreadPool;  
    22.     QVector<QGHThreadObject *> m_ThreadObjs;  
    23.     QGHThreadTaskItem * m_pThreadTaskItem;  
    24.     int m_nThreads;  
    25.     bool m_bFIFOKeep;  
    26. private:  
    27.     //各个m_ThreadPoolm_ThreadObjs的任务数  
    28.     QMap<QObject *,qint32> m_map_Tasks;         
    29.     //m_bFIFOKeep == true 时,下面两个成员将保证非空闲的单个 data_source 将始终在单一线程处理  
    30.     //各个data_source 目前的处理线程  
    31.     QMap<QObject *,QObject *> m_map_busy_source_task;   
    32.     //各个data_source 目前的排队数目  
    33.     QMap<QObject *,int> m_map_busy_source_counter;          
    34. public:  
    35.     void SetThreadTaskItem(QGHThreadTaskItem * pTaskItem);  
    36.     QList<qint32> CurrentLoad()  
    37.     {  
    38.         return m_map_Tasks.values();  
    39.     }  
    40. public slots:  
    41.     void append_new(QObject * data_source, const QByteArray & data);  
    42.     //捕获QGHThreadObject::sig_process_finished, 以便管理data_source的 FIFO 顺序  
    43.     void on_sig_process_finished(QObject * data_source);  
    44. signals:  
    45.     //************************************  
    46.     // Method:    do_task  
    47.     // FullName:  QGHThreadEngine::do_task  
    48.     // Access:    public   
    49.     // Returns:   void  
    50.     // Qualifier:  
    51.     // Parameter: QObject *     任务来源 (相同任务源的任务,在队列非空时会被安排到同一个线程处理,以确保对相同源的FIFO)  
    52.     // Parameter: QByteArray    任务体   
    53.     // Parameter: QObject *     处理任务的线程对象(QGHThreadObject)  
    54.     //************************************  
    55.     void do_task(QObject *, const QByteArray &,QObject *);  
    56. };  
    57.   
    58. #endif // QGHTHREADENGINE_H  



    
    

     实现qghthreadengine.cpp:

    [cpp] view plain copy
     
    1. #include "qghthreadengine.h"  
    2. #include <assert.h>  
    3. QGHThreadEngine::QGHThreadEngine(QObject *parent,QGHThreadTaskItem * pTaskItem,int nThreads,bool bFIFOKeep)  
    4.     : QObject(parent),  
    5.     m_nThreads(nThreads),  
    6.     m_pThreadTaskItem(pTaskItem),  
    7.     m_bFIFOKeep(bFIFOKeep)  
    8. {  
    9.     assert(nThreads>0 && nThreads<512 && pTaskItem!=NULL);  
    10.     //创建固定数目的线程  
    11.     for (int i=0;i<nThreads;i++)  
    12.     {  
    13.         QThread * pNewThread = new QThread(this);  
    14.         QGHThreadObject * pNewObject = new QGHThreadObject(0,pTaskItem);  
    15.         //记录下来  
    16.         m_ThreadPool.push_back(pNewThread);  
    17.         m_ThreadObjs.push_back(pNewObject);  
    18.         m_map_Tasks[pNewObject] = 0;  
    19.         pNewThread->start();  
    20.         //把QGHThreadObject的信号、曹处理搬移到子线程内  
    21.         pNewObject->moveToThread(pNewThread);  
    22.         //连接处理完成消息  
    23.         connect(pNewObject,SIGNAL(sig_process_finished(QObject *)),this,SLOT(on_sig_process_finished(QObject *)));  
    24.         //连接处理新任务消息  
    25.         connect(this,SIGNAL(do_task(QObject *, const QByteArray &,QObject *)),pNewObject,SLOT(process(QObject *, const QByteArray &,QObject *)));  
    26.   
    27.     }  
    28. }  
    29.   
    30. QGHThreadEngine::~QGHThreadEngine()  
    31. {  
    32.     foreach(QGHThreadObject * obj,m_ThreadObjs)  
    33.     {  
    34.         disconnect(obj,SIGNAL(sig_process_finished(QObject *)),this,SLOT(on_sig_process_finished(QObject *)));  
    35.         obj->deleteLater();  
    36.     }  
    37.     foreach(QThread * th ,m_ThreadPool)  
    38.     {  
    39.         disconnect(this,SIGNAL(do_task(QObject *, QByteArray,QObject *)),th,SLOT(process(QObject *, QByteArray,QObject *)));  
    40.         th->exit(0);  
    41.         th->wait();  
    42.     }  
    43. }  
    44.   
    45. //负载均衡添加任务,生产者的信号要挂接到这个槽上  
    46. void QGHThreadEngine::append_new(QObject * data_source, const QByteArray &  data)  
    47. {  
    48.     QObject * pMinObj = 0;  
    49.     //对一批来自同一数据源的数据,使用同样的数据源处理,以免发生多线程扰乱FIFO对单个data_source的完整性  
    50.     if (m_map_busy_source_counter.find(data_source)!=m_map_busy_source_counter.end()&& m_bFIFOKeep==true)  
    51.     {  
    52.         m_map_busy_source_counter[data_source]++;  
    53.         pMinObj = m_map_busy_source_task[data_source];  
    54.     }  
    55.     else  
    56.     {  
    57.         qint32 nMinCost = 0x7fffffff;  
    58.         //寻找现在最空闲的一个线程  
    59.         for (QMap<QObject *,qint32>::iterator p = m_map_Tasks.begin();p!=m_map_Tasks.end();p++)  
    60.         {  
    61.             if (p.value()< nMinCost)  
    62.             {  
    63.                 nMinCost = p.value();  
    64.                 pMinObj = p.key();  
    65.             }  
    66.         }  
    67.         if (pMinObj)  
    68.         {  
    69.             m_map_busy_source_counter[data_source] = 1;  
    70.             m_map_busy_source_task[data_source] = pMinObj;  
    71.         }  
    72.     }  
    73.     if (pMinObj)  
    74.     {  
    75.         m_map_Tasks[pMinObj]++;  
    76.         emit do_task(data_source,data,pMinObj);  
    77.     }  
    78. }  
    79. void QGHThreadEngine::on_sig_process_finished(QObject * data_source)  
    80. {  
    81.     if (m_map_Tasks.find(sender())!=m_map_Tasks.end())  
    82.     {  
    83.         m_map_Tasks[sender()]--;  
    84.     }  
    85.     if (m_map_busy_source_counter.find(data_source)!=m_map_busy_source_counter.end())  
    86.     {  
    87.         m_map_busy_source_counter[data_source]--;  
    88.         if (m_map_busy_source_counter[data_source]<=0)  
    89.         {  
    90.             m_map_busy_source_counter.remove(data_source);  
    91.             m_map_busy_source_task.remove(data_source);  
    92.         }  
    93.     }  
    94. }     

    用于绑定的 qghthreadobject.h

    [cpp] view plain copy
     
    1. #ifndef QGHTHREADOBJECT_H  
    2. #define QGHTHREADOBJECT_H  
    3. #include <QObject>  
    4. #include "qghthreadtaskitem.h"  
    5. //用于在子线程内具体承担事件循环的类,用户无需重载  
    6. class QGHThreadObject:public QObject  
    7. {  
    8.     Q_OBJECT  
    9.   
    10. public:  
    11.     QGHThreadObject(QObject *parent,QGHThreadTaskItem * pThreadTaskItem);  
    12.     ~QGHThreadObject();  
    13. public:  
    14.     void SetThreadTaskItem(QGHThreadTaskItem * pThreadTaskItem);  
    15. public slots:  
    16.     //************************************  
    17.     // Method:    process  
    18.     // FullName:  QGHThreadObject::process  
    19.     // Access:    public   
    20.     // Returns:   void  
    21.     // Qualifier:  
    22.     // Parameter: QObject *     任务来源 (相同任务源的任务,在队列非空时会被安排到同一个线程处理,以确保对相同源的FIFO)  
    23.     // Parameter: QByteArray    任务体   
    24.     // Parameter: QObject *     处理任务的线程对象(QGHThreadObject)  
    25.     //************************************  
    26.     void process(QObject * data_source, const QByteArray &data,QObject * target);  
    27. private:  
    28.     QGHThreadTaskItem * m_pThreadTaskItem;  
    29. signals:  
    30.     //信号,表示一次处理已经完成。QGHThreadEngine捕获该信号,管理data_source的 FIFO 顺序  
    31.     void sig_process_finished(QObject * data_source);  
    32. };  
    33. #endif  


    相应实现qghthreadobject.cpp

    [cpp] view plain copy
     
    1. #include "qghthreadobject.h"  
    2. #include <assert.h>  
    3.   
    4. QGHThreadObject::QGHThreadObject(QObject *parent,QGHThreadTaskItem * pThreadTaskItem)  
    5.     : QObject(parent),  
    6.     m_pThreadTaskItem(pThreadTaskItem)  
    7. {  
    8.     assert(pThreadTaskItem!=NULL);  
    9.   
    10. }  
    11.   
    12. QGHThreadObject::~QGHThreadObject()  
    13. {  
    14. }  
    15. void QGHThreadObject::process(QObject * data_source, const QByteArray &data,QObject * target)  
    16. {  
    17.     if (target==this)  
    18.     {  
    19.         m_pThreadTaskItem->run(data_source,data);  
    20.         emit sig_process_finished(data_source);  
    21.     }  
    22. }  
    23.   
    24. void QGHThreadObject::SetThreadTaskItem(QGHThreadTaskItem * pThreadTaskItem)  
    25. {  
    26.     assert(pThreadTaskItem!=NULL);  
    27.     m_pThreadTaskItem = pThreadTaskItem;  
    28. }  


    最后,是供用户重载的实际处理方法的纯虚基类qghthreadtaskitem.h

    [cpp] view plain copy
     
    1. #ifndef QGHTHREADTASKITEM_H  
    2. #define QGHTHREADTASKITEM_H  
    3. #include <QObject>  
    4. //用户重载该类,实现自定义方法的线程池调用  
    5. class QGHThreadTaskItem:public QObject  
    6. {  
    7.     Q_OBJECT  
    8.   
    9. public:  
    10.     QGHThreadTaskItem(QObject *parent);  
    11.     ~QGHThreadTaskItem();  
    12. public:  
    13.     virtual void run(QObject * task_source, const QByteArray & data_array) = 0;  
    14.   
    15. };  
    16. #endif  


    下次,继续写如何实现一个TCP链路,让这个线程池活起来

    http://blog.csdn.net/goldenhawking/article/details/7854413

  • 相关阅读:
    extjs 表单显示控制
    windows net user
    ORACLE截取时间
    oracle to_timestamp
    oracle to_date
    ext numberfield小数模式
    ext 仅文字field
    extjs 占位字段
    [转]CPU的位数与操作系统的位数的区别
    32位的Win7系统下安装64位的Sql Sever?
  • 原文地址:https://www.cnblogs.com/findumars/p/5634519.html
Copyright © 2011-2022 走看看