zoukankan      html  css  js  c++  java
  • Mina之polling

    polling下面包含了实现了基于轮询策略的select调用或其他类型的I/O轮询系统调用的基类。先看抽象类AbstractPollingIoAcceptor,成员如下:

    public abstract class AbstractPollingIoAcceptor<T extends AbstractIoSession, H> extends AbstractIoAcceptor {
    private final IoProcessor<T> processor;
    private final boolean createdProcessor;
    private final Object lock = new Object();
    private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
    private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<AcceptorOperationFuture>();
    private final Map<SocketAddress, H> boundHandles = Collections.synchronizedMap(new HashMap<SocketAddress, H>());
    private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
    private volatile boolean selectable;
    private Acceptor acceptor;
    }

    registerQueue是注册队列,cancelQueue是取消注册的队列。boundHandles保存了地址到服务器socket的映射表。先看bind的处理过程:

        protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
    // 创建一个BIND REQUEST作为Future Operation。
    AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
    // 添加到注册队列中
    registerQueue.add(request);
    // 创建Acceptor实例,然后启动它
    startupAcceptor();
    // select不会被阻塞,只需要把bind request添加到注册队列就好了。
    wakeup();
    // 等待知道request完成
    request.awaitUninterruptibly();
    if (request.getException() != null) {
    throw request.getException();
    }
    //更新本地绑定地址
    Set<SocketAddress> newLocalAddresses = new HashSet<SocketAddress>();

    for (H handle:boundHandles.values()) {
    newLocalAddresses.add(localAddress(handle));
    }
    return newLocalAddresses;
    }

    真正负责接受客户端请求的工作是Acceptor线程完成的,下面是其中的run方法:

            public void run() {
    int nHandles = 0;
    while (selectable) {
    try {
    // 看一下有没有keys可以被处理了
    int selected = select();
    // 注册服务器,这样做的目的是讲Selector的状态至于OP_ACCEPT,并绑定到
    // 监听的端口上,表明可以接受来自客户端的链接请求了。
    nHandles += registerHandles();

    if (selected > 0) {
    // 处理OP_ACCEPT状态的服务器socket句柄集
    processHandles(selectedHandles());
    }
    // 检查客户端取消的请求
    nHandles -= unregisterHandles();

    // 如果现在nHandles为0,那么就可以退出循环了。
    if (nHandles == 0) {
    synchronized (lock) {
    if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
    acceptor = null;
    break;
    }
    }
    }
    } catch (Throwable e) {
    ExceptionMonitor.getInstance().exceptionCaught(e);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e1) {
    ExceptionMonitor.getInstance().exceptionCaught(e1);
    }
    }
    }
    // 释放资源。
    if (selectable && isDisposing()) {
    selectable = false;
    try {
    if (createdProcessor) {
    processor.dispose();
    }
    } finally {
    try {
    synchronized (disposalLock) {
    if (isDisposing()) {
    destroy();
    }
    }
    } catch (Exception e) {
    ExceptionMonitor.getInstance().exceptionCaught(e);
    } finally {
    disposalFuture.setDone();
    }
    }
    }
    }

    下面通过registerHandles注册服务器socket句柄:

        private int registerHandles() {
    for (;;) {
    // acceptor中注册队列包括services列表
    AcceptorOperationFuture future = registerQueue.poll();

    if (future == null) {
    return 0;
    }
    // 我们建立临时的map保存
    Map<SocketAddress, H> newHandles = new HashMap<SocketAddress, H>();
    List<SocketAddress> localAddresses = future.getLocalAddresses();

    try {
    // 处理所有的地址
    for (SocketAddress a : localAddresses) {
    H handle = open(a);// 打开指定地址,返回服务器socket句柄
    newHandles.put(localAddress(handle), handle);// 加入地址-socket映射表中
    }
    boundHandles.putAll(newHandles);// 更新本地绑定地址集
    future.setDone();// 完成注册过程
    return newHandles.size();
    } catch (Exception e) {
    future.setException(e);
    } finally {
    // 如果失败的话回滚
    if (future.getException() != null) {
    for (H handle : newHandles.values()) {
    try {
    close(handle);
    } catch (Exception e) {
    ExceptionMonitor.getInstance().exceptionCaught(e);
    }
    }
    wakeup();
    }
    }
    }
    }

    processHandles方法用来处理一个会话,只有准备好的才会被在这里处理:

            // 处理来自客户端的请求
    private void processHandles(Iterator<H> handles) throws Exception {
    while (handles.hasNext()) {
    H handle = handles.next();
    handles.remove();
    T session = accept(processor, handle);// 真正接受来自客户端的请求
    if (session == null) {
    break;
    }
    initSession(session, null, null);
    session.getProcessor().add(session);// 加入
    }
    }

    AbstractPollingIoConnector用于实现客户端连接的轮询策略,底层的socket不断地被检测,并当有任何一个socket需要被处理时就会被唤醒去处理:

    public abstract class AbstractPollingIoConnector<T extends AbstractIoSession, H> extends AbstractIoConnector {
    private final Object lock = new Object();
    private final Queue<ConnectionRequest> connectQueue = new ConcurrentLinkedQueue<ConnectionRequest>();// 链接队列
    private final Queue<ConnectionRequest> cancelQueue = new ConcurrentLinkedQueue<ConnectionRequest>(); // 取消队列
    private final IoProcessor<T> processor;
    private final boolean createdProcessor;
    private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
    private volatile boolean selectable;
    private Connector connector;
    }

    处理连接的方法,真正的操作也是在线程中进行的:

        protected final ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
    H handle = null;
    boolean success = false;
    try {
    handle = newHandle(localAddress);
    if (connect(handle, remoteAddress)) {// 如果已链接服务器成功
    ConnectFuture future = new DefaultConnectFuture();
    T session = newSession(processor, handle);// 创建新会话
    initSession(session, future, sessionInitializer);
    session.getProcessor().add(session);// 将剩下的处理交给IoProcessor
    success = true;
    return future;
    }
    success = true;
    } catch (Exception e) {
    return DefaultConnectFuture.newFailedFuture(e);
    } finally {
    if (!success && handle != null) {
    try {
    close(handle);
    } catch (Exception e) {
    ExceptionMonitor.getInstance().exceptionCaught(e);
    }
    }
    }
    ConnectionRequest request = new ConnectionRequest(handle, sessionInitializer);
    connectQueue.add(request);// 加入请求队列
    startupWorker();// 开启工作线程处理连接请求
    wakeup();// 中断select操作
    return request;
    }

    真正的处理的线程:

            public void run() {
    int nHandles = 0;
    while (selectable) {
    try {
    int timeout = (int)Math.min(getConnectTimeoutMillis(), 1000L);// 等待超时时间
    int selected = select(timeout);// 在超时时间内看时候有可以的连接
    nHandles += registerNew();// 取出连接请求,加入连接轮询池
    if (selected > 0) {
    nHandles -= processConnections(selectedHandles());// 处理连接请求
    }
    processTimedOutSessions(allHandles());// 处理超时连接请求
    nHandles -= cancelKeys();// 取消了的会话
    if (nHandles == 0) {
    synchronized (lock) {
    if (connectQueue.isEmpty()) {
    connector = null;
    break;
    }
    }
    }
    } catch (Throwable e) {
    ExceptionMonitor.getInstance().exceptionCaught(e);
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e1) {
    ExceptionMonitor.getInstance().exceptionCaught(e1);
    }
    }
    }

    if (selectable && isDisposing()) {
    selectable = false;
    try {
    if (createdProcessor) {
    processor.dispose();
    }
    } finally {
    try {
    synchronized (disposalLock) {
    if (isDisposing()) {
    destroy();
    }
    }
    } catch (Exception e) {
    ExceptionMonitor.getInstance().exceptionCaught(e);
    } finally {
    disposalFuture.setDone();
    }
    }
    }
    }
    }

    其实这里的代码就差不多都很清楚了(和前面的已经很相似,不在赘述)。

  • 相关阅读:
    ActiveMQ, Qpid, HornetQ and RabbitMQ in Comparison
    AMQP与QPID简介
    设置JVM内存溢出时快照转存HeapDump到文件
    How to find configuration file MySQL uses?
    linux命令行模式下实现代理上网
    CAS分析——Core
    单点登录加验证码例子
    统一建模语言(UML) 版本 2.0
    UML 2中结构图的介绍
    如何更改 RSA 的语言设置
  • 原文地址:https://www.cnblogs.com/ggzwtj/p/2213474.html
Copyright © 2011-2022 走看看