zoukankan      html  css  js  c++  java
  • scala实现Netty通信

    在学习spark源码的时候看到spark在1.6之后底层的通信框架变成了akka和netty两种方式,默认的是用netty根据源码的思路用scala写了一个Demo级别的netty通信

    package com.spark.netty
    import io.netty.bootstrap.ServerBootstrap
    import io.netty.channel.ChannelInitializer
    import io.netty.channel.nio.NioEventLoopGroup
    import io.netty.channel.socket.SocketChannel
    import io.netty.channel.socket.nio.NioServerSocketChannel
    import io.netty.handler.codec.serialization.{ClassResolvers, ClassResolver, ObjectDecoder, ObjectEncoder}
    
    /**
      * Created by root on 2016/11/18.
      */
    class NettyServer {
      def bind(host: String, port: Int): Unit = {
        //配置服务端线程池组
        //用于服务器接收客户端连接
        val bossGroup = new NioEventLoopGroup()
        //用户进行SocketChannel的网络读写
        val workerGroup = new NioEventLoopGroup()
    
        try {
          //是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度
          val bootstrap = new ServerBootstrap()
          //将两个NIO线程组作为参数传入到ServerBootstrap
          bootstrap.group(bossGroup, workerGroup)
            //创建NioServerSocketChannel
            .channel(classOf[NioServerSocketChannel])
            //绑定I/O事件处理类
            .childHandler(new ChannelInitializer[SocketChannel] {
            override def initChannel(ch: SocketChannel): Unit = {
              ch.pipeline().addLast(
                //            new ObjectEncoder,
                //            new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)),
                new ServerHandler
              )
            }
          })
          //绑定端口,调用sync方法等待绑定操作完成
          val channelFuture = bootstrap.bind(host, port).sync()
          //等待服务关闭
          channelFuture.channel().closeFuture().sync()
        } finally {
          //优雅的退出,释放线程池资源
          bossGroup.shutdownGracefully()
          workerGroup.shutdownGracefully()
        }
      }
    }
    
    object NettyServer {
      def main(args: Array[String]) {
        val host = args(0)
        val port = args(1).toInt
        val server = new NettyServer
        server.bind(host, port)
      }
    }
    package com.spark.netty
    
    import io.netty.bootstrap.Bootstrap
    import io.netty.channel.ChannelInitializer
    import io.netty.channel.nio.NioEventLoopGroup
    import io.netty.channel.socket.SocketChannel
    import io.netty.channel.socket.nio.{NioSocketChannel, NioServerSocketChannel}
    import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder}
    
    /**
      * Created by root on 2016/11/18.
      */
    
    class NettyClient {
      def connect(host: String, port: Int): Unit = {
        //创建客户端NIO线程组
        val eventGroup = new NioEventLoopGroup
        //创建客户端辅助启动类
        val bootstrap = new Bootstrap
        try {
          //将NIO线程组传入到Bootstrap
          bootstrap.group(eventGroup)
            //创建NioSocketChannel
            .channel(classOf[NioSocketChannel])
            //绑定I/O事件处理类
            .handler(new ChannelInitializer[SocketChannel] {
            override def initChannel(ch: SocketChannel): Unit = {
              ch.pipeline().addLast(
    //            new ObjectEncoder,
    //            new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)),
                new ClientHandler
              )
            }
          })
          //发起异步连接操作
          val channelFuture = bootstrap.connect(host, port).sync()
          //等待服务关闭
          channelFuture.channel().closeFuture().sync()
        } finally {
          //优雅的退出,释放线程池资源
          eventGroup.shutdownGracefully()
        }
      }
    }
    
    object NettyClient {
      def main(args: Array[String]) {
        val host = args(0)
        val port = args(1).toInt
        val client = new NettyClient
        client.connect(host, port)
      }
    }
    package com.spark.netty
    
    import io.netty.buffer.{Unpooled, ByteBuf}
    import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter}
    
    /**
      * Created by root on 2016/11/18.
      */
    class ServerHandler extends ChannelInboundHandlerAdapter {
      /**
        * 有客户端建立连接后调用
        */
      override def channelActive(ctx: ChannelHandlerContext): Unit = {
        println("channelActive invoked")
      }
    
      /**
        * 接受客户端发送来的消息
        */
      override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
        println("channelRead invoked")
        val byteBuf = msg.asInstanceOf[ByteBuf]
        val bytes = new Array[Byte](byteBuf.readableBytes())
        byteBuf.readBytes(bytes)
        val message = new String(bytes, "UTF-8")
        println(message)
        val back = "good boy!"
        val resp = Unpooled.copiedBuffer(back.getBytes("UTF-8"))
        println(msg)
        ctx.write(resp)
      }
    
      /**
        * 将消息对列中的数据写入到SocketChanne并发送给对方
        */
      override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
        println("channekReadComplete invoked")
        ctx.flush()
      }
    
    
    }
    package com.spark.netty
    
    import io.netty.buffer.{ByteBuf, Unpooled}
    import io.netty.channel.{ChannelInboundHandlerAdapter, ChannelHandlerContext, ChannelHandlerAdapter}
    
    /**
      * Created by root on 2016/11/18.
      */
    class ClientHandler extends ChannelInboundHandlerAdapter {
      override def channelActive(ctx: ChannelHandlerContext): Unit = {
        println("channelActive")
        val content = "hello server"
        ctx.writeAndFlush(Unpooled.copiedBuffer(content.getBytes("UTF-8")))
        //发送case class 不在发送字符串了,封装一个字符串
        //    ctx.writeAndFlush(RegisterMsg("hello server"))
      }
    
      override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = {
        println("channelRead")
        val byteBuf = msg.asInstanceOf[ByteBuf]
        val bytes = new Array[Byte](byteBuf.readableBytes())
        byteBuf.readBytes(bytes)
        val message = new String(bytes, "UTF-8")
        println(message)
      }
    
      override def channelReadComplete(ctx: ChannelHandlerContext): Unit = {
        println("channeReadComplete")
        ctx.flush()
      }
    //发送异常时关闭
      override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = {
        println("exceptionCaught")
        ctx.close()
      }
    
    }
    package com.spark.netty
    
    /**
      * Created by root on 2016/11/18.
      */
    case class RegisterMsg(content: String) extends Serializable

    先启动NettyServer,然后在启动NettyClient.打印结果

  • 相关阅读:
    Y2K Accounting Bug(POJ 2586)
    Power of Cryptography(POJ 2109 math )
    codeforces C. Valera and Tubes
    codeforces C. Devu and Partitioning of the Array
    codeforces C. Ryouko's Memory Note
    codeforces C. k-Tree
    codeforces C. Prime Swaps
    codeforces C. Xor-tree
    codeforces B. Prison Transfer
    codeforces C. Sereja and Swaps
  • 原文地址:https://www.cnblogs.com/itboys/p/6077640.html
Copyright © 2011-2022 走看看