在学习spark源码的时候看到spark在1.6之后底层的通信框架变成了akka和netty两种方式,默认的是用netty根据源码的思路用scala写了一个Demo级别的netty通信
package com.spark.netty import io.netty.bootstrap.ServerBootstrap import io.netty.channel.ChannelInitializer import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.handler.codec.serialization.{ClassResolvers, ClassResolver, ObjectDecoder, ObjectEncoder} /** * Created by root on 2016/11/18. */ class NettyServer { def bind(host: String, port: Int): Unit = { //配置服务端线程池组 //用于服务器接收客户端连接 val bossGroup = new NioEventLoopGroup() //用户进行SocketChannel的网络读写 val workerGroup = new NioEventLoopGroup() try { //是Netty用户启动NIO服务端的辅助启动类,降低服务端的开发复杂度 val bootstrap = new ServerBootstrap() //将两个NIO线程组作为参数传入到ServerBootstrap bootstrap.group(bossGroup, workerGroup) //创建NioServerSocketChannel .channel(classOf[NioServerSocketChannel]) //绑定I/O事件处理类 .childHandler(new ChannelInitializer[SocketChannel] { override def initChannel(ch: SocketChannel): Unit = { ch.pipeline().addLast( // new ObjectEncoder, // new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)), new ServerHandler ) } }) //绑定端口,调用sync方法等待绑定操作完成 val channelFuture = bootstrap.bind(host, port).sync() //等待服务关闭 channelFuture.channel().closeFuture().sync() } finally { //优雅的退出,释放线程池资源 bossGroup.shutdownGracefully() workerGroup.shutdownGracefully() } } } object NettyServer { def main(args: Array[String]) { val host = args(0) val port = args(1).toInt val server = new NettyServer server.bind(host, port) } }
package com.spark.netty import io.netty.bootstrap.Bootstrap import io.netty.channel.ChannelInitializer import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.{NioSocketChannel, NioServerSocketChannel} import io.netty.handler.codec.serialization.{ClassResolvers, ObjectDecoder, ObjectEncoder} /** * Created by root on 2016/11/18. */ class NettyClient { def connect(host: String, port: Int): Unit = { //创建客户端NIO线程组 val eventGroup = new NioEventLoopGroup //创建客户端辅助启动类 val bootstrap = new Bootstrap try { //将NIO线程组传入到Bootstrap bootstrap.group(eventGroup) //创建NioSocketChannel .channel(classOf[NioSocketChannel]) //绑定I/O事件处理类 .handler(new ChannelInitializer[SocketChannel] { override def initChannel(ch: SocketChannel): Unit = { ch.pipeline().addLast( // new ObjectEncoder, // new ObjectDecoder(ClassResolvers.cacheDisabled(getClass.getClassLoader)), new ClientHandler ) } }) //发起异步连接操作 val channelFuture = bootstrap.connect(host, port).sync() //等待服务关闭 channelFuture.channel().closeFuture().sync() } finally { //优雅的退出,释放线程池资源 eventGroup.shutdownGracefully() } } } object NettyClient { def main(args: Array[String]) { val host = args(0) val port = args(1).toInt val client = new NettyClient client.connect(host, port) } }
package com.spark.netty import io.netty.buffer.{Unpooled, ByteBuf} import io.netty.channel.{ChannelHandlerContext, ChannelInboundHandlerAdapter} /** * Created by root on 2016/11/18. */ class ServerHandler extends ChannelInboundHandlerAdapter { /** * 有客户端建立连接后调用 */ override def channelActive(ctx: ChannelHandlerContext): Unit = { println("channelActive invoked") } /** * 接受客户端发送来的消息 */ override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = { println("channelRead invoked") val byteBuf = msg.asInstanceOf[ByteBuf] val bytes = new Array[Byte](byteBuf.readableBytes()) byteBuf.readBytes(bytes) val message = new String(bytes, "UTF-8") println(message) val back = "good boy!" val resp = Unpooled.copiedBuffer(back.getBytes("UTF-8")) println(msg) ctx.write(resp) } /** * 将消息对列中的数据写入到SocketChanne并发送给对方 */ override def channelReadComplete(ctx: ChannelHandlerContext): Unit = { println("channekReadComplete invoked") ctx.flush() } }
package com.spark.netty import io.netty.buffer.{ByteBuf, Unpooled} import io.netty.channel.{ChannelInboundHandlerAdapter, ChannelHandlerContext, ChannelHandlerAdapter} /** * Created by root on 2016/11/18. */ class ClientHandler extends ChannelInboundHandlerAdapter { override def channelActive(ctx: ChannelHandlerContext): Unit = { println("channelActive") val content = "hello server" ctx.writeAndFlush(Unpooled.copiedBuffer(content.getBytes("UTF-8"))) //发送case class 不在发送字符串了,封装一个字符串 // ctx.writeAndFlush(RegisterMsg("hello server")) } override def channelRead(ctx: ChannelHandlerContext, msg: scala.Any): Unit = { println("channelRead") val byteBuf = msg.asInstanceOf[ByteBuf] val bytes = new Array[Byte](byteBuf.readableBytes()) byteBuf.readBytes(bytes) val message = new String(bytes, "UTF-8") println(message) } override def channelReadComplete(ctx: ChannelHandlerContext): Unit = { println("channeReadComplete") ctx.flush() } //发送异常时关闭 override def exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable): Unit = { println("exceptionCaught") ctx.close() } }
package com.spark.netty /** * Created by root on 2016/11/18. */ case class RegisterMsg(content: String) extends Serializable
先启动NettyServer,然后在启动NettyClient.打印结果