zoukankan      html  css  js  c++  java
  • 分布式协议学习笔记(三) Raft 选举自编写代码练习

    由于时间安排上的原因,这次的代码写的稍微有些简略,只能算是自己对RAFT协议的一个巩固。

    实现定义2个节点,使用读取配置文件来获取IP和端口以及节点ID

    网络使用boost同步流程 一个线程收 一个线程发送

    1 收的线程根据接受的数据 判断是心跳包还是选举请求还是选举请求回复  来更新自己的时间逻辑编号term 更新是否投票isVote 和最新term中那些节点投了自己的选举票map<int,int> // nodeid, term

    2 发送的节点每个200MS则轮询一次,根据结点当前状态减少等待时间(等待时间根据节点状态调节为1000ms心跳间隔或者1500-5000的随机选举超时)

    现运行看看效果

    我们需要两个节点 所以需要将exe和配置文件放入不同的文件夹 如图

    启动程序

    1 初始两个节点都是follower状态 等待leader发送心跳 

    2 由于目前无leader 所以两个节点其中之一在随机的超时时间后,发起选举投票

     

    3 之后节点1 成为leader,以1秒(1000ms)的间隔发送心跳包 进入正常状态

    4 在状态3 的情况下 关闭folloer状态的节点2 对情况并无影响。leader节点1 会持续尝试连接follower节点2

    5 节点2 再次连接 由于leader节点1 发送的心跳term为1  大于新启动节点2的初始化term 0 。所以节点2 会马上成为follower ,接受leader节点1的心跳

    6 在状态3 的情况下,如果关闭的是leader节点1,节点2 在一段时候未接受到心跳后,就会广播选举请求,请求自己成为leader,但是由于没有节点与节点2的投票一致,也没有其他的节点选举投票,节点2将持续尝试选举自己成为leader

     

    7 节点1上线后,同意节点2的选举请求,节点2接收超过半数以上的投票,成为leader。开始以1秒间隔发送心跳包。

    代码如下

    基础结构体:

     1 const enum STATUS {
     2     LEADER_STATUS = 1,
     3     FOLLOWER_STATUS,
     4     CANDIDATE_STATUS,
     5     PRE_VOTE_STAUS,
     6 };
     7 
     8 const enum INFOTYPE {
     9     DEFAULT_TYPE = 0,
    10     HEART_BREAT_TYPE,
    11     VOTE_LEADER_TYPE,
    12     VOTE_LEADER_RESP_TYPE,
    13 
    14 };
    15 
    16 typedef struct netInfo {
    17     int fromID;
    18     int toID;
    19     INFOTYPE infotype;
    20     int    term;
    21     int voteId;    //选举ID infotype为votetype才有效
    22 }NetInfo;
    23 
    24 typedef struct locaInfo {
    25     int    id;
    26     int leaderID;
    27     STATUS status;
    28     int term;
    29     int isVote;
    30     int IsRecvHeartbeat;
    31     int electionTimeout;
    32     std::map<int, int> voteRecord;// id term有此记录表示该term收到该id投取自己一票
    33 }LocalInfo;
    34 
    35 typedef struct localInfoWithLock {
    36     LocalInfo    locInfo;
    37     std::mutex m;
    38 }LocalInfoWithLock;
    基本数据结构
      1 #include "RaftManager.h"
      2 #include "NetInfoHandler.h"
      3 #include "StatusHandler.h"
      4 #include <random>
      5 #include <functional>
      6 
      7 using namespace std;
      8 
      9 std::shared_ptr<RaftManager> RaftManager::p = nullptr;
     10 
     11 bool RaftManager::Init() {
     12     //可以使用json 读取配置
     13     ReadConfig cfg("nodeCfg");
     14     map<string, string> kv = cfg.Do();
     15 
     16     if (kv.find("ip") == kv.end() || kv.find("portStart") == kv.end() || kv.find("nodeID") == kv.end()) {
     17         assert(0);
     18         return false;
     19     }
     20     ip = kv["ip"];  portStart = stoi(kv["portStart"]);  nodeID = stoi(kv["nodeID"]);
     21     heartbeatTime = 5000;
     22     if (kv.find("heartbeatTime") != kv.end())
     23         heartbeatTime = stoi(kv["heartbeatTime"]);
     24     
     25     locInfolock.locInfo.id = nodeID; locInfolock.locInfo.leaderID = 0;
     26     locInfolock.locInfo.IsRecvHeartbeat = 0; locInfolock.locInfo.isVote = 0;
     27     locInfolock.locInfo.electionTimeout = 4000;
     28     locInfolock.locInfo.status = FOLLOWER_STATUS;
     29     locInfolock.locInfo.voteRecord.clear();
     30     
     31     std::random_device rd;
     32     std::default_random_engine engine(rd());
     33     std::uniform_int_distribution<> dis(5001, 9000);
     34     dice = std::bind(dis, engine);
     35     
     36     return true;
     37 }
     38 
     39 void RaftManager::SendFunc(int sendId) {
     40     std::shared_ptr<tcp::socket> s = std::make_shared<tcp::socket>((io_service));
     41     tcp::resolver resolver(io_service);
     42     while (1) {
     43         int port = portStart+ sendId;
     44         
     45         try {
     46             boost::asio::connect(*s, resolver.resolve({ "127.0.0.1", to_string(port) }));
     47         }
     48         catch (std::exception& e) {
     49             //std::cerr << e.what() << std::endl;
     50             continue;
     51             std::this_thread::sleep_for(std::chrono::milliseconds(2000));
     52         }
     53         //============================================================
     54         netInfo netinfo;
     55         while (1) {
     56             q.Take(netinfo);
     57             boost::system::error_code ignored_error;
     58             boost::asio::write(*s, boost::asio::buffer(&netinfo, sizeof(netinfo)), ignored_error);
     59             if (ignored_error) {
     60                 std::cerr << boost::system::system_error(ignored_error).what() << std::endl;
     61                 break;
     62             }
     63             
     64             std::cout << "
    ==========================================================>" << std::endl;
     65             std::cout << "Send netinfo" << std::endl;
     66             std::cout << "netinf.fromID = " << netinfo.fromID << std::endl;
     67             std::cout << "netinf.toID = " << netinfo.toID << std::endl;
     68             std::cout << "netinf.infotype = " << netinfo.infotype << std::endl;
     69             std::cout << "netinf.term = " << netinfo.term << std::endl;
     70             std::cout << "netinf.voteId = " << netinfo.voteId << std::endl << std::endl;
     71             std::cout << "<==========================================================" << std::endl;
     72         }
     73     }
     74     
     75 }
     76 
     77 
     78 void RaftManager::LoopCheck(LocalInfoWithLock& locInfolock) {
     79     int looptime = 200;
     80     StatusHandler handler;
     81     while (1) {
     82         handler.DiapatchByStatus(locInfolock,q);
     83         std::this_thread::sleep_for(std::chrono::milliseconds(looptime));
     84     }
     85 
     86     return;
     87 }
     88 
     89 void RaftManager::RecvNetInfo(tcp::socket sock) {
     90     BYTE data[1024] = { 0 };
     91     boost::system::error_code error;
     92     NetInfo netinf;
     93 
     94     for (;;) {
     95         size_t length = sock.read_some(boost::asio::buffer(&netinf, sizeof(netinf)), error);
     96         if (error == boost::asio::error::eof)
     97             return; // Connection closed cleanly by peer.
     98         else if (error) {
     99             std::cerr << boost::system::system_error(error).what() << std::endl;// Some other error.
    100             return;
    101         }
    102         if (length != sizeof(netinf)) {
    103             std::cerr << __FUNCTION__ << " recv wrong lenth:" << length << std::endl;// Some other error.
    104             continue;
    105         }
    106         
    107         std::cout << "
    ==========================================================>" << std::endl;
    108         std::cout << "recv netinfo" << std::endl;
    109         std::cout << "netinf.fromID = " << netinf.fromID << std::endl;
    110         std::cout << "netinf.toID = " << netinf.toID << std::endl;
    111         std::cout << "netinf.infotype = " << netinf.infotype << std::endl;
    112         std::cout << "netinf.term = " << netinf.term << std::endl;
    113         std::cout << "netinf.voteId = " << netinf.voteId << std::endl << std::endl;
    114         std::cout << "<==========================================================" << std::endl;
    115         
    116         NetInfoHandler handler;
    117         handler.DispatchByinfoType(netinf,q, locInfolock);
    118     }
    119 
    120 }
    121 
    122 bool RaftManager::Go() {
    123     if (ip == "" || portStart == 0 || nodeID == 0)
    124         return false;
    125     try {
    126         for (int i = 1; i <= NODE_COUNT; i++) {
    127             if (i != nodeID) {
    128                 std::thread tsend = std::thread(&RaftManager::SendFunc, shared_from_this(),i);
    129                 tsend.detach();
    130             }
    131         }
    132 
    133         std::thread tloop = std::thread(&RaftManager::LoopCheck, shared_from_this(), std::ref(locInfolock));
    134         tloop.detach();
    135 
    136         int port = portStart + nodeID;
    137         tcp::acceptor a(io_service, tcp::endpoint(tcp::v4(), port));
    138     
    139         for (;;)
    140         {
    141             tcp::socket sock(io_service);
    142             a.accept(sock);
    143             std::cout << "accept
    ";
    144             std::thread(&RaftManager::RecvNetInfo, shared_from_this(), std::move(sock)).detach();
    145         }
    146 
    147     }
    148     catch (std::exception& e) {
    149         std::cerr << __FUNCTION__ << " : " << e.what() << std::endl;
    150         return false;
    151     }
    152 
    153     return true;
    154 }
    读取配置文件和开启网络连接的代码
      1 #include "StatusHandler.h"
      2 #include "RaftManager.h"
      3 #include <iostream>
      4 
      5 
      6 void StatusHandler::DiapatchByStatus(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
      7     LocalInfo localInfo;
      8     //加锁获取当前状态 决定是否进行发送操作
      9     {
     10         //加锁获取本地当前状态
     11         std::lock_guard<std::mutex> lck(locInfolock.m);
     12         localInfo = locInfolock.locInfo;
     13     }
     14     
     15     switch (localInfo.status) {
     16     case LEADER_STATUS:
     17         HandleLeaderSend(locInfolock,q);
     18         break;
     19     case FOLLOWER_STATUS:
     20         HandleFollowerSend(locInfolock,q);
     21         break;
     22     case CANDIDATE_STATUS:
     23         HandleCandidateSend(locInfolock,q);
     24         break;
     25     default:
     26         std::cerr << "Unknown status!" << std::endl;
     27     }
     28 }
     29 
     30 void StatusHandler::HandleLeaderSend(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
     31     bool isSendheartbeat = false;
     32     int nodeid = 0;
     33     int term = 0;
     34 
     35     {
     36         std::lock_guard<std::mutex> lck(locInfolock.m);
     37         if (locInfolock.locInfo.electionTimeout > 0) {
     38             locInfolock.locInfo.electionTimeout -= 200;
     39         }
     40         //超过时间限制
     41         if (locInfolock.locInfo.electionTimeout <= 0 && locInfolock.locInfo.status == LEADER_STATUS) {
     42             isSendheartbeat = true;
     43             nodeid = locInfolock.locInfo.id;
     44             term = locInfolock.locInfo.term;
     45             locInfolock.locInfo.electionTimeout = 1000;
     46         }
     47     }
     48     if (isSendheartbeat) {
     49         for (int i = 1; i <= NODE_COUNT; i++) {
     50             if (i != nodeid) {
     51                 netInfo netinfo{ nodeid ,i,HEART_BREAT_TYPE ,term,0 };
     52                 q.Put(netinfo);
     53             }
     54         }
     55     }
     56 }
     57 
     58 
     59 void StatusHandler::HandleFollowerSend(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
     60     bool isSendVoteNetInfo = false;
     61     int nodeid = 0;
     62     int term = 0;
     63     //加锁获取本地当前状态
     64     {
     65         //std::cout << "Enter " << __FUNCTION__ << std::endl;
     66         std::lock_guard<std::mutex> lck(locInfolock.m);
     67         if (locInfolock.locInfo.electionTimeout > 0) {
     68             locInfolock.locInfo.electionTimeout -= 200;
     69         }
     70         //超过时间限制
     71         if (locInfolock.locInfo.electionTimeout <= 0) {
     72             std::cout << "electionTimeout .change to CANDIDATE_STATUS" << std::endl;
     73             if (locInfolock.locInfo.IsRecvHeartbeat == 0) {
     74                 //心跳超时  切换到选举模式
     75                 locInfolock.locInfo.term++;
     76                 locInfolock.locInfo.status = CANDIDATE_STATUS;
     77                 locInfolock.locInfo.voteRecord.clear();
     78                 locInfolock.locInfo.voteRecord[locInfolock.locInfo.id] =
     79                     locInfolock.locInfo.term;
     80                 isSendVoteNetInfo = true;
     81                 term = locInfolock.locInfo.term;
     82                 nodeid = locInfolock.locInfo.id;
     83                 locInfolock.locInfo.electionTimeout = dice();
     84             }
     85             else {
     86                 locInfolock.locInfo.IsRecvHeartbeat = 0;
     87             }
     88         }
     89         else if ( (locInfolock.locInfo.electionTimeout > 0) && 
     90                 (locInfolock.locInfo.IsRecvHeartbeat == 1) && 
     91                 (locInfolock.locInfo.status == FOLLOWER_STATUS) )
     92         {
     93             std::cout << "Check hearbeat OK!!! Clear electionTimeout" << std::endl;
     94             locInfolock.locInfo.IsRecvHeartbeat = 0;
     95             locInfolock.locInfo.electionTimeout = dice();
     96         }
     97     }
     98 
     99     if (isSendVoteNetInfo) {
    100         for (int i = 1; i <= NODE_COUNT; i++) {
    101             if (i != nodeid) {
    102                 netInfo netinfo{ nodeid ,i,VOTE_LEADER_TYPE ,term,nodeid };
    103                 q.Put(netinfo);
    104             }
    105         }
    106     }
    107     
    108 }
    109 
    110 void StatusHandler::HandleCandidateSend(LocalInfoWithLock& locInfolock, SyncQueue<netInfo>& q) {
    111     bool isSendVoteNetInfo = false;
    112     int nodeid = 0;
    113     int term = 0;
    114     {
    115         std::lock_guard<std::mutex> lck(locInfolock.m);
    116         if (locInfolock.locInfo.electionTimeout > 0) {
    117             locInfolock.locInfo.electionTimeout -= 200;
    118         }
    119         //超过时间限制
    120         if (locInfolock.locInfo.electionTimeout <= 0) {
    121             std::cout << "electionTimeout .CANDIDATE_STATUS too" << std::endl;
    122             if (locInfolock.locInfo.IsRecvHeartbeat == 0) {
    123                 //心跳超时  切换到选举模式
    124                 locInfolock.locInfo.term++;
    125                 locInfolock.locInfo.status = CANDIDATE_STATUS;
    126                 locInfolock.locInfo.voteRecord.clear();
    127                 locInfolock.locInfo.voteRecord[locInfolock.locInfo.id] =
    128                     locInfolock.locInfo.term;
    129             }
    130             isSendVoteNetInfo = true;
    131             term = locInfolock.locInfo.term;
    132             nodeid = locInfolock.locInfo.id;
    133             locInfolock.locInfo.electionTimeout = dice();
    134         }
    135     }
    136 
    137     if (isSendVoteNetInfo) {
    138         for (int i = 1; i <= NODE_COUNT; i++) {
    139             if (i != nodeid) {
    140                 netInfo netinfo{ nodeid ,i,VOTE_LEADER_TYPE ,term,nodeid };
    141                 q.Put(netinfo);
    142             }
    143         }
    144     }
    145 }
    每间隔200秒就进行状态检测切换,和超时发送回复代码
      1 #include "NetInfoHandler.h"
      2 #include "RaftManager.h"
      3 
      4 void NetInfoHandler::DispatchByinfoType(const NetInfo& netinf, SyncQueue<netInfo>& q, LocalInfoWithLock& locInfolock) {
      5     {
      6         std::lock_guard<std::mutex> lck(locInfolock.m);
      7         if (netinf.term < locInfolock.locInfo.term)
      8             return;
      9         if (netinf.term > locInfolock.locInfo.term) {
     10             locInfolock.locInfo.term = netinf.term;
     11             locInfolock.locInfo.status = FOLLOWER_STATUS;
     12             locInfolock.locInfo.isVote = 0;
     13             locInfolock.locInfo.IsRecvHeartbeat = 0;
     14             locInfolock.locInfo.electionTimeout = dice();
     15             locInfolock.locInfo.voteRecord.clear();
     16         }
     17     }
     18     switch (netinf.infotype) {
     19     case HEART_BREAT_TYPE:
     20         HandleHeartBeatTypeRecv(netinf,q, locInfolock);
     21         break;
     22     case VOTE_LEADER_TYPE:
     23         HandleVoteTypeRecv(netinf,q, locInfolock);
     24         break;
     25     case VOTE_LEADER_RESP_TYPE:
     26         HandleVoteRespTypeRecv(netinf,q, locInfolock);
     27         break;
     28     default:
     29         std::cerr << "Recv Unknown info type." << std::endl;
     30     }
     31 }
     32 
     33 void NetInfoHandler::HandleVoteRespTypeRecv(const NetInfo& netinf, SyncQueue<netInfo>& q,LocalInfoWithLock& locInfolock) {
     34     
     35     {
     36         std::lock_guard<std::mutex> lck(locInfolock.m);
     37         if (netinf.term < locInfolock.locInfo.term)
     38             return;
     39         if (netinf.term > locInfolock.locInfo.term) {
     40             locInfolock.locInfo.term = netinf.term;
     41             locInfolock.locInfo.status = FOLLOWER_STATUS;
     42             locInfolock.locInfo.isVote = 0;
     43             locInfolock.locInfo.IsRecvHeartbeat = 0;
     44             locInfolock.locInfo.voteRecord.clear();
     45         }
     46         if (netinf.infotype == VOTE_LEADER_RESP_TYPE && netinf.toID == locInfolock.locInfo.id && netinf.voteId == locInfolock.locInfo.id) {
     47             //更新本地map记录
     48             locInfolock.locInfo.voteRecord[netinf.fromID] = netinf.term;
     49         }
     50         int count = 0;
     51         std::map<int, int>::iterator it = locInfolock.locInfo.voteRecord.begin();
     52             //查看本term的投票是否达半数以上
     53         while (it != locInfolock.locInfo.voteRecord.end()) {
     54             if (it->second == locInfolock.locInfo.term)
     55                 count++;
     56                 it++;
     57         }
     58         if (count > NODE_COUNT / 2) {
     59             //达到半数以上 转化为leader模式 否则继续选举
     60             locInfolock.locInfo.leaderID = locInfolock.locInfo.id;
     61             locInfolock.locInfo.IsRecvHeartbeat = 0;
     62             locInfolock.locInfo.status = LEADER_STATUS;
     63             locInfolock.locInfo.electionTimeout = 1000;
     64             std::cout << "I am the leader term = " << 
     65                 locInfolock.locInfo.term << std::endl;
     66         }
     67     }
     68 
     69 }
     70 
     71 void NetInfoHandler::HandleVoteTypeRecv(const NetInfo& netinf, SyncQueue<netInfo>& q, LocalInfoWithLock& locInfolock) {
     72 
     73     NetInfo respNetInfo;
     74     bool isSend = false;
     75     {
     76         std::lock_guard<std::mutex> lck(locInfolock.m);
     77         if (netinf.term < locInfolock.locInfo.term)
     78             return;
     79         if (netinf.term > locInfolock.locInfo.term) {
     80             locInfolock.locInfo.term = netinf.term;
     81             locInfolock.locInfo.status = FOLLOWER_STATUS;
     82             locInfolock.locInfo.isVote = 0;
     83             locInfolock.locInfo.IsRecvHeartbeat = 0;
     84             locInfolock.locInfo.voteRecord.clear();
     85         }
     86         if (locInfolock.locInfo.isVote == 0 && locInfolock.locInfo.status == FOLLOWER_STATUS) {
     87             respNetInfo.fromID = locInfolock.locInfo.id;
     88             respNetInfo.toID = netinf.fromID;
     89             respNetInfo.term = netinf.term;
     90             respNetInfo.infotype = VOTE_LEADER_RESP_TYPE;
     91             respNetInfo.voteId = netinf.voteId;
     92             locInfolock.locInfo.isVote = 1;
     93             isSend = true;
     94         }
     95         else if(locInfolock.locInfo.status == FOLLOWER_STATUS){
     96             respNetInfo.fromID = locInfolock.locInfo.id;
     97             respNetInfo.toID = netinf.fromID;
     98             respNetInfo.term = netinf.term;
     99             respNetInfo.infotype = VOTE_LEADER_RESP_TYPE;
    100             respNetInfo.voteId = 0;
    101             isSend = true;
    102         }
    103     }
    104     if(isSend == true)
    105         q.Put(respNetInfo);
    106 }
    107 
    108 
    109 void NetInfoHandler::HandleHeartBeatTypeRecv(const NetInfo& netinf, SyncQueue<netInfo>& q, LocalInfoWithLock& locInfolock) {
    110 
    111     {
    112         std::lock_guard<std::mutex> lck(locInfolock.m);
    113         if (netinf.term < locInfolock.locInfo.term)
    114             return;
    115         if (netinf.term > locInfolock.locInfo.term) {
    116             locInfolock.locInfo.term = netinf.term;
    117             locInfolock.locInfo.status = FOLLOWER_STATUS;
    118             locInfolock.locInfo.isVote = 0;
    119             locInfolock.locInfo.IsRecvHeartbeat = 0;
    120             locInfolock.locInfo.voteRecord.clear();
    121         }
    122 
    123         locInfolock.locInfo.IsRecvHeartbeat = 1;    
    124     }
    125 }
    收到信息,进行处理以及发送告知自己状态改变的代码

     

    作 者: itdef
    欢迎转帖 请保持文本完整并注明出处
    技术博客 http://www.cnblogs.com/itdef/
    B站算法视频题解
    https://space.bilibili.com/18508846
    qq 151435887
    gitee https://gitee.com/def/
    欢迎c c++ 算法爱好者 windows驱动爱好者 服务器程序员沟通交流
    如果觉得不错,欢迎点赞,你的鼓励就是我的动力
    阿里打赏 微信打赏
  • 相关阅读:
    Advanced Configuration Tricks
    Reviewing the Blog Module
    Editing and Deleting Data
    Making Use of Forms and Fieldsets
    Understanding the Router
    SQL Abstraction and Object Hydration
    Preparing for Different Databases
    Java学习理解路线图
    Openstack学习历程_1_视频
    CentOS安装Nginx负载
  • 原文地址:https://www.cnblogs.com/itdef/p/9698386.html
Copyright © 2011-2022 走看看