zoukankan      html  css  js  c++  java
  • 网络通信简单实例BIO,NIO,AIO

    这里,我将做一个简单的通信程序,分别使用三种原始的通信工具:BIO,NIO,AIO。

    功能就是一个服务器,一个客户端。服务器就是处理请求,返回响应。而客户端就是连接服务器,发送请求,接收响应。

    第一步:建立通信接口 

    import java.util.Date;
    
    public class Test {    
        public static void main(String[] args) {
            new Test().test();
        }
        
        private static final long time = new Date().getTime();
        private boolean run = true;
        
        public void test(){
            final Server server = new BioServer(){
                @Override
                public String service(String request) {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    
                    if(request != null)
                        log("收到请求内容:" + request);
                    else
                        log("收到一个空的请求");
                    return "这是一个响应信息!";
                }
            };
            
            final Client client = new BioClient(){
                @Override
                public String running(String response) {                
                    if(response != null) {
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        
                        log("收到响应内容:" + response);
                    }
                    return "这是一个请求信息!";
                }
            };
            
            // 模拟请求与响应
            new Thread(){
                public void run(){
                    String request = null;
                    String response = null;
                    while(run) {
                        request = client.running(response);
                        if(run)
                            response = server.service(request);
                    }
                }
            }.start();
            
            // 3秒后停止运行
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            run = false;
        }
        
        // ==================================================
        // base
        // ==================================================
        
        public abstract class Log {
            public String getName(){
                return toString();
            }
            
            public void log(String contents){
                System.out.println(getName() + " -> " + (new Date().getTime() - time) + ":
    " + getName() + " -> " + contents);
            }
        }
        
        public abstract class Server extends Log {
            public String getName(){
                return "服务器";
            }
            
            /**
             * 处理请求,并返回需要发送的响应信息
             */
            public abstract String service(String request);
        }
        
        public abstract class Client extends Log {
            public String getName(){
                return "客户端";
            }
            
            /**
             * 接收响应,并返回需要发送的请求信息
             */
            public abstract String running(String response);
        }
        
        // ==================================================
        // BIO
        // ==================================================
        
        public abstract class BioServer extends Server {
            
        }
        
        public abstract class BioClient extends Client {
            
        }
        
        // ==================================================
        // NIO
        // ==================================================
        
        public abstract class NioServer extends Server {
            
        }
        
        public abstract class NioClient extends Client {
            
        }
        
        // ==================================================
        // AIO
        // ==================================================
        
        public abstract class AioServer extends Server {
            
        }
        
        public abstract class AioClient extends Client {
            
        }
    }

    Server抽象类,定义了一个处理请求,并返回响应的方法service(...)。Client抽象类,定义了一个发送请求,并接收响应的方法running(...)。

    这两个方法实现,在应用层。这里的应用层,指的是实际应用的场景,方法test()就是一个应用的场所,这里做了一个简单的应用。

    分别为Server和Clinet建立BIO,NIO和AIO的继承类,后续就是如果实现他们。

    运行第一步建立的结果如下:

    服务器 -> 505:
    服务器 -> 收到请求内容:这是一个请求信息!
    客户端 -> 1005:
    客户端 -> 收到响应内容:这是一个响应信息!
    服务器 -> 1508:
    服务器 -> 收到请求内容:这是一个请求信息!
    客户端 -> 2010:
    客户端 -> 收到响应内容:这是一个响应信息!
    服务器 -> 2511:
    服务器 -> 收到请求内容:这是一个请求信息!
    客户端 -> 3013:
    客户端 -> 收到响应内容:这是一个响应信息!

    每次输出分两行,一行是时间,从0开始的时间,接着一行就是内容。

    第二步:建立App运行场景 

    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Test {    
        public static void main(String[] args) {
            new Test().test();
        }
        
        private static final long time = new Date().getTime();
        
        public void test(){
            final Server server = new BioServer(){
                @Override
                public String service(String request) {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    
                    if(request != null)
                        log("收到请求内容:" + request);
                    else
                        log("收到一个空的请求");
                    return "这是一个响应信息!";
                }
            };
            
            final Client client = new BioClient(){
                @Override
                public String running(String response) {                
                    if(response != null) {
                        try {
                            Thread.sleep(500);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        
                        log("收到响应内容:" + response);
                    }
                    return "这是一个请求信息!";
                }
            };
            
            // 模拟App
            server.start();
            client.start();
            
            // 3秒后停止运行
            try {
                Thread.sleep(3000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            client.stop();
            server.stop();
            
            client.waitStop();
            server.waitStop();
        }
        
        // ==================================================
        // base
        // ==================================================
        
        public abstract class Log {
            public String getName(){
                return toString();
            }
            
            public final void log(String contents){
                System.out.println(getName() + " -> " + (new Date().getTime() - time) + ":
    " + getName() + " -> " + contents);
            }
        }
        
        public abstract class App extends Log {
            protected boolean run = false;
            private final AtomicBoolean running = new AtomicBoolean(false);
            
            protected abstract void run();
            
            public final void start(){
                if(run || running.get())
                    return ;
                
                run = true;
                new Thread(){
                    public void run() {
                        try {
                            log("启动");
                            running.compareAndSet(false, true);
                            App.this.run();
                        } finally {
                            running.compareAndSet(true, false);
                            log("已停止");
                        }
                    }
                }.start();
            }
            
            public final void stop() {
                if(!run)
                    return ;
    
                log("停止开始!");
                run = false;
            }
            
            public final void waitStop() {
                if(run)
                    return ;
                
                while(running.get()){
                    log("等待停止中……");
                    
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        
        public abstract class Server extends App {
            public String getName(){
                return "服务器";
            }
            
            /**
             * 处理请求,并返回需要发送的响应信息
             */
            public abstract String service(String request);
        }
        
        public abstract class Client extends App {
            public String getName(){
                return "客户端";
            }
            
            /**
             * 接收响应,并返回需要发送的请求信息
             */
            public abstract String running(String response);
        }
        
        // ==================================================
        // BIO
        // ==================================================
        
        public abstract class BioServer extends Server {
            private List<Service> services = new ArrayList<Service>();
            private AtomicInteger nextServiceNameNum = new AtomicInteger(1);
            
            protected void run(){
                try {
                    while(run){
                        log("接收客户端连接中……");
        
                        // 注:这里不使用线程池,使用笨重的方法。
                        log("接收到一个客户端连接");
                        Service service = new Service();
                        services.add(service);
                        service.start();
                        
                        // 只接收一个客户端
                        while(run){
                            try {
                                Thread.sleep(200);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                } finally {
                    for(Service service : services)
                        service.stop();
                    for(Service service : services)
                        service.waitStop();
                    services.clear();
                }
            }
            
            private class Service extends App {
                private final String name;
                
                public Service() {
                    name = BioServer.this.getName() + "/" + "服务" + nextServiceNameNum.getAndIncrement();
                }
                
                public final String getName(){
                    return name;
                }
    
                @Override
                public void run() {
                    String request = null;
                    String response = null;
                    while(run) {
                        // TODO 接收请求
                        request = "未实现接收请求";
                        // 应用处理
                        response = service(request);
                        // TODO 发送响应
                    }
                }
            }
        }
        
        public abstract class BioClient extends Client {
            private boolean connected = false;
            
            public void run(){
                try {
                    while(connect() && running()){
                        // 关闭连接后,将重新连接
                        closeConnect();
                    }
                } finally {
                    // 退出运行时,关闭连接
                    closeConnect();
                }
            }
            
            /**
             * 连接,并返回是否继续运行。
             */
            private boolean connect(){
                log("连接服务器中……");
                while(run){
                    // TODO 连接服务器
                    log("连接服务器成功!");
                    connected = true;
                    return true;
                }
                log("连接服务器中止!");
                return run;
            }
            
            /**
             * 关闭连接
             */
            private void closeConnect(){
                if(!connected)
                    return ;
                
                connected = false;
            }
            
            /**
             * 运行,并返回是否重新连接。
             */
            private boolean running(){            
                String request = null;
                String response = null;
                while(run) {
                    // TODO 应用运行
                    request = running(response);
                    // 发送请求
                    // TODO 接收响应
                    response = "未实现接收响应";
                }
                return run;
            }
        }
        
        // ==================================================
        // NIO
        // ==================================================
        
        public abstract class NioServer extends Server {
            protected void run(){
            }
        }
        
        public abstract class NioClient extends Client {
            public void run(){
            }
        }
        
        // ==================================================
        // AIO
        // ==================================================
        
        public abstract class AioServer extends Server {
            protected void run(){
            }
        }
        
        public abstract class AioClient extends Client {
            public void run(){
            }
        }
    }

    服务器与客户端,实际是两个不同的App,因此Server与Client都继承于App类。

    App主要负责启动与停止两个操作,运行结果如下:

    服务器 -> 0:
    服务器 -> 启动
    客户端 -> 0:
    客户端 -> 启动
    服务器 -> 0:
    服务器 -> 接收客户端连接中……
    服务器 -> 0:
    服务器 -> 接收到一个客户端连接
    客户端 -> 0:
    客户端 -> 连接服务器中……
    客户端 -> 0:
    客户端 -> 连接服务器成功!
    服务器/服务1 -> 0:
    服务器/服务1 -> 启动
    服务器 -> 500:
    服务器 -> 收到请求内容:未实现接收请求
    客户端 -> 500:
    客户端 -> 收到响应内容:未实现接收响应
    客户端 -> 1000:
    客户端 -> 收到响应内容:未实现接收响应
    服务器 -> 1000:
    服务器 -> 收到请求内容:未实现接收请求
    服务器 -> 1501:
    服务器 -> 收到请求内容:未实现接收请求
    客户端 -> 1501:
    客户端 -> 收到响应内容:未实现接收响应
    客户端 -> 2001:
    客户端 -> 收到响应内容:未实现接收响应
    服务器 -> 2001:
    服务器 -> 收到请求内容:未实现接收请求
    客户端 -> 2502:
    客户端 -> 收到响应内容:未实现接收响应
    服务器 -> 2502:
    服务器 -> 收到请求内容:未实现接收请求
    服务器 -> 3002:
    服务器 -> 收到请求内容:未实现接收请求
    客户端 -> 3002:
    客户端 -> 停止开始!
    客户端 -> 3002:
    客户端 -> 收到响应内容:未实现接收响应
    服务器 -> 3002:
    服务器 -> 停止开始!
    服务器 -> 3002:
    服务器 -> 等待停止中……
    客户端 -> 3002:
    客户端 -> 已停止
    服务器/服务1 -> 3012:
    服务器/服务1 -> 停止开始!
    服务器/服务1 -> 3012:
    服务器/服务1 -> 等待停止中……
    服务器 -> 3202:
    服务器 -> 等待停止中……
    服务器/服务1 -> 3213:
    服务器/服务1 -> 等待停止中……
    服务器 -> 3402:
    服务器 -> 等待停止中……
    服务器/服务1 -> 3413:
    服务器/服务1 -> 等待停止中……
    服务器 -> 3502:
    服务器 -> 收到请求内容:未实现接收请求
    服务器/服务1 -> 3502:
    服务器/服务1 -> 已停止
    服务器 -> 3606:
    服务器 -> 等待停止中……
    服务器 -> 3616:
    服务器 -> 已停止

      

    第三步:实现BIO

    个人感觉BIO是最好写的,但同时它占用线程是比较多。

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.SocketTimeoutException;
    import java.nio.ByteBuffer;
    import java.nio.charset.Charset;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Test {    
        public static void main(String[] args) throws InterruptedException {
            new Test().test();
        }
        
        private static final long time = new Date().getTime();
        
        public void test() throws InterruptedException{
            final Server server = new BioServer(){
                @Override
                public String service(String request) {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    
                    if(request == null) // system error?
                        return "???";
                    if(request.contains("你好"))
                        return "你好!你是?";
                    if(request.contains("我是"))
                        return "哈哈!我记得了!";
                    return "不想跟你说话";
                }
            };
            
            final BioClient client1 = new BioClient(){
                public String getName(){
                    return "客户端A";
                }
                
                @Override
                public String running(String response) {
                    if(response == null)
                        return "你好!";
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if(response.contains("你是"))
                        return "我是客户端A!你不得了吗?";
                    if(response.contains("我记得"))
                        return "记得就好!上次说好要请我吃饭的,什么时候……";
                    return "不想跟你说话!";
                }
            };
            
            final BioClient client2 = new BioClient(){
                public String getName(){
                    return "客户端B";
                }
                
                @Override
                public String running(String response) {
                    if(response == null)
                        return "你好!";
                    try {
                        Thread.sleep(700);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if(response.contains("你是"))
                        return "我是客户端B!你不得了吗?";
                    if(response.contains("我记得"))
                        return "出门忘带钱包了,能不能先错我点?";
                    return "不想跟你说话!";
                }
            };
            
            // 模拟App
            server.start();
            client1.start();
            client2.start();
    
            // 3秒后停止运行
            Thread.sleep(3000);
            client1.stop();
            client2.stop();
            server.stop();
    
            client1.waitStop();
            client2.waitStop();
            server.waitStop();
        }
        
        // ==================================================
        // base
        // ==================================================
        
        public abstract class Log {
            public String getName(){
                return toString();
            }
            
            public final void log(String contents){
                System.out.println(getName() + " -> " + (new Date().getTime() - time) + ":
    " + getName() + " -> " + contents);
            }
            
            public final void logError(String contents){
                System.out.println(getName() + " -> " + (new Date().getTime() - time) + ":
    " + getName() + " -> " + contents + "!!!!!!!!!!!!!!!!!!!!!!!!!!");
            }
        }
        
        public interface App {
            public abstract void start();
            public abstract void stop();
            public abstract void waitStop();
        }
        
        public abstract class AbstractApp extends Log implements App {
            protected boolean run = false;
            private final AtomicBoolean running = new AtomicBoolean(false);
            
            protected abstract void run();
            
            public final void start(){
                if(run || running.get())
                    return ;
                
                run = true;
                new Thread(){
                    public void run() {
                        try {
                            log("启动");
                            running.compareAndSet(false, true);
                            AbstractApp.this.run();
                        } finally {
                            running.compareAndSet(true, false);
                            log("已停止");
                        }
                    }
                }.start();
            }
            
            protected void stopImpl(){
                
            }
            
            public final void stop() {
                if(!run)
                    return ;
    
                if(running.get())
                    log("停止开始!");
                run = false;
                stopImpl();
            }
            
            public final void waitStop() {
                if(run)
                    return ;
                
                while(running.get()){
                    log("等待停止中……");
                    
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        
        public abstract class Server extends AbstractApp {
            public String getName(){
                return "服务器";
            }
            
            /**
             * 处理请求,并返回需要发送的响应信息
             */
            public abstract String service(String request);
        }
        
        public interface Client extends App {
            /**
             * 接收响应,并返回需要发送的请求信息
             */
            public abstract String running(String response);
        }
        
        public class Session {
            protected static final int RECEIVE_MAX_LEN = 100;
            private ByteBuffer receiveContents = ByteBuffer.allocate(RECEIVE_MAX_LEN);
            private boolean receiveOutMaxLen = false;
            
            protected final String onReceive(ByteBuffer buffer){
                while(buffer.hasRemaining()){
                    byte b = buffer.get();
                    if(b == 0){
                        if(!receiveOutMaxLen){
                            receiveContents.flip();
                            String result = new String(receiveContents.array(), 0, receiveContents.limit(), Charset.defaultCharset());
                            receiveContents.clear();
                            return result;
                        } else {
                            return "error.outLen";
                        }
                    } else if(receiveOutMaxLen) {
                        // 处理接收数据的长度超出限制的情况,这里处理方式为一直接收,无时间限制。
                        // if(!run)
                        //    return null;
                    } else if(receiveContents.hasRemaining()) {
                        receiveContents.put(b);
                    } else {
                        receiveOutMaxLen = true;
                        receiveContents.clear();
                    }
                }
                
                return null;
            }
        }
        
        // ==================================================
        // BIO
        // ==================================================
        
        public abstract class BioSocketApp extends AbstractApp {        
            public BioSocketApp(){
                this(null);
            }
            
            public BioSocketApp(Socket socket) {
                this.socket = socket;
            }
    
            protected Socket socket = null;
            private ByteBuffer buffer = ByteBuffer.allocate(Session.RECEIVE_MAX_LEN);
            private Session session = new Session();
            
            protected String receive(String target){
                log("正在接收" + target + "……");
                // 解析 <-> 读取
                while(run){
                    // 解析
                    buffer.flip();
                    try {
                        String contents = session.onReceive(buffer);
                        if(contents != null)
                            return contents;
                    } finally {
                        buffer.compact();
                    }
    
                    while(run) {
                        int count = -1;
                        while(run){
                            try {
                                count = socket.getInputStream().read(buffer.array());
                                break;
                            } catch (IOException e) {
                                logError(e.getMessage());
                                // e.printStackTrace();
                            }
                        }
                        
                        if(count > 0 ){
                            buffer.position(buffer.position() + count);
                            break ;
                        } if(count == -1){
                            return "disconnect";
                        }
                    }
                }
                return null;
            }
            
            protected final boolean send(String target, String response){
                log("发送" + target + ":" + response + "!");
                ByteBuffer buffer = ByteBuffer.wrap((response + (char)0).getBytes());
    
                log("正在发送" + target + "……");
                try {
                    socket.getOutputStream().write(buffer.array(), buffer.position(), buffer.limit());
                } catch (IOException e) {
                    log(e.getMessage());
                    // e.printStackTrace();
                    return run;
                }
                
                log("发送" + target + "成功!");
                return run;
            }
        }
        
        public abstract class BioServer extends Server {
            private List<Service> services = new ArrayList<Service>();
            private AtomicInteger nextServiceNameNum = new AtomicInteger(1);
            
            protected void run(){
                ServerSocket acceptor = null;
                try {
                    acceptor = new ServerSocket();
                    acceptor.bind(new InetSocketAddress("localhost", 8080));
                    acceptor.setSoTimeout(1000);
                    
                    while(run){
                        log("接收客户端连接中……");
                        Socket socket = null;
                        try {
                            socket = acceptor.accept();
                        } catch (SocketTimeoutException e) {
                            continue ;
                        } catch (IOException e) {
                            logError(e.getMessage());
                            // e.printStackTrace();
                            socket.close();
                            continue;
                        }
                        
                        if(!run)
                            break;
        
                        // 注:这里不使用线程池,使用笨重的方法。
                        log("接收到一个客户端连接");
                        Service service = new Service(socket);
                        services.add(service);
                        service.start();
                    }
                } catch (IOException e) {
                    logError(e.getMessage());
                    // e.printStackTrace();
                } finally {
                    if(acceptor == null)
                        return ;
                    
                    for(Service service : services)
                        service.stop();
                    for(Service service : services)
                        service.waitStop();
                    services.clear();
                    
                    try {
                        acceptor.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
            
            private class Service extends BioSocketApp {
                private final String name;
                
                public Service(Socket socket) {
                    super(socket);
                    name = BioServer.this.getName() + "/" + "服务" + nextServiceNameNum.getAndIncrement();
                }
                
                public final String getName(){
                    return name;
                }
    
                @Override
                public void run() {
                    try {
                        socket.setSoTimeout(1000);
                        
                        while(run) {
                            String request = receive("客户端请求");
                            if(request != null) {
                                if(request.equals("disconnect"))
                                    break;
                                
                                if(request.startsWith("error")) {
                                    if(send("响应", "请求格式不正确!"))
                                        continue;
                                    break;
                                }
                                
                                log("接收到客户端请求:" + request);
                                String response = service(request);
                                if(response != null && !send("响应", response))
                                    break;
                            }
                        }
                    } catch (IOException e) {
                        e.printStackTrace();
                    } finally {                    
                        try {
                            socket.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        log("与客户端断开连接!");
                    }
                }
            }
        }
        
        public abstract class BioClient extends BioSocketApp implements Client {
            public BioClient() {
            }
    
            private boolean connected = false;
            
            public String getName(){
                return "Bio客户端";
            }
            
            public void run(){
                try {
                    // 连接 -> 运行
                    while(connect() && running()){
                        // 关闭连接后,将重新连接
                        closeConnect();
                    }
                } finally {
                    // 退出运行时,关闭连接
                    closeConnect();
                }
            }
            
            /**
             * 连接,并返回是否继续运行。
             */
            private boolean connect(){
                if(socket != null)
                    throw new RuntimeException("socket not closed!");
    
                log("正在连接服务器中……");
                InetSocketAddress address = new InetSocketAddress("localhost", 8080);
                while(run){
                    try {
                        socket = new Socket();
                        socket.connect(address);
                        socket.setSoTimeout(1000);
                        connected = true;
                        log("连接服务器成功!");
                        return true;
                    } catch (IOException e) {
                        logError(e.getMessage());
                        // e.printStackTrace();
                        closeConnect();
                    }
                    
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                return false;
            }
            
            /**
             * 关闭连接
             */
            private void closeConnect(){
                if(socket != null){
                    if(connected) {
                        connected = false;
                        log("与服务器断开连接!");
                    }
                    
                    try {
                        socket.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    socket = null;
                }
            }
            
            /**
             * 运行,并返回是否重新连接。
             */
            private boolean running(){
                String response = null;
                // 循环请求
                while(run){
                    // 处理响应,获取请求
                    String request = running(response);
                    // 退出运行
                    if(request == null)
                        break ;
                    // 清空响应
                    response = null;
                    // 发送请求
                    if(send("请求", request)){
                        // 接收响应
                        response = receive("服务器响应");
                        if(response != null){
                            if(response.equals("disconnect"))
                                break;
                            if(!response.startsWith("error"))
                                log("接收到服务器响应:" + response);
                            else
                                log("接收服务器响应时,发生错误:" + response);
                        } else {
                            response = "无回应!";
                        }
                    }
                }
                return run;
            }
        }
        
        // ==================================================
        // NIO
        // ==================================================
        
        // 省略……

      // ================================================== // AIO // ================================================== // 省略…… }

    运行结果:

    服务器 -> 0:
    服务器 -> 启动
    客户端A -> 0:
    客户端A -> 启动
    客户端B -> 0:
    客户端B -> 启动
    客户端B -> 0:
    客户端B -> 正在连接服务器中……
    客户端A -> 0:
    客户端A -> 正在连接服务器中……
    服务器 -> 11:
    服务器 -> 接收客户端连接中……
    客户端B -> 11:
    客户端B -> 连接服务器成功!
    客户端B -> 11:
    客户端B -> 发送请求:你好!!
    客户端B -> 11:
    客户端B -> 正在发送请求……
    服务器 -> 11:
    服务器 -> 接收到一个客户端连接
    客户端A -> 11:
    客户端A -> 连接服务器成功!
    客户端A -> 11:
    客户端A -> 发送请求:你好!!
    客户端A -> 11:
    客户端A -> 正在发送请求……
    客户端A -> 11:
    客户端A -> 发送请求成功!
    客户端B -> 11:
    客户端B -> 发送请求成功!
    客户端A -> 11:
    客户端A -> 正在接收服务器响应……
    客户端B -> 11:
    客户端B -> 正在接收服务器响应……
    服务器 -> 11:
    服务器 -> 接收客户端连接中……
    服务器/服务1 -> 11:
    服务器/服务1 -> 启动
    服务器 -> 11:
    服务器 -> 接收到一个客户端连接
    服务器/服务1 -> 11:
    服务器/服务1 -> 正在接收客户端请求……
    服务器 -> 11:
    服务器 -> 接收客户端连接中……
    服务器/服务1 -> 11:
    服务器/服务1 -> 接收到客户端请求:你好!
    服务器/服务2 -> 11:
    服务器/服务2 -> 启动
    服务器/服务2 -> 11:
    服务器/服务2 -> 正在接收客户端请求……
    服务器/服务2 -> 11:
    服务器/服务2 -> 接收到客户端请求:你好!
    服务器/服务1 -> 517:
    服务器/服务1 -> 发送响应:你好!你是?!
    服务器/服务2 -> 517:
    服务器/服务2 -> 发送响应:你好!你是?!
    服务器/服务1 -> 517:
    服务器/服务1 -> 正在发送响应……
    服务器/服务2 -> 517:
    服务器/服务2 -> 正在发送响应……
    服务器/服务2 -> 517:
    服务器/服务2 -> 发送响应成功!
    服务器/服务2 -> 517:
    服务器/服务2 -> 正在接收客户端请求……
    服务器/服务1 -> 517:
    服务器/服务1 -> 发送响应成功!
    客户端B -> 517:
    客户端B -> 接收到服务器响应:你好!你是?
    客户端A -> 517:
    客户端A -> 接收到服务器响应:你好!你是?
    服务器/服务1 -> 517:
    服务器/服务1 -> 正在接收客户端请求……
    客户端A -> 1020:
    客户端A -> 发送请求:我是客户端A!你不得了吗?!
    服务器 -> 1020:
    服务器 -> 接收客户端连接中……
    客户端A -> 1020:
    客户端A -> 正在发送请求……
    客户端A -> 1020:
    客户端A -> 发送请求成功!
    客户端A -> 1020:
    客户端A -> 正在接收服务器响应……
    服务器/服务1 -> 1020:
    服务器/服务1 -> 接收到客户端请求:我是客户端A!你不得了吗?
    客户端B -> 1222:
    客户端B -> 发送请求:我是客户端B!你不得了吗?!
    客户端B -> 1222:
    客户端B -> 正在发送请求……
    客户端B -> 1222:
    客户端B -> 发送请求成功!
    客户端B -> 1222:
    客户端B -> 正在接收服务器响应……
    服务器/服务2 -> 1222:
    服务器/服务2 -> 接收到客户端请求:我是客户端B!你不得了吗?
    服务器/服务1 -> 1522:
    服务器/服务1 -> 发送响应:哈哈!我记得了!!
    服务器/服务1 -> 1522:
    服务器/服务1 -> 正在发送响应……
    服务器/服务1 -> 1522:
    服务器/服务1 -> 发送响应成功!
    服务器/服务1 -> 1522:
    服务器/服务1 -> 正在接收客户端请求……
    客户端A -> 1522:
    客户端A -> 接收到服务器响应:哈哈!我记得了!
    服务器/服务2 -> 1725:
    服务器/服务2 -> 发送响应:哈哈!我记得了!!
    服务器/服务2 -> 1725:
    服务器/服务2 -> 正在发送响应……
    服务器/服务2 -> 1725:
    服务器/服务2 -> 发送响应成功!
    服务器/服务2 -> 1725:
    服务器/服务2 -> 正在接收客户端请求……
    客户端B -> 1725:
    客户端B -> 接收到服务器响应:哈哈!我记得了!
    服务器 -> 2020:
    服务器 -> 接收客户端连接中……
    客户端A -> 2022:
    客户端A -> 发送请求:记得就好!上次说好要请我吃饭的,什么时候……!
    客户端A -> 2022:
    客户端A -> 正在发送请求……
    客户端A -> 2022:
    客户端A -> 发送请求成功!
    客户端A -> 2022:
    客户端A -> 正在接收服务器响应……
    服务器/服务1 -> 2022:
    服务器/服务1 -> 接收到客户端请求:记得就好!上次说好要请我吃饭的,什么时候……
    客户端B -> 2427:
    客户端B -> 发送请求:出门忘带钱包了,能不能先错我点?!
    客户端B -> 2427:
    客户端B -> 正在发送请求……
    客户端B -> 2427:
    客户端B -> 发送请求成功!
    客户端B -> 2427:
    客户端B -> 正在接收服务器响应……
    服务器/服务2 -> 2427:
    服务器/服务2 -> 接收到客户端请求:出门忘带钱包了,能不能先错我点?
    服务器/服务1 -> 2527:
    服务器/服务1 -> 发送响应:不想跟你说话!
    服务器/服务1 -> 2527:
    服务器/服务1 -> 正在发送响应……
    服务器/服务1 -> 2527:
    服务器/服务1 -> 发送响应成功!
    服务器/服务1 -> 2527:
    服务器/服务1 -> 正在接收客户端请求……
    客户端A -> 2527:
    客户端A -> 接收到服务器响应:不想跟你说话
    服务器/服务2 -> 2927:
    服务器/服务2 -> 发送响应:不想跟你说话!
    服务器/服务2 -> 2927:
    服务器/服务2 -> 正在发送响应……
    服务器/服务2 -> 2927:
    服务器/服务2 -> 发送响应成功!
    客户端B -> 2927:
    客户端B -> 接收到服务器响应:不想跟你说话
    服务器/服务2 -> 2927:
    服务器/服务2 -> 正在接收客户端请求……
    客户端A -> 3007:
    客户端A -> 停止开始!
    客户端B -> 3007:
    客户端B -> 停止开始!
    服务器 -> 3007:
    服务器 -> 停止开始!
    客户端A -> 3007:
    客户端A -> 等待停止中……
    客户端A -> 3027:
    客户端A -> 发送请求:不想跟你说话!!
    客户端A -> 3027:
    客户端A -> 正在发送请求……
    服务器/服务1 -> 3027:
    服务器/服务1 -> 停止开始!
    服务器/服务2 -> 3027:
    服务器/服务2 -> 停止开始!
    客户端A -> 3027:
    客户端A -> 发送请求成功!
    服务器/服务1 -> 3027:
    服务器/服务1 -> 等待停止中……
    客户端A -> 3027:
    客户端A -> 与服务器断开连接!
    服务器/服务1 -> 3027:
    服务器/服务1 -> 与客户端断开连接!
    服务器/服务1 -> 3027:
    服务器/服务1 -> 已停止
    客户端A -> 3027:
    客户端A -> 已停止
    客户端B -> 3208:
    客户端B -> 等待停止中……
    服务器/服务2 -> 3228:
    服务器/服务2 -> 等待停止中……
    客户端B -> 3408:
    客户端B -> 等待停止中……
    服务器/服务2 -> 3428:
    服务器/服务2 -> 等待停止中……
    客户端B -> 3608:
    客户端B -> 等待停止中……
    服务器/服务2 -> 3628:
    服务器/服务2 -> 等待停止中……
    客户端B -> 3628:
    客户端B -> 发送请求:不想跟你说话!!
    客户端B -> 3628:
    客户端B -> 正在发送请求……
    客户端B -> 3628:
    客户端B -> 发送请求成功!
    客户端B -> 3628:
    客户端B -> 与服务器断开连接!
    客户端B -> 3628:
    客户端B -> 已停止
    服务器/服务2 -> 3628:
    服务器/服务2 -> 与客户端断开连接!
    服务器/服务2 -> 3628:
    服务器/服务2 -> 已停止
    服务器 -> 3808:
    服务器 -> 等待停止中……
    服务器 -> 3828:
    服务器 -> 已停止

      

    第四步:实现NIO 

    NIO如果不设置成阻塞,其实跟BIO一样。触发selector事件,客户端除了需要注册,连接,还要在触发事件时使用方法finishConnect()。另外,Selector的select()方法会阻塞,阻塞的时候,是不允许注册的,反则注册方法也会被阻塞,即死锁,解决的方法就是使用wakeup()方法,唤醒Selector线程,让Selector进入一个同步阻塞,再执行异步注册。

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.SocketTimeoutException;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.Iterator;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Test {    
        public static void main(String[] args) throws InterruptedException {
            new Test().test();
        }
        
        private static final long time = new Date().getTime();
        
        public void test() throws InterruptedException{
            final Server server = new NioServer(){
                @Override
                public String service(String request) {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    
                    if(request == null) // system error?
                        return "???";
                    if(request.contains("你好"))
                        return "你好!你是?";
                    if(request.contains("我是"))
                        return "哈哈!我记得了!";
                    return "不想跟你说话";
                }
            };
            
            final Client client1 = new NioClient(){
                public String getName(){
                    return "客户端A";
                }
                
                @Override
                public String running(String response) {
                    if(response == null)
                        return "你好!";
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if(response.contains("你是"))
                        return "我是客户端A!你不记得了吗?";
                    if(response.contains("我记得"))
                        return "记得就好!上次说好要请我吃饭的,什么时候……";
                    return "不想跟你说话!";
                }
            };
            
            final Client client2 = new NioClient(){
                public String getName(){
                    return "客户端B";
                }
                
                @Override
                public String running(String response) {
                    if(response == null)
                        return "你好!";
                    try {
                        Thread.sleep(700);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if(response.contains("你是"))
                        return "我是客户端B!你不记得了吗?";
                    if(response.contains("我记得"))
                        return "出门忘带钱包了,能不能先错我点?";
                    return "不想跟你说话!";
                }
            };
            
            // 模拟App
            server.start();
            client1.start();
            //client2.start();
    
            // 2秒后停止运行
            Thread.sleep(2000);
            client1.stop();
            // 3秒后停止运行
            Thread.sleep(3000);
            client2.stop();
            server.stop();
    
            client1.waitStop();
            client2.waitStop();
            server.waitStop();
        }
        
        // ==================================================
        // base
        // ==================================================
        
        public abstract class Log {
            public String getName(){
                return toString();
            }
            
            public final void log(String contents){
                System.out.println(getName() + " -> " + (new Date().getTime() - time) + ":
    " + getName() + " -> " + contents);
            }
            
            public final void logError(String contents){
                System.out.println(getName() + " -> " + (new Date().getTime() - time) + ":
    " + getName() + " -> " + contents + "!!!!!!!!!!!!!!!!!!!!!!!!!!");
            }
        }
        
        public interface App {
            public abstract void start();
            public abstract void stop();
            public abstract void waitStop();
        }
        
        public abstract class AbstractApp extends Log implements App {
            protected boolean run = false;
            private final AtomicBoolean running = new AtomicBoolean(false);
            
            protected abstract void run();
            
            public final void start(){
                if(run || running.get())
                    return ;
                
                run = true;
                new Thread(){
                    public void run() {
                        try {
                            log("启动");
                            running.compareAndSet(false, true);
                            AbstractApp.this.run();
                        } finally {
                            running.compareAndSet(true, false);
                            log("已停止");
                        }
                    }
                }.start();
            }
            
            protected void stopImpl(){
                
            }
            
            public final void stop() {
                if(!run)
                    return ;
    
                if(running.get())
                    log("停止开始!");
                run = false;
                stopImpl();
            }
            
            public final void waitStop() {
                if(run)
                    return ;
                
                while(running.get()){
                    log("等待停止中……");
                    
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        
        public abstract class Server extends AbstractApp {
            public String getName(){
                return "服务器";
            }
            
            /**
             * 处理请求,并返回需要发送的响应信息
             */
            public abstract String service(String request);
        }
        
        public interface Client extends App {
            /**
             * 接收响应,并返回需要发送的请求信息
             */
            public abstract String running(String response);
        }
        
        public class Session {
            protected static final int RECEIVE_MAX_LEN = 100;
            private ByteBuffer receiveContents = ByteBuffer.allocate(RECEIVE_MAX_LEN);
            private boolean receiveOutMaxLen = false;
            
            protected final String onReceive(ByteBuffer buffer){
                while(buffer.hasRemaining()){
                    byte b = buffer.get();
                    if(b == 0){
                        if(!receiveOutMaxLen){
                            receiveContents.flip();
                            String result = new String(receiveContents.array(), 0, receiveContents.limit(), Charset.defaultCharset());
                            receiveContents.clear();
                            return result;
                        } else {
                            return "error.outLen";
                        }
                    } else if(receiveOutMaxLen) {
                        // 处理接收数据的长度超出限制的情况,这里处理方式为一直接收,无时间限制。
                        // if(!run)
                        //    return null;
                    } else if(receiveContents.hasRemaining()) {
                        receiveContents.put(b);
                    } else {
                        receiveOutMaxLen = true;
                        receiveContents.clear();
                    }
                }
                
                return null;
            }
        }
        
        // ==================================================
        // BIO
        // ==================================================
        
      // 省略

      // ================================================== // NIO // ================================================== public class NioSession extends Session { private String contents = null; private int id; public NioSession() { } public NioSession(int id) { this.id = id; } public int getId(){ return id; } public String getContents(){ return contents; } public void setContents(String contents) { this.contents = contents; } } public abstract class NioSocketApp extends AbstractApp { private ByteBuffer buffer = ByteBuffer.allocate(NioSession.RECEIVE_MAX_LEN); protected final String receive(String target, SelectionKey key){ SocketChannel socket = (SocketChannel) key.channel(); NioSession session = (NioSession) key.attachment(); int count = -1; try { count = socket.read(buffer); } catch (IOException e) { logError(e.getMessage()); // e.printStackTrace(); } if(count == -1){ return "disconnect"; } if(count > 0){ buffer.flip(); try { String contents = session.onReceive(buffer); if(contents != null) { session.setContents(contents); return contents; } } finally { buffer.compact(); } } return null; } protected final boolean send(String target, SelectionKey key){ SocketChannel socket = (SocketChannel) key.channel(); NioSession session = (NioSession) key.attachment(); String response = session.getContents(); log("发送" + target + "" + response + "!"); ByteBuffer buffer = ByteBuffer.wrap((response + (char)0).getBytes()); log("正在发送" + target + "……"); try { socket.write(buffer); } catch (IOException e) { log(e.getMessage()); return run; } log("发送" + target + "成功!"); return run; } protected void closeSocket(SelectionKey key){ try { key.channel().close(); } catch (IOException e) { e.printStackTrace(); } log("与客户端断开连接!"); } } public abstract class NioServer extends Server { private Selector acceptorSelector = null; private AtomicInteger nextServiceNameNum = new AtomicInteger(1); protected void run(){ ServerSocketChannel acceptor = null; Service service = null; try { try { acceptor = ServerSocketChannel.open(); acceptor.bind(new InetSocketAddress("localhost", 8080)); acceptor.configureBlocking(false); service = new Service(); service.init(); service.start(); acceptorSelector = Selector.open(); acceptor.register(acceptorSelector, SelectionKey.OP_ACCEPT); } catch (IOException e1) { e1.printStackTrace(); log("启动失败!"); return ; } while(run){ log("正在接收客户端连接中……"); try { if(acceptorSelector.select(1000) > 0){ Iterator<SelectionKey> keys = acceptorSelector.selectedKeys().iterator(); while(keys.hasNext()){ SelectionKey key = keys.next(); if(key.isAcceptable()){ SocketChannel socket = acceptor.accept(); // 这里有可能获取的socket为空 // if(socket != null){ log("接收到一个客户端连接!"); service.register(socket); // } } keys.remove(); } } } catch (IOException e) { logError(e.getMessage()); e.printStackTrace(); } } } finally { if(acceptor != null){ if(acceptorSelector != null) { try { acceptorSelector.close(); } catch (IOException e1) { e1.printStackTrace(); } } acceptorSelector = null; if(service != null){ service.stop(); service.waitStop(); } try { acceptor.close(); } catch (IOException e) { e.printStackTrace(); } } log("已停止!"); } } protected void stopImpl() { Selector selector = this.acceptorSelector; if(selector != null){ selector.wakeup(); } } private class Service extends NioSocketApp{ private Selector selector; private Object blockLock = null; private int serviceId = 0; public String getName(){ return NioServer.this.getName() + "/" + "服务" + (serviceId > 0 ? serviceId : ""); } public void init() throws IOException{ if(selector == null) selector = Selector.open(); } @Override protected void run() { try { Object blockLock = null; while(run){ try { if(selector.select() > 0){ Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while(keys.hasNext()){ SelectionKey key = keys.next(); keys.remove(); NioSession session = (NioSession) key.attachment(); serviceId = session.getId(); if(key.isReadable()){ String request = receive("客户端请求", key); if(request == null) continue ; // 不会发生!NIO的读取数据都是就绪,也就是说总会读取到数据。 if(request.equals("disconnect")){ closeSocket(key); continue ; } if(request.startsWith("error")) { session.setContents("请求格式不正确!"); } else { String response = service(request); if(response == null){ closeSocket(key); continue; } session.setContents(response); } key.interestOps(SelectionKey.OP_WRITE); } if(key.isWritable()){ if(!send("响应", key)){ closeSocket(key); continue; } key.interestOps(SelectionKey.OP_READ); } serviceId = 0; } } } catch (IOException e) { e.printStackTrace(); } if(this.blockLock != null){ blockLock = this.blockLock; if(blockLock != null){ synchronized(blockLock){ // 不操作,这里同步,为了让register方法有机会同步块。 } } } } } finally { if(selector != null){ for(SelectionKey key : selector.keys()){ NioSession session = (NioSession) key.attachment(); serviceId = session.getId(); closeSocket(key); serviceId = 0; } try { selector.close(); } catch (IOException e) { e.printStackTrace(); } selector = null; } } } public void register(SocketChannel socket) { try { blockLock = socket.blockingLock(); synchronized(blockLock){ socket.configureBlocking(false); // 这里要唤醒,因为register和selector是互斥锁方法。 selector.wakeup(); SelectionKey key = socket.register(selector, SelectionKey.OP_READ); key.attach(new NioSession(nextServiceNameNum.getAndIncrement())); blockLock = null; } } catch (IOException e) { e.printStackTrace(); try { socket.close(); } catch (IOException e1) { e1.printStackTrace(); } } } protected void stopImpl() { Selector selector = this.selector; if(selector != null){ selector.wakeup(); } } } } public abstract class NioClient extends NioSocketApp implements Client { public NioClient() { } private Selector selector = null; private SocketChannel socket = null; private boolean connected = false; public String getName(){ return "Nio客户端"; } public void run(){ try { try { selector = Selector.open(); } catch (IOException e) { e.printStackTrace(); return ; } // 连接 -> 运行 while(connect() && running()){ // 关闭连接后,将重新连接 closeConnect(); } } finally { // 退出运行时,关闭连接 closeConnect(); if(selector != null){ try { selector.close(); } catch (IOException e) { e.printStackTrace(); } } } } private boolean connect(){ if(socket != null) throw new RuntimeException("connect is not closed"); log("正在连接服务器中……"); while(run){ try { socket = SocketChannel.open(); socket.configureBlocking(false); socket.connect(new InetSocketAddress("localhost", 8080)); socket.register(selector, SelectionKey.OP_CONNECT); return run; } catch (IOException e) { e.printStackTrace(); closeConnect(); } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } return false; } private void closeConnect(){ if(socket != null){ if(connected) { connected = false; log("与服务器断开连接!"); } try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } private boolean running(){ while(run){ try { if(selector.select() > 0){ Iterator<SelectionKey> keys = selector.selectedKeys().iterator(); while(keys.hasNext()){ SelectionKey key = keys.next(); keys.remove(); if(key.isConnectable()){ socket.finishConnect(); key.attach(new NioSession()); key.interestOps(SelectionKey.OP_WRITE); log("连接服务器成功!"); } NioSession session = (NioSession) key.attachment(); if(key.isWritable()){ String request = running(session.getContents()); session.setContents(request); if(!send("响应", key)){ closeSocket(key); continue; } key.interestOps(SelectionKey.OP_READ); } if(key.isReadable()){ String response = receive("服务器响应", key); if(response == null) continue ; // 不会发生!NIO的读取数据都是就绪,也就是说总会读取到数据。 if(response.equals("disconnect")){ closeSocket(key); continue ; } if(response.startsWith("error")) { log("接收服务器响应时,发生错误:" + response); } else log("接收到服务器响应:" + response); key.interestOps(SelectionKey.OP_WRITE); } } } } catch (IOException e) { e.printStackTrace(); } } return true; } protected void stopImpl() { Selector selector = this.selector; if(selector != null){ selector.wakeup(); } } } // ================================================== // AIO // ==================================================   // 省略…… }

      

    第五步:实现AIO

    AIO本来可以很简单的实现的,但因为考滤用最少的线程实线,就变得复杂了点。虽然它没有BIO那么简单,但它灵活性高,不阻塞,跟NIO相比的话,AIO各方面都比较方便一点。

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.net.SocketTimeoutException;
    import java.nio.ByteBuffer;
    import java.nio.channels.AsynchronousServerSocketChannel;
    import java.nio.channels.AsynchronousSocketChannel;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.HashSet;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Set;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Test {    
        public static void main(String[] args) throws InterruptedException {
            new Test().test();
        }
        
        private static final long time = new Date().getTime();
        
        public void test() throws InterruptedException{
            final Server server = new AioServer(){
                @Override
                public String service(String request) {
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    
                    if(request == null) // system error?
                        return "???";
                    if(request.contains("你好"))
                        return "你好!你是?";
                    if(request.contains("我是"))
                        return "哈哈!我记得了!";
                    return "不想跟你说话";
                }
            };
            
            final Client client1 = new AioClient(){
                public String getName(){
                    return "客户端A";
                }
                
                @Override
                public String running(String response) {
                    if(response == null)
                        return "你好!";
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if(response.contains("你是"))
                        return "============我是客户端A!你不记得了吗?===========";
                    if(response.contains("我记得"))
                        return "记得就好!上次说好要请我吃饭的,什么时候……";
                    return "不想跟你说话!";
                }
            };
            
            final Client client2 = new AioClient(){
                public String getName(){
                    return "客户端B";
                }
                
                @Override
                public String running(String response) {
                    if(response == null)
                        return "你好!";
                    try {
                        Thread.sleep(700);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    if(response.contains("你是"))
                        return "==========我是客户端B!你不记得了吗?==========";
                    if(response.contains("我记得"))
                        return "出门忘带钱包了,能不能先错我点?";
                    return "不想跟你说话!";
                }
            };
            
            // 模拟App
            server.start();
            client1.start();
            client2.start();
    
            // 2秒后停止运行
            Thread.sleep(2000);
            client1.stop();
            // 3秒后停止运行
            Thread.sleep(3000);
            client2.stop();
            server.stop();
    
            client1.waitStop();
            client2.waitStop();
            server.waitStop();
        }
        
        // ==================================================
        // base
        // ==================================================
        
        public abstract class Log {
            public String getName(){
                return toString();
            }
            
            public final void log(String contents){
                System.out.println(getName() + " -> " + (new Date().getTime() - time) + ":
    " + getName() + " -> " + contents);
            }
            
            public final void logError(String contents){
                System.out.println(getName() + " -> " + (new Date().getTime() - time) + ":
    " + getName() + " -> " + contents + "!!!!!!!!!!!!!!!!!!!!!!!!!!");
            }
        }
        
        public interface App {
            public abstract void start();
            public abstract void stop();
            public abstract void waitStop();
        }
        
        public abstract class AbstractApp extends Log implements App {
            protected boolean run = false;
            private final AtomicBoolean running = new AtomicBoolean(false);
            
            protected abstract void run();
            
            public final void start(){
                if(run || running.get())
                    return ;
                
                run = true;
                new Thread(){
                    public void run() {
                        try {
                            log("启动");
                            running.compareAndSet(false, true);
                            AbstractApp.this.run();
                        } finally {
                            running.compareAndSet(true, false);
                            log("已停止");
                        }
                    }
                }.start();
            }
            
            protected void stopImpl(){
                
            }
            
            public final void stop() {
                if(!run)
                    return ;
    
                if(running.get())
                    log("停止开始!");
                run = false;
                stopImpl();
            }
            
            public final void waitStop() {
                if(run)
                    return ;
                
                while(running.get()){
                    log("等待停止中……");
                    
                    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
        
        public abstract class Server extends AbstractApp {
            public String getName(){
                return "服务器";
            }
            
            /**
             * 处理请求,并返回需要发送的响应信息
             */
            public abstract String service(String request);
        }
        
        public interface Client extends App {
            /**
             * 接收响应,并返回需要发送的请求信息
             */
            public abstract String running(String response);
        }
        
        public class Session {
            protected static final int RECEIVE_MAX_LEN = 100;
            private ByteBuffer receiveContents = ByteBuffer.allocate(RECEIVE_MAX_LEN);
            private boolean receiveOutMaxLen = false;
            
            protected final String onReceive(ByteBuffer buffer){
                while(buffer.hasRemaining()){
                    byte b = buffer.get();
                    if(b == 0){
                        if(!receiveOutMaxLen){
                            receiveContents.flip();
                            String result = new String(receiveContents.array(), 0, receiveContents.limit(), Charset.defaultCharset());
                            receiveContents.clear();
                            return result;
                        } else {
                            return "error.outLen";
                        }
                    } else if(receiveOutMaxLen) {
                        // 处理接收数据的长度超出限制的情况,这里处理方式为一直接收,无时间限制。
                        // if(!run)
                        //    return null;
                    } else if(receiveContents.hasRemaining()) {
                        receiveContents.put(b);
                    } else {
                        receiveOutMaxLen = true;
                        receiveContents.clear();
                    }
                }
                
                return null;
            }
        }
        
        // ==================================================
        // BIO
        // ==================================================
        
        // 省略……

      // ================================================== // NIO // ================================================== // 省略……

      // ================================================== // AIO // ================================================== public class AioSession extends Session { private int id; private ByteBuffer buffer = ByteBuffer.allocate(AioSession.RECEIVE_MAX_LEN); private AsynchronousSocketChannel socket; private Future<Integer> future; private boolean read = true; public AioSession(AsynchronousSocketChannel socket) { this.socket = socket; } public AioSession(int id, AsynchronousSocketChannel socket) { this.id = id; this.socket = socket; } public int getId(){ return id; } public ByteBuffer getBuffer(){ return buffer; } public AsynchronousSocketChannel getSocket(){ return socket; } public Future<Integer> getFuture(){ return future; } public void setFuture(Future<Integer> future, boolean read){ this.future = future; this.read = read; } public boolean isRead() { return read; } } public abstract class AioSocketApp extends AbstractApp { protected final String receive(String target, AioSession session){ AsynchronousSocketChannel socket = session.getSocket(); ByteBuffer buffer = session.getBuffer(); Future<Integer> future = session.getFuture(); int count = -1; if(future != null){ try { count = future.get(); } catch (InterruptedException | ExecutionException e) { logError(e.getMessage()); // e.printStackTrace(); } if(count == -1){ return "disconnect"; } } if(future == null || count > 0){ if(count > 0){ buffer.flip(); try { String contents = session.onReceive(buffer); if(contents != null) return contents; } finally { buffer.compact(); } } future = socket.read(buffer); session.setFuture(future, true); } return null; } protected final boolean send(String target, String contents, AioSession session){ AsynchronousSocketChannel socket = session.getSocket(); log("发送" + target + ":" + contents + "!"); ByteBuffer buffer = ByteBuffer.wrap((contents + (char)0).getBytes()); log("正在发送" + target + "……"); session.setFuture(socket.write(buffer), false); return run; } } public abstract class AioServer extends Server { private AtomicInteger nextServiceNameNum = new AtomicInteger(1); protected void run(){ Service service = new Service(); AsynchronousServerSocketChannel acceptor = null; try { acceptor = AsynchronousServerSocketChannel.open(); acceptor.bind(new InetSocketAddress("localhost", 8080)); service = new Service(); service.start(); while(run){ Future<AsynchronousSocketChannel> future = null; try { while(run){ log("正在接收客户端连接……"); if(future == null) future = acceptor.accept(); if(future.isDone()){ log("接收到一个客户端的连接!"); AsynchronousSocketChannel socket = null; try { socket = future.get(); service.add(socket); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); socket.close(); } future = null; continue ; } if(future.isCancelled()){ log("接收客户端连接已中止!"); future = null; continue ; } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } finally { if(future != null && !future.cancel(false)){ while(!(future.isDone() || future.isCancelled())){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } } } } catch (IOException e){ e.printStackTrace(); } finally { if(acceptor != null){ if(service != null){ service.stop(); service.waitStop(); service.clear(); } try { acceptor.close(); } catch (IOException e) { e.printStackTrace(); } } } } private class Service extends AioSocketApp { private BlockingQueue<AsynchronousSocketChannel> sockets = new ArrayBlockingQueue<AsynchronousSocketChannel>(100); private Set<AioSession> sessions = new HashSet<AioSession>(); private int serviceId = 0; public String getName(){ return AioServer.this.getName() + "/" + "服务" + (serviceId > 0 ? serviceId : ""); } @Override protected void run() { AsynchronousSocketChannel socket = null; while(run){ while(run && !sockets.isEmpty()) { try { socket = sockets.poll(1, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); continue; } if(socket == null) continue; AioSession session = new AioSession(nextServiceNameNum.getAndIncrement(), socket); sessions.add(session); receive(session); } if(sessions.size() > 0){ Iterator<AioSession> sessions = new ArrayList<AioSession>(this.sessions).iterator(); while(run && sessions.hasNext()){ AioSession session = sessions.next(); serviceId = session.getId(); if(session.isRead()) receive(session); else send(session); serviceId = 0; } } }; } private void receive(AioSession session){ receive(session, false); } private void receive(AioSession session, boolean reset){ if(reset) session.setFuture(null, true); Future<Integer> future = session.getFuture(); if(future == null || future.isDone()){ String request = receive("客户端请求", session); if(request == null) return ; if(request.equals("disconnect")){ closeSocket(session); return ; } String response = null; if(request.startsWith("error")) { response = "请求格式不正确!"; } else { log("接收到客户端请求:" + request); response = service(request); if(response == null){ closeSocket(session); return ; } } if(!send("响应", response, session)) closeSocket(session); } else if(future.isCancelled()){ closeSocket(session); log("接收客户端请求已中止!"); } } private void send(AioSession session){ Future<Integer> future = session.getFuture(); if(future.isDone()){ log("发送响应成功!"); // 重置future receive(session, true); } else if(future.isCancelled()){ closeSocket(session); log("发送响应已中止!"); } } protected void closeSocket(AioSession session) { Future<Integer> future = session.getFuture(); if(future != null && !future.cancel(false)){ while(!(future.isDone() || future.isCancelled())){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } session.setFuture(null, true); } try { session.getSocket().close(); } catch (IOException e) { e.printStackTrace(); } sessions.remove(session); log("与客户端断开连接!"); } public void add(AsynchronousSocketChannel socket) throws InterruptedException { sockets.put(socket); } public void clear(){ while(!sockets.isEmpty()){ AsynchronousSocketChannel socket = null; try { socket = sockets.poll(1, TimeUnit.MINUTES); } catch (InterruptedException e) { e.printStackTrace(); } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } } } if(sessions.size() > 0){ Iterator<AioSession> sessions = new ArrayList<AioSession>(this.sessions).iterator(); while(sessions.hasNext()){ AioSession session = sessions.next(); closeSocket(session); } } } } } public abstract class AioClient extends AioSocketApp implements Client { public AioClient() { } public String getName(){ return "Aio客户端"; } AsynchronousSocketChannel socket = null; AioSession session = null; public void run(){ try { while(connect() && running()){ closeConnect(); } } finally { closeConnect(); } } private boolean connect(){ while(run){ Future<Void> future = null; try { try { socket = AsynchronousSocketChannel.open(); } catch (IOException e) { e.printStackTrace(); closeConnect(); return false; } future = socket.connect(new InetSocketAddress("localhost", 8080)); while(run){ if(future.isDone()){ session = new AioSession(socket); log("连接服务器成功!"); return true; } if(future.isCancelled()){ log("连接服务器终止!"); } } } finally { if(future != null && !future.cancel(false)){ while(!(future.isCancelled() || future.isDone())){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } try { Thread.sleep(1000); } catch (InterruptedException e1) { e1.printStackTrace(); } } } return false; } private void closeConnect(){ if(session != null){ Future<Integer> future = session.getFuture(); if(future != null){ while(!(future.isCancelled() || future.isDone())){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } } session = null; } if(socket != null){ try { socket.close(); } catch (IOException e) { e.printStackTrace(); } socket = null; } } private boolean running(){ String response = null; String request = running(response); // 循环请求 while(run){ // 退出运行 if(request == null) break ; // 清空响应 response = null; // 发送请求 if(send("请求", request, session)){ Future<Integer> future = session.getFuture(); while(run){ if(future.isDone()){ log("发送请求成功!"); break; } if(future.isCancelled()){ log("发送请求已中止!"); return run; } try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } // 接收响应 future = null; while(run){ if(future == null || future.isDone()){ response = receive("服务器响应", session); if(response == null) { future = session.getFuture(); continue ; } if(response.equals("disconnect")) return run; if(response.startsWith("error")) { request = "请求格式不正确!"; } else { log("接收到服务器响应:" + response); request = running(response); } break ; } else if(future.isCancelled()) { log("接收客户端响应已中止!"); return true; } } } } return run; } } }
  • 相关阅读:
    C#异步调用的好处和方法
    asp.net运行原理
    基于jQuery的AJAX和JSON的实例
    Slq Server创建索引
    SQL查询一个表中类别字段中Max()最大值对应的记录
    优化SQL Server的内存占用之执行缓存
    ASP.NET缓存
    .NET Framework 4.5新特性
    ADO.NET 连接数据库字符串(Oracle、SqlServer、Access、ODBC)
    sql server基础
  • 原文地址:https://www.cnblogs.com/hvicen/p/6208046.html
Copyright © 2011-2022 走看看