zoukankan      html  css  js  c++  java
  • 分布式协议学习笔记(三) Raft 选举自编写代码练习

    由于时间安排上的原因,这次的代码写的稍微有些简略,只能算是自己对RAFT协议的一个巩固。

    实现定义2个节点,使用读取配置文件来获取IP和端口以及节点ID

    网络使用boost同步流程 一个线程收 一个线程发送

    1 收的线程根据接受的数据 判断是心跳包还是选举请求还是选举请求回复  来更新自己的时间逻辑编号term 更新是否投票isVote 和最新term中那些节点投了自己的选举票map<int,int> // nodeid, term

    2 发送的节点每个200MS则轮询一次,根据结点当前状态减少等待时间(等待时间根据节点状态调节为1000ms心跳间隔或者1500-5000的随机选举超时)

    现运行看看效果

    我们需要两个节点 所以需要将exe和配置文件放入不同的文件夹 如图

    启动程序

    1 初始两个节点都是follower状态 等待leader发送心跳 

    2 由于目前无leader 所以两个节点其中之一在随机的超时时间后,发起选举投票

     

    3 之后节点1 成为leader,以1秒(1000ms)的间隔发送心跳包 进入正常状态

    4 在状态3 的情况下 关闭folloer状态的节点2 对情况并无影响。leader节点1 会持续尝试连接follower节点2

    5 节点2 再次连接 由于leader节点1 发送的心跳term为1  大于新启动节点2的初始化term 0 。所以节点2 会马上成为follower ,接受leader节点1的心跳

    6 在状态3 的情况下,如果关闭的是leader节点1,节点2 在一段时候未接受到心跳后,就会广播选举请求,请求自己成为leader,但是由于没有节点与节点2的投票一致,也没有其他的节点选举投票,节点2将持续尝试选举自己成为leader

     

    7 节点1上线后,同意节点2的选举请求,节点2接收超过半数以上的投票,成为leader。开始以1秒间隔发送心跳包。

    代码如下

    基础结构体:

     1 const enum STATUS {
     2     LEADER_STATUS = 1,
     3     FOLLOWER_STATUS,
     4     CANDIDATE_STATUS,
     5     PRE_VOTE_STAUS,
     6 };
     7 
     8 const enum INFOTYPE {
     9     DEFAULT_TYPE = 0,
    10     HEART_BREAT_TYPE,
    11     VOTE_LEADER_TYPE,
    12     VOTE_LEADER_RESP_TYPE,
    13 
    14 };
    15 
    16 typedef struct netInfo {
    17     int fromID;
    18     int toID;
    19     INFOTYPE infotype;
    20     int    term;
    21     int voteId;    //选举ID infotype为votetype才有效
    22 }NetInfo;
    23 
    24 typedef struct locaInfo {
    25     int    id;
    26     int leaderID;
    27     STATUS status;
    28     int term;
    29     int isVote;
    30     int IsRecvHeartbeat;
    31     int electionTimeout;
    32     std::map<int, int> voteRecord;// id term有此记录表示该term收到该id投取自己一票
    33 }LocalInfo;
    34 
    35 typedef struct localInfoWithLock {
    36     LocalInfo    locInfo;
    37     std::mutex m;
    38 }LocalInfoWithLock;
    基本数据结构
      1 #include "RaftManager.h"
      2 #include "NetInfoHandler.h"
      3 #include "StatusHandler.h"
      4 #include <random>
      5 #include <functional>
      6 
      7 using namespace std;
      8 
      9 std::shared_ptr<RaftManager> RaftManager::p = nullptr;
     10 
     11 bool RaftManager::Init() {
     12     //可以使用json 读取配置
     13     ReadConfig cfg("nodeCfg");
     14     map<string, string> kv = cfg.Do();
     15 
     16     if (kv.find("ip") == kv.end() || kv.find("portStart") == kv.end() || kv.find("nodeID") == kv.end()) {
     17         assert(0);
     18         return false;
     19     }
     20     ip = kv["ip"];  portStart = stoi(kv["portStart"]);  nodeID = stoi(kv["nodeID"]);
     21     heartbeatTime = 5000;
     22     if (kv.find("heartbeatTime") != kv.end())
     23         heartbeatTime = stoi(kv["heartbeatTime"]);
     24     
     25     locInfolock.locInfo.id = nodeID; locInfolock.locInfo.leaderID = 0;
     26     locInfolock.locInfo.IsRecvHeartbeat = 0; locInfolock.locInfo.isVote = 0;
     27     locInfolock.locInfo.electionTimeout = 4000;
     28     locInfolock.locInfo.status = FOLLOWER_STATUS;
     29     locInfolock.locInfo.voteRecord.clear();
     30     
     31     std::random_device rd;
     32     std::default_random_engine engine(rd());
     33     std::uniform_int_distribution<> dis(5001, 9000);
     34     dice = std::bind(dis, engine);
     35     
     36     return true;
     37 }
     38 
     39 void RaftManager::SendFunc(int sendId) {
     40     std::shared_ptr<tcp::socket> s = std::make_shared<tcp::socket>((io_service));
     41     tcp::resolver resolver(io_service);
     42     while (1) {
     43         int port = portStart+ sendId;
     44         
     45         try {
     46             boost::asio::connect(*s, resolver.resolve({ "127.0.0.1", to_string(port) }));
     47         }
     48         catch (std::exception& e) {
     49             //std::cerr << e.what() << std::endl;
     50             continue;
     51             std::this_thread::sleep_for(std::chrono::milliseconds(2000));
     52         }
     53         //============================================================
     54         netInfo netinfo;
     55         while (1) {
     56             q.Take(netinfo);
     57             boost::system::error_code ignored_error;
     58             boost::asio::write(*s, boost::asio::buffer(&netinfo, sizeof(netinfo)), ignored_error);
     59             if (ignored_error) {
     60                 std::cerr << boost::system::system_error(ignored_error).what() << std::endl;
     61                 break;
     62             }
     63             
     64             std::cout << "
    ==========================================================>" << std::endl;
     65             std::cout << "Send netinfo" << std::endl;
     66             std::cout << "netinf.fromID = " << netinfo.fromID << std::endl;
     67             std::cout << "netinf.toID = " << netinfo.toID << std::endl;
     68             std::cout << "netinf.infotype = " << netinfo.infotype << std::endl;
     69             std::cout << "netinf.term = " << netinfo.term << std::endl;
     70             std::cout << "netinf.voteId = " << netinfo.voteId << std::endl << std::endl;
     71             std::cout << "<==========================================================" << std::endl;
     72         }
     73     }
     74     
     75 }
     76 
     77 
     78 void RaftManager::LoopCheck(LocalInfoWithLock& locInfolock) {
     79     int looptime = 200;
     80     StatusHandler handler;
     81     while (1) {
     82         handler.DiapatchByStatus(locInfolock,q);
     83         std::this_thread::sleep_for(std::chrono::milliseconds(looptime));
     84     }
     85 
     86     return;
     87 }
     88 
     89 void RaftManager::RecvNetInfo(tcp::socket sock) {
     90     BYTE data[1024] = { 0 };
     91     boost::system::error_code error;
     92     NetInfo netinf;
     93 
     94     for (;;) {
     95         size_t length = sock.read_some(boost::asio::buffer(&netinf, sizeof(netinf)), error);
     96         if (error == boost::asio::error::eof)
     97             return; // Connection closed cleanly by peer.
     98         else if (error) {
     99             std::cerr << boost::system::system_error(error).what() << std::endl;// Some other error.
    100             return;
    101         }
    102         if (length != sizeof(netinf)) {
    103             std::cerr << __FUNCTION__ << " recv wrong lenth:" << length << std::endl;// Some other error.
    104             continue;
    105         }
    106         
    107         std::cout << "
    ==========================================================>" << std::endl;
    108         std::cout << "recv netinfo" << std::endl;
    109         std::cout << "netinf.fromID = " << netinf.fromID << std::endl;
    110         std::cout << "netinf.toID = " << netinf.toID << std::endl;
    111         std::cout << "netinf.infotype = " << netinf.infotype << std::endl;
    112         std::cout << "netinf.term = " << netinf.term << std::endl;
    113         std::cout << "netinf.voteId = " << netinf.voteId << std::endl << std::endl;
    114         std::cout << "<==========================================================" << std::endl;
    115         
    116         NetInfoHandler handler;
    117         handler.DispatchByinfoType(netinf,q, locInfolock);
    118     }
    119 
    120 }
    121 
    122 bool RaftManager::Go() {
    123     if (ip == "" || portStart == 0 || nodeID == 0)
    124         return false;
    125     try {
    126         for (int i = 1; i <= NODE_COUNT; i++) {
    127             if (i != nodeID) {
    128                 std::thread tsend = std::thread(&RaftManager::SendFunc, shared_from_this(),i);
    129                 tsend.detach();
    130             }
    131         }
    132 
    133         std::thread tloop = std::thread(&RaftManager::LoopCheck, shared_from_this(), std::ref(locInfolock));
    134         tloop.detach();
    135 
    136         int port = portStart + nodeID;
    137         tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port));
    138     
    139         for (;;)
    140         {
    141             tcp::socket sock(io_service);
    142             a.accept(sock);
    143             std::cout << "accept
    ";
    144             std::thread(&RaftManager::RecvNetInfo, shared_from_this(), std::move(sock)).detach();
    145         }
    146 
    147     }
    148     catch (std::exception& e) {
    149         std::cerr << __FUNCTION__ << " : " << e.what() << std::endl;
    150         return false;
    151     }
    152 
    153     return true;
    154 }
    读取配置文件和开启网络连接的代码
      1 #include "StatusHandler.h"
      2 #include "RaftManager.h"
      3 #include <iostream>
      4 
      5 
      6 void StatusHandler::DiapatchByStatus(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
      7     LocalInfo localInfo;
      8     //加锁获取当前状态 决定是否进行发送操作
      9     {
     10         //加锁获取本地当前状态
     11         std::lock_guard<std::mutex> lck(locInfolock.m);
     12         localInfo = locInfolock.locInfo;
     13     }
     14     
     15     switch (localInfo.status) {
     16     case LEADER_STATUS:
     17         HandleLeaderSend(locInfolock,q);
     18         break;
     19     case FOLLOWER_STATUS:
     20         HandleFollowerSend(locInfolock,q);
     21         break;
     22     case CANDIDATE_STATUS:
     23         HandleCandidateSend(locInfolock,q);
     24         break;
     25     default:
     26         std::cerr << "Unknown status!" << std::endl;
     27     }
     28 }
     29 
     30 void StatusHandler::HandleLeaderSend(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
     31     bool isSendheartbeat = false;
     32     int nodeid = 0;
     33     int term = 0;
     34 
     35     {
     36         std::lock_guard<std::mutex> lck(locInfolock.m);
     37         if (locInfolock.locInfo.electionTimeout > 0) {
     38             locInfolock.locInfo.electionTimeout -= 200;
     39         }
     40         //超过时间限制
     41         if (locInfolock.locInfo.electionTimeout <= 0 && locInfolock.locInfo.status == LEADER_STATUS) {
     42             isSendheartbeat = true;
     43             nodeid = locInfolock.locInfo.id;
     44             term = locInfolock.locInfo.term;
     45             locInfolock.locInfo.electionTimeout = 1000;
     46         }
     47     }
     48     if (isSendheartbeat) {
     49         for (int i = 1; i <= NODE_COUNT; i++) {
     50             if (i != nodeid) {
     51                 netInfo netinfo{ nodeid ,i,HEART_BREAT_TYPE ,term,0 };
     52                 q.Put(netinfo);
     53             }
     54         }
     55     }
     56 }
     57 
     58 
     59 void StatusHandler::HandleFollowerSend(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
     60     bool isSendVoteNetInfo = false;
     61     int nodeid = 0;
     62     int term = 0;
     63     //加锁获取本地当前状态
     64     {
     65         //std::cout << "Enter " << __FUNCTION__ << std::endl;
     66         std::lock_guard<std::mutex> lck(locInfolock.m);
     67         if (locInfolock.locInfo.electionTimeout > 0) {
     68             locInfolock.locInfo.electionTimeout -= 200;
     69         }
     70         //超过时间限制
     71         if (locInfolock.locInfo.electionTimeout <= 0) {
     72             std::cout << "electionTimeout .change to CANDIDATE_STATUS" << std::endl;
     73             if (locInfolock.locInfo.IsRecvHeartbeat == 0) {
     74                 //心跳超时  切换到选举模式
     75                 locInfolock.locInfo.term++;
     76                 locInfolock.locInfo.status = CANDIDATE_STATUS;
     77                 locInfolock.locInfo.voteRecord.clear();
     78                 locInfolock.locInfo.voteRecord[locInfolock.locInfo.id] =
     79                     locInfolock.locInfo.term;
     80                 isSendVoteNetInfo = true;
     81                 term = locInfolock.locInfo.term;
     82                 nodeid = locInfolock.locInfo.id;
     83                 locInfolock.locInfo.electionTimeout = dice();
     84             }
     85             else {
     86                 locInfolock.locInfo.IsRecvHeartbeat = 0;
     87             }
     88         }
     89         else if ( (locInfolock.locInfo.electionTimeout > 0) && 
     90                 (locInfolock.locInfo.IsRecvHeartbeat == 1) && 
     91                 (locInfolock.locInfo.status == FOLLOWER_STATUS) )
     92         {
     93             std::cout << "Check hearbeat OK!!! Clear electionTimeout" << std::endl;
     94             locInfolock.locInfo.IsRecvHeartbeat = 0;
     95             locInfolock.locInfo.electionTimeout = dice();
     96         }
     97     }
     98 
     99     if (isSendVoteNetInfo) {
    100         for (int i = 1; i <= NODE_COUNT; i++) {
    101             if (i != nodeid) {
    102                 netInfo netinfo{ nodeid ,i,VOTE_LEADER_TYPE ,term,nodeid };
    103                 q.Put(netinfo);
    104             }
    105         }
    106     }
    107     
    108 }
    109 
    110 void StatusHandler::HandleCandidateSend(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
    111     bool isSendVoteNetInfo = false;
    112     int nodeid = 0;
    113     int term = 0;
    114     {
    115         std::lock_guard<std::mutex> lck(locInfolock.m);
    116         if (locInfolock.locInfo.electionTimeout > 0) {
    117             locInfolock.locInfo.electionTimeout -= 200;
    118         }
    119         //超过时间限制
    120         if (locInfolock.locInfo.electionTimeout <= 0) {
    121             std::cout << "electionTimeout .CANDIDATE_STATUS too" << std::endl;
    122             if (locInfolock.locInfo.IsRecvHeartbeat == 0) {
    123                 //心跳超时  切换到选举模式
    124                 locInfolock.locInfo.term++;
    125                 locInfolock.locInfo.status = CANDIDATE_STATUS;
    126                 locInfolock.locInfo.voteRecord.clear();
    127                 locInfolock.locInfo.voteRecord[locInfolock.locInfo.id] =
    128                     locInfolock.locInfo.term;
    129             }
    130             isSendVoteNetInfo = true;
    131             term = locInfolock.locInfo.term;
    132             nodeid = locInfolock.locInfo.id;
    133             locInfolock.locInfo.electionTimeout = dice();
    134         }
    135     }
    136 
    137     if (isSendVoteNetInfo) {
    138         for (int i = 1; i <= NODE_COUNT; i++) {
    139             if (i != nodeid) {
    140                 netInfo netinfo{ nodeid ,i,VOTE_LEADER_TYPE ,term,nodeid };
    141                 q.Put(netinfo);
    142             }
    143         }
    144     }
    145 }
    每间隔200秒就进行状态检测切换,和超时发送回复代码
      1 #include "NetInfoHandler.h"
      2 #include "RaftManager.h"
      3 
      4 void NetInfoHandler::DispatchByinfoType(const NetInfo& netinf, SyncQueue<netInfo>& q, LocalInfoWithLock& locInfolock) {
      5     {
      6         std::lock_guard<std::mutex> lck(locInfolock.m);
      7         if (netinf.term < locInfolock.locInfo.term)
      8             return;
      9         if (netinf.term > locInfolock.locInfo.term) {
     10             locInfolock.locInfo.term = netinf.term;
     11             locInfolock.locInfo.status = FOLLOWER_STATUS;
     12             locInfolock.locInfo.isVote = 0;
     13             locInfolock.locInfo.IsRecvHeartbeat = 0;
     14             locInfolock.locInfo.electionTimeout = dice();
     15             locInfolock.locInfo.voteRecord.clear();
     16         }
     17     }
     18     switch (netinf.infotype) {
     19     case HEART_BREAT_TYPE:
     20         HandleHeartBeatTypeRecv(netinf,q, locInfolock);
     21         break;
     22     case VOTE_LEADER_TYPE:
     23         HandleVoteTypeRecv(netinf,q, locInfolock);
     24         break;
     25     case VOTE_LEADER_RESP_TYPE:
     26         HandleVoteRespTypeRecv(netinf,q, locInfolock);
     27         break;
     28     default:
     29         std::cerr << "Recv Unknown info type." << std::endl;
     30     }
     31 }
     32 
     33 void NetInfoHandler::HandleVoteRespTypeRecv(const NetInfo& netinf, SyncQueue<netInfo>& q,LocalInfoWithLock& locInfolock) {
     34     
     35     {
     36         std::lock_guard<std::mutex> lck(locInfolock.m);
     37         if (netinf.term < locInfolock.locInfo.term)
     38             return;
     39         if (netinf.term > locInfolock.locInfo.term) {
     40             locInfolock.locInfo.term = netinf.term;
     41             locInfolock.locInfo.status = FOLLOWER_STATUS;
     42             locInfolock.locInfo.isVote = 0;
     43             locInfolock.locInfo.IsRecvHeartbeat = 0;
     44             locInfolock.locInfo.voteRecord.clear();
     45         }
     46         if (netinf.infotype == VOTE_LEADER_RESP_TYPE && netinf.toID == locInfolock.locInfo.id && netinf.voteId == locInfolock.locInfo.id) {
     47             //更新本地map记录
     48             locInfolock.locInfo.voteRecord[netinf.fromID] = netinf.term;
     49         }
     50         int count = 0;
     51         std::map<int, int>::iterator it = locInfolock.locInfo.voteRecord.begin();
     52             //查看本term的投票是否达半数以上
     53         while (it != locInfolock.locInfo.voteRecord.end()) {
     54             if (it->second == locInfolock.locInfo.term)
     55                 count++;
     56                 it++;
     57         }
     58         if (count > NODE_COUNT / 2) {
     59             //达到半数以上 转化为leader模式 否则继续选举
     60             locInfolock.locInfo.leaderID = locInfolock.locInfo.id;
     61             locInfolock.locInfo.IsRecvHeartbeat = 0;
     62             locInfolock.locInfo.status = LEADER_STATUS;
     63             locInfolock.locInfo.electionTimeout = 1000;
     64             std::cout << "I am the leader term = " << 
     65                 locInfolock.locInfo.term << std::endl;
     66         }
     67     }
     68 
     69 }
     70 
     71 void NetInfoHandler::HandleVoteTypeRecv(const NetInfo& netinf, SyncQueue<netInfo>& q, LocalInfoWithLock& locInfolock) {
     72 
     73     NetInfo respNetInfo;
     74     bool isSend = false;
     75     {
     76         std::lock_guard<std::mutex> lck(locInfolock.m);
     77         if (netinf.term < locInfolock.locInfo.term)
     78             return;
     79         if (netinf.term > locInfolock.locInfo.term) {
     80             locInfolock.locInfo.term = netinf.term;
     81             locInfolock.locInfo.status = FOLLOWER_STATUS;
     82             locInfolock.locInfo.isVote = 0;
     83             locInfolock.locInfo.IsRecvHeartbeat = 0;
     84             locInfolock.locInfo.voteRecord.clear();
     85         }
     86         if (locInfolock.locInfo.isVote == 0 && locInfolock.locInfo.status == FOLLOWER_STATUS) {
     87             respNetInfo.fromID = locInfolock.locInfo.id;
     88             respNetInfo.toID = netinf.fromID;
     89             respNetInfo.term = netinf.term;
     90             respNetInfo.infotype = VOTE_LEADER_RESP_TYPE;
     91             respNetInfo.voteId = netinf.voteId;
     92             locInfolock.locInfo.isVote = 1;
     93             isSend = true;
     94         }
     95         else if(locInfolock.locInfo.status == FOLLOWER_STATUS){
     96             respNetInfo.fromID = locInfolock.locInfo.id;
     97             respNetInfo.toID = netinf.fromID;
     98             respNetInfo.term = netinf.term;
     99             respNetInfo.infotype = VOTE_LEADER_RESP_TYPE;
    100             respNetInfo.voteId = 0;
    101             isSend = true;
    102         }
    103     }
    104     if(isSend == true)
    105         q.Put(respNetInfo);
    106 }
    107 
    108 
    109 void NetInfoHandler::HandleHeartBeatTypeRecv(const NetInfo& netinf, SyncQueue<netInfo>& q, LocalInfoWithLock& locInfolock) {
    110 
    111     {
    112         std::lock_guard<std::mutex> lck(locInfolock.m);
    113         if (netinf.term < locInfolock.locInfo.term)
    114             return;
    115         if (netinf.term > locInfolock.locInfo.term) {
    116             locInfolock.locInfo.term = netinf.term;
    117             locInfolock.locInfo.status = FOLLOWER_STATUS;
    118             locInfolock.locInfo.isVote = 0;
    119             locInfolock.locInfo.IsRecvHeartbeat = 0;
    120             locInfolock.locInfo.voteRecord.clear();
    121         }
    122 
    123         locInfolock.locInfo.IsRecvHeartbeat = 1;    
    124     }
    125 }
    收到信息,进行处理以及发送告知自己状态改变的代码

     

    作 者: itdef
    欢迎转帖 请保持文本完整并注明出处
    技术博客 http://www.cnblogs.com/itdef/
    B站算法视频题解
    https://space.bilibili.com/18508846
    qq 151435887
    gitee https://gitee.com/def/
    欢迎c c++ 算法爱好者 windows驱动爱好者 服务器程序员沟通交流
    如果觉得不错,欢迎点赞,你的鼓励就是我的动力
    阿里打赏 微信打赏
  • 相关阅读:
    接口测试该怎么做
    Mac操作Github实现代码的下载、上传
    Django开发基础--操作数据库
    Django开发基础--创建项目/应用
    Python统计安卓log中Anr、Crash出现的数量
    Mac下PyCharm快捷键大全
    Selenium常用方法及函数、txt参数化
    Appium基于PO模型
    Selenium免登录、等待、unittest、PO模型
    Python接口请求及封装
  • 原文地址:https://www.cnblogs.com/itdef/p/9698386.html
Copyright © 2011-2022 走看看