zoukankan      html  css  js  c++  java
  • 简单的线程消息队列实现

    1. 线程使用场景
    (1)流水线方式。根据业务特点,将一个流程的处理分割成多个线程,形成流水线的处理方式。产生的结果:延长单一流程的处理时间,提高系统整体的吞吐能力。
    (2)线程池方式。针对处理时间比较长且没有内蕴状态的线程,使用线程池方式分流消息,加快对线程消息的处理,避免其成为系统瓶颈。
    线程使用的关键是线程消息队列、线程锁、智能指针的使用。其中以线程消息队列最为重要。

    2. 线程消息队列描述
    所谓线程消息队列,就是一个普通的循环队列加上“多生产者-单(多)消费者的存/取操作”。流水线方式中的线程是单消费者,线程池方式中的线程是多消费者。
    为了后文更好的描述问题,作如下说明:
    (1)假定循环队列queue中, 入队操作put_queue, 出队操作get_queue。
    (2)生产者消费者:生产者线程生产消息,放在一个空缓冲区中,供消费者线程消费,生产者生产消息(put_queue),如果缓冲区满,则被阻塞,消费者消费消息(get_queue),如果缓冲区空,则被阻塞。线程消息队列就是生产者消费者问题中的缓冲区,而它的生产者是不限定的,任何线程都可以作为生产者向其中进行put_queue操作,消费线程则可能是一个,也可能是多个。因此对循环队列的任何操作都要加锁,以保证线程安全。

    3. 线程相关的操作
    (1)pthread_t类型的创建、属性创建设置等。
    这类具体可以: man pthread_creat; man pthread_attr_init; man pthread_detach; man pthread_join等查看
    (2)pthread_mutex_t类型的操作。
    这类具体可以: man pthread_mutex_init可以看到所有相关的操作。
    (3)pthread_cond_t类型的操作。man pthread_cond_init。pthread_cond_t的wait和signal操作一定要和pthread_mutex_t的lock、unlock配合使用。类似于此:

    4. linux的线程库
    2.6之后的内核的默认使用的是redhat公司的NPTL(原生posix线程库),以前内核使用的是LinuxThreads库,两者的简单介绍可以看http://www.ibm.com/developerworks/cn/linux/l-threading.html。不过对于应用者,分析两者的区别和优劣也没什么大意义。这里特别提下NPTL的futex机制。借助该机制,pthread_mutex的性能大大提高,只要不进入竞争态,进程就不会陷入内核态。这点可以自己写示例程序,通过strace -c 跟踪进程的系统调用,另外还可以证实总是进入内核态的操作有pthread_cond_signal和sem_post。

    5. 通过上面的分析,我们可以有如下结论:
    (1)减少pthread_cond_signal和sem_post的调用,只在有必要的时候调用;
    (2)尽量避免pthread_mutex进入竞争态。增大消息队列的大小,可以有效减少竞态条件的出现。

    6. 实用的线程消息队列实现(msg_queue.h)

    #include <stdio.h>
    #include <unistd.h>
    #include <stdlib.h>
    #include <pthread.h>
    
    pthread_mutex_t mux;
    pthread_cond_t cond_get, cond_put;
    
    struct msg_queue {
        void** buffer; // 缓冲数据, .buffer = msg
        int size; // 队列大小,使用的时候给出稍大的size,可以减少进入内核态的操作
        int lget; // 取队列数据的偏移量
        int lput; // 放队列数据的偏移量
        int nData; // 队列中数据的个数,用来判断队列满/空
        int nFullThread; // 由于队列满而阻塞在put_queue的线程个数
        int nEmptyThread; // 由于队列空而阻塞在get_queue的线程个数
    };
    
    void* get_queue(struct msg_queue *q){
        void* data = NULL;
        pthread_mutex_lock(&mux);
        while(q->lget == q->lput && 0 == q->nData){
            // 此处循环判断的原因是:假设2个消费者线程在get_queue阻塞,然后两者都被激活,
            // 而其中一个线程运行比较块,快速消耗了2个数据,另一个线程醒来的时候已
            // 经没有新数据可以消耗了。这种情况是有可能的:比如,其它生产者线程快速
            // 调用put_queue两次,如果有2个线程在get_queue处阻塞,就会被同时激活,
            // 而完全有可能,其中一个被激活的线程获取到了cpu,快速处理了2个消息。
    
            // 对于循环队列,如果lget与lput相等,那么只有两种情况,
            // 1:nData不为0,队列满
            // 2:nData为0,队列空
            q->nEmptyThread++;
            pthread_cond_wait(&cond_get, &mux);
            q->nEmptyThread--;
        }
    #ifdef DEBUG
        printf("get data! lget:%d", q->lget);
    #endif
        data = (q->buffer)[q->lget++];
        if(q->lget == q->size){
            // queue用作循环队列
            q->lget = 0;
        }
        q->nData--;
    #ifdef DEBUG
        printf(" nData:%d\n", q->nData);
    #endif
        if(q->nFullThread){
            // 仅在必要时才调用pthread_cond_signal, 尽量少陷入内核态
            pthread_cond_signal(&cond_put);
        }
        pthread_mutex_unlock(&mux);
        return data;
    }
    
    void put_queue(struct msg_queue *q, void* data){
        pthread_mutex_lock(&mux);
        while(q->lget == q->lput && q->nData){
            q->nFullThread++;
            pthread_cond_wait(&cond_put, &mux);
            q->nFullThread--;
        }
    #ifdef DEBUG
        printf("put data! lput:%d", q->lput);
    #endif
        (q->buffer)[q->lput++] = data;
        if(q->lput == q->size){
            q->lput = 0;
        }
        q->nData++;
    #ifdef DEBUG
        printf(" nData:%d\n", q->nData);
    #endif
        if(q->nEmptyThread){
            pthread_cond_signal(&cond_get);
        }
        pthread_mutex_unlock(&mux);
    }

    7. demo程序(msg_queue.c)

    #include "msg_queue.h"
    struct msg_queue queue = {NULL, 10, 0, 0, 0, 0, 0};
    
    void * produce(void * arg)
    {
        pthread_detach(pthread_self());
        int i=0;
        while(1){
            put_queue(&queue, (void*)i++);
        }
    }
    
    void *consume(void *arg)
    {
        int data;
        while(1){
            data = (int)(get_queue(&queue));
        }
    }
    
    int main()
    {   
        pthread_t pid;
        int i=0;
    
        pthread_mutex_init(&mux, 0);
        pthread_cond_init(&cond_get, 0);
        pthread_cond_init(&cond_put, 0);
    
        queue.buffer = malloc(queue.size * sizeof(void*));
        if(queue.buffer == NULL){
            printf("malloc failed!\n");
            exit(-1);
        }
    
        pthread_create(&pid, 0, produce, 0);
        pthread_create(&pid, 0, produce, 0);
        pthread_create(&pid, 0, produce, 0);
        pthread_create(&pid, 0, consume, 0);
        pthread_create(&pid, 0, consume, 0);
        pthread_create(&pid, 0, consume, 0);
    
        sleep(60);
    
        free(queue.buffer);
        pthread_mutex_destroy(&mux);
        pthread_cond_destroy(&cond_get);
        pthread_cond_destroy(&cond_put);
    }

    Reference: http://www.cppblog.com/CppExplore/archive/2008/01/15/41175.html

  • 相关阅读:
    asp.net的decimal保留两位小数
    由于管理员设置的策略,该磁盘处于脱机状态-Win 2008 R2
    论大公司的通病和缺点
    sql server删除数据后空间无变化处理方法
    sql server压缩数据库和日志文件
    SQL千万级数据设计和优化
    SQL Server索引怎么用
    在电脑上测试手机网站
    asp.net实现GZip压缩和GZip解压
    WebService教程和分析
  • 原文地址:https://www.cnblogs.com/lijingcheng/p/4454876.html
Copyright © 2011-2022 走看看