zoukankan      html  css  js  c++  java
  • 【Socket / Mina】Mina TCP Server & Client

    Simple TCP Server

    import java.net.InetSocketAddress;
    import java.net.SocketAddress;
    import java.util.Optional;
    import java.util.function.Consumer;
    
    import org.apache.mina.core.service.IoAcceptor;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
    
    public class SimpleTcpServer {
    
        /** CPU 线程数 */
        protected static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
        /** 吞吐量计算间隔 (秒) */
        protected volatile int throughput = 3;
        /** 监听端口 */
        protected volatile int port = Short.MAX_VALUE;
        /** 关闭标识 */
        protected volatile boolean closed = false;
        /** {@link java.net.SocketAddress} endpoint */
        protected volatile SocketAddress endpoint;
        /** {@link org.apache.mina.core.service.IoAcceptor} acceptor */
        protected volatile IoAcceptor acceptor;
    
        /**
         * Create a tcp server.
         * 
         * @param port          监听端口
         */
        public TcpServer(int port) {
            this.port = port;
            this.endpoint = new InetSocketAddress(port);
        }
    
        /**
         * Create a tcp server.
         * 
         * @param port             监听端口
         * @param throughput       吞吐量计算间隔 (秒)
         */
        public TcpServer(int port, int throughput) {
            this.throughput = throughput;
            this.port = port;
            this.endpoint = new InetSocketAddress(port);
        }
    
        /**
         * Stop the tcp server.
         */
        public void stop() {
            this.closed = true;
            Optional.ofNullable(this.acceptor).ifPresent(x -> { x.unbind(); x.dispose(); });
        }
    
        /**
         * Start the tcp server.
         * 
         * @param consumer
         * @throws Exception
         */
        public void start(Consumer<IoAcceptor> consumer) throws Exception {
            this.closed = false;
            this.acceptor = new NioSocketAcceptor(CPU_COUNT + 1);
            this.acceptor.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 30);
            this.acceptor.getSessionConfig().setThroughputCalculationInterval(this.throughput);
            // this.acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(ProtocolCodecFactory));
            // this.acceptor.setHandler(IoHandler);
            Optional.ofNullable(consumer).ifPresent(x -> x.accept(this.acceptor));
            doBind();
        }
    
        /**
         * 
         */
        protected void doBind() {
            if (this.closed) {
                return;
            }
            while (!this.closed) {
                try {
                    this.acceptor.bind(this.endpoint);
                    return;
                } catch (Exception ex) {
                    // ignore
                    ex.printStackTrace();
                }
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    // ignore
                }
            }
        }
    
    }
    
    

    Simple TCP Client

    import java.net.InetSocketAddress;
    import java.net.SocketAddress;
    import java.util.Optional;
    import java.util.concurrent.CompletableFuture;
    import java.util.function.Consumer;
    
    import org.apache.mina.core.filterchain.IoFilterAdapter;
    import org.apache.mina.core.future.ConnectFuture;
    import org.apache.mina.core.future.IoFutureListener;
    import org.apache.mina.core.session.IdleStatus;
    import org.apache.mina.core.session.IoSession;
    import org.apache.mina.transport.socket.nio.NioSocketConnector;
    
    public class SimpleTcpClient {
    
        /** 连接地址 */
        protected volatile String addr = "127.0.0.1";
        /** 连接端口 */
        protected volatile int port = Short.MAX_VALUE;
        /** 关闭标识 */
        protected volatile boolean closed = false;
        /** {@link java.net.SocketAddress} endpoint */
        protected volatile SocketAddress endpoint;
        /** {@link org.apache.mina.transport.socket.nio.NioSocketConnector} connector */
        protected volatile NioSocketConnector connector;
        /** {@link org.apache.mina.core.session.IoSession} session */
        protected volatile IoSession session;
    
        /**
         * Create a tcp client.
         * 
         * @param addr      IP 地址
         * @param port      端口号
         */
        public TcpClient(String addr, int port) {
            this.addr = addr;
            this.port = port;
            this.endpoint = new InetSocketAddress(addr, port);
        }
    
        /**
         * Close the tcp client.
         */
        public void stop() {
            this.closed = true;
            Optional.ofNullable(this.session).ifPresent(x -> x.closeNow());
            Optional.ofNullable(this.connector).ifPresent(x -> x.dispose());
        }
    
        /**
         * Connect to tcp server.
         * 
         * @param consumer
         * @throws Exception 
         */
        public void start(Consumer<NioSocketConnector> consumer) throws Exception {
            this.closed = false;
            this.connector = new NioSocketConnector();
            this.connector.setConnectTimeoutMillis(3000);
            this.connector.getSessionConfig().setReuseAddress(true);
            this.connector.getSessionConfig().setKeepAlive(true);
            this.connector.getSessionConfig().setTcpNoDelay(true);
            this.connector.getSessionConfig().setSoLinger(0);
            this.connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 30);
            this.connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter() {
                    @Override
                    public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception {
                        CompletableFuture.runAsync(() -> doConnect());
                    }
                });
            // this.connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(ProtocolCodecFactory));
            // this.connector.setHandler(IoHandler);
            Optional.ofNullable(consumer).ifPresent(x -> x.accept(this.connector));
            doConnect();
        }
    
        /**
         * 
         */
        protected void doConnect() {
            if (this.closed) {
                return;
            }
            this.connector.connect(this.endpoint).addListener(new IoFutureListener<ConnectFuture>() {
                @Override
                public void operationComplete(ConnectFuture future) {
                    if (!future.isConnected()) {
                        CompletableFuture.runAsync(() -> doConnect());
                    } else {
                        session = future.getSession();
                    }
                }
            });
        }
    
        /**
         * 
         * @param data
         */
        public void sent(Object data) {
            Optional.ofNullable(this.session)
                    .filter(x -> x.isActive())
                    .ifPresent(x -> x.write(data));
        }
    
    }
    
    
  • 相关阅读:
    计算机英语
    NSQ学习记录
    Java学习记录-注解
    VS插件开发

    双链表
    顺序表
    顺序队列
    顺序栈

  • 原文地址:https://www.cnblogs.com/zhuzhongxing/p/14147096.html
Copyright © 2011-2022 走看看