之前对Java nio的了解局限于简答的socket数据传输,对tcp沾包、拆包有一些简单的了解,但是没有深入的研究。后来
简单的了解了一些tcp知识,例如:tcp滑动窗口。
最近粗略的看了一下zookeeper的源码,了解了zookeeper的底层实现,比如zk节点保存结构是使用的DataTree,临时节点使用的是Map<String sessionId,Set
zk中实现了基于java nio和netty两种socket编程,本文根据zk Java nio实现socket服务端编程。测试时,客户端使用的是telnet进来连接。
java nio中比较核心的几个类:
java.nio.channels.ServerSocketChannel
java.nio.channels.SocketChannel
java.nio.channels.SelectionKey
java.nio.channels.Selector
以下是核心代码,相关引用代码请clone zookeeper代码进行查看学习
package server;
import common.ExpiryQueue;
import common.WorkerService;
import common.ZooKeeperThread;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
/**
* Created by hezhengkui on 2019/7/15.
*/
@Slf4j
public class NioServerFactory {
protected final Set<NIOServerCnxn> cnxns = Collections.newSetFromMap(new ConcurrentHashMap<NIOServerCnxn, Boolean>());
protected int maxClientCnxns = 60;
private volatile boolean stopped = true;
private AcceptThread acceptThread;
private ConnectionExpirerThread expirerThread;
private final ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>> ipMap = new ConcurrentHashMap<InetAddress, Set<NIOServerCnxn>>();
private ExpiryQueue<NIOServerCnxn> cnxnExpiryQueue;
private final Set<SelectorThread> selectorThreads = new HashSet<SelectorThread>();
protected WorkerService workerPool;
final ConcurrentHashMap<Long, NIOServerCnxn> sessionMap = new ConcurrentHashMap<Long, NIOServerCnxn>();
private int numSelectorThreads;
private int numWorkerThreads;
private long workerShutdownTimeoutMS;
public static final String ZOOKEEPER_NIO_NUM_SELECTOR_THREADS = "zookeeper.nio.numSelectorThreads";
public static final String ZOOKEEPER_NIO_NUM_WORKER_THREADS = "zookeeper.nio.numWorkerThreads";
/**
* Default worker pool shutdown timeout in ms: 5000 (5s)
*/
public static final String ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT = "zookeeper.nio.shutdownTimeout";
static {
try {
Selector.open().close();
} catch (IOException ie) {
log.error("Selector failed to open", ie);
}
}
private abstract class AbstractSelectThread extends ZooKeeperThread {
protected final Selector selector;
public AbstractSelectThread(String name) throws IOException {
super(name);
// Allows the JVM to shutdown even if this thread is still running.
setDaemon(true);
this.selector = Selector.open();
}
public void wakeupSelector() {
selector.wakeup();
}
/**
* Close the selector. This should be called when the thread is about to
* exit and no operation is going to be performed on the Selector or
* SelectionKey
*/
protected void closeSelector() {
try {
selector.close();
} catch (IOException e) {
log.warn("ignored exception during selector close "
+ e.getMessage());
}
}
protected void cleanupSelectionKey(SelectionKey key) {
if (key != null) {
try {
key.cancel();
} catch (Exception ex) {
if (log.isDebugEnabled()) {
log.debug("ignoring exception during selectionkey cancel", ex);
}
}
}
}
protected void fastCloseSock(SocketChannel sc) {
if (sc != null) {
try {
// Hard close immediately, discarding buffers
sc.socket().setSoLinger(true, 0);
} catch (SocketException e) {
log.warn("Unable to set socket linger to 0, socket close"
+ " may stall in CLOSE_WAIT", e);
}
closeSock(sc);
}
}
}
public static void closeSock(SocketChannel sock) {
if (sock.isOpen() == false) {
return;
}
try {
/*
* The following sequence of code is stupid! You would think that
* only sock.close() is needed, but alas, it doesn't work that way.
* If you just do sock.close() there are cases where the socket
* doesn't actually close...
*/
sock.socket().shutdownOutput();
} catch (IOException e) {
// This is a relatively common exception that we can't avoid
if (log.isDebugEnabled()) {
log.debug("ignoring exception during output shutdown", e);
}
}
try {
sock.socket().shutdownInput();
} catch (IOException e) {
// This is a relatively common exception that we can't avoid
if (log.isDebugEnabled()) {
log.debug("ignoring exception during input shutdown", e);
}
}
try {
sock.socket().close();
} catch (IOException e) {
if (log.isDebugEnabled()) {
log.debug("ignoring exception during socket close", e);
}
}
try {
sock.close();
} catch (IOException e) {
if (log.isDebugEnabled()) {
log.debug("ignoring exception during socketchannel close", e);
}
}
}
private class AcceptThread extends AbstractSelectThread {
private final ServerSocketChannel acceptSocket;
private final SelectionKey acceptKey;
private final Collection<SelectorThread> selectorThreads;
private Iterator<SelectorThread> selectorIterator;
private volatile boolean reconfiguring = false;
public AcceptThread(ServerSocketChannel ss, InetSocketAddress addr,
Set<SelectorThread> selectorThreads) throws IOException {
super("NIOServerCxnFactory.AcceptThread:" + addr);
this.acceptSocket = ss;
this.acceptKey = acceptSocket.register(selector, SelectionKey.OP_ACCEPT);
this.selectorThreads = Collections.unmodifiableList(new ArrayList<SelectorThread>(selectorThreads));
selectorIterator = this.selectorThreads.iterator();
}
public void run() {
try {
while (!stopped && !acceptSocket.socket().isClosed()) {
try {
select();
} catch (RuntimeException e) {
log.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
log.warn("Ignoring unexpected exception", e);
}
}
} finally {
closeSelector();
// This will wake up the selector threads, and tell the
// worker thread pool to begin shutdown.
if (!reconfiguring) {
NioServerFactory.this.stop();
}
log.info("accept thread exitted run method");
}
}
public void setReconfiguring() {
reconfiguring = true;
}
private void select() {
try {
int select = selector.select();
Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
while (!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
if (key.isAcceptable()) {
if (!doAccept()) {
// If unable to pull a new connection off the accept
// queue, pause accepting to give us time to free
// up file descriptors and so the accept thread
// doesn't spin in a tight loop.
pauseAccept(10);
}
} else {
log.warn("Unexpected ops in accept select "
+ key.readyOps());
}
}
} catch (IOException e) {
log.warn("Ignoring IOException while selecting", e);
}
}
/**
* Mask off the listen socket interest ops and use select() to sleep
* so that other threads can wake us up by calling wakeup() on the
* selector.
*/
private void pauseAccept(long millisecs) {
acceptKey.interestOps(0);
try {
selector.select(millisecs);
} catch (IOException e) {
// ignore
} finally {
acceptKey.interestOps(SelectionKey.OP_ACCEPT);
}
}
/**
* Accept new socket connections. Enforces maximum number of connections
* per client IP address. Round-robin assigns to selector thread for
* handling. Returns whether pulled a connection off the accept queue
* or not. If encounters an error attempts to fast close the socket.
*
* @return whether was able to accept a connection or not
*/
private boolean doAccept() {
boolean accepted = false;
SocketChannel sc = null;
try {
sc = acceptSocket.accept();
accepted = true;
InetAddress ia = sc.socket().getInetAddress();
int cnxncount = getClientCnxnCount(ia);
if (maxClientCnxns > 0 && cnxncount >= maxClientCnxns) {
throw new IOException("Too many connections from " + ia
+ " - max is " + maxClientCnxns);
}
log.debug("Accepted socket connection from "
+ sc.socket().getRemoteSocketAddress());
sc.configureBlocking(false);
// Round-robin assign this connection to a selector thread
if (!selectorIterator.hasNext()) {
selectorIterator = selectorThreads.iterator();
}
SelectorThread selectorThread = selectorIterator.next();
//交给selector 线程执行
if (!selectorThread.addAcceptedConnection(sc)) {
throw new IOException(
"Unable to add connection to selector queue"
+ (stopped ? " (shutdown in progress)" : ""));
}
} catch (IOException e) {
e.printStackTrace();
// accept, maxClientCnxns, configureBlocking
fastCloseSock(sc);
}
return accepted;
}
}
ServerSocketChannel ss;
public void stop() {
stopped = true;
// Stop queuing connection attempts
try {
ss.close();
} catch (IOException e) {
log.warn("Error closing listen socket", e);
}
if (acceptThread != null) {
if (acceptThread.isAlive()) {
acceptThread.wakeupSelector();
} else {
acceptThread.closeSelector();
}
}
if (expirerThread != null) {
expirerThread.interrupt();
}
for (SelectorThread thread : selectorThreads) {
if (thread.isAlive()) {
thread.wakeupSelector();
} else {
thread.closeSelector();
}
}
if (workerPool != null) {
workerPool.stop();
}
}
class SelectorThread extends AbstractSelectThread {
private final int id;
private final Queue<SocketChannel> acceptedQueue;
private final Queue<SelectionKey> updateQueue;
public SelectorThread(int id) throws IOException {
super("NIOServerCxnFactory.SelectorThread-" + id);
this.id = id;
acceptedQueue = new LinkedBlockingQueue<SocketChannel>();
updateQueue = new LinkedBlockingQueue<SelectionKey>();
}
/**
* Place new accepted connection onto a queue for adding. Do this
* so only the selector thread modifies what keys are registered
* with the selector.
*/
public boolean addAcceptedConnection(SocketChannel accepted) {
if (stopped || !acceptedQueue.offer(accepted)) {
return false;
}
wakeupSelector();
return true;
}
/**
* Place interest op update requests onto a queue so that only the
* selector thread modifies interest ops, because interest ops
* reads/sets are potentially blocking operations if other select
* operations are happening.
*/
public boolean addInterestOpsUpdateRequest(SelectionKey sk) {
if (stopped || !updateQueue.offer(sk)) {
return false;
}
wakeupSelector();
return true;
}
/**
* The main loop for the thread selects() on the connections and
* dispatches ready I/O work requests, then registers all pending
* newly accepted connections and updates any interest ops on the
* queue.
*/
public void run() {
try {
while (!stopped) {
try {
//accept事件第一次select()方法不会有任何事件触发,通过seletor.wakeUp()唤起,
select();
//这个方法会触发registor(SelectorKey.OP_READ)事件
processAcceptedConnections();
processInterestOpsUpdateRequests();
} catch (RuntimeException e) {
log.warn("Ignoring unexpected runtime exception", e);
} catch (Exception e) {
log.warn("Ignoring unexpected exception", e);
}
}
// Close connections still pending on the selector. Any others
// with in-flight work, let drain out of the work queue.
for (SelectionKey key : selector.keys()) {
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
if (cnxn.isSelectable()) {
cnxn.close(NIOServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
}
cleanupSelectionKey(key);
}
SocketChannel accepted;
while ((accepted = acceptedQueue.poll()) != null) {
fastCloseSock(accepted);
}
updateQueue.clear();
} finally {
closeSelector();
// This will wake up the accept thread and the other selector
// threads, and tell the worker thread pool to begin shutdown.
NioServerFactory.this.stop();
log.info("selector thread exitted run method");
}
}
private void select() {
try {
//selector.wakeUp()方法会唤起select()方法执行
//registor()方法同样能唤起select()方法执行
int select = selector.select();
Set<SelectionKey> selected = selector.selectedKeys();
ArrayList<SelectionKey> selectedList = new ArrayList<SelectionKey>(selected);
Collections.shuffle(selectedList);
Iterator<SelectionKey> selectedKeys = selectedList.iterator();
while (!stopped && selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
selected.remove(key);
if (!key.isValid()) {
cleanupSelectionKey(key);
co 大专栏 java-nio-serverntinue;
}
if (key.isReadable() || key.isWritable()) {
handleIO(key);
} else {
log.warn("Unexpected ops in select " + key.readyOps());
}
}
} catch (IOException e) {
log.warn("Ignoring IOException while selecting", e);
}
}
/**
* Schedule I/O for processing on the connection associated with
* the given SelectionKey. If a worker thread pool is not being used,
* I/O is run directly by this thread.
*/
private void handleIO(SelectionKey key) {
IOWorkRequest workRequest = new IOWorkRequest(this, key);
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
// Stop selecting this key while processing on its
// connection
cnxn.disableSelectable();
key.interestOps(0);
touchCnxn(cnxn);
workerPool.schedule(workRequest);
}
/**
* Iterate over the queue of accepted connections that have been
* assigned to this thread but not yet placed on the selector.
*/
private void processAcceptedConnections() {
SocketChannel accepted;
while (!stopped && (accepted = acceptedQueue.poll()) != null) {
SelectionKey key = null;
try {
key = accepted.register(selector, SelectionKey.OP_READ);
NIOServerCnxn cnxn = createConnection(accepted, key, this);
key.attach(cnxn);
addCnxn(cnxn);
} catch (IOException e) {
e.printStackTrace();
// register, createConnection
cleanupSelectionKey(key);
fastCloseSock(accepted);
}
}
}
/**
* Iterate over the queue of connections ready to resume selection,
* and restore their interest ops selection mask.
*/
private void processInterestOpsUpdateRequests() {
SelectionKey key;
while (!stopped && (key = updateQueue.poll()) != null) {
if (!key.isValid()) {
cleanupSelectionKey(key);
}
NIOServerCnxn cnxn = (NIOServerCnxn) key.attachment();
if (cnxn.isSelectable()) {
key.interestOps(cnxn.getInterestOps());
}
}
}
}
private void addCnxn(NIOServerCnxn cnxn) throws IOException {
InetAddress addr = cnxn.getSocketAddress();
if (addr == null) {
throw new IOException("Socket of " + cnxn + " has been closed");
}
Set<NIOServerCnxn> set = ipMap.get(addr);
if (set == null) {
// in general we will see 1 connection from each
// host, setting the initial cap to 2 allows us
// to minimize mem usage in the common case
// of 1 entry -- we need to set the initial cap
// to 2 to avoid rehash when the first entry is added
// Construct a ConcurrentHashSet using a ConcurrentHashMap
set = Collections.newSetFromMap(
new ConcurrentHashMap<NIOServerCnxn, Boolean>(2));
// Put the new set in the map, but only if another thread
// hasn't beaten us to it
Set<NIOServerCnxn> existingSet = ipMap.putIfAbsent(addr, set);
if (existingSet != null) {
set = existingSet;
}
}
set.add(cnxn);
cnxns.add(cnxn);
touchCnxn(cnxn);
}
public void touchCnxn(NIOServerCnxn cnxn) {
cnxnExpiryQueue.update(cnxn, cnxn.getSessionTimeout());
}
public boolean removeCnxn(NIOServerCnxn cnxn) {
// If the connection is not in the master list it's already been closed
if (!cnxns.remove(cnxn)) {
return false;
}
cnxnExpiryQueue.remove(cnxn);
removeCnxnFromSessionMap(cnxn);
InetAddress addr = cnxn.getSocketAddress();
if (addr != null) {
Set<NIOServerCnxn> set = ipMap.get(addr);
if (set != null) {
set.remove(cnxn);
// Note that we make no effort here to remove empty mappings
// from ipMap.
}
}
return true;
}
protected NIOServerCnxn createConnection(SocketChannel sock,
SelectionKey sk, SelectorThread selectorThread) throws IOException {
return new NIOServerCnxn(sock, sk, this, selectorThread);
}
public void removeCnxnFromSessionMap(NIOServerCnxn cnxn) {
long sessionId = cnxn.getSessionId();
if (sessionId != 0) {
sessionMap.remove(sessionId);
}
}
private class ConnectionExpirerThread extends ZooKeeperThread {
ConnectionExpirerThread() {
super("ConnnectionExpirer");
}
public void run() {
try {
while (!stopped) {
long waitTime = cnxnExpiryQueue.getWaitTime();
if (waitTime > 0) {
Thread.sleep(waitTime);
continue;
}
for (NIOServerCnxn conn : cnxnExpiryQueue.poll()) {
// ServerMetrics.getMetrics().SESSIONLESS_CONNECTIONS_EXPIRED.add(1);
conn.close(NIOServerCnxn.DisconnectReason.CONNECTION_EXPIRED);
}
}
} catch (InterruptedException e) {
log.info("ConnnectionExpirerThread interrupted");
}
}
}
private int getClientCnxnCount(InetAddress cl) {
Set<NIOServerCnxn> s = ipMap.get(cl);
if (s == null) return 0;
return s.size();
}
private class IOWorkRequest extends WorkerService.WorkRequest {
private final SelectorThread selectorThread;
private final SelectionKey key;
private final NIOServerCnxn cnxn;
IOWorkRequest(SelectorThread selectorThread, SelectionKey key) {
this.selectorThread = selectorThread;
this.key = key;
this.cnxn = (NIOServerCnxn) key.attachment();
}
public void doWork() throws InterruptedException {
if (!key.isValid()) {
selectorThread.cleanupSelectionKey(key);
return;
}
if (key.isReadable() || key.isWritable()) {
cnxn.doIO(key);
// Check if we shutdown or doIO() closed this connection
if (stopped) {
cnxn.close(NIOServerCnxn.DisconnectReason.SERVER_SHUTDOWN);
return;
}
if (!key.isValid()) {
selectorThread.cleanupSelectionKey(key);
return;
}
touchCnxn(cnxn);
}
// Mark this connection as once again ready for selection
cnxn.enableSelectable();
// Push an update request on the queue to resume selecting
// on the current set of interest ops, which may have changed
// as a result of the I/O operations we just performed.
if (!selectorThread.addInterestOpsUpdateRequest(key)) {
cnxn.close(NIOServerCnxn.DisconnectReason.CONNECTION_MODE_CHANGED);
}
}
@Override
public void cleanup() {
cnxn.close(NIOServerCnxn.DisconnectReason.CLEAN_UP);
}
}
public void configure(InetSocketAddress addr, int maxcc) throws IOException {
maxClientCnxns = maxcc;
cnxnExpiryQueue =
new ExpiryQueue<NIOServerCnxn>(10000);
expirerThread = new ConnectionExpirerThread();
int numCores = Runtime.getRuntime().availableProcessors();
// 32 cores sweet spot seems to be 4 selector threads
numSelectorThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_SELECTOR_THREADS,
Math.max((int) Math.sqrt((float) numCores / 2), 1));
if (numSelectorThreads < 1) {
throw new IOException("numSelectorThreads must be at least 1");
}
numWorkerThreads = Integer.getInteger(
ZOOKEEPER_NIO_NUM_WORKER_THREADS, 2 * numCores);
workerShutdownTimeoutMS = Long.getLong(
ZOOKEEPER_NIO_SHUTDOWN_TIMEOUT, 5000);
for (int i = 0; i < numSelectorThreads; ++i) {
selectorThreads.add(new SelectorThread(i));
}
this.ss = ServerSocketChannel.open();
ss.socket().setReuseAddress(true);
log.info("binding to port " + addr);
ss.socket().bind(addr);
ss.configureBlocking(false);
acceptThread = new AcceptThread(ss, addr, selectorThreads);
}
public void start() {
stopped = false;
if (workerPool == null) {
workerPool = new WorkerService(
"NIOWorker", numWorkerThreads, false);
}
for (SelectorThread thread : selectorThreads) {
if (thread.getState() == Thread.State.NEW) {
thread.start();
}
}
// ensure thread is started once and only once
if (acceptThread.getState() == Thread.State.NEW) {
acceptThread.start();
}
if (expirerThread.getState() == Thread.State.NEW) {
expirerThread.start();
}
}
public static void main(String[] args) throws IOException {
NioServerFactory factory = new NioServerFactory();
InetSocketAddress address = new InetSocketAddress(9999);
factory.configure(address, 3);
factory.start();
}
}
package server;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* Created by hezhengkui on 2019/7/15.
*/
@Slf4j
public class NIOServerCnxn {
private final SelectionKey sk;
private final SocketChannel sock;
private final NioServerFactory.SelectorThread selectorThread;
private final NioServerFactory factory;
private final AtomicBoolean selectable = new AtomicBoolean(true);
private volatile boolean stale = false;
private long sessionId;
protected DisconnectReason disconnectReason = DisconnectReason.UNKNOWN;
private int sessionTimeout = 100000000;
private final AtomicBoolean throttled = new AtomicBoolean(false);
private final Queue<ByteBuffer> outgoingBuffers =
new LinkedBlockingQueue<ByteBuffer>();
public NIOServerCnxn(SocketChannel sock, SelectionKey sk, NioServerFactory factory,
NioServerFactory.SelectorThread selectorThread) throws IOException {
this.sock = sock;
this.sk = sk;
this.factory = factory;
this.selectorThread = selectorThread;
sock.socket().setTcpNoDelay(true);
sock.socket().setSoLinger(false, -1);
InetAddress addr = ((InetSocketAddress) sock.socket().getRemoteSocketAddress()).getAddress();
}
public void enableSelectable() {
selectable.set(true);
}
public boolean isSelectable() {
return sk.isValid() && selectable.get();
}
public void close(DisconnectReason reason) {
disconnectReason = reason;
close();
}
public InetAddress getSocketAddress() {
if (sock.isOpen() == false) {
return null;
}
return sock.socket().getInetAddress();
}
public int getInterestOps() {
if (!isSelectable()) {
return 0;
}
int interestOps = 0;
if (getReadInterest()) {
interestOps |= SelectionKey.OP_READ;
}
if (getWriteInterest()) {
interestOps |= SelectionKey.OP_WRITE;
}
return interestOps;
}
private boolean getWriteInterest() {
return !outgoingBuffers.isEmpty();
}
private boolean getReadInterest() {
return !throttled.get();
}
public void setStale() {
stale = true;
}
public long getSessionId() {
return sessionId;
}
private void close() {
setStale();
if (!factory.removeCnxn(this)) {
return;
}
if (sk != null) {
try {
// need to cancel this selection key from the selector
sk.cancel();
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("ignoring exception during selectionkey cancel", e);
}
}
}
closeSock();
}
public void disableSelectable() {
selectable.set(false);
}
public int getSessionTimeout() {
return sessionTimeout;
}
private void closeSock() {
if (sock.isOpen() == false) {
return;
}
log.debug("Closed socket connection for client "
+ sock.socket().getRemoteSocketAddress()
+ (sessionId != 0 ?
" which had sessionid 0x" + Long.toHexString(sessionId) :
" (no session established for client)"));
closeSock(sock);
}
public static void closeSock(SocketChannel sock) {
if (sock.isOpen() == false) {
return;
}
try {
/*
* The following sequence of code is stupid! You would think that
* only sock.close() is needed, but alas, it doesn't work that way.
* If you just do sock.close() there are cases where the socket
* doesn't actually close...
*/
sock.socket().shutdownOutput();
} catch (IOException e) {
// This is a relatively common exception that we can't avoid
if (log.isDebugEnabled()) {
log.debug("ignoring exception during output shutdown", e);
}
}
try {
sock.socket().shutdownInput();
} catch (IOException e) {
// This is a relatively common exception that we can't avoid
if (log.isDebugEnabled()) {
log.debug("ignoring exception during input shutdown", e);
}
}
try {
sock.socket().close();
} catch (IOException e) {
if (log.isDebugEnabled()) {
log.debug("ignoring exception during socket close", e);
}
}
try {
sock.close();
} catch (IOException e) {
if (log.isDebugEnabled()) {
log.debug("ignoring exception during socketchannel close", e);
}
}
}
protected boolean isSocketOpen() {
return sock.isOpen();
}
/**
* 具体的read、write事件在这里处理,进行数据包的拆分
* @param k
* @throws InterruptedException
*/
void doIO(SelectionKey k) throws InterruptedException {
try {
if (isSocketOpen() == false) {
log.warn("trying to do i/o on a null socket for session:0x"
+ Long.toHexString(sessionId));
return;
}
if (k.isReadable()) {//只有客户端发送数据,才会触发这里方法的执行
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
int rc = sock.read(byteBuffer);
byteBuffer.flip();
log.debug("------r--{}, {}, {}", k.interestOps(), rc, new String(byteBuffer.array()));
}
if (k.isWritable()) {
log.debug("------w--", k.interestOps());
}
} catch (IOException e) {
e.printStackTrace();
} catch (CancelledKeyException e) {
e.printStackTrace();
log.warn("CancelledKeyException causing close of session 0x" + Long.toHexString(sessionId));
if (log.isDebugEnabled()) {
log.debug("CancelledKeyException stack trace", e);
}
close(DisconnectReason.CANCELLED_KEY_EXCEPTION);
}
}
public enum DisconnectReason {
UNKNOWN("unknown"),
SERVER_SHUTDOWN("server_shutdown"),
CONNECTION_EXPIRED("connection_expired"),
CANCELLED_KEY_EXCEPTION("cancelled_key_exception"),
CLEAN_UP("clean_up"),
CONNECTION_MODE_CHANGED("connection_mode_changed");
String disconnectReason;
DisconnectReason(String reason) {
this.disconnectReason = reason;
}
public String toDisconnectReasonString() {
return disconnectReason;
}
}
}