zoukankan      html  css  js  c++  java
  • 【Netty整理03-NIO】Java 实现 NIO demo

    jdk提供的NIO使用:

    概览:https://blog.csdn.net/the_fool_/article/details/83000648

    博主抄写了网上的demo,略作修改与调整,原文链接:

    Demo01:抄写地址忘记了。。。

    1、服务端代码:

    package com.test3;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.util.Iterator;
    
    /**
     * NIO服务端代码
     * @author http://
     * @author ZX
     * 监听客户端连接,接收、发送消息
     *
     */
    public class AServer {
        public static  void  main(String[]args)throws Exception{
            System.out.println("=================");
            //创建选择器
            Selector selector = Selector.open();
            //创建打开服务端的监听
            ServerSocketChannel sChannel = ServerSocketChannel.open();
            //绑定本地地址
            sChannel.socket().bind(new InetSocketAddress(9999));
            //设置非阻塞模式
            sChannel.configureBlocking(false);
            //将通道绑定到选择器,非阻塞通道才能注册到选择器,第二个参数好像是方式或者操作吧
            sChannel.register(selector, SelectionKey.OP_ACCEPT);
    
            TcpProtocal protocol = new TcpProtocal();
            //循环监听等待
            while (true){
                //
                if(selector.select(3000)==0){
                    System.out.println("继续等待");
                    continue;
                }
                //间听到的可操作集合
                Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator();
                while (keyIter.hasNext()){
                    SelectionKey key = keyIter.next();
                    //这是干嘛的?获取下下个?一点用没有啊???
                    /*  SelectionKey key1;
                    if(keyIter.hasNext()){
                        key1 = keyIter.next();
                    }*/
                    try {
                        //如果有客户端连接请求
                        if(key.isAcceptable()){
                            protocol.handleAccept(key);
                        }
                        //如果有数据发送
                        if(key.isReadable()){
                            protocol.handleRead(key);
                            protocol.handleWrite(key);
                        }
                        //是否有效,是否可发送给客户端
                        if(key.isValid()&&key.isWritable()){
                            //protocol.handleWriteMsg(key,"我服务端在这里说点事情");
                        }
                    }catch (IOException e){
                        keyIter.remove();
                        e.printStackTrace();
                        continue;
                    }
                    //删除处理过的键
                    keyIter.remove();
    
    
    
                }
    
            }
    
        }
    }
    

    2、协议:

    package com.test3;
    
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    
    /**
     * 定义一个协议传输信息
     */
    public class TcpProtocal {
        private int bufferSize=1024;
        /** 接受一个SocketChannel处理*/
        public void handleAccept(SelectionKey key) throws Exception{
            //返回创建此键的通道,接收客户端建立连接的请求,并返回socketChannel对象
            SocketChannel clientChannel=((ServerSocketChannel)key.channel()).accept();
            //设置非阻塞
            clientChannel.configureBlocking(false);
            //注册到selector
            clientChannel.register(key.selector(),SelectionKey.OP_READ, ByteBuffer.allocate(bufferSize));
    
        }
        /** 从一个一个SocketChannel读取信息*/
        public void handleRead(SelectionKey key) throws Exception{
            //获得与客户端通信的通道
            SocketChannel clientChannel=(SocketChannel)key.channel();
            //得到并清空缓冲区并清空缓冲区
            ByteBuffer buffer= (ByteBuffer)key.attachment();
            buffer.clear();
            //读取信息获得的字节数
            int byteRead = clientChannel.read(buffer);
            if(byteRead==-1){
                clientChannel.close();
            }else{
                //将缓冲区准备为数据传状态
                buffer.flip();
                //将字节转换为UTF-16
                String receivedStr = Charset.forName("UTF-8").newDecoder().decode(buffer).toString();
                System.out.println("接收到来自"+clientChannel.socket().getRemoteSocketAddress()+"信息"+receivedStr);
                key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
                /*String sendStr ="已收到信息";
                buffer=ByteBuffer.wrap(sendStr.getBytes("UTF-8"));
                clientChannel.write(buffer);
                //设置为下依稀读取写入做准备
                key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);*/
            }
    
    
        }
        /** 向一个一个SocketChannel写入信息*/
        public void handleWrite(SelectionKey key) throws Exception{
            handleWriteMsg(key,null);
    
        }
    
        /** 向一个一个SocketChannel写入信息*/
        public void handleWriteMsg(SelectionKey key,String msg) throws Exception{
            if(msg==null||"".equals(msg)){
                msg="服务器主动说:已收到建立请求消息";
            }
            //获得与客户端通信的通道
            SocketChannel clientChannel=(SocketChannel)key.channel();
            //得到并清空缓冲区并清空缓冲区
            ByteBuffer buffer= (ByteBuffer)key.attachment();
            buffer.clear();
            String sendStr =msg;
            buffer=ByteBuffer.wrap(sendStr.getBytes("UTF-8"));
            clientChannel.write(buffer);
            //设置为下依稀读取写入做准备
            key.interestOps(SelectionKey.OP_READ|SelectionKey.OP_WRITE);
    
        }
    }
    

    3、客户端处理类:

    package com.test3;
    
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    
    public class TcpClientReadThread implements Runnable{
        private Selector selector;
    
        public TcpClientReadThread(Selector selector){
            this.selector=selector;
            new Thread(this).start();
        }
    
        public void run() {
            try {
                //select()方法只能使用一次,用过之后就会删除,每个连接到服务器的选择器都是独立的
                while (selector.select()>0){
                    //遍历所有可以IO操作做的Channel对应的selectionKey
                    for(SelectionKey sk:selector.selectedKeys()){
                        //如果数据可读
                        if(sk.isReadable()){
                            //使用NIO读取Channel中可读数据
                            //获取通道信息
                            SocketChannel sc =  (SocketChannel)sk.channel();
                            //创建缓冲区
                            ByteBuffer buffer = ByteBuffer.allocate(1024);
                            //读取数据到缓冲区
                            sc.read(buffer);
                            //吊用此方法为读取写入做准备
                            buffer.flip();
    
                            String receiveStr = Charset.forName("UTF-8").newDecoder().decode(buffer).toString();
                            System.out.println("收到服务器"+sc.socket().getRemoteSocketAddress()+"信息"+receiveStr);
                            //为下一次读取做准备
                            sk.interestOps(SelectionKey.OP_READ);
                            //删除正在处理的selectionKey
                            selector.selectedKeys().remove(sk);
    
    
                        }
    
                    }
                }
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    

    4、客户端代码:

    package com.test3;
    
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Scanner;
    
    public class AClient {
    
        private Selector selector;
        private SocketChannel socketChannel;
    
        private String hostIp;
        private  int hostPort;
    
        public AClient(String hostIp,int hostPort)throws Exception{
            this.hostIp=hostIp;
            this.hostPort=hostPort;
            init();
    
        }
        public void init()throws Exception{
            socketChannel=SocketChannel.open(new InetSocketAddress(hostIp,hostPort));
            socketChannel.configureBlocking(false);
            //打开并注册选择器信道、
            selector= Selector.open();
            socketChannel.register(selector,SelectionKey.OP_READ);
            //启动读取线程
            new TcpClientReadThread(selector);
    
    
        }
    
        /**
         *发送字符串到服务器
         */
        public void sendMsg(String msgg)throws Exception{
            ByteBuffer writeBuffer = ByteBuffer.wrap(msgg.getBytes("UTF-8"));
            socketChannel.write(writeBuffer);
        }
        static AClient aClient;
        static  boolean mFlag=true;
        public static void main(String[]args)throws Exception{
            aClient=new AClient("127.0.0.1",9999);
    
    
            new Thread(){
                @Override
                public void run() {
                    try {
                        aClient.sendMsg("客户端======");
                        while (mFlag){
                            Scanner sc = new Scanner(System.in);
                            String next = sc.next();
                            aClient.sendMsg(next);
                        }
                    }catch (Exception e){
                        mFlag=false;
                        e.printStackTrace();
                    }
                }
            }.start();
        }
    }
    

    =================================================================================================

    DEMO02:这个看起来更清楚一些,重新补充如下demo:

    摘抄地址:

    作者:anxpp
    来源:CSDN
    原文:https://blog.csdn.net/anxpp/article/details/51512200?utm_source=copy

    Server:

    package javanio;
    
    import javax.script.ScriptEngine;
    import javax.script.ScriptEngineManager;
    import javax.script.ScriptException;
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    /**NIO 步骤*/
    
    /**            打开ServerSocketChannel,监听客户端连接
                绑定监听端口,设置连接为非阻塞模式
                创建Reactor线程,创建多路复用器并启动线程
                将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件
                Selector轮询准备就绪的key
                Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,简历物理链路
                设置客户端链路为非阻塞模式
                将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息
                异步读取客户端消息到缓冲区
                对Buffer编解码,处理半包消息,将解码成功的消息封装成Task
                将应答消息编码为Buffer,调用SocketChannel的write将消息异步发送给客户端
            ---------------------
            作者:anxpp
            来源:CSDN
            原文:https://blog.csdn.net/anxpp/article/details/51512200?utm_source=copy
            版权声明:本文为博主原创文章,转载请附上博文链接!*/
    public class Server {
        private static int DEFAULT_PORT = 8081;
        private static ServerHandle serverHandle;
        public static void start(){
            start(DEFAULT_PORT);
        }
        public static synchronized void start(int port){
            if(serverHandle!=null){
                serverHandle.stop();
            }
    
            serverHandle = new ServerHandle(port);
            new Thread(serverHandle,"Server").start();
        }
        public static void main(String[] args){
            start();
        }
    
    
    }
    
    class ServerHandle implements Runnable{
        private Selector selector;
        private ServerSocketChannel serverChannel;
        private volatile boolean started;
    
        /**
         * 构造方法
         * @param port 指定要监听的端口号
         */
        public ServerHandle(int port) {
            try{
                //创建选择器
                selector = Selector.open();
                //打开ServerSocketChannel,监听客户端连接
                serverChannel = ServerSocketChannel.open();
                //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
                serverChannel.configureBlocking(false);
                //绑定端口 backlog设为1024
                serverChannel.socket().bind(new InetSocketAddress(port),1024);
                //监听客户端连接请求
                serverChannel.register(selector, SelectionKey.OP_ACCEPT);
                //标记服务器已开启
                started = true;
                System.out.println("SERVER START,PORT:" + port);
            }catch(IOException e){
                e.printStackTrace();
                System.exit(1);
            }
        }
        public void stop(){
            started = false;
        }
    
    
        @Override
        public void run() {
            //循环遍历selector
            while(started){
                try{
                    //无论是否有读写事件发生,selector每隔1s被唤醒一次
                    selector.select(1000);
                    //阻塞,只有当至少一个注册的事件发生的时候才会继续.
    //				selector.select();
                    //返回已此通道已准备就绪的键集,已选择始终是键集的一个子集
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
                    SelectionKey key = null;
                    //处理所有的
                    while(it.hasNext()){
                        key = it.next();
                        it.remove();
                        try{
                            handleInput(key);
                        }catch(Exception e){
                            if(key != null){
                                key.cancel();
                                if(key.channel() != null){
                                    key.channel().close();
                                }
                            }
                        }
                    }
                }catch(Throwable t){
                    t.printStackTrace();
                }
            }
            //selector关闭后会自动释放里面管理的资源
            if(selector != null)
                try{
                    selector.close();
                }catch (Exception e) {
                    e.printStackTrace();
                }
    
    
        }
    
        private void handleInput(SelectionKey key) throws IOException{
            if(key.isValid()){
                //处理新接入的请求消息
                if(key.isAcceptable()){
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    //通过ServerSocketChannel的accept创建SocketChannel实例
                    //完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
                    SocketChannel sc = ssc.accept();
                    //设置为非阻塞的
                    sc.configureBlocking(false);
                    //注册为读
                    sc.register(selector, SelectionKey.OP_READ);
                }
                //读消息
                if(key.isReadable()){
                    SocketChannel sc = (SocketChannel) key.channel();
                    //创建ByteBuffer,并开辟一个1M的缓冲区
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    //读取请求码流,返回读取到的字节数
                    int readBytes = sc.read(buffer);
                    //读取到字节,对字节进行编解码
                    if(readBytes>0){
                        //将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
                        buffer.flip();
                        //根据缓冲区可读字节数创建字节数组
                        byte[] bytes = new byte[buffer.remaining()];
                        //将缓冲区可读字节数组复制到新建的数组中
                        buffer.get(bytes);
                        String expression = new String(bytes,"UTF-8");
                        System.out.println("SERVER GET MSSG:" + expression);
                        //处理数据
                        String result = null;
                        try{
                            result = Calculator.cal(expression).toString();
                        }catch(Exception e){
                            result = "CAL ERR:" + e.getMessage();
                        }
                        //发送应答消息
                        doWrite(sc,result);
                    }
                    //没有读取到字节 忽略
    //				else if(readBytes==0);
                    //链路已经关闭,释放资源
                    else if(readBytes<0){
                        key.cancel();
                        sc.close();
                    }
                }
            }
        }
        //异步发送应答消息
        private void doWrite(SocketChannel channel,String response) throws IOException{
            //将消息编码为字节数组
            byte[] bytes = response.getBytes();
            //根据数组容量创建ByteBuffer
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            //将字节数组复制到缓冲区
            writeBuffer.put(bytes);
            //flip操作
            writeBuffer.flip();
            //发送缓冲区的字节数组
            channel.write(writeBuffer);
            //****此处不含处理“写半包”的代码
        }
    
    
    
        void test(){
    
        }
    
    }
     class Calculator {
        private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
        public static Object cal(String expression) throws ScriptException {
            return jse.eval(expression);
        }
    }
    
    

    Client:

    package javanio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class Client {
    
        private static String DEFAULT_HOST = "127.0.0.1";
        private static int DEFAULT_PORT = 8081;
        private static ClientHandle clientHandle;
        public static void start(){
            start(DEFAULT_HOST,DEFAULT_PORT);
        }
        public static synchronized void start(String ip,int port){
            if(clientHandle!=null)
                clientHandle.stop();
            clientHandle = new ClientHandle(ip,port);
            new Thread(clientHandle,"Server").start();
        }
        //向服务器发送消息
        public static boolean sendMsg(String msg) throws Exception{
            if(msg.equals("q")) return false;
            clientHandle.sendMsg(msg);
            return true;
        }
        public static void main(String[] args){
            start();
        }
    
    
    }
    class ClientHandle implements Runnable{
        private String host;
        private int port;
        private Selector selector;
        private SocketChannel socketChannel;
        private volatile boolean started;
    
        public ClientHandle(String ip,int port) {
            this.host = ip;
            this.port = port;
            try{
                //创建选择器
                selector = Selector.open();
                //打开监听通道
                socketChannel = SocketChannel.open();
                //如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
                socketChannel.configureBlocking(false);//开启非阻塞模式
                started = true;
            }catch(IOException e){
                e.printStackTrace();
                System.exit(1);
            }
        }
        public void stop(){
            started = false;
        }
        @Override
        public void run() {
            try{
                doConnect();
            }catch(IOException e){
                e.printStackTrace();
                System.exit(1);
            }
            //循环遍历selector
            while(started){
                try{
                    //无论是否有读写事件发生,selector每隔1s被唤醒一次
                    selector.select(1000);
                    //阻塞,只有当至少一个注册的事件发生的时候才会继续.
    //				selector.select();
                    Set<SelectionKey> keys = selector.selectedKeys();
                    Iterator<SelectionKey> it = keys.iterator();
                    SelectionKey key = null;
                    while(it.hasNext()){
                        key = it.next();
                        it.remove();
                        try{
                            handleInput(key);
                        }catch(Exception e){
                            if(key != null){
                                key.cancel();
                                if(key.channel() != null){
                                    key.channel().close();
                                }
                            }
                        }
                    }
                }catch(Exception e){
                    e.printStackTrace();
                    System.exit(1);
                }
            }
            //selector关闭后会自动释放里面管理的资源
            if(selector != null)
                try{
                    selector.close();
                }catch (Exception e) {
                    e.printStackTrace();
                }
        }
        private void handleInput(SelectionKey key) throws IOException{
            if(key.isValid()){
                SocketChannel sc = (SocketChannel) key.channel();
                if(key.isConnectable()){
                    if(sc.finishConnect());
                    else System.exit(1);
                }
                //读消息
                if(key.isReadable()){
                    //创建ByteBuffer,并开辟一个1M的缓冲区
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    //读取请求码流,返回读取到的字节数
                    int readBytes = sc.read(buffer);
                    //读取到字节,对字节进行编解码
                    if(readBytes>0){
                        //将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
                        buffer.flip();
                        //根据缓冲区可读字节数创建字节数组
                        byte[] bytes = new byte[buffer.remaining()];
                        //将缓冲区可读字节数组复制到新建的数组中
                        buffer.get(bytes);
                        String result = new String(bytes,"UTF-8");
                        System.out.println("CLIENT GET MSG:" + result);
                    }
                    //没有读取到字节 忽略
    //				else if(readBytes==0);
                    //链路已经关闭,释放资源
                    else if(readBytes<0){
                        key.cancel();
                        sc.close();
                    }
                }
            }
        }
        //异步发送消息
        private void doWrite(SocketChannel channel,String request) throws IOException{
            //将消息编码为字节数组
            byte[] bytes = request.getBytes();
            //根据数组容量创建ByteBuffer
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            //将字节数组复制到缓冲区
            writeBuffer.put(bytes);
            //flip操作
            writeBuffer.flip();
            //发送缓冲区的字节数组
            channel.write(writeBuffer);
            //****此处不含处理“写半包”的代码
        }
        private void doConnect() throws IOException{
            if(socketChannel.connect(new InetSocketAddress(host,port)));
            else socketChannel.register(selector, SelectionKey.OP_CONNECT);
        }
        public void sendMsg(String msg) throws Exception{
            socketChannel.register(selector, SelectionKey.OP_READ);
            doWrite(socketChannel, msg);
        }
    
    
    }

    Test:

    package javanio;
    
    import java.util.Scanner;
    
    public class Test {
        public static void main(String[] args) throws Exception{
            //运行服务器
            Server.start();
            //避免客户端先于服务器启动前执行代码
            Thread.sleep(100);
            //运行客户端
            Client.start();
            while(Client.sendMsg(new Scanner(System.in).nextLine()));
        }
    
    
    }
    

    demo3:https://blog.csdn.net/Howinfun/article/details/81283721

  • 相关阅读:
    电子辅助的个体化严密控制策略比常规方法更有效地帮助早期RA实现全面控制病情[EULAR2015_THU0122]
    超声和免疫学指标的特征能否反映RA临床缓解的表型?[EULAR2015_THU0121]
    依那西普减量维持过程中RA病人自报病情复发可能预示未来放射学进展[EULAR2015_SAT0147]
    RETRO研究: 持续缓解的RA患者的减量维持方案[EULAR2015_SAT0056]
    OPTIRRA研究: TNF拮抗剂维持期优化减量方案[EULAR2015_SAT0150]
    与时俱进的治疗策略不断提高RA无药缓解机会[EULAR2015_SAT0058]
    雷公藤多甙治疗类风湿关节炎遭质疑
    我的博客今天2岁203天了,我领取了先锋博主徽章
    MyEclipse中最常用的快捷键大全
    MyEclipse无法打开jsp文件(打开是空白的),但是可以打开java文件
  • 原文地址:https://www.cnblogs.com/the-fool/p/11054120.html
Copyright © 2011-2022 走看看