zoukankan      html  css  js  c++  java
  • 【原创实现】C 多线程入门Demo CAS Block 2种模式实现

    分Cas和Block模式实现了demo, 供入门学习使用,代码全部是远程实现。

    直接上代码:

    /*
     ============================================================================
     Name        : Producer.c
     Author      : qionghui.fang
     Version     : 1.0
     Date        : 2019年6月11日 下午2:32:30
     Copyright   : Your copyright notice
     Description : Main
     ============================================================================
     */
    
    #include <stdio.h>
    #include <stdlib.h>
    #include <pthread.h>
    #include <semaphore.h>
    #include <unistd.h>
    
    #include "header.h"
    
    
    int main(void) {
    
        printf("Start...
    ");
    
        // 在阻塞模式下初始化信号量
        if(TC > 3){
            pthread_mutex_init(&mutex_add,NULL);
            pthread_mutex_init(&mutex_take,NULL);
        }
    
        printf("Producer1 Thread init ...
    ");
        pthread_t producer1;
        ThreadParam p1Params = {"Producer1", &mutex_add};
        pthread_create(&producer1, NULL, (void *)addBean, &p1Params);
    
        printf("Producer2 Thread init ...
    ");
        pthread_t producer2;
        ThreadParam p2Params = { "Producer2", &mutex_add};
        pthread_create(&producer2, NULL, (void *)addBean, &p2Params);
    
    
        printf("Consumer1 Thread init ...
    ");
        pthread_t consumer1;
        ThreadParam c1Params = { "Consumer1", &mutex_take};
        pthread_create(&consumer1, NULL, (void *)takeBean, &c1Params);
    
        printf("Consumer2 Thread init ...
    ");
        pthread_t consumer2;
        ThreadParam c2Params = { "Consumer2", &mutex_take};
        pthread_create(&consumer2, NULL, (void *)takeBean, &c2Params);
    
        printf("Consumer3 Thread init ...
    ");
        pthread_t consumer3;
        ThreadParam c3Params = { "Consumer3", &mutex_take};
        pthread_create(&consumer3, NULL, (void *)takeBean, &c3Params);
    
    
        pthread_join(producer1, NULL);
        printf("
    
    here 1");
        pthread_join(producer2, NULL);
        printf("
    here2");
        pthread_join(consumer1, NULL);
        pthread_join(consumer2, NULL);
        pthread_join(consumer3, NULL);
        printf("
    
    here3");
    
        printf("
    End.");
    
        return EXIT_SUCCESS;
    }
    /*
     * header.h
     *
     *  Created on: 2019年6月11日
     *      Author: fangdahui
     */
    
    #include <pthread.h>
    #include <semaphore.h>
    
    #ifndef HEADER_H_
    #define HEADER_H_
    
    
    typedef int boolean;
    #define TRUE 1
    #define FALSE 0
    
    // 队列数量 BeanCount
    #define BC 10
    // thread count: <=3 为CAS模式 大于3为阻塞模式
    #define TC 3
    
    typedef struct bean{
        int no;
        char* msg;
    }Bean_t;
    
    typedef struct threadParam{
        char* threadName;
        pthread_mutex_t* mutex;
    }ThreadParam;
    
    
    
    
    //  队列
    Bean_t beanList[100];
    volatile int beanListSize;
    
    
    
    
    // 原子操作 cmpxchg
    boolean cmpxchg(volatile int* dest, int compareValue, int newValue);
    
    
    
    
    // 加入元素
    void *addBean(ThreadParam* params);
    // 互斥信号
    pthread_mutex_t mutex_add;
    
    
    // 获取元素
    void *takeBean(ThreadParam* params);
    // 互斥信号
    pthread_mutex_t mutex_take;
    
    
    
    #endif /* HEADER_H_ */
    /*
     ============================================================================
     Name        : Atom.c
     Author      : qionghui.fang
     Version     : 1.0
     Date        : 2019年6月12日 上午8:53:35 
     Copyright   : Your copyright notice
     Description : CAS原子实现
     ============================================================================
     */
    
    
    
    #include "header.h"
    
    // Adding a lock prefix to an instruction on MP machine
    #define LOCK_IF_MP(mp) "cmp $0, " #mp "; je 1f; lock; 1: "
    
    boolean cmpxchg(volatile int* dest, int compareValue, int newValue){
    
        // 暂存旧值
        int oldValue = *dest;
    
        // 默认多核
        boolean mp = TRUE;
    
         // 调用 cmpxchgl 指令
          __asm__ volatile (LOCK_IF_MP(%4) "cmpxchgl %1,(%3)"
                            : "=a" (newValue)
                            : "r" (newValue), "a" (compareValue), "r" (dest), "r" (mp)
                            : "cc", "memory");
    
          // 调用结束后 若地址目标值发生变更,则说明替换成功
          if(*dest != oldValue){
              return TRUE;
          }
    
          // 其它为失败
          return FALSE;
    }
    /*
     ============================================================================
     Name        : Producer.c
     Author      : qionghui.fang
     Version     : 1.0
     Date        : 2019年6月11日 下午2:32:30 
     Copyright   : Your copyright notice
     Description : 生产者
     ============================================================================
     */
    
    #include "header.h"
    #include <stdio.h>
    #include <unistd.h>
    #include <semaphore.h>
    
    void *addBean(ThreadParam * params){
    
        while(TRUE){
    
            // 生产1次 休眠1秒
            sleep(1);
    
            // 走CAS模式
            if(TC <= 3){
                int oldSize = beanListSize;
                int newSize = oldSize + 1;
    
                if(newSize >= BC){
                    printf("
    [CasMode]%s: 队列已达最大长度,本次不加入,当前长度为:%d", params->threadName, beanListSize);
                    sleep(1);
                    //continue;
                    break;
                }
    
                if(cmpxchg(&beanListSize, oldSize, newSize)){
                    // 队列长度+1
                    beanListSize = newSize;
                    Bean_t bean = {beanListSize, "beanListSize"};
                    beanList[beanListSize] = bean;
    
                    printf("
    [CasMode]%s:  CAS成功,当前长度为:%d", params->threadName, beanListSize);
                }
                else{
                    printf("
    [CasMode]%s: CAS失败,进行下一轮尝试,当前长度为:%d", params->threadName, beanListSize);
                }
            }
    
            else{
                // 加锁
                pthread_mutex_lock(params->mutex);
    
                beanListSize ++;
    
                if(beanListSize >= 10){
                    printf("
    [BlockMode]%s: 队列已达最大长度,本次不加入,当前长度为:%d", params->threadName, beanListSize);
                    sleep(1);
                    //continue;
                    //释放锁
                    pthread_mutex_unlock(params->mutex);
                    break;
                }
    
                Bean_t bean = {beanListSize, "beanListSize"};
                beanList[beanListSize] = bean;
    
                printf("
    [BlockMode]%s: 加锁成功,添加元素,当前长度为:%d", params->threadName, beanListSize);
    
                //释放锁
                pthread_mutex_unlock(params->mutex);
            }
        }
    
        printf("
    %s: 线程终止,当前长度为:%d
    ", params->threadName, beanListSize);
        return 0;
    }
    /*
     ============================================================================
     Name        : Consumer.c
     Author      : qionghui.fang
     Version     : 1.0
     Date        : 2019年6月21日 上午9:56:30 
     Copyright   : Your copyright notice
     Description : 消费者
     ============================================================================
     */
    
    #include "header.h"
    #include <stdio.h>
    #include <unistd.h>
    #include <semaphore.h>
    
    
    void *takeBean(ThreadParam* params){
    
        sleep(5);
    
        while(TRUE){
            // 消费一次 休眠2秒
            sleep(2);
    
            // 走CAS模式
            if(TC <= 3){
    
                int oldSize = beanListSize;
                int newSize = oldSize - 1;
    
                if(newSize < 1){
                    printf("
    [CasMode]%s: 消息已消费完,本次不能消费,当前长度为:%d", params->threadName, beanListSize);
                    //continue;
                    break;
                }
    
                if(cmpxchg(&beanListSize, oldSize, newSize)){
                    // 队列长度-1
                    beanListSize = newSize;
                    printf("
    [CasMode]__%s:  CAS成功,当前长度为:%d",params->threadName, beanListSize);
    
                    int i;
                    for(i=1; i<BC;i++){
                        Bean_t *bean = &beanList[i];
                        if(bean->no != 0){
                            bean->no = 0;
                            bean->msg = NULL;
                            break;
                        }
                    }
                }
                else{
                    printf("
    [CasMode]%s:CAS失败,进行下一轮尝试,当前长度为:%d", params->threadName, beanListSize);
                }
            }
            else{
                // 加锁
                pthread_mutex_lock(params->mutex);
    
                beanListSize --;
    
                if(beanListSize < 1){
                    printf("
    [BlockMode]__%s: 消息已消费完,本次不能消费,当前长度为:%d", params->threadName, beanListSize);
                    sleep(1);
                    //continue;
                    //释放锁
                    pthread_mutex_unlock(params->mutex);
                    break;
                }
    
                int i;
                for(i=1; i<100;i++){
                    Bean_t *bean = &beanList[i];
                    if(bean->no != 0){
                        bean->no = 0;
                        bean->msg = NULL;
                        break;
                    }
                }
    
                printf("
    [BlockMode]__%s: 加锁成功, 消费元素,当前长度为:%d", params->threadName, beanListSize);
    
                //释放锁
                pthread_mutex_unlock(params->mutex);
            }
        }
    
        printf("
    %s: 线程终止,当前长度为:%d
    ", params->threadName, beanListSize);
        return 0;
    }

    运行结果:

    CAS模式:

    阻塞Block模式(头文件里TC改为3以上,如5):

    逐行写的代码,收获不小。

    这个适用于初学者,还是要在本地跑一跑。

    祝好运。

  • 相关阅读:
    AdvStringGrid使用小结
    svn提示out of date的解决方法
    delphi之socket通讯
    Delphi的Socket编程步骤
    C++ Socket编程步骤
    centos7安装docker
    centos7安装指南
    UltraISO制作U盘启动盘
    浅谈linux 文件的三个时间
    自动配置zabbix-agent
  • 原文地址:https://www.cnblogs.com/do-your-best/p/11065866.html
Copyright © 2011-2022 走看看