转载: http://blog.sina.com.cn/s/blog_48d4cf2d0100mx4n.html
死锁是由于不同线程按照不同顺序进行加锁而造成的。如:
线程A:对lock a加锁 => 对lock b加锁 => dosth => 释放lock b => 释放lock a
线程B:对lock b加锁 => 对lock a加锁 => dosth => 释放lock a => 释放lock b
这样两条线程,就可能发生死锁问题。要避免发生死锁,应该使用同一个顺序进行加锁。
这点在对象单向调用的情况下是很容易达成的。对象单向调用的意思是如果对象a的函数调用了对象b的函数,则对象b中的函数不会去调用对象a的函数(注意:a和b也可能同属于一个类)。
举个例子吧,假设聊天室(Room)对象room,聊天者(Chatter)对象chatter,假设Chatter和Room的定义如下:
class InnerChatter
{
public:
void sendMsg(const string& msg)
{
boost::mutex::scoped_lock lock(mtx);
socket->send(msg);
}
private:
boost::mutex mtx;
TcpSocket socket;
};
typedef boost::shared_ptr< InnerChatter> Chatter;
class InnerRoom
{
public:
void sendMsg(const string& user, const string& msg)
{
boost::mutex::scoped_lock lock(mtx);
if (chatters.find(user) != chatters.end())
{
chatters[user]-> sendMsg(user);
}
}
private:
boost::mutex mtx;
map<string, Chatter> chatters;
};
目前代码中只存在Room调用Chatter的情况,不存在Chatter调用Room,Room调用Room,Chatter调用Chatter这三种情况。所以总是先获得room锁,再获得chatter锁,不会发生死锁。
如果为Chatter加上发送历史和以下这个方法之后呢?
vector<string> history;
void sendMsgToChatter(Chatter dst, const string& msg)
{
boost::mutex::scoped_lock lock(mtx); // 加锁当前对象
history.push_back(msg);
dsg>sendMsg(msg); // 注意:次函数调用会加锁dst对象
}
乍看起来似乎没问题,但如果线程A执行chatterA.sendMsgToChatter(chatterB, “sth”)时,线程B正好执行chatterB.sendMsgToChatter(chatterA, “sth”),就会发生本文一开头举例的死锁问题。
如果在Chatter中加入函数:
void sendMsgToAll(Room room, const string& msg)
{
boost::mutex::scoped_lock lock(mtx);
history.push_back(msg);
room->sendMsgToAll(msg);
}
在Room中加入函数:
void sendMsgToAll(const string& msg)
{
boost::mutex::scoped_lock lock(mtx);
for (map<string, Chatter>::iterator it = chatters.begin(); it != chatters.end(); ++it)
{
it->second->sendMsg(msg);
}
}
显然死锁问题更严重了,也更令人抓狂了。也许有人要问,为什么要这么做,不能就保持Room单向调用Chatter吗?大部分时候答案是肯定的,也建议大部分模块尤其是周边模块如基础设施模块使用明确清晰的单向调用关系,这样可以减少对死锁的忧虑,少白一些头发。
但有时候保证单向调用的代价太高:试想一下,如果被调用者b是一个容器类,调用者a定义了一些对元素的汇总操作如求和,为了避免回调(回调打破了单向调用约束),那就只有对b加锁,复制所有元素,解锁,遍历求和。复制所有元素比较耗计算资源,有可能成为性能瓶颈。
另外还有设计方面的考虑。还举Room和Chatter的例子,如果避免Chatter调用Room和Chatter,则Chatter很难实现啥高级功能,这样所有代码都将堆砌在Room,Room将成为一个超级类,带来维护上的难度。此外还有设计上的不妥:因为几乎全部面向对象的设计模式都可以理解成某种方式的回调,禁止回调也就禁掉了设计模式,可能带来不便。
当对象间的相互调用无法避免时,如果只使用传统的mutex,保证相同顺序加锁需要十分小心,万一编程时失误,测试时又没发现(这是很可能的,死锁很不容易测试出来),如果条件允许还可以手忙脚乱地火线gdb,若无法调试定位,则服务器可能要成为重启帝了,对产品的形象十分有害。
我想出的解决方案是既然mutex要保证相同顺序加锁,就直接让mutex和一个优先级挂钩,使用线程专有存储(TSS)保存当前线程优先级最低的锁,当对新的mutex加锁时,如果mutex的优先级< 当前优先级(为什么=不可以,参考上文说的sendMsgToChatter函数),才允许加锁,否则记录当前函数栈信息,抛出异常(要仔细设计以免破坏内部数据结构)。代码如下:
boost::thread_specific_ptr<global::stack<int>> locks_hold_by_current_thread;
class xrecursive_mutex
{
public:
xrecursive_mutex(int pri_level_)
: recursion_count(0)
, pri_level(pri_level_){}
~xrecursive_mutex(){}
class scoped_lock
{
public:
scoped_lock(xrecursive_mutex& mtx_)
: mtx(mtx_)
{
mtx.lock();
}
~scoped_lock()
{
mtx.unlock();
}
private:
xrecursive_mutex& mtx;
};
private:
int recursion_count;
int pri_level;
boost::recursive_mutex mutex;
int get_recursion_count()
{
return recursion_count;
}
void lock()
{
mutex.lock();
++ recursion_count;
if (recursion_count == 1)
{
if (locks_hold_by_current_thread.get() == NULL)
{
locks_hold_by_current_thread.reset(new std::stack<int>());
}
if (!locks_hold_by_current_thread->empty() &&
locks_hold_by_current_thread->top()>= pri_level)
{ // wrong order, lock failed
-- recursion_count;
mutex.unlock();
XASSERT(false);//记录栈信息,抛异常
}
locks_hold_by_current_thread->push(pri_level);
}
}
void unlock()
{
bool bad_usage_flag = false;
if (recursion_count == 1 &&locks_hold_by_current_thread.get() != NULL)
{
if (!locks_hold_by_current_thread->empty()
&& (locks_hold_by_current_thread->top() == pri_level))
{
locks_hold_by_current_thread->pop();
}
else
{
bad_usage_flag = true;
}
}
-- recursion_count;
mutex.unlock();
XASSERT(!bad_usage_flag);// // 记录栈信息,抛异常
}
};
使用:
xrecursive_mutex mtx1(1);
xrecursive_mutex mtx2(2);
xrecursive_mutex mtx3(3);
xrecursive_mutex mtx3_2(3);
{
xrecursive_mutex::scoped_lock lock1(mtx1); // pass, 当前线程锁优先级1
xrecursive_mutex::scoped_lock lock2(mtx3); // pass, 当前线程锁优先级3
ASSERT_ANY_THROW(xrecursive_mutex::scoped_lock lock2_2(mtx3_2)); // 捕获异常,因为优先级3 <= 当前线程锁优先级
xrecursive_mutex::scoped_lock lock3(mtx3); // pass, 可重入锁
xrecursive_mutex::scoped_lock lock4(mtx1); // pass, 可重入锁
ASSERT_ANY_THROW(xrecursive_mutex::scoped_lock lock5(mtx2)); // 捕获异常,因为优先级2<= 当前线程锁优先级3
}