zoukankan      html  css  js  c++  java
  • NIO之socket交互。

    参考: https://www.cnblogs.com/yanghuahui/p/3686054.html

    https://www.jianshu.com/p/c26a25feb77e

    参考上面的代码,写了份demo。雷同率高达98.888888888888~%

    本来想加个服务端断掉,客户端尝试重连的功能。发现难度有点大。如有解决方案,希望可以告知,感激不尽。

    测试代码

    IChat.java

    package com.boot.demo.test.io.socket.chat;
    
    /**
     * @author braska
     * @date 2020/3/23
     **/
    public interface IChat {
        void run() throws Exception;
    }
    

     

    AbstractChat.java

    package com.boot.demo.test.io.socket.chat;
    
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.util.Iterator;
    
    /**
     * @author braska
     * @date 2020/3/23
     **/
    public abstract class AbstractChat implements IChat {
        protected final String USER_CONTENT_SPLIT = "#@#";
        protected final String USER_EXISTS_ERROR = "system message: user exist, please change a name";
        protected final String WELCOME = "welcome %s to chat room! Online numbers: %s";
    
        protected Charset charset = Charset.forName("UTF-8");
    
        protected int port;
        protected Selector selector;
        protected SelectionKey currentKey;
    
        @Override
        public void run() throws Exception {
            while (this.selector.select() > 0) {
                for (Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator(); iter.hasNext(); ) {
                    currentKey = iter.next();
                    iter.remove();
                    keyHandler(currentKey);
                }
            }
        }
    
        protected abstract void keyHandler(SelectionKey key) throws Exception;
    
        protected SocketChannel getChannel() {
            return (SocketChannel) currentKey.channel();
        }
    
        protected String content() throws Exception {
            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            StringBuilder builder = new StringBuilder(256);
            while (getChannel().read(byteBuffer) > 0) {
                byteBuffer.flip();
                builder.append(charset.decode(byteBuffer));
                byteBuffer.clear();
            }
            return builder.toString();
        }
    }
    

      

    ClientServer.java

    package com.boot.demo.test.io.socket.chat;
    
    import com.google.common.base.Strings;
    import com.google.common.collect.Maps;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.*;
    import java.util.Map;
    
    /**
     * @author braska
     * @date 2020/3/23
     **/
    public class ChatServer extends AbstractChat {
        private int port;
        private final Map<String, SocketChannel> users;
    
        public ChatServer(int port) throws Exception {
            this.users = Maps.newConcurrentMap();
            this.port = port;
            ServerSocketChannel channel = ServerSocketChannel.open();
            channel.socket().bind(new InetSocketAddress(this.port));
            channel.configureBlocking(false);
            this.selector = Selector.open();
            channel.register(selector, SelectionKey.OP_ACCEPT);
        }
    
        @Override
        public void keyHandler(SelectionKey key) {
            try {
                if (key.isAcceptable()) {    // 客户端连入
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    sc.register(this.selector, SelectionKey.OP_READ);
                    key.interestOps(SelectionKey.OP_ACCEPT);
                    System.out.println("Server is listening from client :" + sc.getRemoteAddress());
                    sc.write(charset.encode("Please input your name:"));
                } else if (key.isReadable()) {  // 接收客户端消息
                    SocketChannel channel = getChannel();
                    key.interestOps(SelectionKey.OP_READ);
                    String content = content();
                    if (!Strings.isNullOrEmpty(content)) {
                        String[] nameAndContent = content.split(USER_CONTENT_SPLIT);
                        if (nameAndContent != null && nameAndContent.length == 1) {
                            if (users.containsKey(nameAndContent[0]) && users.get(nameAndContent[0]).isConnected()) {
                                channel.write(charset.encode(USER_EXISTS_ERROR));
                            } else {
                                users.put(nameAndContent[0], channel);
                                dispatch(String.format(WELCOME, nameAndContent[0], countUsers()), null);
                            }
                        } else {
                            content = nameAndContent[0] + " said: " +
                                    content.replace(String.format("%s%s", nameAndContent[0], USER_CONTENT_SPLIT), "");
                            if (users.containsKey(nameAndContent[0])) {
                                dispatch(content, channel);
                            }
                        }
                    }
                }
            } catch (Exception e) {
                if (key != null) {
                    key.cancel();
                    if (key.channel() != null) {
                        try {
                            for (Map.Entry<String, SocketChannel> entry : users.entrySet()) {
                                if (entry.getValue().equals(key.channel())) {
                                    dispatch(entry.getKey() + " has gone.", key.channel());
                                    break;
                                }
                            }
                            key.channel().close();
                        } catch (IOException e1) {
                            e1.printStackTrace();
                        }
                    }
                }
            }
        }
    
        private long countUsers() {
            return this.selector.keys().stream().filter(key -> key.channel() instanceof SocketChannel).count();
        }
    
        private void dispatch(String content, SelectableChannel channel) {
            selector.keys().stream()
                    .filter(key -> key.channel() instanceof SocketChannel)
                    .filter(key -> key.channel() != channel)
                    .forEach(key -> {
                        SocketChannel c = (SocketChannel) key.channel();
                        try {
                            c.write(charset.encode(content));
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    });
        }
    
        public static void main(String[] args) throws Exception {
            new ChatServer(3000).run();
        }
    }
    

      

    ChatClient.java

    package com.boot.demo.test.io.socket.chat;
    
    import org.apache.logging.log4j.util.Strings;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Scanner;
    import java.util.concurrent.Semaphore;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author braska
     * @date 2020/3/23
     **/
    public class ChatClient extends AbstractChat {
        private String username;
        private Integer retry = 20;
        private int interval = 1000;
        private AtomicInteger count = new AtomicInteger(0);
        private Semaphore semaphore = new Semaphore(1);
    
        public ChatClient(int port) throws Exception {
            this.port = port;
            connect();
        }
    
        public void setRetry(Integer retry) {
            this.retry = retry;
        }
    
        public void setInteval(int inteval) {
            this.interval = inteval;
        }
    
        private void connect() throws Exception {
            this.selector = Selector.open();
            SocketChannel channel = SocketChannel.open();
            channel.configureBlocking(false);   
            channel.connect(new InetSocketAddress(this.port));
            channel.register(selector, SelectionKey.OP_CONNECT);
    
            if (semaphore.tryAcquire() && count.get() != 0 && count.get() <= retry) {
                System.out.println("try to connect chat server " + count.get() + "time");
                connect();
            }
        }
    
        @Override
        public void keyHandler(SelectionKey key) {
            try {
                SocketChannel channel = getChannel();
                if (key.isValid() && key.isConnectable()) {
                    if (channel.finishConnect()) {
                        count.set(0);
                        username = "";
                    }
                    channel.register(this.selector, SelectionKey.OP_READ);
                    new Thread(() -> {
                        Scanner scanner = new Scanner(System.in);
                        try {
                            while (scanner.hasNextLine()) {
                                String content = scanner.nextLine();
                                if ("".equals(content)) continue;
                                if (!Strings.isNotBlank(username)) {
                                    username = content;
                                    content += USER_CONTENT_SPLIT;
                                } else {
                                    content = username + USER_CONTENT_SPLIT + content;
                                }
                                try {
                                    ChatClient.this.getChannel().write(charset.encode(content));
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        } finally {
                            scanner.close();
                        }
                    }).start();
                } else if (key.isReadable()) {
                    key.interestOps(SelectionKey.OP_READ);
                    String content = content();
                    if (content.equals(USER_EXISTS_ERROR)) {
                        username = "";
                    }
                    System.out.println(content);
                }
            } catch (Exception e) {
                if (key != null) {
                    key.cancel();
                    if (key.channel() != null) {
                        try {
                            key.channel().close();
                            // todo 调用连接方法,目前有问题注释掉。
    /*                        semaphore.release();
                            count.incrementAndGet();
                            connect();
                            TimeUnit.MILLISECONDS.sleep(interval);*/
                        } catch (Exception e1) {
                            e1.printStackTrace();
                        }
                    }
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            new ChatClient(3000).run();
        }
    }
    

      

    重连方法也不是不可用,就是有大bug:服务端启动以后,控制台日志会打印出两条客户端连进来的日志。在线人数就翻倍了。

    因为有问题,所以注释掉了。

    查看别人的客户端重连代码,发现根本不可用。可能是我应用的场景不对。

    @Test
        public void test_finishConnect_connect4() throws Exception {
            SocketChannel socketChannel = SocketChannel.open();
            long begin = 0;
            long end = 0;
            try {
                TimeUnit.SECONDS.sleep(1);
                socketChannel.configureBlocking(false);
                boolean result = socketChannel.connect(new InetSocketAddress(3000));
                if(!result){
                    while (!socketChannel.finishConnect()){
                        System.out.println("一直在尝试连接");
                        TimeUnit.SECONDS.sleep(1);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    

      

  • 相关阅读:
    RFM模型
    mysql日期函数(时间函数)
    数据库探索
    anaconhda安装步骤
    mysql安装和环境配置
    mysql时间条件查询
    mysql自连接
    mysql查询注意事项(查询优化)
    mysql常见的保留字和反引号使用
    多表联合查询注意事项(索引)
  • 原文地址:https://www.cnblogs.com/braska/p/12559439.html
Copyright © 2011-2022 走看看