zoukankan      html  css  js  c++  java
  • SparkCore分布式计算模拟

    一、基础的架子

     假设Executor是服务端,Driver是客户端

    代码:

    package test
    
    import java.io.InputStream
    import java.net.{ServerSocket, Socket}
    
    object Executor {
    
      def main(args: Array[String]): Unit = {
        //启动服务器,接收数据
        val server = new ServerSocket(9999)
        println("服务器启动,等待接收数据")
    
        //等待客户端连接
        val client: Socket = server.accept()
        val in: InputStream = client.getInputStream
    
        val i: Int = in.read()
        println("接收到客户端发送的数据"+i)
        in.close()
        client.close()
        server.close()
      }
    }
    package test
    
    import java.io.OutputStream
    import java.net.Socket
    
    object Driver {
    
      def main(args: Array[String]): Unit = {
        //连接服务器
        val client = new Socket("localhost", 9999)
    
        val out: OutputStream = client.getOutputStream
    
        out.write(2)
        out.flush()
        out.close()
    
        client.close()
      }
    }

    运行结果:

     

     二、模拟客户端向服务端发送计算任务

    代码:

    package test
    
    //客户端不能直接发送对象,需要用混入的方式来进行序列化
    class Task extends Serializable {
    
      val datas=List(1,2,3,4)
    
      //val logic=(num:Int)=>{num*2}
      val logic:Int=>Int=_*2
    
      //计算
      def compute()={
        datas.map(logic)
      }
    }
    package test
    
    import java.io.{InputStream, ObjectInput, ObjectInputStream}
    import java.net.{ServerSocket, Socket}
    
    object Executor {
    
      def main(args: Array[String]): Unit = {
        //启动服务器,接收数据
        val server = new ServerSocket(9999)
        println("服务器启动,等待接收数据")
    
        //等待客户端连接
        val client: Socket = server.accept()
        val in: InputStream = client.getInputStream
        val objIn=new ObjectInputStream(in)
        val task: Task = objIn.readObject().asInstanceOf[Task]
        val ints: List[Int] = task.compute()
        println("计算节点计算结果为"+ints)
        objIn.close()
        client.close()
        server.close()
      }
    }
    package test
    
    import java.io.{ObjectOutputStream, OutputStream}
    import java.net.Socket
    
    object Driver {
    
      def main(args: Array[String]): Unit = {
        //连接服务器
        val client = new Socket("localhost", 9999)
    
        val out: OutputStream = client.getOutputStream
        val objOut = new ObjectOutputStream(out)
    
        val task = new Task()
        objOut.writeObject(task)
        objOut.flush()
        objOut.close()
        client.close()
        println("客户端计算任务发送完毕!")
      }
    }

    运行截图:

     

     三、模拟两台服务器同时进行计算

    代码:

    package test
    
    //客户端不能直接发送对象,需要用混入的方式来进行序列化
    class Task extends Serializable {
    
      val datas = List(1, 2, 3, 4)
    
      //val logic=(num:Int)=>{num*2}
      val logic: Int => Int = _ * 2
    
    }
    package test
    
    class SubTask extends Serializable {
    
      var datas:List[Int]=_
      var logic:(Int)=>Int=_
    
      //计算
      def compute()={
        datas.map(logic)
      }
    }
    package test
    
    import java.io.{InputStream, ObjectInput, ObjectInputStream}
    import java.net.{ServerSocket, Socket}
    
    object Executor1 {
    
      def main(args: Array[String]): Unit = {
        //启动服务器,接收数据
        val server = new ServerSocket(9999)
        println("服务器启动,等待接收数据")
    
        //等待客户端连接
        val client: Socket = server.accept()
        val in: InputStream = client.getInputStream
        val objIn=new ObjectInputStream(in)
        val task: SubTask = objIn.readObject().asInstanceOf[SubTask]
        val ints: List[Int] = task.compute()
        println("计算节点[9999]计算结果为"+ints)
        objIn.close()
        client.close()
        server.close()
      }
    }
    package test
    
    import java.io.{InputStream, ObjectInputStream}
    import java.net.{ServerSocket, Socket}
    
    object Executor2 {
    
      def main(args: Array[String]): Unit = {
        //启动服务器,接收数据
        val server = new ServerSocket(8888)
        println("服务器启动,等待接收数据")
    
        //等待客户端连接
        val client: Socket = server.accept()
        val in: InputStream = client.getInputStream
        val objIn=new ObjectInputStream(in)
        val task: SubTask = objIn.readObject().asInstanceOf[SubTask]
        val ints: List[Int] = task.compute()
        println("计算节点[8888]计算结果为"+ints)
        objIn.close()
        client.close()
        server.close()
      }
    }
    package test
    
    import java.io.{ObjectOutputStream, OutputStream}
    import java.net.Socket
    
    object Driver {
    
      def main(args: Array[String]): Unit = {
        //连接服务器
        val client1 = new Socket("localhost", 9999)
        val client2 = new Socket("localhost", 8888)
    
        val task = new Task()
    
        val out1: OutputStream = client1.getOutputStream
        val objOut1 = new ObjectOutputStream(out1)
    
        val subTask1 = new SubTask()
        subTask1.logic=task.logic
        subTask1.datas=task.datas.take(2)
    
        objOut1.writeObject(subTask1)
        objOut1.flush()
        objOut1.close()
        client1.close()
    
    
        val out2: OutputStream = client2.getOutputStream
        val objOut2 = new ObjectOutputStream(out2)
    
        val subTask2 = new SubTask()
        subTask2.logic=task.logic
        subTask2.datas=task.datas.takeRight(2)
    
        objOut2.writeObject(subTask2)
        objOut2.flush()
        objOut2.close()
        client2.close()
        println("客户端计算任务发送完毕!")
      }
    }

    运行截图:

     

     

  • 相关阅读:
    Python中利用xpath解析HTML
    常见聚类算法——K均值、凝聚层次聚类和DBSCAN比较
    格式化字符串format函数
    编程语言这个垂直方向
    CLR,GC 表示什么意思?
    ASP.Net MVC开发基础学习笔记:一、走向MVC模式
    NPOI 通过excel模板写入数据并导出
    SQL 注意事项
    解决微信公众号OAuth出现40029(invalid code,不合法的oauth_code)的错误
    iis 站点中文乱码 解决方案
  • 原文地址:https://www.cnblogs.com/dd110343/p/14303804.html
Copyright © 2011-2022 走看看