zoukankan      html  css  js  c++  java
  • akka并发编程练习

    Scala的akka同时向多个节点发送信息(广播)

    思路:每个客户端向服务端发送登陆信息后,服务端记录发送者,存入列表中,客户端向服务端发送的其他信息向所有已登录的客户端发送

    这个程序实现了消息的简单广播


    首先实现服务端

    package com.bit.room
    
    import akka.actor.{Actor, ActorRef, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    
    import scala.collection.mutable
    
    class TalkServer extends Actor { //伴生类
    
      val talkerSet = new mutable.HashSet[ActorRef]()
    
      override def receive: Receive = {
        case "launch" => println("Server on") //接受launch 打印启动
    
        case "exit" => {  //接受到exit,将发送者从聊天室Set移除
          talkerSet.remove(sender())
          if (talkerSet.nonEmpty) {
            println(sender().toString() + "off")
            talkerSet.foreach(Refs => Refs ! ServerMessage(sender().toString() + "off"))
          }
        }
    
        case "join" => {
          talkerSet.add(sender()) //接受到join,将发送者加入聊天室
          println(sender().toString() + "on")
          talkerSet.foreach(Refs => Refs ! ServerMessage(sender().toString() + "on"))
        }
    
        case ClientMessage(msg) => { //接受到其他信息,将其转发给聊天室的其他人
          println(msg)
          talkerSet.foreach(Refs => Refs ! ServerMessage(msg))
        }
      }
    }
    
    object TalkServer { //伴生对象
      def main(args: Array[String]): Unit = {
        val host = "127.0.0.1" //ip
        val port = 9000 //端口
        val config = ConfigFactory.parseString( //配置项
          s"""
             |akka.actor.provider="akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname = $host
             |akka.remote.netty.tcp.port=$port
             """.stripMargin)
        val actorSystem = ActorSystem("Server", config)
        val serverActorRef = actorSystem.actorOf(Props[TalkServer], "talk_server")
        serverActorRef ! "launch"
      }
    }
    

    实现客户端

    package com.bit.room
    
    import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
    import com.typesafe.config.ConfigFactory
    
    import scala.io.StdIn
    
    class TalkClient(host: String, port: Int) extends Actor {
      var serverActorRef: ActorSelection = _ //服务器代理对象
    
      override def preStart(): Unit = { //客户端启动,获取服务的的信息
        serverActorRef = context.actorSelection(s"akka.tcp://Server@${host}:${port}/user/talk_server")
      }
    
    
      override def receive: Receive = {
        case "start" => println("client on") //启动
        case "join" => serverActorRef ! "join"
        case "exit" => serverActorRef ! "exit"
        case msg: String => { //向服务端发信息
          serverActorRef ! ClientMessage(msg)
        }
        case ServerMessage(msg) => println(msg) //接收到服务端点的信息,打印
    
      }
    }
    
    object TalkClient {
      def main(args: Array[String]): Unit = {
        val host = "127.0.0.1"
        val port = 9001
    
        val serverHost = "127.0.0.1"
        val serverPort = 9000
    
        val config = ConfigFactory.parseString(
          s"""
             |akka.actor.provider="akka.remote.RemoteActorRefProvider"
             |akka.remote.netty.tcp.hostname=$host
             |akka.remote.netty.tcp.port=$port
             """.stripMargin)
    
        val clientSystem = ActorSystem("Client", config)
    
        val clientActorRef = clientSystem.actorOf(Props(new TalkClient(serverHost, serverPort.toInt)), "talk_client")
        clientActorRef ! "start"
    
        var flag = true
        while (true) {
          val question = StdIn.readLine()
          if (question.equals("exit")) { //输入exit后,就打印到控制台
            clientActorRef ! question
            flag = false
          }
          if (question.equals("join")) { //输入join后,就发给服务端
            flag = true
          }
          if (flag) {
            clientActorRef ! question
          } else {
            println(question)
          }
    
        }
    
      }
    }
    
    

    由于是在一台机器上进行的实验,启动多个客户端时需要更改端口号

  • 相关阅读:
    tensorflow 2.0 学习 (十) 拟合与过拟合问题
    tensorflow 2.0 学习 (九) tensorboard可视化功能认识
    tensorflow 2.0 学习 (八) keras模块的认识
    tensorflow 2.0 学习 (七) 反向传播代码逐步实现
    tensorflow 2.0 学习 (六) Himmelblua函数求极值
    tensorflow 2.0 学习 (五)MPG全连接网络训练与测试
    arp协议简单介绍
    Pthread spinlock自旋锁
    线程和进程状态
    内核态(内核空间)和用户态(用户空间)的区别和联系·
  • 原文地址:https://www.cnblogs.com/DismalSnail/p/11392489.html
Copyright © 2011-2022 走看看