最近读skynet c语言部分的源码,发现有好多锁的使用和gcc提供的一些原子操作。看到这些东西,对于我这个newbee来说实在有些hold不住。但为了了解并进一步掌握,还是决定好好分析一下。不足之处望指正。
自旋锁(spinlock) 和 互斥锁(mutex) 对比
自旋锁:得到锁之前是在一个循环中空转,直到得到锁为止,那么就有三种可能 1:很短时间就得到锁,由于是空转,没有sleep,也就没有由系统到用户态的消耗,2:很长时间才得到锁,虽然没有状态的切换,但是由于忙等时间过长
导致性能下降,3:一直空转,消耗cpu时间。
互斥锁 : 企图获得锁,若是得不到锁则阻塞,放弃cpu,没有忙等的出现,当锁可得时,发生状态切换,由内核切换到用户态,虽然没有忙等但是状态切换的代价仍然很大。
由此可知:对自旋锁和互斥锁的选择是要根据得到锁的耗时来的,若果当得到锁后,需要执行大量的操作,一般选用互斥锁,若得到锁后,进行很少量的操作,一般选择自旋锁,因为执行的操作短,那么忙等的开销总体还是小于内核态
和用户态切换带来的开销的。
最近在使用skynet,这里贴出来cloud实现自旋锁的代码,方便大家查阅:
#ifndef SKYNET_SPINLOCK_H
#define SKYNET_SPINLOCK_H
#define SPIN_INIT(q) spinlock_init(&(q)->lock);
#define SPIN_LOCK(q) spinlock_lock(&(q)->lock);
#define SPIN_UNLOCK(q) spinlock_unlock(&(q)->lock);
#define SPIN_DESTROY(q) spinlock_destroy(&(q)->lock);
#ifndef USE_PTHREAD_LOCK
struct spinlock {
int lock;
};
static inline void
spinlock_init(struct spinlock *lock) {
lock->lock = 0;
}
static inline void
spinlock_lock(struct spinlock *lock) {
while (__sync_lock_test_and_set(&lock->lock,1)) {}
}
static inline int
spinlock_trylock(struct spinlock *lock) {
return __sync_lock_test_and_set(&lock->lock,1) == 0;
}
static inline void
spinlock_unlock(struct spinlock *lock) {
__sync_lock_release(&lock->lock);
}
static inline void
spinlock_destroy(struct spinlock *lock) {
(void) lock;
}
#else
#include <pthread.h>
// we use mutex instead of spinlock for some reason
// you can also replace to pthread_spinlock
struct spinlock {
pthread_mutex_t lock;
};
static inline void
spinlock_init(struct spinlock *lock) {
pthread_mutex_init(&lock->lock, NULL);
}
static inline void
spinlock_lock(struct spinlock *lock) {
pthread_mutex_lock(&lock->lock);
}
static inline int
spinlock_trylock(struct spinlock *lock) {
return pthread_mutex_trylock(&lock->lock) == 0;
}
static inline void
spinlock_unlock(struct spinlock *lock) {
pthread_mutex_unlock(&lock->lock);
}
static inline void
spinlock_destroy(struct spinlock *lock) {
pthread_mutex_destroy(&lock->lock);
}
#endif
#endif
linux自带了pthread_spinlock。
cloud的第一种实现中,用到了gcc自带的原子操作函数实现了spinlock,这里提供一些gcc自带的原子操作的资料:https://gcc.gnu.org/onlinedocs/gcc-4.7.2/gcc/_005f_005fatomic-Builtins.html#_005f_005fatomic-Builtins
互斥锁和条件锁
pthread_mutex_t 和 pthread_cond_t
在阅读skynet.start.c中发现了这两种锁的使用,代码过长,不全部列出,这里只列出简化过得。
源码在这里:https://github.com/cloudwu/skynet/blob/master/skynet-src/skynet_start.c
简化后代码:
//创建线程时调用的代码
static void
create_thread(pthread_t *thread, void *(*start_routine) (void *), void *arg) {
if (pthread_create(thread,NULL, start_routine, arg)) {
fprintf(stderr, "Create thread failed");
exit(1);
}
}
//事件发生时调用
static void
wakeup(struct monitor *m, int busy) {
if (m->sleep >= m->count - busy) {
// signal sleep worker, "spurious wakeup" is harmless
pthread_cond_signal(&m->cond);
}
}
//创建socket线程监听链接,并唤醒阻塞的thread_worker线程
static void *
thread_socket(void *p) {
struct monitor * m = p;
skynet_initthread(THREAD_SOCKET);
for (;;) {
int r = skynet_socket_poll();
if (r==0)
break;
if (r<0) {
CHECK_ABORT
continue;
}
wakeup(m,0);
}
return NULL;
}
static void *
thread_worker(void *p) {
struct worker_parm *wp = p;
int id = wp->id;
int weight = wp->weight;
struct monitor *m = wp->m;
struct skynet_monitor *sm = m->m[id];
skynet_initthread(THREAD_WORKER);
struct message_queue * q = NULL; //上边几句选择性忽略
while (!m->quit) {
q = skynet_context_message_dispatch(sm, q, weight);
if (q == NULL) {
if (pthread_mutex_lock(&m->mutex) == 0) { //创建工作线程时,每个线程运行到这里,获得mutex
++ m->sleep; //当没有事件要处理时,阻塞贤臣数加1
// "spurious wakeup" is harmless,
// because skynet_context_message_dispatch() can be call at any time.
if (!m->quit)
pthread_cond_wait(&m->cond, &m->mutex); //在这里真正的阻塞在cond上,调用这个函数时,线程阻塞在cond上,并暂时放弃mutex的使用权,让其他线程可 以获取到,当其他线程获得到mutex,继续阻塞在cond上,直到socket线程监听到链接,调用pthread_cond_signal来唤醒工作线程,唤醒的工作线程,重新获得mutex.
-- m->sleep;
if (pthread_mutex_unlock(&m->mutex)) {
fprintf(stderr, "unlock mutex error");
exit(1);
}
}
}
}
return NULL;
}
创建thread_worker线程的代码: create_thread(&pid[i+3], thread_worker, &wp[i]);
以上代码大致工作过程:
1.create_thread(&pid[i+3], thread_worker, &wp[i]); 创建工作线程
2.运行到thread_worker函数中pthread_cond_wait(&m->cond, &m->mutex)时,线程阻塞在cond上,待等待的条件出现,方可被唤醒.
3.所有创建的工作线程,都阻塞在了cond上,等待条件出现,而条件出现是在用户连接服务端后,被epoll或kqueue监听到后,唤醒工作线程
int r = skynet_socket_poll();
if (r==0)
break;
if (r<0) {
CHECK_ABORT
continue;
}
wakeup(m,0); //唤醒工作线程
一般来说 mutex和cond是配合使用的,稍后解释原因。
未完,待续,欢迎纠正错误。