参考: https://www.cnblogs.com/yanghuahui/p/3686054.html
https://www.jianshu.com/p/c26a25feb77e
参考上面的代码,写了份demo。雷同率高达98.888888888888~%
本来想加个服务端断掉,客户端尝试重连的功能。发现难度有点大。如有解决方案,希望可以告知,感激不尽。
测试代码
IChat.java
package com.boot.demo.test.io.socket.chat; /** * @author braska * @date 2020/3/23 **/ public interface IChat { void run() throws Exception; }
AbstractChat.java
package com.boot.demo.test.io.socket.chat; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; /** * @author braska * @date 2020/3/23 **/ public abstract class AbstractChat implements IChat { protected final String USER_CONTENT_SPLIT = "#@#"; protected final String USER_EXISTS_ERROR = "system message: user exist, please change a name"; protected final String WELCOME = "welcome %s to chat room! Online numbers: %s"; protected Charset charset = Charset.forName("UTF-8"); protected int port; protected Selector selector; protected SelectionKey currentKey; @Override public void run() throws Exception { while (this.selector.select() > 0) { for (Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator(); iter.hasNext(); ) { currentKey = iter.next(); iter.remove(); keyHandler(currentKey); } } } protected abstract void keyHandler(SelectionKey key) throws Exception; protected SocketChannel getChannel() { return (SocketChannel) currentKey.channel(); } protected String content() throws Exception { ByteBuffer byteBuffer = ByteBuffer.allocate(1024); StringBuilder builder = new StringBuilder(256); while (getChannel().read(byteBuffer) > 0) { byteBuffer.flip(); builder.append(charset.decode(byteBuffer)); byteBuffer.clear(); } return builder.toString(); } }
ClientServer.java
package com.boot.demo.test.io.socket.chat; import com.google.common.base.Strings; import com.google.common.collect.Maps; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.*; import java.util.Map; /** * @author braska * @date 2020/3/23 **/ public class ChatServer extends AbstractChat { private int port; private final Map<String, SocketChannel> users; public ChatServer(int port) throws Exception { this.users = Maps.newConcurrentMap(); this.port = port; ServerSocketChannel channel = ServerSocketChannel.open(); channel.socket().bind(new InetSocketAddress(this.port)); channel.configureBlocking(false); this.selector = Selector.open(); channel.register(selector, SelectionKey.OP_ACCEPT); } @Override public void keyHandler(SelectionKey key) { try { if (key.isAcceptable()) { // 客户端连入 ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); sc.register(this.selector, SelectionKey.OP_READ); key.interestOps(SelectionKey.OP_ACCEPT); System.out.println("Server is listening from client :" + sc.getRemoteAddress()); sc.write(charset.encode("Please input your name:")); } else if (key.isReadable()) { // 接收客户端消息 SocketChannel channel = getChannel(); key.interestOps(SelectionKey.OP_READ); String content = content(); if (!Strings.isNullOrEmpty(content)) { String[] nameAndContent = content.split(USER_CONTENT_SPLIT); if (nameAndContent != null && nameAndContent.length == 1) { if (users.containsKey(nameAndContent[0]) && users.get(nameAndContent[0]).isConnected()) { channel.write(charset.encode(USER_EXISTS_ERROR)); } else { users.put(nameAndContent[0], channel); dispatch(String.format(WELCOME, nameAndContent[0], countUsers()), null); } } else { content = nameAndContent[0] + " said: " + content.replace(String.format("%s%s", nameAndContent[0], USER_CONTENT_SPLIT), ""); if (users.containsKey(nameAndContent[0])) { dispatch(content, channel); } } } } } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { try { for (Map.Entry<String, SocketChannel> entry : users.entrySet()) { if (entry.getValue().equals(key.channel())) { dispatch(entry.getKey() + " has gone.", key.channel()); break; } } key.channel().close(); } catch (IOException e1) { e1.printStackTrace(); } } } } } private long countUsers() { return this.selector.keys().stream().filter(key -> key.channel() instanceof SocketChannel).count(); } private void dispatch(String content, SelectableChannel channel) { selector.keys().stream() .filter(key -> key.channel() instanceof SocketChannel) .filter(key -> key.channel() != channel) .forEach(key -> { SocketChannel c = (SocketChannel) key.channel(); try { c.write(charset.encode(content)); } catch (IOException e) { e.printStackTrace(); } }); } public static void main(String[] args) throws Exception { new ChatServer(3000).run(); } }
ChatClient.java
package com.boot.demo.test.io.socket.chat; import org.apache.logging.log4j.util.Strings; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Scanner; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; /** * @author braska * @date 2020/3/23 **/ public class ChatClient extends AbstractChat { private String username; private Integer retry = 20; private int interval = 1000; private AtomicInteger count = new AtomicInteger(0); private Semaphore semaphore = new Semaphore(1); public ChatClient(int port) throws Exception { this.port = port; connect(); } public void setRetry(Integer retry) { this.retry = retry; } public void setInteval(int inteval) { this.interval = inteval; } private void connect() throws Exception { this.selector = Selector.open(); SocketChannel channel = SocketChannel.open(); channel.configureBlocking(false); channel.connect(new InetSocketAddress(this.port)); channel.register(selector, SelectionKey.OP_CONNECT); if (semaphore.tryAcquire() && count.get() != 0 && count.get() <= retry) { System.out.println("try to connect chat server " + count.get() + "time"); connect(); } } @Override public void keyHandler(SelectionKey key) { try { SocketChannel channel = getChannel(); if (key.isValid() && key.isConnectable()) { if (channel.finishConnect()) { count.set(0); username = ""; } channel.register(this.selector, SelectionKey.OP_READ); new Thread(() -> { Scanner scanner = new Scanner(System.in); try { while (scanner.hasNextLine()) { String content = scanner.nextLine(); if ("".equals(content)) continue; if (!Strings.isNotBlank(username)) { username = content; content += USER_CONTENT_SPLIT; } else { content = username + USER_CONTENT_SPLIT + content; } try { ChatClient.this.getChannel().write(charset.encode(content)); } catch (IOException e) { e.printStackTrace(); } } } finally { scanner.close(); } }).start(); } else if (key.isReadable()) { key.interestOps(SelectionKey.OP_READ); String content = content(); if (content.equals(USER_EXISTS_ERROR)) { username = ""; } System.out.println(content); } } catch (Exception e) { if (key != null) { key.cancel(); if (key.channel() != null) { try { key.channel().close(); // todo 调用连接方法,目前有问题注释掉。 /* semaphore.release(); count.incrementAndGet(); connect(); TimeUnit.MILLISECONDS.sleep(interval);*/ } catch (Exception e1) { e1.printStackTrace(); } } } } } public static void main(String[] args) throws Exception { new ChatClient(3000).run(); } }
重连方法也不是不可用,就是有大bug:服务端启动以后,控制台日志会打印出两条客户端连进来的日志。在线人数就翻倍了。
因为有问题,所以注释掉了。
查看别人的客户端重连代码,发现根本不可用。可能是我应用的场景不对。
@Test public void test_finishConnect_connect4() throws Exception { SocketChannel socketChannel = SocketChannel.open(); long begin = 0; long end = 0; try { TimeUnit.SECONDS.sleep(1); socketChannel.configureBlocking(false); boolean result = socketChannel.connect(new InetSocketAddress(3000)); if(!result){ while (!socketChannel.finishConnect()){ System.out.println("一直在尝试连接"); TimeUnit.SECONDS.sleep(1); } } } catch (Exception e) { e.printStackTrace(); } }