用Scala
的akka同时向多个节点发送信息(广播)
思路:每个客户端向服务端发送登陆信息后,服务端记录发送者,存入列表中,客户端向服务端发送的其他信息向所有已登录的客户端发送
这个程序实现了消息的简单广播
首先实现服务端
package com.bit.room
import akka.actor.{Actor, ActorRef, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.collection.mutable
class TalkServer extends Actor { //伴生类
val talkerSet = new mutable.HashSet[ActorRef]()
override def receive: Receive = {
case "launch" => println("Server on") //接受launch 打印启动
case "exit" => { //接受到exit,将发送者从聊天室Set移除
talkerSet.remove(sender())
if (talkerSet.nonEmpty) {
println(sender().toString() + "off")
talkerSet.foreach(Refs => Refs ! ServerMessage(sender().toString() + "off"))
}
}
case "join" => {
talkerSet.add(sender()) //接受到join,将发送者加入聊天室
println(sender().toString() + "on")
talkerSet.foreach(Refs => Refs ! ServerMessage(sender().toString() + "on"))
}
case ClientMessage(msg) => { //接受到其他信息,将其转发给聊天室的其他人
println(msg)
talkerSet.foreach(Refs => Refs ! ServerMessage(msg))
}
}
}
object TalkServer { //伴生对象
def main(args: Array[String]): Unit = {
val host = "127.0.0.1" //ip
val port = 9000 //端口
val config = ConfigFactory.parseString( //配置项
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname = $host
|akka.remote.netty.tcp.port=$port
""".stripMargin)
val actorSystem = ActorSystem("Server", config)
val serverActorRef = actorSystem.actorOf(Props[TalkServer], "talk_server")
serverActorRef ! "launch"
}
}
实现客户端
package com.bit.room
import akka.actor.{Actor, ActorSelection, ActorSystem, Props}
import com.typesafe.config.ConfigFactory
import scala.io.StdIn
class TalkClient(host: String, port: Int) extends Actor {
var serverActorRef: ActorSelection = _ //服务器代理对象
override def preStart(): Unit = { //客户端启动,获取服务的的信息
serverActorRef = context.actorSelection(s"akka.tcp://Server@${host}:${port}/user/talk_server")
}
override def receive: Receive = {
case "start" => println("client on") //启动
case "join" => serverActorRef ! "join"
case "exit" => serverActorRef ! "exit"
case msg: String => { //向服务端发信息
serverActorRef ! ClientMessage(msg)
}
case ServerMessage(msg) => println(msg) //接收到服务端点的信息,打印
}
}
object TalkClient {
def main(args: Array[String]): Unit = {
val host = "127.0.0.1"
val port = 9001
val serverHost = "127.0.0.1"
val serverPort = 9000
val config = ConfigFactory.parseString(
s"""
|akka.actor.provider="akka.remote.RemoteActorRefProvider"
|akka.remote.netty.tcp.hostname=$host
|akka.remote.netty.tcp.port=$port
""".stripMargin)
val clientSystem = ActorSystem("Client", config)
val clientActorRef = clientSystem.actorOf(Props(new TalkClient(serverHost, serverPort.toInt)), "talk_client")
clientActorRef ! "start"
var flag = true
while (true) {
val question = StdIn.readLine()
if (question.equals("exit")) { //输入exit后,就打印到控制台
clientActorRef ! question
flag = false
}
if (question.equals("join")) { //输入join后,就发给服务端
flag = true
}
if (flag) {
clientActorRef ! question
} else {
println(question)
}
}
}
}
由于是在一台机器上进行的实验,启动多个客户端时需要更改端口号