zoukankan      html  css  js  c++  java
  • 基于CAS实现无锁结构

    杨乾成

    一、题目要求

      基于CAS(Compare and Swap)实现一个无锁结构,可考虑queue,stack,hashmap,freelist等。

    • 能够支持多个线程同时访问该结构
    • 不能有任何锁操作,且操作是线程安全的
    • 对上述的内存单元进行管理,至少malloc与free一次。

    二、数据结构

      看到题目有一说一,不知道怎么下手,那就google一下先。稍微了解了一下CAS,原准备使用STL模板的队列,后来发现实现题目要求似乎得再Queue的插入和删除函数里面具体实现,于是决定自己实现吧(省事失败)。

      先简要介绍一下CAS算法,我先po一下wiki百科上的代码介绍:

    bool compare_and_swap (int *accum, int *dest, int newval)
    {
      if ( *accum == *dest ) {
          *dest = newval;
          return true;
      }
      return false;
    }

      这段代码的思想大概就是看看*dest里面的值和*accum里面的值是不是一样,如果一样,则将新数据写入。个人觉得用数据库的“脏数据”概念来形容很合适:即不允许往存有脏数据的缓冲区写入数值。在数据库中,常用的方法是两段锁,但是这里不让加锁。突然想起来大二上学期修JAVA里面讲到的同步信号量,即每当对数据结构进行操作时,就对=改变信号量的值,比如只有信号量值为0的时候才允许更改变量,修改时使用者将信号量置一,但是仔细一想,这似乎也是锁的思想。

      遇事不决,再看看,害,现成的CAS都在这里,我直接用不就完事了。回归正题,CAS思想总结如下:线程操作钱保存当前版本号,也就是记录当前数据的状态的标志;当完成操作时,比较自己持有的版本号与当前版本号是否相符,相符则允许写入,否则获取的新的版本号再来一轮。就好像是大家都听说身边有个好看的姑娘处于“单身版本”,各自回去准备准备,第一个到的获得了优先权,后来的只能记住另一个“单身”的姑娘,再回家准备展开竞争,直到自己完成任务。

    三、代码实现

    #include<stdio.h>
    #include<pthread.h>
    #include <unistd.h>
    #include <assert.h>
    #include<stdlib.h>
    #define THREAD_NUMBER 4
    //队节点
    typedef struct QNode
    {
        int data;
        struct QNode *next;
    }QNode, *QueuePtr;
    //链队结构
    typedef struct LinkQueue
    {
        QueuePtr front;
        QueuePtr rear;
    }LinkQueue;
    
    int is_Empty(LinkQueue *q);
    void *pushThread(void *arg);
    void *popThread(void *arg);
    void init_Queue(LinkQueue *q);void cas_push(LinkQueue *q, int e);
    void cas_push(LinkQueue *q, int e);
    int cas_pop(LinkQueue *q, int *e);
    void show(LinkQueue *q);
    
    int main()
    {
        LinkQueue que;
        init_Queue(&que);
        int i;
    
        //创建四个进程
        pthread_t threadArr[THREAD_NUMBER];    //线程ID数组
        for (i = 0; i < THREAD_NUMBER; ++i)
        {
            pthread_create(&threadArr[i], NULL, pushThread, (void *)&que);    //设置线程入口
        }
        //等待线程执行结束
        for (i = 0; i < THREAD_NUMBER; ++i)
        {
            pthread_join(threadArr[i], NULL);
        }
        show(&que);
    
        //创建四个进线程然后执行出队操作
        for (i = 0; i < THREAD_NUMBER; ++i)
        {
            pthread_create(&threadArr[i], NULL, popThread, (void *)&que);
        }
        for (i = 0; i < THREAD_NUMBER; ++i)
        {
            pthread_join(threadArr[i], NULL);
        }
        exit(EXIT_SUCCESS);
     }
    
    void *pushThread(void *arg)
    {
        printf("进程%ld开始创建队列:
    ",pthread_self());
        LinkQueue *quePtr =(LinkQueue *)arg;
        int i;
        for(i= 0;i<5;++i)
        {       
            cas_push(quePtr, i);    
        }
        printf("
    ");
        printf("完成队列创建操作!
    ");
        pthread_exit(NULL);     //结束当前进程
    }
    
    void *popThread(void *arg)
    {
        printf("%ld开始出队操作:
    ",pthread_self());
        LinkQueue *quePtr=(LinkQueue *)arg;
        int temp;
        int res;
        while(1)
        {
            res = cas_pop(quePtr,&temp);
            if(!res)
            {
                break;    
            }
            printf("PID:%ld	pop%d
    ",pthread_self(),temp);
        }
        
    }
    
    //初始化队列
    void init_Queue(LinkQueue *q)
    {
        q->front=q->rear=(QNode *)malloc(sizeof(QNode));
        q->front->next=NULL;
    }
    
    //判断队列是否为空
    int is_Empty(LinkQueue *q)
    {
        if(q->front->next ==NULL)
        {
            return 1;
        }
        return 0;
    }
    
    
    void cas_push(LinkQueue *q, int e) 
    {
         QueuePtr newNode = (QueuePtr)malloc(sizeof(QNode));
         newNode->data = e;
         newNode->next = NULL;
         QueuePtr tmp; 
         //不断尝试将新节点进队
         do
         {
             // sleep(1);
              tmp = q->rear;
         }while (!__sync_bool_compare_and_swap((long *)(&(tmp->next)), NULL, (long)newNode));    //判断此时队尾是否被改变
            //就是在这里控制对队列的原子化操作
         sleep(1);
         q->rear = newNode;
    } 
        
    int cas_pop(LinkQueue *q, int *e)
    {
         QueuePtr tmp;
         do
         {
              if (is_Empty(q))
              {
                   return(0);     //队列已空,结束
              } 
              tmp = q->front->next; 
         } while (!__sync_bool_compare_and_swap((long *)(&(q->front->next)), (long)tmp, (long)tmp->next));
         sleep(1);     //通过休眠一秒,模拟线程操作时间,防止过快操作导致结果不明显
         *e = tmp->data;
         free(tmp); 
         return 1;
    }
    
    void show(LinkQueue *q) 
    {
         printf("Current queue:
    "); 
         QueuePtr tmp = q->front->next; 
         while (tmp) 
         {
              printf("%d ", tmp->data);
              tmp = tmp->next; 
         } 
         printf("
    ");
    }

    上面就是这次的代码实现。

    四、运行结果

    使用指令:

    gcc CAS_Structure.c -o result.out -lpthread

    将源码进行编译,此处要注意后面一定要加上-lpthread,不然会提示“pthread.h”头文件报错问题。

    我创建了四个进程,竞争往队尾插入数据。

      然后竞争删除完队列的数据,直到队列为空。

      从截图可以看出来,四个进程竞争着抢占处理器,直到完成创建和删除节点的任务。这里面要注意的是,因为实际处理就是插入节点,刚开始做的时候我没有插入sleep语句,所以最后的结果看起来像是顺序完成,所以要模拟实验,一定要注意加上sleep语句模拟进程处理的时间,防止第二个进程还没创建完,第一个进程早已完成了所有事务。

  • 相关阅读:
    Java基础面试题18--单例设计模式
    Error:The method setInputPaths(JobConf, String) in the type FileInputFormat is not
    java基础面试题17--类的加载、对象的加载流程
    隐私策略-en
    隐私策略-ch
    Java 11 相关
    kali BugFix
    bugFix
    xcode 快捷键
    QT 静态编译 windows&mac 版本
  • 原文地址:https://www.cnblogs.com/blogMorningStar/p/12041033.html
Copyright © 2011-2022 走看看