前几天一直没想明白,搜索我的记忆海,愣是没找到这种三个线程相交互的!
1、问题描述
目前有三个class
WriteDB他主要负责把网页的信息写到数据中(间隔5分钟),每次更新之后都会他会相应的把DataOperation中的变量isDataUpdate_设置为true;
Search他负责处理对用户执行查询信息的操作,当有用户执行Search,他会相应的把DataOperation中的变量isUserSearch_设置为true;
DataOperation这是一个中间的类,负责做类WriteDB和类Search的同步操作,会一直检查这两个变量isDataUpdate_和isUserSearch_,当他们都为false的时候,把最新在内存中的数据库写入到对应的文件中,以便能够查询最新的信息,如果其中一个为true,他都等待。
2、迷糊的那段时间
也就是前几天,一直在想,怎么能实现上面的东西!一直被WriteDB这个class和isUserSearch_(bool类型)所困扰!
还写出来类似第一版的DataOperation代码:
class DataOperation { public: DataOperation() { running_flag = true; isDataUpdate_ = false; isUserSearch_ = false; } void Stop() { running_flag = false; { boost::recursive_mutex::scoped_lock lock(mtx); cond.notify_one(); } } void EmitSignalThread() { while (running_flag) { if (isSearchEnd_) { break; } { boost::recursive_mutex::scoped_lock lock(mtx); if (IsReplaceDB()) { // 需要替换数据库 for (int i = 0; i <= 999; i++) { cout << "正在替换数据库" << i << endl; } } cond.notify_all(); } boost::this_thread::sleep(boost::posix_time::millisec(1000)); } } // 当后台有数据要更新,并且目前没有用户进行search的时候replace // 返回true表明正在进行,false表示还需要等待 bool IsReplaceDB() { return (isDataUpdate_ && isUserSearch_); } // 这个会阻塞 void WaitReplaceDB() { boost::recursive_mutex::scoped_lock lock(mtx); cout << "如果程序没有动静,那么表明此刻正在更新数据库...等待几秒即可!" << endl; cond.wait(mtx); boost::this_thread::sleep(boost::posix_time::millisec(100)); } void SetIsDataUpdate(bool isDataUpdate = true) { isDataUpdate_ = isDataUpdate; } void SetIsUserSearch(bool isUserSearch = true) { isUserSearch_ = isUserSearch; } void SetIsSearchEnd(bool isSearchEnd = false) { isSearchEnd_ = isSearchEnd; } bool GetIsSearchEnd() { return isSearchEnd_; } private: bool running_flag; // 数据助手是否需要存活 bool isDataUpdate_; // 数据是否有更新 bool isUserSearch_; // 当前是否有用户正在查询 bool isSearchEnd_; // 程序是否已经退出 boost::recursive_mutex mtx; boost::condition_variable_any cond; };
3、困扰的原因
我一直在想,以前的生产者和消费者是怎么做的?有没有想过还带有条件变量的?
4、能不能更简单
比如我就想过,只在DataOperation类中检测这两个bool变量,如果满足则更新DB,不满足Sleep;最终发现也有漏洞,毕竟这么做了Search中也要对应检测DB当前是否在更新,更新就Sleep----举一个极端的例子,但是是很可能发生:当前DataOperation发现没有用户在执行查询,那么DataOperation去设置标记(表面将要去更新数据库),当在设置标记的时候,用户更好输入了查询,发现数据库没在更新,那么就会造成同时出现DataOperation正在更新数据库、用户正在查询的情况!那么此时系统就会崩溃了。
5、所以说
条件变量、互斥量是确实有存在的意义的!
6、再来看一下整体的设计
从作用来看WriteDB只是自己做自己的,并不和多线程的同步有关,他仅仅是每次都设置isDataUpdate_为true就行;剩下的同步仅在于Search和Dataoperation中,bool isUserSearch_;和bool isSearchEnd_;都没有实际的存在价值;只需要把isUserSearch_设置为条件变量就行。这样前面的两个bool变量的功能就被取代了(同时提一下,原来的条件变量,在这里才真正的用起来)
7、最终的测试代码
#include "iostream" #include "boost/thread.hpp" #include "boost/bind.hpp" using namespace std; #include "windows.h" #include "fstream" namespace MyNamespace { ofstream cout("log.txt"); } //#define WRITEFILE_NAMESPACE #ifdef WRITEFILE_NAMESPACE ofstream out("log.txt"); #endif // 0 class DataOperation { public: DataOperation() { running_flag_ = true; isDataUpdate_ = false; } void Stop() { running_flag_ = false; } void EmitSignalThread() { while (running_flag_) { boost::recursive_mutex::scoped_lock lock(mtx); bool flag = GetDataUpdate(); // 如果有更新 那些就替换数据库 if (GetDataUpdate()) { // 需要替换数据库 for (int i = 0; i <= 999; i++) { if (i == 1) { #ifdef WRITEFILE_NAMESPACE out << "正在替换数据库" << i << endl; #else cout << "正在替换数据库" << i << endl; #endif // WRITEFILE_NAMESPACE } if (i == 999) { #ifdef WRITEFILE_NAMESPACE out << "替换完毕" << i << endl; #else cout << "替换完毕" << i << endl; #endif // WRITEFILE_NAMESPACE } } isDataUpdate_ = false; } isUserSearch_.notify_all(); // 等待10秒钟,所以结束的时候可能会在这里阻塞 // boost::this_thread::sleep(boost::posix_time::millisec(10000)); } } // 这个会阻塞 void WaitReplaceDB() { boost::recursive_mutex::scoped_lock lock(mtx); #ifdef WRITEFILE_NAMESPACE out << "如果程序没有动静,那么表明此刻正在更新数据库...等待几秒即可!" << endl; #else cout << "如果程序没有动静,那么表明此刻正在更新数据库...等待几秒即可!" << endl; #endif // WRITEFILE_NAMESPACE isUserSearch_.wait(mtx); boost::this_thread::sleep(boost::posix_time::millisec(100)); } void SetDataUpdate(bool isDataUpdate = true) { isDataUpdate_ = isDataUpdate; } bool GetDataUpdate() { return isDataUpdate_; } bool GetRunningFlag() { return running_flag_; } private: bool running_flag_; // 数据助手是否需要存活 bool isDataUpdate_; boost::recursive_mutex mtx; boost::condition_variable_any isUserSearch_; // 当前是否有用户正在查询 }; class WriteDB { public: void Start(DataOperation& dataoperation) { while (true) { // 在这里 这个线程进行数据库的更新可能需要大概好几分钟 for (int i = 0; i <= 9999; i++) { if (i == 1) { #ifdef WRITEFILE_NAMESPACE out << "WriteDB wait..." << i << endl; #else cout << "WriteDB wait..." << i << endl; #endif // WRITEFILE_NAMESPACE } if (i == 9999) { #ifdef WRITEFILE_NAMESPACE out << "WriteDB wait..." << i << endl; #else cout << "WriteDB wait..." << i << endl; #endif // WRITEFILE_NAMESPACE } } dataoperation.SetDataUpdate(); // 查询一下,如果程序退出了,那么线程也退出 if (!dataoperation.GetRunningFlag()) { #ifdef WRITEFILE_NAMESPACE out << "bye-bye" << endl; #else cout << "bye-bye" << endl; #endif // WRITEFILE_NAMESPACE break; } Sleep(10000); } } }; class Search { public: void Start(DataOperation& dataoperation) { string user_enter; while (true) { getline(cin, user_enter); if (user_enter == "q") { dataoperation.Stop(); break; } #ifdef WRITEFILE_NAMESPACE out << "当前用户输入的是:" << user_enter << endl; #else cout << "当前用户输入的是:" << user_enter << endl; #endif // WRITEFILE_NAMESPACE // 如果正在更新数据库,那么wait dataoperation.WaitReplaceDB(); // out << "如果程序没有动静,那么表明此刻正在更新数据库...等待几秒即可!" << endl; // 如果没在更新数据库,直接进行查询 for (int i = 0; i <= 99999; i++) { // cout << "正在为您查询数据..." << i << endl; ////////////////////////////////////////////////////////////////////////// if (i == 1) { #ifdef WRITEFILE_NAMESPACE out << "正在为您查询数据..." << i << endl; #else cout << "正在为您查询数据..." << i << endl; #endif // WRITEFILE_NAMESPACE } if (i == 99999) { #ifdef WRITEFILE_NAMESPACE out << "查询完毕..." << i << endl; #else cout << "查询完毕..." << i << endl; #endif // WRITEFILE_NAMESPACE } ////////////////////////////////////////////////////////////////////////// } // 向等待的线程发出信号,让他去更新数据库 Sleep(100); } } }; int main() { boost::thread_group grp; WriteDB write_db; Search search; DataOperation data_operation; grp.create_thread(boost::bind(&WriteDB::Start, &write_db, boost::ref(data_operation))); grp.create_thread(boost::bind(&Search::Start, &search, boost::ref(data_operation))); grp.create_thread(boost::bind(&DataOperation::EmitSignalThread, boost::ref(data_operation))); grp.join_all(); return 0; }