一、基础的架子
假设Executor是服务端,Driver是客户端
代码:
package test import java.io.InputStream import java.net.{ServerSocket, Socket} object Executor { def main(args: Array[String]): Unit = { //启动服务器,接收数据 val server = new ServerSocket(9999) println("服务器启动,等待接收数据") //等待客户端连接 val client: Socket = server.accept() val in: InputStream = client.getInputStream val i: Int = in.read() println("接收到客户端发送的数据"+i) in.close() client.close() server.close() } }
package test import java.io.OutputStream import java.net.Socket object Driver { def main(args: Array[String]): Unit = { //连接服务器 val client = new Socket("localhost", 9999) val out: OutputStream = client.getOutputStream out.write(2) out.flush() out.close() client.close() } }
运行结果:
二、模拟客户端向服务端发送计算任务
代码:
package test //客户端不能直接发送对象,需要用混入的方式来进行序列化 class Task extends Serializable { val datas=List(1,2,3,4) //val logic=(num:Int)=>{num*2} val logic:Int=>Int=_*2 //计算 def compute()={ datas.map(logic) } }
package test import java.io.{InputStream, ObjectInput, ObjectInputStream} import java.net.{ServerSocket, Socket} object Executor { def main(args: Array[String]): Unit = { //启动服务器,接收数据 val server = new ServerSocket(9999) println("服务器启动,等待接收数据") //等待客户端连接 val client: Socket = server.accept() val in: InputStream = client.getInputStream val objIn=new ObjectInputStream(in) val task: Task = objIn.readObject().asInstanceOf[Task] val ints: List[Int] = task.compute() println("计算节点计算结果为"+ints) objIn.close() client.close() server.close() } }
package test import java.io.{ObjectOutputStream, OutputStream} import java.net.Socket object Driver { def main(args: Array[String]): Unit = { //连接服务器 val client = new Socket("localhost", 9999) val out: OutputStream = client.getOutputStream val objOut = new ObjectOutputStream(out) val task = new Task() objOut.writeObject(task) objOut.flush() objOut.close() client.close() println("客户端计算任务发送完毕!") } }
运行截图:
三、模拟两台服务器同时进行计算
代码:
package test //客户端不能直接发送对象,需要用混入的方式来进行序列化 class Task extends Serializable { val datas = List(1, 2, 3, 4) //val logic=(num:Int)=>{num*2} val logic: Int => Int = _ * 2 }
package test class SubTask extends Serializable { var datas:List[Int]=_ var logic:(Int)=>Int=_ //计算 def compute()={ datas.map(logic) } }
package test import java.io.{InputStream, ObjectInput, ObjectInputStream} import java.net.{ServerSocket, Socket} object Executor1 { def main(args: Array[String]): Unit = { //启动服务器,接收数据 val server = new ServerSocket(9999) println("服务器启动,等待接收数据") //等待客户端连接 val client: Socket = server.accept() val in: InputStream = client.getInputStream val objIn=new ObjectInputStream(in) val task: SubTask = objIn.readObject().asInstanceOf[SubTask] val ints: List[Int] = task.compute() println("计算节点[9999]计算结果为"+ints) objIn.close() client.close() server.close() } }
package test import java.io.{InputStream, ObjectInputStream} import java.net.{ServerSocket, Socket} object Executor2 { def main(args: Array[String]): Unit = { //启动服务器,接收数据 val server = new ServerSocket(8888) println("服务器启动,等待接收数据") //等待客户端连接 val client: Socket = server.accept() val in: InputStream = client.getInputStream val objIn=new ObjectInputStream(in) val task: SubTask = objIn.readObject().asInstanceOf[SubTask] val ints: List[Int] = task.compute() println("计算节点[8888]计算结果为"+ints) objIn.close() client.close() server.close() } }
package test import java.io.{ObjectOutputStream, OutputStream} import java.net.Socket object Driver { def main(args: Array[String]): Unit = { //连接服务器 val client1 = new Socket("localhost", 9999) val client2 = new Socket("localhost", 8888) val task = new Task() val out1: OutputStream = client1.getOutputStream val objOut1 = new ObjectOutputStream(out1) val subTask1 = new SubTask() subTask1.logic=task.logic subTask1.datas=task.datas.take(2) objOut1.writeObject(subTask1) objOut1.flush() objOut1.close() client1.close() val out2: OutputStream = client2.getOutputStream val objOut2 = new ObjectOutputStream(out2) val subTask2 = new SubTask() subTask2.logic=task.logic subTask2.datas=task.datas.takeRight(2) objOut2.writeObject(subTask2) objOut2.flush() objOut2.close() client2.close() println("客户端计算任务发送完毕!") } }
运行截图: