Simple TCP Server
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Optional;
import java.util.function.Consumer;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
public class SimpleTcpServer {
/** CPU 线程数 */
protected static final int CPU_COUNT = Runtime.getRuntime().availableProcessors();
/** 吞吐量计算间隔 (秒) */
protected volatile int throughput = 3;
/** 监听端口 */
protected volatile int port = Short.MAX_VALUE;
/** 关闭标识 */
protected volatile boolean closed = false;
/** {@link java.net.SocketAddress} endpoint */
protected volatile SocketAddress endpoint;
/** {@link org.apache.mina.core.service.IoAcceptor} acceptor */
protected volatile IoAcceptor acceptor;
/**
* Create a tcp server.
*
* @param port 监听端口
*/
public TcpServer(int port) {
this.port = port;
this.endpoint = new InetSocketAddress(port);
}
/**
* Create a tcp server.
*
* @param port 监听端口
* @param throughput 吞吐量计算间隔 (秒)
*/
public TcpServer(int port, int throughput) {
this.throughput = throughput;
this.port = port;
this.endpoint = new InetSocketAddress(port);
}
/**
* Stop the tcp server.
*/
public void stop() {
this.closed = true;
Optional.ofNullable(this.acceptor).ifPresent(x -> { x.unbind(); x.dispose(); });
}
/**
* Start the tcp server.
*
* @param consumer
* @throws Exception
*/
public void start(Consumer<IoAcceptor> consumer) throws Exception {
this.closed = false;
this.acceptor = new NioSocketAcceptor(CPU_COUNT + 1);
this.acceptor.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 30);
this.acceptor.getSessionConfig().setThroughputCalculationInterval(this.throughput);
// this.acceptor.getFilterChain().addLast("codec", new ProtocolCodecFilter(ProtocolCodecFactory));
// this.acceptor.setHandler(IoHandler);
Optional.ofNullable(consumer).ifPresent(x -> x.accept(this.acceptor));
doBind();
}
/**
*
*/
protected void doBind() {
if (this.closed) {
return;
}
while (!this.closed) {
try {
this.acceptor.bind(this.endpoint);
return;
} catch (Exception ex) {
// ignore
ex.printStackTrace();
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore
}
}
}
}
Simple TCP Client
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import org.apache.mina.core.filterchain.IoFilterAdapter;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.future.IoFutureListener;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
public class SimpleTcpClient {
/** 连接地址 */
protected volatile String addr = "127.0.0.1";
/** 连接端口 */
protected volatile int port = Short.MAX_VALUE;
/** 关闭标识 */
protected volatile boolean closed = false;
/** {@link java.net.SocketAddress} endpoint */
protected volatile SocketAddress endpoint;
/** {@link org.apache.mina.transport.socket.nio.NioSocketConnector} connector */
protected volatile NioSocketConnector connector;
/** {@link org.apache.mina.core.session.IoSession} session */
protected volatile IoSession session;
/**
* Create a tcp client.
*
* @param addr IP 地址
* @param port 端口号
*/
public TcpClient(String addr, int port) {
this.addr = addr;
this.port = port;
this.endpoint = new InetSocketAddress(addr, port);
}
/**
* Close the tcp client.
*/
public void stop() {
this.closed = true;
Optional.ofNullable(this.session).ifPresent(x -> x.closeNow());
Optional.ofNullable(this.connector).ifPresent(x -> x.dispose());
}
/**
* Connect to tcp server.
*
* @param consumer
* @throws Exception
*/
public void start(Consumer<NioSocketConnector> consumer) throws Exception {
this.closed = false;
this.connector = new NioSocketConnector();
this.connector.setConnectTimeoutMillis(3000);
this.connector.getSessionConfig().setReuseAddress(true);
this.connector.getSessionConfig().setKeepAlive(true);
this.connector.getSessionConfig().setTcpNoDelay(true);
this.connector.getSessionConfig().setSoLinger(0);
this.connector.getSessionConfig().setIdleTime(IdleStatus.READER_IDLE, 30);
this.connector.getFilterChain().addFirst("reconnection", new IoFilterAdapter() {
@Override
public void sessionClosed(NextFilter nextFilter, IoSession ioSession) throws Exception {
CompletableFuture.runAsync(() -> doConnect());
}
});
// this.connector.getFilterChain().addLast("codec", new ProtocolCodecFilter(ProtocolCodecFactory));
// this.connector.setHandler(IoHandler);
Optional.ofNullable(consumer).ifPresent(x -> x.accept(this.connector));
doConnect();
}
/**
*
*/
protected void doConnect() {
if (this.closed) {
return;
}
this.connector.connect(this.endpoint).addListener(new IoFutureListener<ConnectFuture>() {
@Override
public void operationComplete(ConnectFuture future) {
if (!future.isConnected()) {
CompletableFuture.runAsync(() -> doConnect());
} else {
session = future.getSession();
}
}
});
}
/**
*
* @param data
*/
public void sent(Object data) {
Optional.ofNullable(this.session)
.filter(x -> x.isActive())
.ifPresent(x -> x.write(data));
}
}