zoukankan      html  css  js  c++  java
  • 自己动手实现读写锁(readwrite lock)

    书接前文自己动手实现自旋锁(spinlock)),本文将详细讲解自己动手实现读写锁(read-write lock)。

    很多时候,我们的进程并不需要改变它所访问的数据结构,它们只是以只读的方式访问某一变量或结构某字段,此时如果多个进程同时申请访问相同的数据,为了效率起见,我们可以让这些进程同时访问它们所需要的数据,而不需要进行加锁和解锁操作,自旋锁并不区分进程是读访问还是写访问,此时使用自旋锁是很不明智的,所以精心设计的读写锁便发挥了作用。


    读写锁(read-write lock)

    最简单的读写锁使用自旋锁来控制写访问,并用一个counter域来标识当前读者数目,

    typedef struct dumbrwlock dumbrwlock;
    struct dumbrwlock
    {
    spinlock lock;
    unsigned readers;
    };

    static void dumb_wrlock(dumbrwlock *l)
    {
    /* Get write lock */
    spin_lock(&l->lock);

    /* Wait for readers to finish */
    while (l->readers) cpu_relax();
    }

    static void dumb_wrunlock(dumbrwlock *l)
    {
    spin_unlock(&l->lock);
    }

    static int dumb_wrtrylock(dumbrwlock *l)
    {
    /* Want no readers */
    if (l->readers) return EBUSY;

    /* Try to get write lock */
    if (spin_trylock(&l->lock)) return EBUSY;

    if (l->readers)
    {
    /* Oops, a reader started */
    spin_unlock(&l->lock);
    return EBUSY;
    }

    /* Success! */
    return 0;
    }

    static void dumb_rdlock(dumbrwlock *l)
    {
    while (1)
    {
    /* Speculatively take read lock */
    atomic_inc(&l->readers);

    /* Success? */
    if (!l->lock) return;

    /* Failure - undo, and wait until we can try again */
    atomic_dec(&l->readers);
    while (l->lock) cpu_relax();
    }
    }

    static void dumb_rdunlock(dumbrwlock *l)
    {
    atomic_dec(&l->readers);
    }

    static int dumb_rdtrylock(dumbrwlock *l)
    {
    /* Speculatively take read lock */
    atomic_inc(&l->readers);

    /* Success? */
    if (!l->lock) return 0;

    /* Failure - undo */
    atomic_dec(&l->readers);

    return EBUSY;
    }

    static int dumb_rdupgradelock(dumbrwlock *l)
    {
    /* Try to convert into a write lock */
    if (spin_trylock(&l->lock)) return EBUSY;

    /* I'm no longer a reader */
    atomic_dec(&l->readers);

    /* Wait for all other readers to finish */
    while (l->readers) cpu_relax();

    return 0;
    }

    如果我们把上面实现读写锁的自旋锁算法改成ticket lock算法,则可以大大提高性能,

    typedef struct dumbtrwlock dumbtrwlock;
    struct dumbtrwlock
    {
    ticketlock lock;
    unsigned readers;
    };

    static void dumbt_wrlock(dumbtrwlock *l)
    {
    /* Get lock */
    ticket_lock(&l->lock);

    /* Wait for readers to finish */
    while (l->readers) cpu_relax();
    }

    static void dumbt_wrunlock(dumbtrwlock *l)
    {
    ticket_unlock(&l->lock);
    }

    static int dumbt_wrtrylock(dumbtrwlock *l)
    {
    /* Want no readers */
    if (l->readers) return EBUSY;

    /* Try to get write lock */
    if (ticket_trylock(&l->lock)) return EBUSY;

    if (l->readers)
    {
    /* Oops, a reader started */
    ticket_unlock(&l->lock);
    return EBUSY;
    }

    /* Success! */
    return 0;
    }

    static void dumbt_rdlock(dumbtrwlock *l)
    {
    while (1)
    {
    /* Success? */
    if (ticket_lockable(&l->lock))
    {
    /* Speculatively take read lock */
    atomic_inc(&l->readers);

    /* Success? */
    if (ticket_lockable(&l->lock)) return;

    /* Failure - undo, and wait until we can try again */
    atomic_dec(&l->readers);
    }

    while (!ticket_lockable(&l->lock)) cpu_relax();
    }
    }

    static void dumbt_rdunlock(dumbtrwlock *l)
    {
    atomic_dec(&l->readers);
    }

    static int dumbt_rdtrylock(dumbtrwlock *l)
    {
    /* Speculatively take read lock */
    atomic_inc(&l->readers);

    /* Success? */
    if (ticket_lockable(&l->lock)) return 0;

    /* Failure - undo */
    atomic_dec(&l->readers);

    return EBUSY;
    }

    static int dumbt_rdupgradelock(dumbtrwlock *l)
    {
    /* Try to convert into a write lock */
    if (ticket_trylock(&l->lock)) return EBUSY;

    /* I'm no longer a reader */
    atomic_dec(&l->readers);

    /* Wait for all other readers to finish */
    while (l->readers) cpu_relax();

    return 0;
    }

    为了减少竞争,提高执行速度,我们采用另外一种复杂的算法,该算法在ReatOS中使用,模拟Windows的slim read-write lock(SRW lock),该算法使用了等待队列,并用bitlock来控制进程对等待队列的访问,因此等待者可以在隔离的内存位置自旋,以提高扩展性,

    /* Have a wait block */
    #define SRWLOCK_WAIT 1

    /* Users are readers */
    #define SRWLOCK_SHARED 2

    /* Bit-lock for editing the wait block */
    #define SRWLOCK_LOCK 4
    #define SRWLOCK_LOCK_BIT 2

    /* Mask for the above bits */
    #define SRWLOCK_MASK 7

    /* Number of current users * 8 */
    #define SRWLOCK_USERS 8

    typedef struct srwlock srwlock;
    struct srwlock
    {
    uintptr_t p;
    };

    typedef struct srw_sw srw_sw;
    struct srw_sw
    {
    uintptr_t spin;
    srw_sw *next;
    };

    typedef struct srw_wb srw_wb;
    struct srw_wb
    {
    /* s_count is the number of shared acquirers * SRWLOCK_USERS. */
    uintptr_t s_count;

    /* Last points to the last wait block in the chain. The value
    is only valid when read from the first wait block.
    */
    srw_wb *last;

    /* Next points to the next wait block in the chain. */
    srw_wb *next;

    /* The wake chain is only valid for shared wait blocks */
    srw_sw *wake;
    srw_sw *last_shared;

    int ex;
    };

    /* Wait for control of wait block */
    static srw_wb *lock_wb(srwlock *l)
    {
    uintptr_t p;

    /* Spin on the wait block bit lock */
    while (atomic_bitsetandtest(&l->p, SRWLOCK_LOCK_BIT)) cpu_relax();

    p = l->p;
    barrier();

    if (!(p & SRWLOCK_WAIT))
    {
    /* Oops, looks like the wait block was removed. */
    atomic_clear_bit(&l->p, SRWLOCK_LOCK_BIT);
    return NULL;
    }

    return (srw_wb *)(p & ~SRWLOCK_MASK);
    }

    static void srwlock_init(srwlock *l)
    {
    l->p = 0;
    }

    static void srwlock_rdlock(srwlock *l)
    {
    srw_wb swblock;
    srw_sw sw;
    uintptr_t p;
    srw_wb *wb, *shared;

    while (1)
    {
    barrier();
    p = l->p;

    cpu_relax();

    if (!p)
    {
    /* This is a fast path, we can simply try to set the shared count to 1 */
    if (!cmpxchg(&l->p, 0, SRWLOCK_USERS | SRWLOCK_SHARED)) return;

    continue;
    }

    /* Don't interfere with locking */
    if (p & SRWLOCK_LOCK) continue;

    if (p & SRWLOCK_SHARED)
    {
    if (!(p & SRWLOCK_WAIT))
    {
    /* This is a fast path, just increment the number of current shared locks */
    if (cmpxchg(&l->p, p, p + SRWLOCK_USERS) == p) return;
    }
    else
    {
    /* There's other waiters already, lock the wait blocks and increment the shared count */
    wb = lock_wb(l);
    if (wb) break;
    }

    continue;
    }

    /* Initialize wait block */
    swblock.ex = FALSE;
    swblock.next = NULL;
    swblock.last = &swblock;
    swblock.wake = &sw;

    sw.next = NULL;
    sw.spin = 0;

    if (!(p & SRWLOCK_WAIT))
    {
    /*
    * We need to setup the first wait block.
    * Currently an exclusive lock is held, change the lock to contended mode.
    */
    swblock.s_count = SRWLOCK_USERS;
    swblock.last_shared = &sw;

    if (cmpxchg(&l->p, p, (uintptr_t)&swblock | SRWLOCK_WAIT) == p)
    {
    while (!sw.spin) cpu_relax();
    return;
    }

    continue;
    }

    /* Handle the contended but not shared case */

    /*
    * There's other waiters already, lock the wait blocks and increment the shared count.
    * If the last block in the chain is an exclusive lock, add another block.
    */
    swblock.s_count = 0;

    wb = lock_wb(l);
    if (!wb) continue;

    shared = wb->last;
    if (shared->ex)
    {
    shared->next = &swblock;
    wb->last = &swblock;

    shared = &swblock;
    }
    else
    {
    shared->last_shared->next = &sw;
    }

    shared->s_count += SRWLOCK_USERS;
    shared->last_shared = &sw;

    /* Unlock */
    barrier();
    l->p &= ~SRWLOCK_LOCK;

    /* Wait to be woken */
    while (!sw.spin) cpu_relax();

    return;
    }

    /* The contended and shared case */
    sw.next = NULL;
    sw.spin = 0;

    if (wb->ex)
    {
    /*
    * We need to setup a new wait block.
    * Although we're currently in a shared lock and we're acquiring
    * a shared lock, there are exclusive locks queued in between.
    * We need to wait until those are released.
    */
    shared = wb->last;

    if (shared->ex)
    {
    swblock.ex = FALSE;
    swblock.s_count = SRWLOCK_USERS;
    swblock.next = NULL;
    swblock.last = &swblock;
    swblock.wake = &sw;
    swblock.last_shared = &sw;

    shared->next = &swblock;
    wb->last = &swblock;
    }
    else
    {
    shared->last_shared->next = &sw;
    shared->s_count += SRWLOCK_USERS;
    shared->last_shared = &sw;
    }
    }
    else
    {
    wb->last_shared->next = &sw;
    wb->s_count += SRWLOCK_USERS;
    wb->last_shared = &sw;
    }

    /* Unlock */
    barrier();
    l->p &= ~SRWLOCK_LOCK;

    /* Wait to be woken */
    while (!sw.spin) cpu_relax();
    }


    static void srwlock_rdunlock(srwlock *l)
    {
    uintptr_t p, np;
    srw_wb *wb;
    srw_wb *next;

    while (1)
    {
    barrier();
    p = l->p;

    cpu_relax();

    if (p & SRWLOCK_WAIT)
    {
    /*
    * There's a wait block, we need to wake a pending exclusive acquirer,
    * if this is the last shared release.
    */
    wb = lock_wb(l);
    if (wb) break;

    continue;
    }

    /* Don't interfere with locking */
    if (p & SRWLOCK_LOCK) continue;

    /*
    * This is a fast path, we can simply decrement the shared
    * count and store the pointer
    */
    np = p - SRWLOCK_USERS;

    /* If we are the last reader, then the lock is unused */
    if (np == SRWLOCK_SHARED) np = 0;

    /* Try to release the lock */
    if (cmpxchg(&l->p, p, np) == p) return;
    }

    wb->s_count -= SRWLOCK_USERS;

    if (wb->s_count)
    {
    /* Unlock */
    barrier();
    l->p &= ~SRWLOCK_LOCK;
    return;
    }

    next = wb->next;
    if (next)
    {
    /*
    * There's more blocks chained, we need to update the pointers
    * in the next wait block and update the wait block pointer.
    */
    np = (uintptr_t)next | SRWLOCK_WAIT;

    next->last = wb->last;
    }
    else
    {
    /* Convert the lock to a simple exclusive lock. */
    np = SRWLOCK_USERS;
    }

    barrier();
    /* This also unlocks wb lock bit */
    l->p = np;
    barrier();
    wb->wake = (void *) 1;
    barrier();

    /* We released the lock */
    }

    static int srwlock_rdtrylock(srwlock *s)
    {
    uintptr_t p = s->p;

    barrier();

    /* This is a fast path, we can simply try to set the shared count to 1 */
    if (!p && (cmpxchg(&s->p, 0, SRWLOCK_USERS | SRWLOCK_SHARED) == 0)) return 0;

    if ((p & (SRWLOCK_SHARED | SRWLOCK_WAIT)) == SRWLOCK_SHARED)
    {
    /* This is a fast path, just increment the number of current shared locks */
    if (cmpxchg(&s->p, p, p + SRWLOCK_USERS) == p) return 0;
    }

    return EBUSY;
    }


    static void srwlock_wrlock(srwlock *l)
    {
    srw_wb swblock;
    uintptr_t p, np;

    /* Fastpath - no other readers or writers */
    if (!l->p && (!cmpxchg(&l->p, 0, SRWLOCK_USERS))) return;

    /* Initialize wait block */
    swblock.ex = TRUE;
    swblock.next = NULL;
    swblock.last = &swblock;
    swblock.wake = NULL;

    while (1)
    {
    barrier();
    p = l->p;
    cpu_relax();

    if (p & SRWLOCK_WAIT)
    {
    srw_wb *wb = lock_wb(l);
    if (!wb) continue;

    /* Complete Initialization of block */
    swblock.s_count = 0;

    wb->last->next = &swblock;
    wb->last = &swblock;

    /* Unlock */
    barrier();
    l->p &= ~SRWLOCK_LOCK;

    /* Has our wait block became the first one in the chain? */
    while (!swblock.wake) cpu_relax();

    return;
    }

    /* Fastpath - no other readers or writers */
    if (!p)
    {
    if (!cmpxchg(&l->p, 0, SRWLOCK_USERS)) return;
    continue;
    }

    /* Don't interfere with locking */
    if (p & SRWLOCK_LOCK) continue;

    /* There are no wait blocks so far, we need to add ourselves as the first wait block. */
    if (p & SRWLOCK_SHARED)
    {
    swblock.s_count = p & ~SRWLOCK_MASK;
    np = (uintptr_t)&swblock | SRWLOCK_SHARED | SRWLOCK_WAIT;
    }
    else
    {
    swblock.s_count = 0;
    np = (uintptr_t)&swblock | SRWLOCK_WAIT;
    }

    /* Try to make change */
    if (cmpxchg(&l->p, p, np) == p) break;
    }

    /* Has our wait block became the first one in the chain? */
    while (!swblock.wake) cpu_relax();
    }


    static void srwlock_wrunlock(srwlock *l)
    {
    uintptr_t p, np;
    srw_wb *wb;
    srw_wb *next;
    srw_sw *wake, *wake_next;

    while (1)
    {
    barrier();
    p = l->p;
    cpu_relax();

    if (p == SRWLOCK_USERS)
    {
    /*
    * This is the fast path, we can simply clear the SRWLOCK_USERS bit.
    * All other bits should be 0 now because this is a simple exclusive lock,
    * and no one else is waiting.
    */

    if (cmpxchg(&l->p, SRWLOCK_USERS, 0) == SRWLOCK_USERS) return;

    continue;
    }

    /* There's a wait block, we need to wake the next pending acquirer */
    wb = lock_wb(l);
    if (wb) break;
    }

    next = wb->next;
    if (next)
    {
    /*
    * There's more blocks chained, we need to update the pointers
    * in the next wait block and update the wait block pointer.
    */
    np = (uintptr_t)next | SRWLOCK_WAIT;
    if (!wb->ex)
    {
    /* Save the shared count */
    next->s_count = wb->s_count;

    np |= SRWLOCK_SHARED;
    }

    next->last = wb->last;
    }
    else
    {
    /* Convert the lock to a simple lock. */
    if (wb->ex)
    {
    np = SRWLOCK_USERS;
    }
    else
    {
    np = wb->s_count | SRWLOCK_SHARED;
    }
    }

    barrier();
    /* Also unlocks lock bit */
    l->p = np;
    barrier();

    if (wb->ex)
    {
    barrier();
    /* Notify the next waiter */
    wb->wake = (void *) 1;
    barrier();
    return;
    }

    /* We now need to wake all others required. */
    for (wake = wb->wake; wake; wake = wake_next)
    {
    barrier();
    wake_next = wake->next;
    barrier();
    wake->spin = 1;
    barrier();
    }
    }

    static int srwlock_wrtrylock(srwlock *s)
    {
    /* No other readers or writers? */
    if (!s->p && (cmpxchg(&s->p, 0, SRWLOCK_USERS) == 0)) return 0;

    return EBUSY;
    }

    另一种可能的实现是结合读者数目counter和写者状态的一些信息来设计锁,Linux内核正是使用该算法,读者优先的读写锁,

    #define RW_WAIT_BIT        0
    #define RW_WRITE_BIT 1
    #define RW_READ_BIT 2

    #define RW_WAIT 1
    #define RW_WRITE 2
    #define RW_READ 4

    typedef unsigned rwlock;

    static void wrlock(rwlock *l)
    {
    while (1)
    {
    unsigned state = *l;

    /* No readers or writers? */
    if (state < RW_WRITE)
    {
    /* Turn off RW_WAIT, and turn on RW_WRITE */
    if (cmpxchg(l, state, RW_WRITE) == state) return;

    /* Someone else got there... time to wait */
    state = *l;
    }

    /* Turn on writer wait bit */
    if (!(state & RW_WAIT)) atomic_set_bit(l, RW_WAIT_BIT);

    /* Wait until can try to take the lock */
    while (*l > RW_WAIT) cpu_relax();
    }
    }

    static void wrunlock(rwlock *l)
    {
    atomic_add(l, -RW_WRITE);
    }

    static int wrtrylock(rwlock *l)
    {
    unsigned state = *l;

    if ((state < RW_WRITE) && (cmpxchg(l, state, state + RW_WRITE) == state)) return 0;

    return EBUSY;
    }

    static void rdlock(rwlock *l)
    {
    while (1)
    {
    /* A writer exists? */
    while (*l & (RW_WAIT | RW_WRITE)) cpu_relax();

    /* Try to get read lock */
    if (!(atomic_xadd(l, RW_READ) & (RW_WAIT | RW_WRITE))) return;

    /* Undo */
    atomic_add(l, -RW_READ);
    }
    }

    static void rdunlock(rwlock *l)
    {
    atomic_add(l, -RW_READ);
    }

    static int rdtrylock(rwlock *l)
    {
    /* Try to get read lock */
    unsigned state = atomic_xadd(l, RW_READ);

    if (!(state & (RW_WAIT | RW_WRITE))) return 0;

    /* Undo */
    atomic_add(l, -RW_READ);

    return EBUSY;
    }

    /* Get a read lock, even if a writer is waiting */
    static int rdforcelock(rwlock *l)
    {
    /* Try to get read lock */
    unsigned state = atomic_xadd(l, RW_READ);

    /* We succeed even if a writer is waiting */
    if (!(state & RW_WRITE)) return 0;

    /* Undo */
    atomic_add(l, -RW_READ);

    return EBUSY;
    }

    /* Try to upgrade from a read to a write lock atomically */
    static int rdtryupgradelock(rwlock *l)
    {
    /* Someone else is trying (and will succeed) to upgrade to a write lock? */
    if (atomic_bitsetandtest(l, RW_WRITE_BIT)) return EBUSY;

    /* Don't count myself any more */
    atomic_add(l, -RW_READ);

    /* Wait until there are no more readers */
    while (*l > (RW_WAIT | RW_WRITE)) cpu_relax();

    return 0;
    }

    另外一种高效的实现方法如下,该算法由John Mellor-Crummey 和 Michael Scott 提出,详见"Scalable Reader-Writer Synchronization for Shared-Memory Multiprocessors",

    typedef union rwticket rwticket;

    union rwticket
    {
    unsigned u;
    unsigned short us;
    __extension__ struct
    {
    unsigned char write;
    unsigned char read;
    unsigned char users;
    } s;
    };

    static void rwticket_wrlock(rwticket *l)
    {
    unsigned me = atomic_xadd(&l->u, (1<<16));
    unsigned char val = me >> 16;

    while (val != l->s.write) cpu_relax();
    }

    static void rwticket_wrunlock(rwticket *l)
    {
    rwticket t = *l;

    barrier();

    t.s.write++;
    t.s.read++;

    *(unsigned short *) l = t.us;
    }

    static int rwticket_wrtrylock(rwticket *l)
    {
    unsigned me = l->s.users;
    unsigned char menew = me + 1;
    unsigned read = l->s.read << 8;
    unsigned cmp = (me << 16) + read + me;
    unsigned cmpnew = (menew << 16) + read + me;

    if (cmpxchg(&l->u, cmp, cmpnew) == cmp) return 0;

    return EBUSY;
    }

    static void rwticket_rdlock(rwticket *l)
    {
    unsigned me = atomic_xadd(&l->u, (1<<16));
    unsigned char val = me >> 16;

    while (val != l->s.read) cpu_relax();
    l->s.read++;
    }

    static void rwticket_rdunlock(rwticket *l)
    {
    atomic_inc(&l->s.write);
    }

    static int rwticket_rdtrylock(rwticket *l)
    {
    unsigned me = l->s.users;
    unsigned write = l->s.write;
    unsigned char menew = me + 1;
    unsigned cmp = (me << 16) + (me << 8) + write;
    unsigned cmpnew = ((unsigned) menew << 16) + (menew << 8) + write;

    if (cmpxchg(&l->u, cmp, cmpnew) == cmp) return 0;

    return EBUSY;
    }


    至此,读写锁的各种设计也介绍完毕,亲,你明白了吗?:)

    (全文完)




  • 相关阅读:
    四则运算
    实验四 决策树算法及应用
    实验三 朴素贝叶斯算法及应用
    实验二 K-近邻算法及应用
    实验三 面向对象分析与设计
    实验二 结构化分析与设计
    实验一 软件开发文档与工具的安装与使用
    ATM管理系统
    流程图与活动图的区别与联系
    四则运算自动生成程序
  • 原文地址:https://www.cnblogs.com/haippy/p/2290956.html
Copyright © 2011-2022 走看看