zoukankan      html  css  js  c++  java
  • 【MINA】用mina做业务服之间的通信,实现业务负载均衡思路

    学习mina目的还是搭建通信架构,学完mina我们了解了如何实现客户端和服务端,也就是一个正常channel我们是知道怎么建立的

    但是问题是,我们应用环境通信分为两种

    1.前后端通信


    其实这个比较好实现,提供一个mina server端,供前端语言通过socket建连接就行,这个通信就算是ok了,编解码等通信解析的细节这里不讲了

    以前的游戏服务端架构业务多用短连接,聊天用长连接,聊天的部分其实就是上面表述的情况

    现在是长连接的天下,聊天依旧是长连接,业务也做成长连接,实现了真正意义上的长连接游戏架构,这其实就表述了一种当下典型架构,

    就是后端提供两个开放的通信端口【即两个mina server】,供前端的socket连接,一个负责聊天,登录,注册,另一个负责其他业务,这样就实现了协议通信的负载均衡

    2.后端的业务服通信【这是本文的重点】


    那么后端的业务就不需要负载均衡吗?比如job,异步更新db,活动副本等

    当然也是需要的,怎么做那,先拿1中的做个解释

                             mainserevr[聊天,登录,注册]---nodeserver[其他业务]

    这两个mina sever端已经建立起来了,但是两个server之间还不能通信,我们有两个选择,要么在mainserevr上起个mina client去连nodeserver,要么在nodeserver

    上起个mina client去连mainserevr,思路肯定是这样的,一旦这个通道建立了,其实互为server和client的,会有一个iosession被通道持有,只要有这个iosession,

    就可以主动write,当然对于通道的另一端可以response,也可以通过取得iosession来主动写

    实现方式,我们在nodeserevr上提供一个mainserverClient这样一个spring的bean去连接mainserver,这样在nodeserver上就可以向mainserevr发消息了

    3.带着这个思路设计一下


    我把游戏中的业务分为

         public static final String SERVER_TYPE_NODE_STR = "nodeserver";// game node
    	public static final String SERVER_TYPE_MAIN_STR = "mainserver";// 主server
    	public static final String SERVER_TYPE_JOB_STR = "jobserver";// job server
    	public static final String SERVER_TYPE_ASYNCDB_STR = "asyncdbserver";// 异步DB
    	public static final String SERVER_TYPE_ACTIVE_STR = "activityserver";// 活动
    	public static final String SERVER_TYPE_OTHER_STR = "other";// 其他
    	public static final String SERVER_TYPE_GM_STR = "GM";//管理端
    

    每次启动一种server时,首先启动一次mina serevr,然后启动多个mina client去连接其他的mina server,

    比如启动nodeserevr 服务端,然后启动多个client分别连接mainserevr,jobserevr等的服务端,这样我就可以

    在nodeserver上给其他业务serevr发请求了,具体启动哪些client看需要

    搞一个启动server类型的方法

    public static ClassPathXmlApplicationContext start(String serverTypeStr) {
            try {
                            //关闭连接池的钩子线程
                ProxoolFacade.disableShutdownHook();
                            //spring 的核心配置文件
                String xmlFile = "applicationContext.xml";
    
                ....
                log.info("启动 {} server................", serverTypeName);
    
                // 设置到系统环境变量
                System.setProperty(NodeSessionMgr.SERVER_TYPE_KEY, serverType + "");
                System.setProperty(NodeSessionMgr.SERVER_TYPE_NAME_KEY,
                        serverTypeName);
    
                // final ClassPathXmlApplicationContext parent = new
                // ClassPathXmlApplicationContext(
                // xmlFile);
                String fileName = null;
    
                  //这是把spring的住配置文件拆分了一部分内容出来,目前是只加载本server需要的bean
                if (serverType == NodeSessionMgr.SERVER_TYPE_NODE) {
                    fileName = "wolf/app_nodeserver.xml";
                } else {
                    fileName = "wolf/app_server.xml";
                }
    
                //手动启动spring
                final ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(
                        new String[] { xmlFile, fileName });
    
                if (context != null) {
                    ServiceLocator.getInstance().setApplicationContext(context);
                }
    
                // 启动socket server
                final WolfServer server = (WolfServer) ServiceLocator
                        .getSpringBean("wolf_server");
                server.setServerType(serverType);
                            //这个调用就是我们熟悉的启动mina server端
                server.start();
    
                //这个动用做两件事,选区需要的serevr类型建立mina client连接
                startClient(server);
    
                            //钩子线程用来监听应用停止,为了做停止时的后续处理
                Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                    public void run() {
                        _shutdown();
                    }
                }, "shutdownHookThread"));
    
                  //为了支持web,springMVC,内置一个web server
                if (NodeSessionMgr.SERVER_TYPE_MAIN_STR
                        .equalsIgnoreCase(serverTypeStr)) {
                    JettyServer jettyServer = (JettyServer) ServiceLocator
                            .getSpringBean("jettyServer");
                    jettyServer.start();
                }
    
                log.info("start {} end................", serverTypeName);
                return context;
    
            } catch (Exception e) {
                e.printStackTrace();
                shutdown();
            } finally {
    
            }
            return null;
        }

    在看下startClient(server);

    private static void startClient(WolfServer server) {
            // asyncdbServer只会被连接,不会主动连接其他server
                    // 这部分目的是过滤那些不需要主动连比人的serevr,比武我这里的异步db,和活动服
            if (server.getServerType() == NodeSessionMgr.SERVER_TYPE_ASYNCDB
                    || server.getServerType() == NodeSessionMgr.SERVER_TYPE_ACTIVE) {
                return;
            }
    
            // 发送game Server ip port到mainserver
            Map<String, Object> params = new HashMap<String, Object>();
            params.put("nodeServerIp", server.getIp());
            params.put("nodeServerPort", server.getPort());
            params.put("serverType", server.getServerType());
    
            //我需要mainserevr的client,就弄个bean在本服
            final IWolfClientService mainServerClient = (IWolfClientService) ServiceLocator
                    .getSpringBean("mainServerClient");
    
            //这个位置其实就是mina的client连server端
            mainServerClient.init();
            Object localAddress = mainServerClient.registerNode(params);
    
                    
             //同上,需要jobserevr的client
            final IWolfClientService jobServerClient = (IWolfClientService) ServiceLocator
                    .getSpringBean("jobServerClient");
            if (jobServerClient != null) {
                jobServerClient.init();
                Map<String, Object> params1 = new HashMap<String, Object>();
                params1.putAll(params);
                jobServerClient.registerNode(params1);
            }
            // }
    
            .....
    
        }

    再看下WolfClientService.init()

    public void init() {
            if (start)
                return;
            if (wolfClient == null) {
                log.error("wolf client is null");
                return;
            }
             //mina 的client 连接 mina server
            wolfClient.start();
            if (wolfClient.isConnected())
                start = true;
        }

    再看下wolfclient.start()

    /**
         * 连接一个服务器,并指定处理接收到的消息的处理方法
         * 
         */
        public void start() {
            // this.context.put("resultMgr", this.resultMgr);
    
            logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
                    .getString("WolfClient_9"), processorNum);
            logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
                    .getString("WolfClient_0"), corePoolSize);
            logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
                    .getString("WolfClient_4"), maxPoolSize);
    
            if (this.serverIp == null || this.serverIp.equals("")) {
                logger.error(clientName + "没有配置serverIp,不启动.........");
                return;
            }
            String threadPrefix = clientName + "[" + this.serverIp + ":"
                    + this.serverPort + "]";
            // exector = Executors.newCachedThreadPool(new
            // NamingThreadFactory(threadPrefix));
            processor = new SimpleIoProcessorPool<NioSession>(NioProcessor.class,
                    processorNum);
    
            // connector = new NioSocketConnector((Executor) exector, processor);
            connector = new NioSocketConnector(processor);
    
            // connector.getSessionConfig().setReuseAddress(true);
            DefaultIoFilterChainBuilder chain = connector.getFilterChain();
    
            if (useLogFilter == 2) {
                chain.addLast("logging", new LoggingFilter());
            }
            // codec filter要放在ExecutorFilter前,因为读写同一个socket connection的socket
            // buf不能并发(事实上主要是读,写操作mina已经封装成一个write Queue)
            chain.addLast("codec", new ProtocolCodecFilter(codecFactory)); // 设置编码过滤器
    
            // 添加心跳过滤器,客户端只接受服务端的心跳请求,不发送心跳请求
            // connector.getSessionConfig().setReaderIdleTime(readIdleTimeOut);
            // 这里的KeepAliveFilter必须在codec之后,因为KeepAliveMessageFactoryImpl返回的是Object,如果KeepAliveMessageFactoryImpl返回的是IOBuffer,则可以在codec之前
            // KeepAliveFilter到底在ExecutorFilter之前好还是之后好,我也不确定
            KeepAliveFilter filter = new KeepAliveFilter(
                    new KeepAliveMessageFactoryImpl(keepAliveRequestInterval <= 0),
                    IdleStatus.READER_IDLE, new RequestTimeoutCloseHandler(),
                    keepAliveRequestInterval <= 0 ? 600 : keepAliveRequestInterval,
                    30);
            chain.addLast("ping", filter);
    
            // 添加执行线程池
            executor = new UnorderedThreadPoolExecutor(corePoolSize, maxPoolSize,
                    keepAliveTime, TimeUnit.SECONDS, new NamingThreadFactory(
                            threadPrefix));
    
            // 这里是预先启动corePoolSize个处理线程
            executor.prestartAllCoreThreads();
    
            chain.addLast("exec", new ExecutorFilter(executor,
                    IoEventType.EXCEPTION_CAUGHT, IoEventType.MESSAGE_RECEIVED,
                    IoEventType.SESSION_CLOSED, IoEventType.SESSION_IDLE,
                    IoEventType.SESSION_OPENED));
    
            if (useWriteThreadPool) {
                executorWrite = new UnorderedThreadPoolExecutor(corePoolSize,
                        maxPoolSize, keepAliveTime, TimeUnit.SECONDS,
                        new NamingThreadFactory(threadPrefix + "write"));
                executorWrite.prestartAllCoreThreads();
                chain.addLast("execWrite", new ExecutorFilter(executorWrite,
                        IoEventType.WRITE, IoEventType.MESSAGE_SENT));
    
            }
            // ,logger.isDebugEnabled() ? new
            // LoggingIoEventQueueHandler("execWrite") : nulls
    
            // 配置handler的 logger,在codec之后,打印的是decode前或者encode后的消息的log
            // 可以配置在ExecutorFilter之后:是为了在工作线程中打印log,不是在NioProcessor中打印
            if (useLogFilter == 1) {
                chain.addLast("logging", new LoggingFilter());
            }
    
            connector.setHandler(handler);
    
            connector.getSessionConfig().setReuseAddress(true);
            connector.getSessionConfig().setTcpNoDelay(tcpNoDelay);
            logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
                    .getString("WolfClient_1")
                    + serverIp + ":" + serverPort);
            ConnectFuture cf = null;
    
            long start = System.currentTimeMillis();
            while (true) {
                            //这地很关键,是个无线循环,每10秒连接一次,直到可以和服务端建立连接,否则一支循环下去
                cf = connector.connect(serverAddress);// 建立连接
                cf.awaitUninterruptibly(10000L);
                if (!cf.isConnected()) {
                    if ((System.currentTimeMillis() - start) > timeout) {
                        throw new RuntimeException(
                                com.youxigu.dynasty2.i18n.MarkupMessages
                                        .getString("WolfClient_5")
                                        + serverIp + ":" + serverPort);
                    }
                    if (cf.getException() != null) {
                        logger.error(com.youxigu.dynasty2.i18n.MarkupMessages
                                .getString("WolfClient_6"), serverIp + ":"
                                + serverPort, cf.getException().getMessage());
                    }
                    try {
                        Thread.sleep(10000);
                    } catch (Exception e) {
                    }
    
                    continue;
                }
    
                            //这就是终极目标了,我们的目的就是在serevr的客户端的bean里,可以拿到这个iosession
                this.setSession(cf.getSession());
    
                logger.info(com.youxigu.dynasty2.i18n.MarkupMessages
                        .getString("WolfClient_10")
                        + serverIp + ":" + serverPort);
                shutDown = false;
                if (handler instanceof WolfMessageChain) {
                    WolfMessageChain wmc = WolfMessageChain.class.cast(handler);
                    wmc.init(context);
                }
    
                break;
            }
    
        }

    这样后端的业务通信网就可以轻松的建立起来,之后想怎么通信就看你的了  

  • 相关阅读:
    行内元素知识点
    WPF可视化控件打印
    C#模拟网站用户登录
    不同版本的浏览器代理编码
    WPF弹出对话确认框
    MSDN中HttpWebRequest/HttpWebResponse用法
    C#Http编程
    WPF ICommand 用法
    详述.NET里class和struct的异同
    WPF页面切换及弹窗
  • 原文地址:https://www.cnblogs.com/dagangzi/p/4729555.html
Copyright © 2011-2022 走看看