感觉作为paxos的升级精简版 Raft在设计之初就以容易理解为目标 看完资料 脑海里都有了大概的轮廓。
有了这些详细的资料甚至是动画演示在前 起始都没多少好说的,本篇知识作为记录下学习点,作为日后回顾提示
在分布式系统中,一致性指的是集群中的多个节点在状态上达成一致.但是在现实场景中,由于程序崩溃、网络故障、硬件故障、断电等原因,节点间的一致性很难保证,这样就需要Paxos、Raft等一致性协议。
Paxos协议是Leslie Lamport在1990年提出的一种基于消息传递的、具有高度容错特性的一致性算法.但是Paxos有两个明显的缺点:第一个缺点就是Paxos算法难以理解.第二个缺点就是并没有提供构建现实系统的良好基础,
有很多工程化Paxos算法的尝试,但是他们对Paxos算法本身做了较大改动,彼此之间的实现差距都比较大
Raft算法是一种用于管理复制日志的一致性算法,在设计Raft算法时设计者就将易于理解作为目标之一,是的Raft算法更易于构建实际的系统,大幅度减少了工程化的工作量。
1 Leader选举
Raft协议的模式是一个Leader节点和多个Follower节点的模式。就是常说的Leader-Follower模式.每个节点有三个状态Leader Follower Candidate状态
Leader负责处理客户端请求 并且将处理结果以log形式同步到其他Follower节点上
在Raft协议中有两个时间控制Leader选举的进度。
一个Leader定时向Follower发送心跳包。
一个是选举超时控制(election timeout),选举超时控制就是一个处于Follower节点等待进入Candidate状态的时间限制。
选举超时控制(election timeout)一般在选择150ms到300ms之间的随机值(概率上避免多个节点同时进入Candidate状态)
若某个节点election timeout进度完成之前都没收到Leader的心跳包,则说明没有Leader,该节点进入Candidate状态.给自己投票,然后给其他节点发送选举请求.
其他节点收到选举请求后,若在当前请求中标记的任期(term)内比自己记录的term相等或者更大,且未进行过投票,则回复答应该投票请求,重置自己的选举超时控制
选举者获取一半以上投票,进入Leader状态,开始给其他节点Follower发送心跳,维持自己的权威
下面来看看多个节点 选择的情况 节点B D同时发起选举投票,并且每个节点都获取一张选票,最后的结果就是随机选举超时时间,选举超时控制(election timeout)一般在选择150ms到300ms之间的随机值(概率上避免多个节点同时进入Candidate状态) 。
最终,重复多次选举投票后(概率很小),某个节点获取一半以上投票,成为Leader。
1 #pragma once 2 #include <iostream> 3 #include <fstream> 4 #include <cassert> 5 #include <string> 6 #include <iostream> 7 #include <vector> 8 #include <map> 9 using namespace std; 10 /* 11 *作 者: itdef 12 *欢迎转帖 请保持文本完整并注明出处 13 *技术博客 http://www.cnblogs.com/itdef/ 14 *技术交流群 群号码:432336863 15 *欢迎c c++ windows驱动爱好者 服务器程序员沟通交流 16 *部分老代码存放地点 17 *http://www.oschina.net/code/list_by_user?id=614253 18 */ 19 const string FILE_NAME = "config.txt"; 20 class ReadConfig { 21 public: 22 ReadConfig(string filename = "") { 23 if (filename.empty()) { 24 file_name = FILE_NAME; 25 } 26 else { 27 file_name = filename; 28 } 29 } 30 ~ReadConfig() {} 31 map<string, string> Do() { 32 tar_path.clear(); 33 ifstream fin; 34 fin.open(file_name); 35 if (false == fin.is_open()) { 36 std::cerr << "open file failed!!" << std::endl; 37 return tar_path; 38 } 39 string s; 40 while (getline(fin, s)) 41 { 42 if ('#' == s[0] || ('/' == s[0] && '/' == s[1])) 43 continue; 44 size_t pos = s.find_first_of("="); 45 if (pos == std::string::npos || pos + 1 >= s.size()) 46 continue; 47 string targetName = s.substr(0, pos); 48 string path = s.substr(pos + 1); 49 std::cout << targetName << " = " << path << std::endl; 50 if (path[0] != ' ') 51 tar_path[targetName] = path; 52 } 53 fin.close(); 54 return tar_path; 55 } 56 private: 57 map<string, string> tar_path; 58 string file_name; 59 };
1 #pragma once 2 #pragma once 3 #include <string> 4 #include <mutex> 5 #include <map> 6 7 const enum STATUS { 8 LEADER_STATUS = 1, 9 FOLLOWER_STATUS, 10 CANDIDATE_STATUS, 11 PRE_VOTE_STAUS, 12 }; 13 14 const enum INFOTYPE { 15 DEFAULT_TYPE = 0, 16 HEART_BREAT_TYPE, 17 VOTE_LEADER_TYPE, 18 VOTE_LEADER_RESP_TYPE, 19 20 }; 21 22 typedef struct netInfo { 23 int fromID; 24 int toID; 25 INFOTYPE infotype; 26 int term; 27 int voteId; //选举ID infotype为votetype才有效 28 }NetInfo; 29 30 typedef struct locaInfo { 31 int id; 32 int leaderID; 33 STATUS status; 34 int term; 35 int isVote; 36 int IsRecvHeartbeat; 37 std::map<int, int> voteRecord;// id term有此记录表示该term收到该id投取自己一票 38 }LocalInfo; 39 40 typedef struct localInfoWithLock { 41 LocalInfo locInfo; 42 std::mutex m; 43 }LocalInfoWithLock;
1 #pragma once 2 #pragma once 3 #include "CommonStruct.h" 4 #include "ReadConfig.h" 5 #include <memory> 6 #include <boost/asio.hpp> 7 8 using boost::asio::ip::tcp; 9 using namespace std; 10 11 class RaftManager :public enable_shared_from_this<RaftManager> { 12 public: 13 static std::shared_ptr<RaftManager> GetInstance() { 14 if (p == nullptr) 15 p.reset(new RaftManager()); 16 //p = std::make_shared<RaftManager>(); 17 return p; 18 } 19 ~RaftManager() { 20 std::cout << "enter ~RaftManager() "; 21 } 22 bool Init(); 23 bool Go(); 24 25 private: 26 boost::asio::io_service io_service; 27 std::string ip; int portStart; 28 int nodeID; 29 int electionTimeout; 30 int heartbeatTime; 31 LocalInfoWithLock locInfolock; 32 33 //===============================send 34 void DiapatchByStatus(int id, int& timeoutLimit); 35 void HandleLeaderSend(int id, int& timeoutLimit); 36 void HandleCandidateSend(int id, int& timeoutLimit); 37 void HandleFollowerSend(int id, int& timeoutLimit); 38 void HandlePreVoteSend(int id, int& timeoutLimit); 39 40 //===================recv 41 void DiapatchByInfoType(const NetInfo& netinf); 42 void HandleHeartbeatTypeRecv(const NetInfo& netinf); 43 void HandleVoteTypeRecv(const NetInfo& netinf); 44 void HandleVoteRespTypeRecv(const NetInfo& netinf); 45 46 std::function<int()> dice; 47 48 bool LoopCheck(int id, std::shared_ptr<tcp::socket> s); 49 void Session(tcp::socket sock); 50 void SendFunc(int id); 51 52 RaftManager() {} 53 RaftManager(const RaftManager&) = delete; 54 RaftManager& operator=(const RaftManager&) = delete; 55 static std::shared_ptr<RaftManager> p; 56 };
1 #include "RaftManager.h" 2 #include <random> 3 #include <functional> 4 5 std::shared_ptr<RaftManager> RaftManager::p = nullptr; 6 7 8 9 10 bool RaftManager::Init() { 11 //可以使用json 读取配置 12 ReadConfig cfg("nodeCfg"); 13 map<string, string> kv = cfg.Do(); 14 15 if (kv.find("ip") == kv.end() || kv.find("portStart") == kv.end() || kv.find("nodeID") == kv.end()) { 16 assert(0); 17 return false; 18 } 19 ip = kv["ip"]; portStart = stoi(kv["portStart"]); nodeID = stoi(kv["nodeID"]); 20 electionTimeout = 4000; 21 heartbeatTime = 5000; 22 if (kv.find("heartbeatTime") != kv.end()) 23 heartbeatTime = stoi(kv["heartbeatTime"]); 24 25 locInfolock.locInfo.id = nodeID; locInfolock.locInfo.leaderID = 0; 26 locInfolock.locInfo.IsRecvHeartbeat = 0; locInfolock.locInfo.isVote = 0; 27 locInfolock.locInfo.status = FOLLOWER_STATUS; 28 locInfolock.locInfo.voteRecord.clear(); 29 30 std::random_device rd; 31 std::default_random_engine engine(rd()); 32 std::uniform_int_distribution<> dis(2001, 5000); 33 dice = std::bind(dis, engine); 34 35 return true; 36 } 37 38 void RaftManager::HandleLeaderSend(int id, int& timeoutLimit) { 39 if (timeoutLimit > 0){ 40 timeoutLimit -= 200; 41 } 42 if (timeoutLimit <= 0) { 43 44 45 46 timeoutLimit = dice(); 47 } 48 } 49 void RaftManager::HandleCandidateSend(int id, int& timeoutLimit) { 50 if (timeoutLimit > 0) { 51 timeoutLimit -= 200; 52 } 53 if (timeoutLimit <= 0) { 54 55 56 57 timeoutLimit = dice(); 58 } 59 60 } 61 62 63 void RaftManager::HandlePreVoteSend(int id, int& timeoutLimit) { 64 if (timeoutLimit > 0) { 65 timeoutLimit -= 200; 66 } 67 if (timeoutLimit <= 0) { 68 69 70 71 timeoutLimit = dice(); 72 } 73 74 } 75 76 void RaftManager::HandleFollowerSend(int id, int& timeoutLimit) { 77 if (timeoutLimit > 0) { 78 timeoutLimit -= 200; 79 } 80 if (timeoutLimit <= 0) { 81 LocalInfo localInfo; 82 //加锁获取当前状态 决定是否进行发送操作 83 { 84 //加锁获取本地当前状态 85 std::lock_guard<std::mutex> lck(locInfolock.m); 86 localInfo = locInfolock.locInfo; 87 } 88 if (localInfo.IsRecvHeartbeat == 0) { 89 //心跳超时 切换到选举模式 90 std::lock_guard<std::mutex> lck(locInfolock.m); 91 locInfolock.locInfo.term++; 92 locInfolock.locInfo.status = CANDIDATE_STATUS; 93 locInfolock.locInfo.voteRecord.clear(); 94 locInfolock.locInfo.voteRecord[nodeID] = locInfolock.locInfo.term; 95 } 96 97 timeoutLimit = dice(); 98 } 99 } 100 101 //=================== 102 void RaftManager::HandleHeartbeatTypeRecv(const NetInfo& netinf) { 103 std::lock_guard<std::mutex> lck(locInfolock.m); 104 if (netinf.fromID != locInfolock.locInfo.leaderID) 105 locInfolock.locInfo.leaderID = netinf.fromID; 106 locInfolock.locInfo.IsRecvHeartbeat = 1; 107 108 } 109 void RaftManager::HandleVoteTypeRecv(const NetInfo& netinf) { 110 std::lock_guard<std::mutex> lck(locInfolock.m); 111 int voteid = netinf.fromID; 112 if (locInfolock.locInfo.isVote == 0) { 113 //回复投票 todo 114 115 locInfolock.locInfo.isVote = 1; //标记该term已经投票 116 } 117 else { 118 //回复不投票 todo 119 } 120 121 } 122 void RaftManager::HandleVoteRespTypeRecv(const NetInfo& netinf) { 123 std::lock_guard<std::mutex> lck(locInfolock.m); 124 if (netinf.infotype == VOTE_LEADER_RESP_TYPE && netinf.toID == nodeID) { 125 //更新本地map记录 126 locInfolock.locInfo.voteRecord[netinf.fromID] = netinf.term; 127 } 128 int count = 0; 129 std::map<int, int>::iterator it = locInfolock.locInfo.voteRecord.begin(); 130 //查看本term的投票是否达半数以上 131 while (it != locInfolock.locInfo.voteRecord.end()) { 132 if (it->second == locInfolock.locInfo.term) 133 count++; 134 it++; 135 } 136 if (count > 5 / 2) { 137 //达到半数以上 转化为leader模式 否则继续选举 138 locInfolock.locInfo.leaderID = nodeID; 139 locInfolock.locInfo.IsRecvHeartbeat = 0; 140 locInfolock.locInfo.status = LEADER_STATUS; 141 } 142 } 143 144 145 //loop send 146 void RaftManager::DiapatchByStatus(int id,int& timeoutLimit) { 147 NetInfo netinf{ nodeID,id,DEFAULT_TYPE,0,0 }; 148 LocalInfo localInfo; 149 //加锁获取当前状态 决定是否进行发送操作 150 { 151 //加锁获取本地当前状态 152 std::lock_guard<std::mutex> lck(locInfolock.m); 153 localInfo = locInfolock.locInfo; 154 } 155 switch (localInfo.status) { 156 case LEADER_STATUS: 157 HandleLeaderSend(id,timeoutLimit); 158 break; 159 case FOLLOWER_STATUS: 160 HandleFollowerSend(id,timeoutLimit); 161 break; 162 case CANDIDATE_STATUS: 163 HandleCandidateSend(id,timeoutLimit); 164 break; 165 case PRE_VOTE_STAUS: 166 HandlePreVoteSend(id, timeoutLimit); 167 default: 168 std::cerr << "unknown status!!" << std::endl; 169 } 170 171 } 172 173 174 //handle recv 175 void RaftManager::DiapatchByInfoType(const NetInfo& netinf) { 176 { 177 std::lock_guard<std::mutex> lck(locInfolock.m); 178 if (netinf.term < locInfolock.locInfo.term) 179 return; 180 if (netinf.term > locInfolock.locInfo.term) { 181 locInfolock.locInfo.term = netinf.term; 182 locInfolock.locInfo.status = FOLLOWER_STATUS; 183 locInfolock.locInfo.isVote = 0; 184 locInfolock.locInfo.IsRecvHeartbeat = 0; 185 locInfolock.locInfo.voteRecord.clear(); 186 } 187 } 188 //======================================== 189 switch (netinf.infotype) { 190 case HEART_BREAT_TYPE: 191 HandleHeartbeatTypeRecv(netinf); 192 break; 193 case VOTE_LEADER_TYPE: 194 HandleVoteTypeRecv(netinf); 195 break; 196 case VOTE_LEADER_RESP_TYPE: 197 HandleVoteRespTypeRecv(netinf); 198 break; 199 default: 200 std::cerr << "Recv Unknown info type." << std::endl; 201 } 202 203 } 204 205 bool RaftManager::LoopCheck(int id, std::shared_ptr<tcp::socket> s) { 206 int looptime = 200; 207 int timeoutlimit = dice(); 208 while (1) { 209 DiapatchByStatus(id, timeoutlimit); 210 std::this_thread::sleep_for(std::chrono::milliseconds(looptime)); 211 } 212 213 return false; 214 } 215 216 void RaftManager::SendFunc(int i) { 217 //todo 218 //示例 间隔200ms扫描 心跳间隔5000ms 选举超时未 1001-4000ms 219 string port = "9920"; 220 port[port.size() - 1] += i; 221 int looptime = 4000; 222 while (1) { 223 std::shared_ptr<tcp::socket> s = std::make_shared<tcp::socket>((io_service)); 224 tcp::resolver resolver(io_service); 225 try { 226 boost::asio::connect(*s, resolver.resolve({ "127.0.0.1", port })); 227 } 228 catch (exception& e) { 229 //持续尝试连接 230 continue; 231 } 232 LoopCheck(i, s); 233 std::this_thread::sleep_for(std::chrono::milliseconds(looptime)); 234 } 235 236 return; 237 } 238 239 void RaftManager::Session(tcp::socket sock) { 240 BYTE data[1024] = { 0 }; 241 boost::system::error_code error; 242 NetInfo netinf; 243 while (1) { 244 size_t length = sock.read_some(boost::asio::buffer(&netinf, sizeof(netinf)), error); 245 if (error == boost::asio::error::eof) 246 return; // Connection closed cleanly by peer. 247 else if (error) { 248 std::cerr << boost::system::system_error(error).what() << std::endl;// Some other error. 249 return; 250 } 251 if (length != sizeof(netinf)) { 252 std::cerr << __FUNCTION__ << " recv wrong lenth:" << length << std::endl;// Some other error. 253 return; 254 } 255 256 DiapatchByInfoType(netinf); 257 258 } 259 } 260 261 bool RaftManager::Go() { 262 //建立网络 本来可以使用广播 获取和通知其他节点 263 //演示版本假定 5个ID和端口分别为1 2 3 4 5 和9921 9922 9923 9924 9925 264 if (ip == "" || portStart == 0 || nodeID == 0) 265 return false; 266 try { 267 //开启4个与其他线程发送信息的线程 268 for (int i = 1; i <= 2; i++) { 269 if (i == nodeID) 270 continue; 271 std::thread t = std::thread(&RaftManager::SendFunc, shared_from_this(), i); 272 t.detach(); 273 } 274 275 int port = portStart + nodeID; 276 tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port)); 277 for (;;) 278 { 279 for (;;) 280 { 281 tcp::socket sock(io_service); 282 a.accept(sock); 283 std::thread(&RaftManager::Session, shared_from_this(), std::move(sock)).detach(); 284 } 285 } 286 } 287 catch (exception& e) { 288 std::cerr << __FUNCTION__ << " : " << e.what() << std::endl; 289 return false; 290 } 291 292 return true; 293 }
1 // QueueTemplate.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。 2 // 3 4 #include "pch.h" 5 #include <iostream> 6 7 8 #include<list> 9 #include<mutex> 10 #include<thread> 11 #include<condition_variable> 12 #include <iostream> 13 using namespace std; 14 15 template<typename T> 16 class SyncQueue 17 { 18 public: 19 SyncQueue(int maxSize) :m_maxSize(maxSize), m_needStop(false) 20 { 21 } 22 23 void Put(const T&x) 24 { 25 Add(x); 26 } 27 28 void Put(T&&x) 29 { 30 Add(std::forward<T>(x)); 31 } 32 33 void Take(std::list<T>& list) 34 { 35 std::unique_lock<std::mutex> locker(m_mutex); 36 m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); }); 37 38 if (m_needStop) 39 return; 40 list = std::move(m_queue); 41 m_notFull.notify_one(); 42 } 43 44 void Take(T& t) 45 { 46 std::unique_lock<std::mutex> locker(m_mutex); 47 m_notEmpty.wait(locker, [this] {return m_needStop || NotEmpty(); }); 48 49 if (m_needStop) 50 return; 51 t = m_queue.front(); 52 m_queue.pop_front(); 53 m_notFull.notify_one(); 54 } 55 56 void Stop() 57 { 58 { 59 std::lock_guard<std::mutex> locker(m_mutex); 60 m_needStop = true; 61 } 62 m_notFull.notify_all(); 63 m_notEmpty.notify_all(); 64 } 65 66 bool Empty() 67 { 68 std::lock_guard<std::mutex> locker(m_mutex); 69 return m_queue.empty(); 70 } 71 72 bool Full() 73 { 74 std::lock_guard<std::mutex> locker(m_mutex); 75 return m_queue.size() == m_maxSize; 76 } 77 78 size_t Size() 79 { 80 std::lock_guard<std::mutex> locker(m_mutex); 81 return m_queue.size(); 82 } 83 84 int Count() 85 { 86 return m_queue.size(); 87 } 88 private: 89 bool NotFull() const 90 { 91 bool full = m_queue.size() >= m_maxSize; 92 if (full) 93 cout << "full, waiting,thread id: " << this_thread::get_id() << endl; 94 return !full; 95 } 96 97 bool NotEmpty() const 98 { 99 bool empty = m_queue.empty(); 100 if (empty) 101 cout << "empty,waiting,thread id: " << this_thread::get_id() << endl; 102 return !empty; 103 } 104 105 template<typename F> 106 void Add(F&&x) 107 { 108 std::unique_lock< std::mutex> locker(m_mutex); 109 m_notFull.wait(locker, [this] {return m_needStop || NotFull(); }); 110 if (m_needStop) 111 return; 112 113 m_queue.push_back(std::forward<F>(x)); 114 m_notEmpty.notify_one(); 115 } 116 117 private: 118 std::list<T> m_queue; //缓冲区 119 std::mutex m_mutex; //互斥量和条件变量结合起来使用 120 std::condition_variable m_notEmpty;//不为空的条件变量 121 std::condition_variable m_notFull; //没有满的条件变量 122 int m_maxSize; //同步队列最大的size 123 124 bool m_needStop; //停止的标志 125 }; 126 127 int main() 128 { 129 std::cout << "Hello World! "; 130 131 SyncQueue<int> q(1); 132 q.Put(1); 133 134 int a = 0; 135 q.Take(a); 136 137 q.Put(2); 138 q.Take(a); 139 140 141 q.Stop(); 142 143 }
自己尝试做一个简化的raft选举演示
实现定义2-5个节点,使用读取配置文件来获取IP和端口以及节点ID
网络使用boost同步流程
一个线程收 四个线程发送
1 收的线程根据接受的数据 判断是心跳包还是选举请求还是选举请求回复 来更新自己的时间逻辑编号term 更新是否投票isVote 和最新term中那些节点投了自己的选举票map<int,int> // nodeid, term
2 发送的节点每个200MS则轮询一次,根据结点当前状态减少等待时间(等待时间根据节点状态调节为1000ms心跳间隔或者1500-5000的随机选举超时)
根据当前状态决定发送心跳包或者是选举消息 或者是选举回复消息
待填坑
参考:
《etcd技术内幕》