zoukankan      html  css  js  c++  java
  • 3、nio中的selector使用

    通过编写一个客户端和服务器端的例子来熟悉selector的使用

    服务端逻辑:

    1. 绑定一个端口号
    2. channel注册到selector中
    3. 用死循环来监听如果有时间发生,遍历selectionKey set
    4. 判断发生的事件类型,前面会注册accept事件,如果发生accept事件,那么注册读事件,同时清除selectionKey set 中的当前元素。、
    5. 接收事件时,将channel保存下来。
    6. 发生读事件时,说明有信息,发过来了,那么将消息,转发给所有的客户端。然后清除自身的事件。

     1 import java.io.IOException;
     2 import java.net.InetSocketAddress;
     3 import java.net.ServerSocket;
     4 import java.nio.ByteBuffer;
     5 import java.nio.channels.SelectionKey;
     6 import java.nio.channels.Selector;
     7 import java.nio.channels.ServerSocketChannel;
     8 import java.nio.channels.SocketChannel;
     9 import java.nio.charset.Charset;
    10 import java.util.*;
    11 
    12 public class NioServer {
    13 
    14     private static HashMap<String, SocketChannel> clientMap = new HashMap<String, SocketChannel>();
    15 
    16     public static void main(String[] args) throws IOException {
    17         ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
    18         serverSocketChannel.configureBlocking(false);
    19 
    20         ServerSocket serverSocket = serverSocketChannel.socket();
    21         serverSocket.bind(new InetSocketAddress(8899));
    22 
    23         Selector selector = Selector.open();
    24 
    25         serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    26 
    27         while(true) {
    28             int number = selector.select();
    29 //            System.out.println("number:" + number);
    30             Set<SelectionKey> selectionKeySet = selector.selectedKeys();
    31 
    32             Iterator<SelectionKey> iterable = selectionKeySet.iterator();
    33 
    34             if(number > 0 ) {
    35                 while(iterable.hasNext()) {
    36                     SelectionKey selectionKey = iterable.next();
    37 
    38                     if(selectionKey.isAcceptable()) {//如果是可接收连接的
    39                         ServerSocketChannel ssc = (ServerSocketChannel) selectionKey.channel();
    40                         SocketChannel socketChannel = ssc.accept();
    41                         socketChannel.configureBlocking(false);
    42 
    43                         socketChannel.register(selector, SelectionKey.OP_READ);//注册读事件
    44 
    45                         clientMap.put(UUID.randomUUID() + "", socketChannel);//保存下channel
    46 
    47                         iterable.remove();
    48                     } else if(selectionKey.isReadable()){//可读的
    49                         SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
    50                         ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    51 
    52                         int readCount = socketChannel.read(byteBuffer);
    53 
    54                         //这里本该用while
    55                         if(readCount > 0 ) {//读取到数据,就写回到其他客户端
    56                             byteBuffer.flip();
    57 
    58                             Charset charset = Charset.forName("UTF-8");
    59                             String receiveStr = new String(charset.decode(byteBuffer).array());
    60 
    61                             System.out.println(socketChannel + " receive msg :" + receiveStr);
    62 
    63                             String sendKey = "";
    64 
    65                             for(Map.Entry<String, SocketChannel> entry : clientMap.entrySet()) {//第一遍遍历找到发送者
    66                                 if(socketChannel == entry.getValue()) {
    67                                     sendKey = entry.getKey();
    68                                     break;
    69                                 }
    70                             }
    71 
    72                             for (Map.Entry<String, SocketChannel> entry: clientMap.entrySet()  ) {//给每个保存的连接,都发送消息
    73                                 ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
    74                                 writeBuffer.put((sendKey + ":" +  receiveStr).getBytes());
    75 
    76                                 writeBuffer.flip();
    77                                 entry.getValue().write(writeBuffer);
    78                             }
    79                         }
    80                         iterable.remove();//这个 删除很关键  每次循环完selectionKeySet ,一定要清楚事件,不然肯定会影响下一次的事件触发,或者直接不触发下次的事件
    81                     }
    82                 }
    83             }
    84 
    85         }
    86     }
    87 }

    客户端逻辑

    1. 建立socketChannel 连接到对应的端口
    2. 新建selector对象,然后把socketChannel注册到selector上
    3. 建立死循环 ,监听是否有事件发生,若有,则遍历seletionKey set ,
    4. 判断发生的事件是什么,
    5. 如果是连接事件 ,做对应的连接处理,注册读事件

    判断是否在等待连接 ,在进程中
    if (channel.isConnectionPending()) { 
    channel.finishConnect(); 完成连接,这里是阻塞的

    6. 如果发生了读事件,读取数据

     1 import java.io.BufferedReader;
     2 import java.io.InputStream;
     3 import java.io.InputStreamReader;
     4 import java.net.InetSocketAddress;
     5 import java.nio.ByteBuffer;
     6 import java.nio.channels.SelectionKey;
     7 import java.nio.channels.Selector;
     8 import java.nio.channels.SocketChannel;
     9 import java.time.LocalDateTime;
    10 import java.util.Iterator;
    11 import java.util.Set;
    12 import java.util.concurrent.Executor;
    13 import java.util.concurrent.ExecutorService;
    14 import java.util.concurrent.Executors;
    15 import java.util.concurrent.ThreadFactory;
    16 
    17 public class NioClient {
    18 
    19     public static void main(String[] args) {
    20         try{
    21             SocketChannel socketChannel = SocketChannel.open();
    22             socketChannel.configureBlocking(false);
    23             socketChannel.connect(new InetSocketAddress(8899));//服务端就是bind  然后accept  serverSocketChannel
    24 
    25             Selector selector = Selector.open();
    26 
    27             socketChannel.register(selector, SelectionKey.OP_CONNECT);//注册连接事件
    28 
    29             while(true) {
    30                 int number = selector.select();
    31 
    32                 if(number > 0) {
    33                     Set<SelectionKey> selectionKeySet =  selector.selectedKeys();
    34 
    35                     Iterator<SelectionKey> iterable = selectionKeySet.iterator();
    36                     while(iterable.hasNext()) {//有事件发生
    37                         SelectionKey selectionKey = iterable.next();
    38 
    39                         SocketChannel client = (SocketChannel) selectionKey.channel();
    40                         if(selectionKey.isConnectable()) {//判断 selectionkey 状态  可连接的
    41                             if(client.isConnectionPending()) {//是否在准备连接的进程中
    42                                 client.finishConnect();//这里会阻塞,如果连接未建立,抛异常 ,
    43 
    44                                 ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    45 
    46                                 byteBuffer.put((LocalDateTime.now() + ",连接成功").getBytes());
    47                                 byteBuffer.flip();
    48                                 client.write(byteBuffer);
    49 
    50                                 ExecutorService executorService = Executors.newSingleThreadExecutor(Executors.defaultThreadFactory());
    51 
    52                                 executorService.submit(() -> {//起一个新的线程,去接收控制台的输入 ,不影响其他线程
    53                                     while(true) {
    54                                         try{
    55                                             byteBuffer.clear();
    56                                             InputStreamReader inputStreamReader = new InputStreamReader(System.in);
    57                                             BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
    58 
    59                                             byteBuffer.put(bufferedReader.readLine().getBytes());
    60                                             byteBuffer.flip();
    61                                             client.write(byteBuffer);
    62 
    63                                         }catch (Exception e) {
    64                                             e.printStackTrace();
    65                                         }
    66                                     }
    67                                 });
    68                             }
    69 
    70                             iterable.remove();//这个事件清楚,很关键
    71                             client.register(selector, SelectionKey.OP_READ);//注册读事件
    72                         } else if(selectionKey.isReadable()){//可读取
    73                             SocketChannel socketChannel1 = (SocketChannel) selectionKey.channel();
    74                             ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    75 
    76                             int readCount = socketChannel.read(readBuffer);
    77                             if(readCount > 0) {
    78                                 String receiveMsg = new String(readBuffer.array());
    79                                 System.out.println("receiveMsg : " + receiveMsg);
    80                             }
    81 
    82                             iterable.remove();
    83                         }
    84 
    85                     }
    86                 }
    87             }
    88 
    89 
    90 
    91 
    92         }catch (Exception e ) {
    93             e.printStackTrace();
    94         }
    95 
    96 
    97     }
    98 }
  • 相关阅读:
    Squid-Squid 多层Cache 如何设置实现墙内直连,墙外域名走国外Proxy
    利用win2008虚拟化hyper-v 和squid反向代理,自己做个IDC
    再次分享 pyspider 爬虫框架
    刘宇:我如何5分钟拿到李书福的投资?
    刘宇:2014年投资感悟
    刘宇(正和磁系资本创始人)_百度百科
    python编写的自动获取代理IP列表的爬虫-chinaboywg-ChinaUnix博客
    采集爬虫中,解决网站限制IP的问题?
    Web 应用性能和压力测试工具 Gor
    dnspod-sr内网轻量级DNS首选方案
  • 原文地址:https://www.cnblogs.com/amibandoufu/p/11441560.html
Copyright © 2011-2022 走看看