zoukankan      html  css  js  c++  java
  • 第一个项目总结

    1项目架构

    基本框架5个服务器:

    2游戏服线程

    main-server and logic server配置

    main-server

    ExecutorService bossExecutor   = Executors.newCachedThreadPool();
            ExecutorService workerExecutor = Executors.newCachedThreadPool();
    
            ChannelFactory channelFactory     = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
            ServerBootstrap bootstrap         = new ServerBootstrap(channelFactory);
    
            ExecutionHandler executionHandler = new ExecutionHandler(
                    new OrderedMemoryAwareThreadPoolExecutor(32, 0, 0, 15, TimeUnit.MINUTES, new ThreadFactory() {
                        private AtomicInteger count = new AtomicInteger(0);
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread t = new Thread(r, "server-biz-pool-" + count.getAndAdd(1));
                            return t;
                        }
                    }));//定义名字
    
            bootstrap.setPipelineFactory(new GameServerPipelineFactory(listenercon, executionHandler));//GameServerPipelineFactory 加载 规定的Handler.并且会将ExecutionHandler 加在lengthFieldDecoder之后
    
            // 禁用Nagle 算法,适用于交互性强的客户端
            bootstrap.setOption("child.tcpNoDelay", true);
            // 设置keepalive
            bootstrap.setOption("child.keepAlive", listenerConfiguration.isKeepalive());
            bootstrap.bind(new InetSocketAddress(listenerConfiguration.getIp(), listenerConfiguration.getPort()));

    logic-server 两种 type connection[对于mainserve来说],每种 挂了8个.共有16个连接

    push[负责消息推送]

    logic[负责业务处理]

    pipelineFactory配置了相应的filter and Handler:

    项目的Handler有这几个:

    lengthFieldDecoder
    ExecutionHandler[执行 ChannelHandler 链的整个过程是同步的,如果业务逻辑的耗时较长,会将导致Work线程长时间被占用得不到释放,从而影响了整个服务器的并发处理能力。

    所 以,为了提高并发数,一般通过ExecutionHandler线程池来异步处理ChannelHandler链(worker线程在经过 ExecutionHandler后就结束了,它会被ChannelFactoryworker线程池所回收)。]

    jsonDecoder
    requestHandler[客户端传过来的action是string,回filter,like is login,通过规定的bean name和method 利用invoke执行,request num使用atomInteger record.]

    lengthFieldPrepender
    jsonEncoder

    对于ExecutionHandler需要的线程池模型,Netty提供了两种可选:

    1) MemoryAwareThreadPoolExecutor 通过对线程池内存的使用控制,可控制Executor中待处理任务的上限(超过上限时,后续进来的任务将被阻塞),并可控制单个Channel待处理任务的上限,防止内存溢出错误;

    2) OrderedMemoryAwareThreadPoolExecutor 是 MemoryAwareThreadPoolExecutor 的子类。除了MemoryAwareThreadPoolExecutor 的功能之外,它还可以保证同一Channel中处理的事件流的顺序性,这主要是控制事件在异步处理模式下可能出现的错误的事件顺序,但它并不保证同一 Channel中的事件都在一个线程中执行(通常也没必要)。

    例如:

    Thread X: --- Channel A (Event A1) --.   .-- Channel B (Event B2) --- Channel B (Event B3) --->

                                           /

                                           X

                                          / 

    Thread Y: --- Channel B (Event B1) --'   '-- Channel A (Event A2) --- Channel A (Event A3) --->

    上图表达的意思有几个:

    1)对整个线程池而言,处理同一个Channel的事件,必须是按照顺序来处理的。例如,必须先处理完Channel A (Event A1) ,再处理Channel A (Event A2)Channel A (Event A3)

    2)同一个Channel的多个事件,会分布到线程池的多个线程中去处理。

    3)不同Channel的事件可以同时处理(分担到多个线程),互不影响。  

    OrderedMemoryAwareThreadPoolExecutor 的这种事件处理有序性是有意义的,因为通常情况下,请求发送端希望服务器能够按照顺序处理自己的请求,特别是需要多次握手的应用层协议。例如:XMPP协议。

    (ExecutionHandler参考:http://www.cnblogs.com/TeaMax/archive/2013/04/03/2997874.html)

    //项目中使用的
    ExecutionHandler exeHandler = new ExecutionHandler(new MemoryAwareThreadPoolExecutor(8, 0, 0));

    3项目消息队列

    推送队列

    原理:将需要的推送信息,按照优先级存进redis相应的队列中,然后按照优先级取出来.

      //异步消息处理
      //主要线程
      private void commandProcess() {
    
        try {
    
          while (true) {
            final Command command = commandDao.get(Priority.HIGH, Priority.NORMAL, Priority.LOWER, Priority.MESSAGE);//阻塞
    
            if (command != null) {
    
              final CommandHandler commandHandler = CommandHandlerFactory.getInstance().getHandler(command.getCommand());
    
              Runnable task = new Runnable() {
    
                @Override
                public void run() {
                  commandHandler.handle(command);//将处理好的消息通过上面的push connection传输
                }
              };
    
              GameTaskProcessor.ins().general(task);//线程池exec
    
            } else {
              break;
            }
          }
    
        } catch (Exception e) {
          LOG.error(e.getMessage(), e);
    
          try {
            Thread.sleep(1000 * 5);
          } catch (InterruptedException ex) {
            LOG.error(ex.getMessage(), ex);
          }
    
        }
    
      }
    
    
    //缓存Handler.也可以使用spring getbeanbyname代替.
    public class CommandHandlerFactory {
    
        private static final Logger LOG = LoggerFactory.getLogger(CommandHandlerFactory.class);
    
        private Map<Integer, CommandHandler> commandHandlerMap = new HashMap<Integer, CommandHandler>();
    
        private static CommandHandlerFactory instance = null;
    
        private CommandHandlerFactory() {
    
        }
    
        public static CommandHandlerFactory getInstance() {
    
            if (instance == null) {
                instance = new CommandHandlerFactory();
            }
            return instance;
        }
    
        public void register(int command, CommandHandler handler) {//spring 实例化相应的Handler时,init method会调用
            LOG.info("command handler register.command[" + command + "]");
            commandHandlerMap.put(command, handler);
        }
    
        public CommandHandler getHandler(int command) {
    
            int key = command / 10000;
    
            if (commandHandlerMap.containsKey(key)) {
                return commandHandlerMap.get(key);
            } else {
                LOG.warn("命令处理器不存在.command[" + command + "], key[" + key + "]");
            }
    
            return null;
        }
    }
    
    //put/get command
    public class CommandDaoRedisImpl implements CommandDao {
    
        private static final int TIME_OUT = 2;
    
        private String getKey(int priority) {
            return RedisKey.getCommandKey(priority);
        }
    
        @Override
        public boolean add(Command command) {
            String key = getKey(command.getPriority());
    
            JedisUtils.pushMsg(key, Json.toJson(command));
    
            return true;
        }
    
        @Override
        public Command get(Integer... proritys) {
    
            String[] keys = new String[proritys.length];
            for (int i = 0; i < proritys.length; i++) {
                String key = getKey(proritys[i]);
                keys[i] = key;
            }
    
            String json = JedisUtils.blockPopMsg(TIME_OUT, keys);//阻塞方法
            if (json != null) {
                return Json.toObject(json, Command.class);
            }
            return null;
        }
    }

    最后利用push connection 随机一个connection,交给netty处理.

    logic队列

    利用netty channel link+java reflection.

    给笨笨的自己提个醒>_<~
  • 相关阅读:
    线程的简单介绍
    队列Joinablequeue的使用,以及生产者消费者模型的认识
    使用子线程来完成链接循环和通信循环
    使用socketserver实现简单的下载和上传
    类的绑定方法学习
    类的组合-多态-封装等特性的简单学习
    爬虫之亚马逊爬取
    JavaScript 面试中常见算法问题详解
    JavaScript 的 this 指向问题深度解析
    深入理解 JavaScript 中的函数
  • 原文地址:https://www.cnblogs.com/ephuizi/p/4348461.html
Copyright © 2011-2022 走看看