zoukankan      html  css  js  c++  java
  • netty实现消息转发服务

    1、结构图

      

    2、消息服务器

    消息服务器(SNS)由Http Netty Server(HNS)WebSocket Netty Server(WNS)组成。HNS采用Netty Http+XML协议栈开发实现,WNS采用Netty WebSocket+JSON实现。

    HNS只接收预定义的HttpXmlRequest类型的数据,这由编解码器控制,编解码器是继承了MessageToMessageDecoder<T>MessageToMessageEncoder<T>这两个编解码基础类、并用于解析处理预定义HttpXmlRequest数据的类。HNS根据接收结果向客户端发送预定义的HttpXmlResponse类型数据。

    HNS可以通过HttpXmlClient创建与业务服务器的链接,并通过HttpXmlClientHandler转发业务请求。HttpXmlClientHandler继承自SimpleChannelInboundHandler,通过它可以实现HNS与业务服务器的异步通信。

    目前,WNS主要用于与Web客户端端进行websocket通信,WNS通过全局变量Global.WSCG维护通道信息,通过Global.appUsers维护客户端连接。WNS定义了一个消息基类BaseMsg,该类描述了客户端发起请求所需要的数据信息。同样,WNS也定义了一个AppUser类用于存储客户端信息,必须说明的一点是,同一个AppUser可能存在多个通道,因此,在AppUser中定义了一个ChannelId数组,该数组维护了当前用户的所有通道ID。客户端发起连接请求时,必要的数据包括appiduserIdcmdappid是一个业务服务器的唯一标识。 

    3、业务服务器

    业务服务器是各个应用端建立的与SNS交互的Http Netty Server,换句话说,每一个应用都需要启动一个HNS用于与SNS交互。同样的,业务服务器也是通过HttpXmlClientSNSHNS发起连接请求,不再赘述。

    4、HttpXmlServer

    package com.sns.protocol.http.xml.server;
    
    import java.net.InetSocketAddress;
    
    import com.zehin.sns.protocol.http.xml.codec.HttpXmlRequest;
    import com.zehin.sns.protocol.http.xml.codec.HttpXmlRequestDecoder;
    import com.zehin.sns.protocol.http.xml.codec.HttpXmlResponseEncoder;
    import com.zehin.sns.protocol.http.xml.pojo.HttpRequestMessage;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.AdaptiveRecvByteBufAllocator;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpRequestDecoder;
    import io.netty.handler.codec.http.HttpResponseEncoder;
    
    public class HttpXmlServer implements Runnable {
    
    	private EventLoopGroup bossGroup = null;
    	private EventLoopGroup workerGroup = null;
    	private SimpleChannelInboundHandler<HttpXmlRequest> handler = null;
    
    	private int port = 9999;
    
    	@SuppressWarnings("unused")
    	private HttpXmlServer() {
    	}
    
    	public HttpXmlServer(int _port, SimpleChannelInboundHandler<HttpXmlRequest> _handler) {
    		this.port = _port;
    		this.handler = _handler;
    	}
    
    	@Override
    	public void run() {
    		// 处理网络连接---接受请求
    		bossGroup = new NioEventLoopGroup();
    		// 进行socketChannel的网络读写
    		workerGroup = new NioEventLoopGroup();
    		try {
    			ServerBootstrap b = new ServerBootstrap();
    			b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)
    					// boss线程接收参数设置,BACKLOG用于构造服务端套接字ServerSocket对象,
    					// 标识当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度。
    					// 如果未设置或所设置的值小于1,Java将使用默认值50。
    					.option(ChannelOption.SO_BACKLOG, 1024)
    					// 发送缓冲器
    					.option(ChannelOption.SO_SNDBUF, 1024)
    					// 接收缓冲器
    					.option(ChannelOption.SO_RCVBUF, 1024)
    					// 接收缓冲分配器
    					.option(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(256, 2048, 65536))
    					// work线程参数设置
    					.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(256, 2048, 65536))
    					.childHandler(new ChannelInitializer<SocketChannel>() {
    						@Override
    						protected void initChannel(SocketChannel ch) throws Exception {
    							ch.pipeline().addLast("http-decoder", new HttpRequestDecoder());
    							ch.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65536));
    							ch.pipeline().addLast("xml-decoder",
    									new HttpXmlRequestDecoder(HttpRequestMessage.class, true));
    							ch.pipeline().addLast("http-encoder", new HttpResponseEncoder());
    							ch.pipeline().addLast("xml-encoder", new HttpXmlResponseEncoder());
    							ch.pipeline().addLast("xmlServerHandler", handler);
    						}
    					});
    			ChannelFuture future = b.bind(new InetSocketAddress(port)).sync();
    			System.out.println("HTTP netty server started. the port is " + port);
    			future.channel().closeFuture().sync();
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			bossGroup.shutdownGracefully();
    			workerGroup.shutdownGracefully();
    		}
    	}
    
    	public void shutdown() {
    		if (bossGroup != null)
    			bossGroup.shutdownGracefully();
    		if (workerGroup != null)
    			workerGroup.shutdownGracefully();
    	}
    }
    

    5、HttpXmlServerHandler

    package com.sns.protocol.http.xml.server;
    
    import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
    import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
    import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
    import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    
    import com.zehin.sns.protocol.http.xml.codec.HttpXmlRequest;
    import com.zehin.sns.protocol.http.xml.codec.HttpXmlResponse;
    import com.zehin.sns.protocol.http.xml.pojo.HttpRequestMessage;
    import com.zehin.sns.protocol.http.xml.pojo.HttpResponseMessage;
    
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandler.Sharable;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.DefaultFullHttpResponse;
    import io.netty.handler.codec.http.FullHttpResponse;
    import io.netty.handler.codec.http.HttpRequest;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import io.netty.util.CharsetUtil;
    import io.netty.util.concurrent.Future;
    import io.netty.util.concurrent.GenericFutureListener;
    
    @Sharable
    public final class HttpXmlServerHandler extends SimpleChannelInboundHandler<HttpXmlRequest> {
    
    	@Override
    	public void messageReceived(final ChannelHandlerContext ctx, HttpXmlRequest xmlRequest) throws Exception {
    		HttpRequest request = xmlRequest.getRequest();
    		HttpRequestMessage reqMessage = (HttpRequestMessage) xmlRequest.getBody();
    		System.out.println("Http server receive request : " + reqMessage);
    		HttpResponseMessage resMessage = dobusiness(reqMessage);
    		ChannelFuture future = ctx.writeAndFlush(new HttpXmlResponse(null, resMessage));
    		if (!isKeepAlive(request)) {
    			future.addListener(new GenericFutureListener<Future<? super Void>>() {
    				public void operationComplete(Future future) throws Exception {
    					ctx.close();
    				}
    			});
    		}
    	}
    
    	private HttpResponseMessage dobusiness(HttpRequestMessage req) {
    		HttpResponseMessage resMessage = new HttpResponseMessage();
    		if (req.getCmd() == 0) {
    			resMessage.setResult(true);
    		} else {
    			// other verify code here...
    		}
    		return resMessage;
    	}
    
    	@Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    		cause.printStackTrace();
    		if (ctx.channel().isActive()) {
    			sendError(ctx, INTERNAL_SERVER_ERROR);
    		}
    	}
    
    	private static void sendError(ChannelHandlerContext ctx, HttpResponseStatus status) {
    		FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status,
    				Unpooled.copiedBuffer("失败: " + status.toString() + "
    ", CharsetUtil.UTF_8));
    		response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
    		ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    	}
    }
    

    6、备注

      主要参考《Netty权威指南》而写了个简单的消息转发。

  • 相关阅读:
    设计模式之适配器模式温故知新(九)
    设计模式之策略模式总结(八)
    设计模式之观察者模式, 个人感觉相当的重要(七)
    设计模式之抽象工厂模式读后(六)
    设计模式之工厂模式详细读后感TT!(五)
    设计模式之简单工厂模式, 加速(四)
    设计模式之代理模式笔记(三)
    设计模式之单例模式读后思考(二)
    为什么要用设计模式?先看看6大原则(一)
    Codeforces_835
  • 原文地址:https://www.cnblogs.com/sdnu/p/5946498.html
Copyright © 2011-2022 走看看