参考: https://www.cnblogs.com/yanghuahui/p/3686054.html
https://www.jianshu.com/p/c26a25feb77e
参考上面的代码,写了份demo。雷同率高达98.888888888888~%
本来想加个服务端断掉,客户端尝试重连的功能。发现难度有点大。如有解决方案,希望可以告知,感激不尽。
测试代码
IChat.java
package com.boot.demo.test.io.socket.chat;
/**
* @author braska
* @date 2020/3/23
**/
public interface IChat {
void run() throws Exception;
}
AbstractChat.java
package com.boot.demo.test.io.socket.chat;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
/**
* @author braska
* @date 2020/3/23
**/
public abstract class AbstractChat implements IChat {
protected final String USER_CONTENT_SPLIT = "#@#";
protected final String USER_EXISTS_ERROR = "system message: user exist, please change a name";
protected final String WELCOME = "welcome %s to chat room! Online numbers: %s";
protected Charset charset = Charset.forName("UTF-8");
protected int port;
protected Selector selector;
protected SelectionKey currentKey;
@Override
public void run() throws Exception {
while (this.selector.select() > 0) {
for (Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator(); iter.hasNext(); ) {
currentKey = iter.next();
iter.remove();
keyHandler(currentKey);
}
}
}
protected abstract void keyHandler(SelectionKey key) throws Exception;
protected SocketChannel getChannel() {
return (SocketChannel) currentKey.channel();
}
protected String content() throws Exception {
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
StringBuilder builder = new StringBuilder(256);
while (getChannel().read(byteBuffer) > 0) {
byteBuffer.flip();
builder.append(charset.decode(byteBuffer));
byteBuffer.clear();
}
return builder.toString();
}
}
ClientServer.java
package com.boot.demo.test.io.socket.chat;
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.*;
import java.util.Map;
/**
* @author braska
* @date 2020/3/23
**/
public class ChatServer extends AbstractChat {
private int port;
private final Map<String, SocketChannel> users;
public ChatServer(int port) throws Exception {
this.users = Maps.newConcurrentMap();
this.port = port;
ServerSocketChannel channel = ServerSocketChannel.open();
channel.socket().bind(new InetSocketAddress(this.port));
channel.configureBlocking(false);
this.selector = Selector.open();
channel.register(selector, SelectionKey.OP_ACCEPT);
}
@Override
public void keyHandler(SelectionKey key) {
try {
if (key.isAcceptable()) { // 客户端连入
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
sc.register(this.selector, SelectionKey.OP_READ);
key.interestOps(SelectionKey.OP_ACCEPT);
System.out.println("Server is listening from client :" + sc.getRemoteAddress());
sc.write(charset.encode("Please input your name:"));
} else if (key.isReadable()) { // 接收客户端消息
SocketChannel channel = getChannel();
key.interestOps(SelectionKey.OP_READ);
String content = content();
if (!Strings.isNullOrEmpty(content)) {
String[] nameAndContent = content.split(USER_CONTENT_SPLIT);
if (nameAndContent != null && nameAndContent.length == 1) {
if (users.containsKey(nameAndContent[0]) && users.get(nameAndContent[0]).isConnected()) {
channel.write(charset.encode(USER_EXISTS_ERROR));
} else {
users.put(nameAndContent[0], channel);
dispatch(String.format(WELCOME, nameAndContent[0], countUsers()), null);
}
} else {
content = nameAndContent[0] + " said: " +
content.replace(String.format("%s%s", nameAndContent[0], USER_CONTENT_SPLIT), "");
if (users.containsKey(nameAndContent[0])) {
dispatch(content, channel);
}
}
}
}
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
try {
for (Map.Entry<String, SocketChannel> entry : users.entrySet()) {
if (entry.getValue().equals(key.channel())) {
dispatch(entry.getKey() + " has gone.", key.channel());
break;
}
}
key.channel().close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
private long countUsers() {
return this.selector.keys().stream().filter(key -> key.channel() instanceof SocketChannel).count();
}
private void dispatch(String content, SelectableChannel channel) {
selector.keys().stream()
.filter(key -> key.channel() instanceof SocketChannel)
.filter(key -> key.channel() != channel)
.forEach(key -> {
SocketChannel c = (SocketChannel) key.channel();
try {
c.write(charset.encode(content));
} catch (IOException e) {
e.printStackTrace();
}
});
}
public static void main(String[] args) throws Exception {
new ChatServer(3000).run();
}
}
ChatClient.java
package com.boot.demo.test.io.socket.chat;
import org.apache.logging.log4j.util.Strings;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Scanner;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
/**
* @author braska
* @date 2020/3/23
**/
public class ChatClient extends AbstractChat {
private String username;
private Integer retry = 20;
private int interval = 1000;
private AtomicInteger count = new AtomicInteger(0);
private Semaphore semaphore = new Semaphore(1);
public ChatClient(int port) throws Exception {
this.port = port;
connect();
}
public void setRetry(Integer retry) {
this.retry = retry;
}
public void setInteval(int inteval) {
this.interval = inteval;
}
private void connect() throws Exception {
this.selector = Selector.open();
SocketChannel channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(this.port));
channel.register(selector, SelectionKey.OP_CONNECT);
if (semaphore.tryAcquire() && count.get() != 0 && count.get() <= retry) {
System.out.println("try to connect chat server " + count.get() + "time");
connect();
}
}
@Override
public void keyHandler(SelectionKey key) {
try {
SocketChannel channel = getChannel();
if (key.isValid() && key.isConnectable()) {
if (channel.finishConnect()) {
count.set(0);
username = "";
}
channel.register(this.selector, SelectionKey.OP_READ);
new Thread(() -> {
Scanner scanner = new Scanner(System.in);
try {
while (scanner.hasNextLine()) {
String content = scanner.nextLine();
if ("".equals(content)) continue;
if (!Strings.isNotBlank(username)) {
username = content;
content += USER_CONTENT_SPLIT;
} else {
content = username + USER_CONTENT_SPLIT + content;
}
try {
ChatClient.this.getChannel().write(charset.encode(content));
} catch (IOException e) {
e.printStackTrace();
}
}
} finally {
scanner.close();
}
}).start();
} else if (key.isReadable()) {
key.interestOps(SelectionKey.OP_READ);
String content = content();
if (content.equals(USER_EXISTS_ERROR)) {
username = "";
}
System.out.println(content);
}
} catch (Exception e) {
if (key != null) {
key.cancel();
if (key.channel() != null) {
try {
key.channel().close();
// todo 调用连接方法,目前有问题注释掉。
/* semaphore.release();
count.incrementAndGet();
connect();
TimeUnit.MILLISECONDS.sleep(interval);*/
} catch (Exception e1) {
e1.printStackTrace();
}
}
}
}
}
public static void main(String[] args) throws Exception {
new ChatClient(3000).run();
}
}
重连方法也不是不可用,就是有大bug:服务端启动以后,控制台日志会打印出两条客户端连进来的日志。在线人数就翻倍了。
因为有问题,所以注释掉了。
查看别人的客户端重连代码,发现根本不可用。可能是我应用的场景不对。
@Test
public void test_finishConnect_connect4() throws Exception {
SocketChannel socketChannel = SocketChannel.open();
long begin = 0;
long end = 0;
try {
TimeUnit.SECONDS.sleep(1);
socketChannel.configureBlocking(false);
boolean result = socketChannel.connect(new InetSocketAddress(3000));
if(!result){
while (!socketChannel.finishConnect()){
System.out.println("一直在尝试连接");
TimeUnit.SECONDS.sleep(1);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}