zoukankan      html  css  js  c++  java
  • 简单多线程拷贝单文件v2.1

    相对于《简单多线程拷贝单文件v2》中,将文件下载的任务分块也作为线程,可以动态添加文件块任务。

    即是生产者与消费者模型。

    用了本Blog的《消息队列的实现》中的实现。

    用了《struct 初始化》中提到的参考代码方式。

    作为基类的msg_block_t。

    typedef struct ares_msg_block
    {
    pthread_t pid;
    enum msg_object_type msg_type;
    void *data;///<*用于扩展的私有数据指针
    }msg_block_t;

    继承的msg_block_t的msg_thread_block_t

    typedef struct msg_thread_block
    {
    msg_block_t parent; ///<*parent
    size_t start_position; ///<*文件的写入起始位置
    size_t block_size; ///<*本次线程执行写入的大小
    }msg_thread_block_t;

    消息邮箱的定义,其实现参考《消息队列的实现》。

    typedef struct ares_msg_box
    {
    pthread_cond_t not_full;
    pthread_cond_t not_empty;
    pthread_mutex_t mutex;
    //消息的循环队列
    size_t first;
    size_t last;
    size_t size;
    size_t nready; ///多少个消息,判断满与空
    msg_block_t * msg_array[0]; ///柔性数组
    }msg_box_t;

    相对于v2,添加一个新的关于任务的结构 thread_task_block_t ,其next指针表明,可能有多个任务,做扩展用,每个任务自带一个邮箱的指针。

    typedef struct thread_task_block
    {
    int infd;
    int outfd;
    size_t file_size;
    size_t thread_cnt;
    msg_box_t *mbox;
    int finished;
    struct thread_task_block *next;
    }thread_task_block_t;

    有三个函数

    int thread_block_copy(msg_thread_block_t *block,char *buf,size_t len);
    void *thread_task_split(void *arg);
    void *thread_copy_fn(void *arg);

    thread_block_copy 是《简单多线程拷贝单文件v2》中的thread_copy_fn改写的,在此不贴代码了。

    但注意其声明中,添加了一个len的参数。

    因为write的时候,写入的最后一个参数为sizeof(buf)——此时应该修改为len——当数组作为函数的参数传入函数内后,

    此时sizeof(buf) = 4 (IA32-x86切记)。

    thread_task_split 是作为生产者,将下载的单文件任务分解成为单个线程下载的线程块。

    代码流程不难理解,new一个消息,post到邮箱中,然后sleep下。


    thread_copy_fn 是消费者,将调用thread_block_copy。

    从邮箱中fetch到邮箱中,然后调用thread_block_copy 执行拷贝线程任务。

    对于终止条件的判断,做了双重判断,应该可以有改进的余地——可能需要信号量的支持。

    while(!(tblock->finished && msgbox_empty(tblock->mbox)))

    生产者线程

    thread_task_split
    void *thread_task_split(void *arg)
    {
    thread_task_block_t *tblock = (thread_task_block_t *)arg;
    size_t block_size = tblock->file_size /tblock->thread_cnt;
    size_t task_size;
    int i = 0;
    printf("At split thread \n");
    thread_task_block_printf(tblock);
    for(; i <= tblock->thread_cnt;++i)
    {
    msg_thread_block_t *mblock =\
    (msg_thread_block_t *)msgblock_new(MSG_Object_Thread,pthread_self());
    if(i == tblock->thread_cnt)
    task_size = tblock->file_size % tblock->thread_cnt;
    else
    task_size = block_size;
    msg_thread_block_init(mblock,tblock,i * block_size,task_size);
    msg_thread_block_printf(mblock);
    msgbox_post(tblock->mbox,&mblock->parent);
    msgbox_printf(tblock->mbox);
    sleep(1);
    }
    tblock->finished = 1;
    printf("#####Post Thread exit %ld#####\n",pthread_self());
    pthread_exit(NULL);
    }

    消费者线程

    thread_copy_fn
    void *thread_copy_fn(void *arg)
    {
    thread_task_block_t *tblock = (thread_task_block_t *)arg;
    //msg_box_t *mbox = tblock->mbox;
    char buf[THREADS_BUFF_SIZE];
    int ret;
    struct msg_thread_block *block;
    thread_task_block_printf(tblock);
    //while((!(tblock->finished))||(!msgbox_empty(mbox)))
    while(!(tblock->finished && msgbox_empty(tblock->mbox)))
    {
    //printf("###at copy $$$$$$$$$$$\n");
    msgbox_printf(tblock->mbox);
    block = (msg_thread_block_t *)msgbox_fetch(tblock->mbox);
    msg_thread_block_printf(block);
    ret = thread_block_copy(block,buf,sizeof(buf));
    msgblock_free(&block->parent);
    sleep(1);
    //printf("End of copy ########\n");
    }//end while
    printf("#####Copy Thread exit %ld#####\n",pthread_self());
    pthread_exit(NULL);
    }

    main测试函数

    main
    nt main(int argc,char *argv[])
    {
    if(argc < 3)
    {
    usage();
    return -1;
    }
    /*msg_box_t mbox;

    msgbox_init(&mbox,MBOX_SIZE);
    */
    msg_box_t *mbox = msgbox_new(MBOX_SIZE);
    //获取任务队列
    thread_task_block_t tblock;
    thread_task_block_init(&tblock,mbox,argv[1],argv[2]);
    thread_task_block_printf(&tblock);
    /**
    消息分块线程
    */
    pthread_t task_split;
    pthread_create(&task_split,NULL,thread_task_split,&tblock);

    size_t thread_size = get_task_threadcnt(&tblock);
    pthread_t ptid[thread_size];
    ///创建线程
    printf("#######At main#########\n");
    thread_task_block_printf(&tblock);
    int i ;
    for(i = 0 ; i < thread_size; ++i)
    {
    pthread_create(&ptid[i],NULL,thread_copy_fn,&tblock);
    }
    ///线程Join
    for(i = 0 ; i < thread_size; ++i)
    {
    pthread_join(ptid[i],NULL);
    }
    pthread_join(task_split,NULL);
    thread_task_block_destroy(&tblock);
    //msgbox_destroy(&mbox);
    msgbox_free(mbox);
    return 0;
    }


    由于在msg_box中用到了柔性数组,所以必须new生成msg_box,这也是写《柔性数组的结构如何只能堆上生成》的原因——因为出过错误,上过当。

    实现的问题:

    由于fetch实现是一直等待邮箱有消息,所以出现了当生产者线程结束,只能有一个消费者线程接受,其它线程都在等待fetch中,只能强制终止程序,有什么方案能够通知其也结束呢——这几天一直考虑中。

    输入a.out list.c list.c.kb 得到如下

    成功拷贝。



  • 相关阅读:
    hdu1124 Factorial (求解一个数的阶乘中出现多少个0)
    SQL on Linux: Erro Unable to read instance id from /var/opt/mssql/.system/instance_id
    Error during WebSocket handshake: Unexpected response code: 200 问题处理
    CUP计算资源争抢通过IIS启用处理器关联解决
    ABP在MultipleDbContext也就是多库的场景下发布后异常“Could not find content root folder”问题处理
    ABP运行Login failed for user 'IIS APPPOOL XXXXX Reason: Could not find a login matching the name provided问题解决
    vs2017cpu占用过高解决方案
    docker查看挂载目录Volume
    windows 10安装docker一直挂起在Installing Components and Removing Files
    ABP vue+asp.net core yarn serve报 Cannot find module 'typescript/package.json错误
  • 原文地址:https://www.cnblogs.com/westfly/p/2414324.html
Copyright © 2011-2022 走看看