zoukankan      html  css  js  c++  java
  • 【Netty整理03-NIO】Java 实现 NIO demo

    jdk提供的NIO使用:

    概览:https://blog.csdn.net/the_fool_/article/details/83000648

    博主抄写了网上的demo,略作修改与调整,原文链接:

    Demo01:抄写地址忘记了。。。

    1、服务端代码:

    package com.test3;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.util.Iterator;
    
    /**
     * NIO服务端代码
     * @author http://
     * @author ZX
     * 监听客户端连接,接收、发送消息
     *
     */
    public class AServer {
        public static  void  main(String[]args)throws Exception{
            System.out.println("=================");
            //创建选择器
            Selector selector = Selector.open();
            //创建打开服务端的监听
            ServerSocketChannel sChannel = ServerSocketChannel.open();
            //绑定本地地址
            sChannel.socket().bind(new InetSocketAddress(9999));
            //设置非阻塞模式
            sChannel.configureBlocking(false);
            //将通道绑定到选择器,非阻塞通道才能注册到选择器,第二个参数好像是方式或者操作吧
            sChannel.register(selector, SelectionKey.OP_ACCEPT);
    
            TcpProtocal protocol = new TcpProtocal();
            //循环监听等待
            while (true){
                //
                if(selector.select(3000)==0){
                    System.out.println("继续等待");
                    continue;
                }
                //间听到的可操作集合
                Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
                while (keyIter.hasNext()){
                    SelectionKey key = keyIter.next();
                    //这是干嘛的?获取下下个?一点用没有啊???
                    /*  SelectionKey key1;
                    if(keyIter.hasNext()){
                        key1 = keyIter.next();
                    }*/
                    try {
                        //如果有客户端连接请求
                        if(key.isAcceptable()){
                            protocol.handleAccept(key);
                        }
                        //如果有数据发送
                        if(key.isReadable()){
                            protocol.handleRead(key);
                            protocol.handleWrite(key);
                        }
                        //是否有效,是否可发送给客户端
                        if(key.isValid()&&key.isWritable()){
                            //protocol.handleWriteMsg(key,"我服务端在这里说点事情");
                        }
                    }catch (IOException e){
                        keyIter.remove();
                        e.printStackTrace();
                        continue;
                    }
                    //删除处理过的键
                    keyIter.remove();
    
    
    
                }
    
            }
    
        }
    }
    

    2、协议:

    package com.test3;
    
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    
    /**
     * 定义一个协议传输信息
     */
    public class TcpProtocal {
        private int bufferSize=1024;
        /** 接受一个SocketChannel处理*/
        public void handleAccept(SelectionKey key) throws Exception{
            //返回创建此键的通道,接收客户端建立连接的请求,并返回socketChannel对象
            SocketChannel clientChannel=((ServerSocketChannel)key.channel()).accept();
            //设置非阻塞
            clientChannel.configureBlocking(false);
            //注册到selector
            clientChannel.register(key.selector(),SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));
    
        }
        /** 从一个一个SocketChannel读取信息*/
        public void handleRead(SelectionKey key) throws Exception{
            //获得与客户端通信的通道
            SocketChannel clientChannel=(SocketChannel)key.channel();
            //得到并清空缓冲区并清空缓冲区
            ByteBuffer buffer= (ByteBuffer)key.attachment();
            buffer.clear();
            //读取信息获得的字节数
            int byteRead = clientChannel.read(buffer);
            if(byteRead==-1){
                clientChannel.close();
            }else{
                //将缓冲区准备为数据传状态
                buffer.flip();
                //将字节转换为UTF-16
                String receivedStr = Charset.forName("UTF-8").newDecoder().decode(buffer).toString();
                System.out.println("接收到来自"+clientChannel.socket().getRemoteSocketAddress()+"信息"+receivedStr);
                key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
                /*String sendStr ="已收到信息";
                buffer=ByteBuffer.wrap(sendStr.getBytes("UTF-8"));
                clientChannel.write(buffer);
                //设置为下依稀读取写入做准备
                key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);*/
            }
    
    
        }
        /** 向一个一个SocketChannel写入信息*/
        public void handleWrite(SelectionKey key) throws Exception{
            handleWriteMsg(key,null);
    
        }
    
        /** 向一个一个SocketChannel写入信息*/
        public void handleWriteMsg(SelectionKey key,String msg) throws Exception{
            if(msg==null||"".equals(msg)){
                msg="服务器主动说:已收到建立请求消息";
            }
            //获得与客户端通信的通道
            SocketChannel clientChannel=(SocketChannel)key.channel();
            //得到并清空缓冲区并清空缓冲区
            ByteBuffer buffer= (ByteBuffer)key.attachment();
            buffer.clear();
            String sendStr =msg;
            buffer=ByteBuffer.wrap(sendStr.getBytes("UTF-8"));
            clientChannel.write(buffer);
            //设置为下依稀读取写入做准备
            key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
    
        }
    }
    

    3、客户端处理类:

    package com.test3;
    
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    
    public class TcpClientReadThread implements Runnable{
        private Selector selector;
    
        public TcpClientReadThread(Selector selector){
            this.selector=selector;
            new Thread(this).start();
        }
    
        public void run() {
            try {
                //select()方法只能使用一次,用过之后就会删除,每个连接到服务器的选择器都是独立的
                while (selector.select()>0){
                    //遍历所有可以IO操作做的Channel对应的selectionKey
                    for(SelectionKey sk:selector.selectedKeys()){
                        //如果数据可读
                        if(sk.isReadable()){
                            //使用NIO读取Channel中可读数据
                            //获取通道信息
                            SocketChannel sc =  (SocketChannel)sk.channel();
                            //创建缓冲区
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            //读取数据到缓冲区
                            sc.read(buffer);
                            //吊用此方法为读取写入做准备
                            buffer.flip();
    
                            String receiveStr = Charset.forName("UTF-8").newDecoder().decode(buffer).toString();
                            System.out.println("收到服务器"+sc.socket().getRemoteSocketAddress()+"信息"+receiveStr);
                            //为下一次读取做准备
                            sk.interestOps(SelectionKey.OP_READ);
                            //删除正在处理的selectionKey
                            selector.selectedKeys().remove(sk);
    
    
                        }
    
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    

    4、客户端代码:

    package com.test3;
    
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Scanner;
    
    public class AClient {
    
        private Selector selector;
        private SocketChannel socketChannel;
    
        private String hostIp;
        private  int hostPort;
    
        public AClient(String hostIp,int hostPort)throws Exception{
            this.hostIp=hostIp;
            this.hostPort=hostPort;
            init();
    
        }
        public void init()throws Exception{
            socketChannel=SocketChannel.open(new InetSocketAddress(hostIp,hostPort));
            socketChannel.configureBlocking(false);
            //打开并注册选择器信道、
            selector= Selector.open();
            socketChannel.register(selector,SelectionKey.OP_READ);
            //启动读取线程
            new TcpClientReadThread(selector);
    
    
        }
    
        /**
         *发送字符串到服务器
         */
        public void sendMsg(String msgg)throws Exception{
            ByteBuffer writeBuffer = ByteBuffer.wrap(msgg.getBytes("UTF-8"));
            socketChannel.write(writeBuffer);
        }
        static AClient aClient;
        static  boolean mFlag=true;
        public static void main(String[]args)throws Exception{
            aClient=new AClient("127.0.0.1",9999);
    
    
            new Thread(){
                @Override
                public void run() {
                    try {
                        aClient.sendMsg("客户端======");
                        while (mFlag){
                            Scanner sc = new Scanner(System.in);
                            String next = sc.next();
                            aClient.sendMsg(next);
                        }
                    }catch (Exception e){
                        mFlag=false;
                        e.printStackTrace();
                    }
                }
            }.start();
        }
    }
    

    =================================================================================================

    DEMO02:这个看起来更清楚一些,重新补充如下demo:

    摘抄地址:

    作者:anxpp
    来源:CSDN
    原文:https://blog.csdn.net/anxpp/article/details/51512200?utm_source=copy

    Server:

    package javanio;
    
    import javax.script.ScriptEngine;
    import javax.script.ScriptEngineManager;
    import javax.script.ScriptException;
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    /**NIO 步骤*/
    
    /**            打开ServerSocketChannel,监听客户端连接
                绑定监听端口,设置连接为非阻塞模式
                创建Reactor线程,创建多路复用器并启动线程
                将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件
                Selector轮询准备就绪的key
                Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,简历物理链路
                设置客户端链路为非阻塞模式
                将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息
                异步读取客户端消息到缓冲区
                对Buffer编解码,处理半包消息,将解码成功的消息封装成Task
                将应答消息编码为Buffer,调用SocketChannel的write将消息异步发送给客户端
            ---------------------
            作者:anxpp
            来源:CSDN
            原文:https://blog.csdn.net/anxpp/article/details/51512200?utm_source=copy
            版权声明:本文为博主原创文章,转载请附上博文链接!*/
    public class Server {
        private static int DEFAULT_PORT = 8081;
        private static ServerHandle serverHandle;
        public static void start(){
            start(DEFAULT_PORT);
        }
        public static synchronized void start(int port){
            if(serverHandle!=null){
                serverHandle.stop();
            }
    
            serverHandle = new ServerHandle(port);
            new Thread(serverHandle,"Server").start();
        }
        public static void main(String[] args){
            start();
        }
    
    
    }
    
    class ServerHandle implements Runnable{
        private Selector selector;
        private ServerSocketChannel serverChannel;
        private volatile boolean started;
    
        /**
         * 构造方法
         * @param port 指定要监听的端口号
         */
        public ServerHandle(int port) {
            try{
                //创建选择器
                selector = Selector.open();
                //打开ServerSocketChannel,监听客户端连接
                serverChannel = ServerSocketChannel.open();
                //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
                serverChannel.configureBlocking(false);
                //绑定端口 backlog设为1024
                serverChannel.socket().bind(new InetSocketAddress(port),1024);
                //监听客户端连接请求
                serverChannel.register(selector, SelectionKey.OP_ACCEPT);
                //标记服务器已开启
                started = true;
                System.out.println("SERVER START,PORT:" + port);
            }catch(IOException e){
                e.printStackTrace();
                System.exit(1);
            }
        }
        public void stop(){
            started = false;
        }
    
    
        @Override
        public void run() {
            //循环遍历selector
            while(started){
                try{
                    //无论是否有读写事件发生,selector每隔1s被唤醒一次
                    selector.select(1000);
                    //阻塞,只有当至少一个注册的事件发生的时候才会继续.
    //				selector.select();
                    //返回已此通道已准备就绪的键集,已选择始终是键集的一个子集
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
                    SelectionKey key = null;
                    //处理所有的
                    while(it.hasNext()){
                        key = it.next();
                        it.remove();
                        try{
                            handleInput(key);
                        }catch(Exception e){
                            if(key != null){
                                key.cancel();
                                if(key.channel() != null){
                                    key.channel().close();
                                }
                            }
                        }
                    }
                }catch(Throwable t){
                    t.printStackTrace();
                }
            }
            //selector关闭后会自动释放里面管理的资源
            if(selector != null)
                try{
                    selector.close();
                }catch (Exception e) {
                    e.printStackTrace();
                }
    
    
        }
    
        private void handleInput(SelectionKey key) throws IOException{
            if(key.isValid()){
                //处理新接入的请求消息
                if(key.isAcceptable()){
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    //通过ServerSocketChannel的accept创建SocketChannel实例
                    //完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
                    SocketChannel sc = ssc.accept();
                    //设置为非阻塞的
                    sc.configureBlocking(false);
                    //注册为读
                    sc.register(selector, SelectionKey.OP_READ);
                }
                //读消息
                if(key.isReadable()){
                    SocketChannel sc = (SocketChannel) key.channel();
                    //创建ByteBuffer,并开辟一个1M的缓冲区
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    //读取请求码流,返回读取到的字节数
                    int readBytes = sc.read(buffer);
                    //读取到字节,对字节进行编解码
                    if(readBytes>0){
                        //将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
                        buffer.flip();
                        //根据缓冲区可读字节数创建字节数组
                        byte[] bytes = new byte[buffer.remaining()];
                        //将缓冲区可读字节数组复制到新建的数组中
                        buffer.get(bytes);
                        String expression = new String(bytes,"UTF-8");
                        System.out.println("SERVER GET MSSG:" + expression);
                        //处理数据
                        String result = null;
                        try{
                            result = Calculator.cal(expression).toString();
                        }catch(Exception e){
                            result = "CAL ERR:" + e.getMessage();
                        }
                        //发送应答消息
                        doWrite(sc,result);
                    }
                    //没有读取到字节 忽略
    //				else if(readBytes==0);
                    //链路已经关闭,释放资源
                    else if(readBytes<0){
                        key.cancel();
                        sc.close();
                    }
                }
            }
        }
        //异步发送应答消息
        private void doWrite(SocketChannel channel,String response) throws IOException{
            //将消息编码为字节数组
            byte[] bytes = response.getBytes();
            //根据数组容量创建ByteBuffer
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            //将字节数组复制到缓冲区
            writeBuffer.put(bytes);
            //flip操作
            writeBuffer.flip();
            //发送缓冲区的字节数组
            channel.write(writeBuffer);
            //****此处不含处理“写半包”的代码
        }
    
    
    
        void test(){
    
        }
    
    }
     class Calculator {
        private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
        public static Object cal(String expression) throws ScriptException {
            return jse.eval(expression);
        }
    }
    
    

    Client:

    package javanio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class Client {
    
        private static String DEFAULT_HOST = "127.0.0.1";
        private static int DEFAULT_PORT = 8081;
        private static ClientHandle clientHandle;
        public static void start(){
            start(DEFAULT_HOST,DEFAULT_PORT);
        }
        public static synchronized void start(String ip,int port){
            if(clientHandle!=null)
                clientHandle.stop();
            clientHandle = new ClientHandle(ip,port);
            new Thread(clientHandle,"Server").start();
        }
        //向服务器发送消息
        public static boolean sendMsg(String msg) throws Exception{
            if(msg.equals("q")) return false;
            clientHandle.sendMsg(msg);
            return true;
        }
        public static void main(String[] args){
            start();
        }
    
    
    }
    class ClientHandle implements Runnable{
        private String host;
        private int port;
        private Selector selector;
        private SocketChannel socketChannel;
        private volatile boolean started;
    
        public ClientHandle(String ip,int port) {
            this.host = ip;
            this.port = port;
            try{
                //创建选择器
                selector = Selector.open();
                //打开监听通道
                socketChannel = SocketChannel.open();
                //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
                socketChannel.configureBlocking(false);//开启非阻塞模式
                started = true;
            }catch(IOException e){
                e.printStackTrace();
                System.exit(1);
            }
        }
        public void stop(){
            started = false;
        }
        @Override
        public void run() {
            try{
                doConnect();
            }catch(IOException e){
                e.printStackTrace();
                System.exit(1);
            }
            //循环遍历selector
            while(started){
                try{
                    //无论是否有读写事件发生,selector每隔1s被唤醒一次
                    selector.select(1000);
                    //阻塞,只有当至少一个注册的事件发生的时候才会继续.
    //				selector.select();
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
                    SelectionKey key = null;
                    while(it.hasNext()){
                        key = it.next();
                        it.remove();
                        try{
                            handleInput(key);
                        }catch(Exception e){
                            if(key != null){
                                key.cancel();
                                if(key.channel() != null){
                                    key.channel().close();
                                }
                            }
                        }
                    }
                }catch(Exception e){
                    e.printStackTrace();
                    System.exit(1);
                }
            }
            //selector关闭后会自动释放里面管理的资源
            if(selector != null)
                try{
                    selector.close();
                }catch (Exception e) {
                    e.printStackTrace();
                }
        }
        private void handleInput(SelectionKey key) throws IOException{
            if(key.isValid()){
                SocketChannel sc = (SocketChannel) key.channel();
                if(key.isConnectable()){
                    if(sc.finishConnect());
                    else System.exit(1);
                }
                //读消息
                if(key.isReadable()){
                    //创建ByteBuffer,并开辟一个1M的缓冲区
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    //读取请求码流,返回读取到的字节数
                    int readBytes = sc.read(buffer);
                    //读取到字节,对字节进行编解码
                    if(readBytes>0){
                        //将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
                        buffer.flip();
                        //根据缓冲区可读字节数创建字节数组
                        byte[] bytes = new byte[buffer.remaining()];
                        //将缓冲区可读字节数组复制到新建的数组中
                        buffer.get(bytes);
                        String result = new String(bytes,"UTF-8");
                        System.out.println("CLIENT GET MSG:" + result);
                    }
                    //没有读取到字节 忽略
    //				else if(readBytes==0);
                    //链路已经关闭,释放资源
                    else if(readBytes<0){
                        key.cancel();
                        sc.close();
                    }
                }
            }
        }
        //异步发送消息
        private void doWrite(SocketChannel channel,String request) throws IOException{
            //将消息编码为字节数组
            byte[] bytes = request.getBytes();
            //根据数组容量创建ByteBuffer
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            //将字节数组复制到缓冲区
            writeBuffer.put(bytes);
            //flip操作
            writeBuffer.flip();
            //发送缓冲区的字节数组
            channel.write(writeBuffer);
            //****此处不含处理“写半包”的代码
        }
        private void doConnect() throws IOException{
            if(socketChannel.connect(new InetSocketAddress(host,port)));
            else socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
        public void sendMsg(String msg) throws Exception{
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel, msg);
        }
    
    
    }

    Test:

    package javanio;
    
    import java.util.Scanner;
    
    public class Test {
        public static void main(String[] args) throws Exception{
            //运行服务器
            Server.start();
            //避免客户端先于服务器启动前执行代码
            Thread.sleep(100);
            //运行客户端
            Client.start();
            while(Client.sendMsg(new Scanner(System.in).nextLine()));
        }
    
    
    }
    

    demo3:https://blog.csdn.net/Howinfun/article/details/81283721

  • 相关阅读:
    chrome浏览器中安装以及使用Elasticsearch head 插件
    windows10 升级并安装配置 jmeter5.3
    linux下部署Elasticsearch6.8.1版本的集群
    【Rollo的Python之路】Python 爬虫系统学习 (八) logging模块的使用
    【Rollo的Python之路】Python 爬虫系统学习 (七) Scrapy初识
    【Rollo的Python之路】Python 爬虫系统学习 (六) Selenium 模拟登录
    【Rollo的Python之路】Python 爬虫系统学习 (五) Selenium
    【Rollo的Python之路】Python 爬虫系统学习 (四) XPath学习
    【Rollo的Python之路】Python 爬虫系统学习 (三)
    【Rollo的Python之路】Python sys argv[] 函数用法笔记
  • 原文地址:https://www.cnblogs.com/the-fool/p/11054120.html
Copyright © 2011-2022 走看看