zoukankan      html  css  js  c++  java
  • [编织消息框架][分层模型设计]会话与节点

     1 public class QNode implements IRecycle {
     2     /**
     3      * session会话,记录通信层属性
     4      **/
     5     private QSession session;
     6     /**
     7      * message cb 维护消息回调
     8      **/
     9     private QCallbackManager callbackManager;
    10     /**
    11      * netty channel
    12      **/
    13     private Channel channel;
    14 
    15     private InetSocketAddress address;
    16 }
    QNode
    1 public class QSession implements IRecycle {
    2 
    3     public static enum SESSION_KEY {
    4     ALIAS, OP_TIME, ID,
    5     }
    6 
    7     private long id;
    8     private Map<String, Object> values = new HashMap<>();
    9 }
    QSession
      1 /***
      2  * 响应异步消息 针对业务上的逻辑处理 {@link QNode#send(Object, IQCallback)}
      3  * 
      4  * @author solq
      5  */
      6 public class QCallbackManager implements IRecycle {
      7     private final static Logger LOGGER = LoggerFactory.getLogger(QCallbackManager.class);
      8 
      9     private final static ScheduledExecutorService pool = PoolUtil.createScheduledPool(QMConfig.getInstance().POOL_CLEAR_MESSAGE_CORE, "message clear");
     10 
     11     private Map<Long, IQCallback<?>> messageRecoreds = Collections.synchronizedMap(new HashMap<>());
     12 
     13     @SuppressWarnings({ "unchecked", "rawtypes" })
     14     private void buildTask(long sn, IQCallback cb) {
     15     messageRecoreds.put(sn, cb);
     16     Future<?> future = pool.schedule(new Runnable() {
     17         private long _sn = sn;
     18 
     19         @Override
     20         public void run() {
     21         IQCallback<?> _cb = messageRecoreds.remove(_sn);
     22         if (_cb == null) {
     23             return;
     24         }
     25         try {
     26             _cb.onReceiveError(QCode.MESSAGE_ERROR_TIMEOUT);
     27         } finally {
     28             _cb.recycle();
     29         }
     30 
     31         }
     32     }, QMConfig.getInstance().NETTY_MESSAGE_CALLBACK_CLEAR_INTERVAL, TimeUnit.MILLISECONDS);
     33     cb.setFuture(future);
     34     }
     35 
     36     public <T> IQCallback<T> doSend(QPacket sendPacket, IQCallback<T> cb) {
     37     if (cb == null) {
     38         return null;
     39     }
     40     sendPacket.setStatus(QPacket.MASK_RESPONSE);
     41     cb.setSendPacket(sendPacket);
     42     buildTask(sendPacket.getSn(), cb);
     43     return cb;
     44     }
     45 
     46     public void doSendError(QPacket sendPacket, short code) {
     47     final long key = sendPacket.getSn();
     48     IQCallback<?> cb = messageRecoreds.remove(key);
     49     if (cb == null) {
     50         if (LOGGER.isWarnEnabled()) {
     51         LOGGER.warn("发送失败 未找到回调 :" + key);
     52         }
     53         return;
     54     }
     55     try {
     56         cb.onSendError(code);
     57     } finally {
     58         cb.recycle();
     59     }
     60     }
     61 
     62     public void doReceiveSucceed(QPacket rePacket) {
     63     final long key = rePacket.getSn();
     64     IQCallback<?> cb = messageRecoreds.remove(key);
     65     if (cb == null) {
     66         if (LOGGER.isWarnEnabled()) {
     67         LOGGER.warn("响应成功 未找到回调 :" + key);
     68         }
     69         return;
     70     }
     71     try {
     72         short code = rePacket.toCode();
     73         cb.setCode(code);
     74         cb.onSucceed(code);
     75     } finally {
     76         cb.recycle();
     77     }
     78     }
     79 
     80     public void doReceiveError(QPacket rePacket) {
     81     final long key = rePacket.getSn();
     82     IQCallback<?> cb = messageRecoreds.remove(key);
     83     if (cb == null) {
     84         if (LOGGER.isWarnEnabled()) {
     85         LOGGER.warn("响应失败 未找到回调 :" + key);
     86         }
     87         return;
     88     }
     89     try {
     90         short code = rePacket.toCode();
     91         cb.setCode(code);
     92         cb.onReceiveError(code);
     93     } finally {
     94         cb.recycle();
     95     }
     96     }
     97 
     98     public int getMessageRecoredSize() {
     99     return messageRecoreds.size();
    100     }
    101 
    102     @Override
    103     public void recycle() {
    104     // 释放所有消息
    105     messageRecoreds.forEach((sn, cb) -> {
    106         cb.recycle();
    107     });
    108     messageRecoreds.clear();
    109     }
    QCallbackManager
     1 public abstract class IQCallback<T> implements IRecycle, QResult<T> {
     2     //响应成功回调
     3     abstract public void onSucceed(short code);
     4 
     5     // 默认什么也不用做
     6     public void onSendError(short code) {
     7     if (LOGGER.isWarnEnabled()) {
     8         LOGGER.warn("onSendError : {)", code);
     9     }
    10     this.code = code;
    11     }
    12 
    13     // 默认什么也不用做
    14     public void onReceiveError(short code) {
    15     if (LOGGER.isWarnEnabled()) {
    16         LOGGER.warn("onReceiveError : {)", code);
    17     }
    18     this.code = code;
    19     }
    20 }

    IQCallback 有三种响应消息处理

    1.onSucceed 响应返回成功

    2.onSendError 发送时失败

    3.onReceiveError 响应返回失败

    有的业务非常复杂,如果响应失败了可以根据返回码深度处理

  • 相关阅读:
    MFC中控制Tips的显示
    VC++6.0/MFC 自定义edit 限制输入内容 响应复制粘贴全选剪切的功能
    VC++6.0/MFC中如何限制Edit控件输入 例子,只能输入0和1
    Windows安装配置php+memcached的方法
    mrg_myIsam分表引擎用法
    用PHP做服务器接口客户端用http协议POST访问安全性一般怎么做
    PHP慢脚本日志和Mysql的慢查询日志(转)
    nginx 配置优化的几个参数
    static详解
    Redis命令总结
  • 原文地址:https://www.cnblogs.com/solq111/p/6600869.html
Copyright © 2011-2022 走看看