zoukankan      html  css  js  c++  java
  • 服务注册、发现、心跳

    使用 akka 实现,服务注册、发现、心跳

    消息类:

    package org.example

    case class RegisterWorkerInfo(id: String, cpu: Int, ram: Int);

    case class WorkerInfo(val id: String, val cpu: Int, val ram: Int) {
    var lastheartbeat: Long = System.currentTimeMillis();
    };

    case object RegisteredWorkerInfo;

    case object SendHeartBeat;

    case object StartTimeOutWorker;

    case object RemoveTimeOutWorker;

    case class HeartBeanPackage(id: String);

    服务端 master:
    package org.example

    import java.util.concurrent.TimeUnit

    import akka.actor.{Actor, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory

    import scala.collection.mutable
    import scala.concurrent.duration.FiniteDuration

    class MasterActor extends Actor {
    val register = mutable.Map[String, WorkerInfo]();

    override def receive: Receive = {
    case "start" => {
    println("master start !!!");
    self ! StartTimeOutWorker;
    }
    case RegisterWorkerInfo(id, cpu, ram) => {
    if (!register.contains(id)) {
    val workerInfo = WorkerInfo(id, cpu, ram);
    register(id) = workerInfo;
    };

    println("workers: " + register)
    sender() ! RegisteredWorkerInfo;
    }
    case HeartBeanPackage(id) => {
    val workerInfo: WorkerInfo = register(id);
    workerInfo.lastheartbeat = System.currentTimeMillis();
    println("master 更新了 " + id + "心跳时间");
    }
    case StartTimeOutWorker =>{
    import context.dispatcher;
    context.system.scheduler.schedule(FiniteDuration(0, TimeUnit.SECONDS), FiniteDuration(9, TimeUnit.SECONDS),
    self, RemoveTimeOutWorker);
    }
    case RemoveTimeOutWorker =>{
    register.filter(x => System.currentTimeMillis() - x._2.lastheartbeat > 6000)
    .foreach(x => register.remove(x._1));
    println("在线 worker数量: " + register.size );
    }
    }
    }

    object MasterActor {
    def main(args: Array[String]): Unit = {
    val config = ConfigFactory.parseString(
    s"""
    |akka.actor.provider="akka.remote.RemoteActorRefProvider"
    |akka.remote.netty.tcp.hostname=localhost
    |akka.remote.netty.tcp.port=8080
    """.stripMargin);
    val masterSysterm = ActorSystem("master_systerm", config);
    val masterActor1 = masterSysterm.actorOf(Props[MasterActor], "master_actor1")
    masterActor1 ! "start";
    }
    }

    客户端 slaver:
    package org.example

    import java.util.UUID
    import java.util.concurrent.TimeUnit

    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory

    import scala.concurrent.duration.{FiniteDuration, TimeUnit}

    class SlaveActor extends Actor {
    var masterProxy: ActorSelection = _;
    val id: String = UUID.randomUUID().toString;

    override def preStart(): Unit = {
    val masterUrl = "akka.tcp://master_systerm@localhost:8080/user/master_actor1";
    this.masterProxy = context.actorSelection(masterUrl);
    }


    override def receive: Receive = {
    case "start" => {
    println("客户端运行了");
    this.masterProxy ! RegisterWorkerInfo(id, 16, 16 * 1024);
    }
    case RegisteredWorkerInfo => {
    println("workerId: " + id + " 注册成功");

    import context.dispatcher;
    context.system.scheduler.schedule(FiniteDuration(0, TimeUnit.SECONDS), FiniteDuration(3, TimeUnit.SECONDS), self, SendHeartBeat);
    }
    case SendHeartBeat =>{
    println("workerId: " + id + " 给服务器发送心跳");
    masterProxy ! HeartBeanPackage(id);
    }
    }
    }

    object SlaveActor {
    def main(args: Array[String]): Unit = {
    val config = ConfigFactory.parseString(
    s"""
    |akka.actor.provider="akka.remote.RemoteActorRefProvider"
    |akka.remote.netty.tcp.hostname=localhost
    |akka.remote.netty.tcp.port=8090
    """.stripMargin);
    val slaverSysterm = ActorSystem("slaver_systerm", config);
    val slaverActor1 = slaverSysterm.actorOf(Props[SlaveActor], "slaver_actor1")
    slaverActor1 ! "start";
    }
    }
  • 相关阅读:
    QuartusII13.0使用教程详解(一个完整的工程建立)
    基于Vivado调用ROM IP core设计DDS
    FPGA学习之路——一路走来
    基于basys2用verilog设计多功能数字钟(重写)
    基于basys2驱动LCDQC12864B的verilog设计图片显示
    PWM(脉宽调制)——LED特效呼吸灯设计
    Isim你不得不知道的技巧(整理)
    ISE、vivado、QuartusII调用notepad++、UE汇总(整理)
    java环境配置为1.7jdk为什么cmd java -version查看版本是1.8
    TCP/IP三次握手和HTTP过程
  • 原文地址:https://www.cnblogs.com/wudeyun/p/12927612.html
Copyright © 2011-2022 走看看