zoukankan      html  css  js  c++  java
  • Java网络编程--echo服务器

    客户端使用Java的阻塞IO

    服务端使用Java的非阻塞NIO

    package com.nio.echo;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.InetSocketAddress;
    import java.net.Socket;
    import java.util.Scanner;
    
    /**
     * @author 作者 E-mail:
     * @version 创建时间:2015-10-29 下午02:49:47 类说明
     */
    public class EchoClient
    {
        public static final String REMOT_IP = "127.0.0.1";
    
        public static final int REMOTE_PORT = 8080;
    
        public void connectServer() throws IOException
        {
            Socket socket = new Socket();
    
            socket.connect(new InetSocketAddress(REMOT_IP, REMOTE_PORT));
    
            if (socket.isConnected())
            {
                System.out.println("connect remote address success");
            }
    
            // 启动线程监听server端消息
            new Thread(new client2server(socket)).start();
            Scanner scanner = new Scanner(System.in);
    
            OutputStream output = socket.getOutputStream();
            while (true)
            {
                String str = scanner.nextLine();
    
                if (str.equals("quit"))
                {
                    socket.close();
                    break;
                }
                output.write(str.getBytes("UTF-8"));
    
            }
    
        }
    
        public static void main(String[] args) throws IOException
        {
            new EchoClient().connectServer();
        }
    }
    
    class client2server implements Runnable
    {
        private Socket socket = null;
    
        public client2server(Socket socket)
        {
            this.socket = socket;
        }
    
        @Override
        public void run()
        {
            InputStream inputStream;
            try
            {
                inputStream = socket.getInputStream();
                byte[] bytes = new byte[1024];
                while (true)
                {
                    int num = inputStream.read(bytes);
                    if (num != -1)
                    {
                        System.out.print(num + " ");
                    }
                    else
                    {
                        System.out.println("server is shutup");
                        break;
                    }
    
                    String str = new String(bytes, 0, num, "UTF-8");
                    System.out.println("get data: " + str);
    
                }
            }
            catch(IOException e)
            {
                e.printStackTrace();
            }
    
        }
    }
    
    package com.nio.echo;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.nio.ByteBuffer;
    import java.nio.CharBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetDecoder;
    import java.util.Iterator;
    import java.util.Set;
    
    /**
     * @author 作者 E-mail:
     * @version 创建时间:2015-10-29 下午02:49:12 类说明
     */
    public class NIOEchoServer
    {
        private static ServerSocketChannel ssc = null;
    
        private static Selector selector = null;
    
        private static final int PORT = 8080;
    
        public static void startServer() throws IOException
        {
            ssc = ServerSocketChannel.open();
            selector = Selector.open();
            ssc.configureBlocking(false);
    
            // nio 对socket 和serverSocket进行了怎样封装
            final ServerSocket serverSocket = ssc.socket();
    
            serverSocket.bind(new InetSocketAddress(PORT));
            serverSocket.setReuseAddress(true);
    
            final AcceptHandler acceptHandler = new AcceptHandler();
            ssc.register(selector, SelectionKey.OP_ACCEPT, acceptHandler);
            while (true)
            {
                int n = selector.select();
                if (n == 0)
                    continue;
    
                final Set<SelectionKey> readyKeys = selector.selectedKeys();
                final Iterator<SelectionKey> it = readyKeys.iterator();
                while (it.hasNext())
                {
                    final SelectionKey key = it.next();
                    final Handle handler = (Handle) key.attachment();
                    handler.doHandle(key);
                    it.remove();
                }
            }
        }
    
        public static void main(String[] args) throws IOException
        {
            NIOEchoServer.startServer();
        }
    }
    
    interface Handle
    {
        void doHandle(SelectionKey key) throws IOException;
    }
    
    class AcceptHandler implements Handle
    {
    
        @Override
        public void doHandle(SelectionKey key) throws IOException
        {
    
            final ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            final SocketChannel sc = ssc.accept();
            final IOHandler handler = new IOHandler(key.selector(), sc);
            System.out.println("server: connect success");
        }
    
    }
    
    class IOHandler implements Handle
    {
        private final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    
        private OutputBuffer outputBuffer = new OutputBuffer();
    
        private SocketChannel socketChannel = null;
    
        // private Selector selector = null;
    
        private SelectionKey key = null;
    
        public IOHandler(Selector selector, SocketChannel sc) throws IOException
        {
            // this.selector = selector;
            this.socketChannel = sc;
            socketChannel.configureBlocking(false);
            key = socketChannel.register(selector, SelectionKey.OP_READ, this);
        }
    
        /**
         * 增加输出缓存
         * 
         * @param writeData
         *            要写出的数据
         * @throws IOException
         * @return 返回处理的字节数
         */
        private int addWriteBuffer(ByteBuffer bytebuffer, int num) throws IOException
        {
            int prevPositon = bytebuffer.position();
            outputBuffer.size += num;
    
            outputBuffer.writeBuffer.put(bytebuffer).flip();
            int nowPosition = bytebuffer.position();
    
            this.interestOps(0, SelectionKey.OP_WRITE);
    
            return nowPosition - prevPositon;
        }
    
        /**
         * 增加删除相应事件
         * 
         * @param remove
         * @param add
         */
        private void interestOps(int remove, int add)
        {
            int cur = key.interestOps();
            int ops = (cur & ~remove) | add;
            if (cur != ops)
            {
                key.interestOps(ops);
                key.selector().wakeup();
            }
        }
    
        /**
         * ByteBuffer 转换 String
         * 
         * @param buffer
         * @return
         */
        public static String getString(ByteBuffer buffer)
        {
            Charset charset = null;
            CharsetDecoder decoder = null;
            CharBuffer charBuffer = null;
            try
            {
                charset = Charset.forName("UTF-8");
                decoder = charset.newDecoder();
                // charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空
                charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
                return charBuffer.toString();
            }
            catch(Exception ex)
            {
                ex.printStackTrace();
                return "";
            }
        }
    
        @Override
        public void doHandle(SelectionKey key) throws IOException
        {
            if (key.isReadable())
            {
                System.out.print("server:  meet read event ,before read position = " + readBuffer.position());
    
                int num = socketChannel.read(readBuffer);
    
                // 关闭
                if (num == -1)
                {
                    System.out.println("close the channel ");
                    key.channel();
                    key.channel().close();
                    return;
                }
    
                // 将position置为0
                readBuffer.flip();
    
                System.out.print(" reveive data " + getString(readBuffer));
    
                int dealsize = addWriteBuffer(readBuffer, num);
    
                System.out.println(" write to writeBuffer size = " + dealsize + " nowPostion = " + readBuffer.position());
    
                // 将处理过的数据清除
                readBuffer.compact();
            }
            else if (key.isWritable())
            {
                System.out.print("meet write event");
    
                long num = socketChannel.write(outputBuffer.writeBuffer);
                outputBuffer.size -= num;
    
                System.out.print("deal size = " + num + "left buffer size = " + outputBuffer.size);
                if (outputBuffer.size == 0)
                {
                    System.out.println(" deal over,cancel write event");
                    interestOps(SelectionKey.OP_WRITE, 0);
                }
                // 清除已经处理过的数据
                outputBuffer.writeBuffer.compact();
    
            }
        }
    }
    
    class OutputBuffer
    {
        public int size;
    
        public final ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
    
    }
    

    ByteBuffer没有提供有用数据的相关方法,只能自己写一个OutputBuffer来辅助处理

    之前OutputBuffer只是封装了一个ByteBuffer以及一个size变量用于标示可以数据量

    下面对OutputBuffer进行了重构,将size变量的修改以及数据的写入和写出操作都封装到方法中,其中output(SocketChannel socketChannel)

    方法利用回调的思想,将socketChannel对象传入,在OutputBuffer当中实现数据的write输出

    package com.nio.echo;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.nio.ByteBuffer;
    import java.nio.CharBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetDecoder;
    import java.util.Iterator;
    import java.util.Set;
    
    /**
     * @author 作者 E-mail:
     * @version 创建时间:2015-10-29 下午02:49:12 类说明
     */
    public class NIOEchoServer
    {
        private static ServerSocketChannel ssc = null;
    
        private static Selector selector = null;
    
        private static final int PORT = 8080;
    
        public static void startServer() throws IOException
        {
            ssc = ServerSocketChannel.open();
            selector = Selector.open();
            ssc.configureBlocking(false);
    
            // nio 对socket 和serverSocket进行了怎样封装
            final ServerSocket serverSocket = ssc.socket();
    
            serverSocket.bind(new InetSocketAddress(PORT));
            serverSocket.setReuseAddress(true);
    
            final AcceptHandler acceptHandler = new AcceptHandler();
            ssc.register(selector, SelectionKey.OP_ACCEPT, acceptHandler);
            while (true)
            {
                int n = selector.select();
                if (n == 0)
                    continue;
    
                final Set<SelectionKey> readyKeys = selector.selectedKeys();
                final Iterator<SelectionKey> it = readyKeys.iterator();
                while (it.hasNext())
                {
                    final SelectionKey key = it.next();
                    final Handle handler = (Handle) key.attachment();
                    handler.doHandle(key);
                    it.remove();
                }
            }
        }
    
        public static void main(String[] args) throws IOException
        {
            NIOEchoServer.startServer();
        }
    }
    
    interface Handle
    {
        void doHandle(SelectionKey key) throws IOException;
    }
    
    class AcceptHandler implements Handle
    {
    
        @Override
        public void doHandle(SelectionKey key) throws IOException
        {
    
            final ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
            final SocketChannel sc = ssc.accept();
            final IOHandler handler = new IOHandler(key.selector(), sc);
            System.out.println("server: connect success");
        }
    
    }
    
    class IOHandler implements Handle
    {
        private final ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    
        private OutputBuffer outputBuffer = new OutputBuffer();
    
        private SocketChannel socketChannel = null;
    
        // private Selector selector = null;
    
        private SelectionKey key = null;
    
        public IOHandler(Selector selector, SocketChannel sc) throws IOException
        {
            // this.selector = selector;
            this.socketChannel = sc;
            socketChannel.configureBlocking(false);
            key = socketChannel.register(selector, SelectionKey.OP_READ, this);
        }
    
        /**
         * 增加输出缓存
         * 
         * @param writeData
         *            要写出的数据
         * @throws IOException
         * @return 返回处理的字节数
         */
        private int addWriteBuffer(ByteBuffer bytebuffer, int num) throws IOException
        {
            int prevPositon = bytebuffer.position();
    
            outputBuffer.put(bytebuffer, num);
    
            int nowPosition = bytebuffer.position();
    
            this.interestOps(0, SelectionKey.OP_WRITE);
    
            return nowPosition - prevPositon;
        }
    
        /**
         * 增加删除相应事件
         * 
         * @param remove
         * @param add
         */
        private void interestOps(int remove, int add)
        {
            int cur = key.interestOps();
            int ops = (cur & ~remove) | add;
            if (cur != ops)
            {
                key.interestOps(ops);
                key.selector().wakeup();
            }
        }
    
        /**
         * ByteBuffer 转换 String
         * 
         * @param buffer
         * @return
         */
        public static String getString(ByteBuffer buffer)
        {
            Charset charset = null;
            CharsetDecoder decoder = null;
            CharBuffer charBuffer = null;
            try
            {
                charset = Charset.forName("UTF-8");
                decoder = charset.newDecoder();
                // charBuffer = decoder.decode(buffer);//用这个的话,只能输出来一次结果,第二次显示为空
                charBuffer = decoder.decode(buffer.asReadOnlyBuffer());
                return charBuffer.toString();
            }
            catch(Exception ex)
            {
                ex.printStackTrace();
                return "";
            }
        }
    
        @Override
        public void doHandle(SelectionKey key) throws IOException
        {
            if (key.isReadable())
            {
                System.out.print("server:  meet read event ,before read position = " + readBuffer.position());
    
                int num = socketChannel.read(readBuffer);
    
                // 关闭
                if (num == -1)
                {
                    System.out.println("close the channel ");
                    key.channel();
                    key.channel().close();
                    return;
                }
    
                // 将position置为0
                readBuffer.flip();
    
                System.out.println(" reveive data " + getString(readBuffer));
    
                int dealsize = addWriteBuffer(readBuffer, num);
    
                // 将处理过的数据清除
                readBuffer.compact();
            }
            else if (key.isWritable())
            {
                System.out.print("meet write event");
    
                // 写数据
                outputBuffer.output(socketChannel);
    
                if (outputBuffer.size() == 0)
                {
                    System.out.println(" deal over,cancel write event");
                    interestOps(SelectionKey.OP_WRITE, 0);
                }
    
            }
        }
    }
    
    class OutputBuffer
    {
        private int size;
    
        private final ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
    
        public void output(SocketChannel socketChannel) throws IOException
        {
            int num = socketChannel.write(writeBuffer);
            writeBuffer.compact();
            size -= num;
        }
    
        public void put(ByteBuffer b, int num)
        {
            writeBuffer.put(b).flip();
            this.size += num;
        }
    
        public int size()
        {
            return this.size;
        }
    }
    

      

    事实上在NIO网络编程中,写出数据的操作需要加入缓存才能保证效率,目的是为了写操作发生的时候不影响业务继续send消息,首先将send消息发送过来的数据缓存到A中,在写事件发生的时候将A中数据写出(此时仅短暂锁住A,将A中引用拿出,重新赋值新引用给A),这样写事件的处理过程和业务消息的send就可以高并发的进行。

  • 相关阅读:
    浅入ABP(1):搭建基础结构的 ABP 解决方案
    浅入ABP(2):添加基础集成服务
    GDB 调试 .NET 程序实录
    浅入 ABP 系列(6):数据库配置
    浅入 ABP 系列(4):事件总线
    浅入 ABP系列(3):增加日志组件、依赖注入服务
    模拟IIC总线多设备挂载(12864OLED屏幕+GY30光照传感器)
    RabbitMQ与Kafka选型对比
    .Net在Windows上使用Jenkins做CI/CD的那些事
    Tomcat乱码问题
  • 原文地址:https://www.cnblogs.com/wuxinliulei/p/4923148.html
Copyright © 2011-2022 走看看