zoukankan      html  css  js  c++  java
  • java分布式开发TCP/IP NIO无阻塞 Socket((基于消息方式实现系统间的通信) )

    转自:https://www.iteye.com/blog/mars914-1238353

    在java中可以基于java.nio.channels中的Channel和Selector的相关类来实现TCP/IP+NIO方式的系统间通信。

    用于系统间通信依靠SocketChannel和ServerSocketChannel,SocketChannel用于建立连接,监听事件及操作读写,ServerSocketChannel用于监听端口及监听连接事件,可通过Selector来获取是否有要处理的事件。

    服务端java代码:

    Java代码  收藏代码
    1. package com.java.distributed.message.tcpip;  
    2.   
    3. import java.io.IOException;  
    4. import java.net.InetSocketAddress;  
    5. import java.net.ServerSocket;  
    6. import java.nio.ByteBuffer;  
    7. import java.nio.channels.SelectionKey;  
    8. import java.nio.channels.Selector;  
    9. import java.nio.channels.ServerSocketChannel;  
    10. import java.nio.channels.SocketChannel;  
    11. import java.nio.charset.Charset;  
    12.   
    13. public class NIOServer {  
    14.   
    15.     /** 
    16.      * @param args 
    17.      * @throws IOException  
    18.      */  
    19.     public static void main(String[] args) throws IOException {  
    20.         int port =7889;  
    21.         //打开选择器  
    22.         Selector selector=Selector.open();  
    23.         //打开服务器套接字通道  
    24.         ServerSocketChannel ssc=ServerSocketChannel.open();  
    25.         //检索与此通道关联的服务器套接字  
    26.         ServerSocket serverSocket=ssc.socket();  
    27.         //将 ServerSocket 绑定到特定地址(IP 地址和端口号)  
    28.         serverSocket.bind(new InetSocketAddress(port));  
    29.         System.out.println("server listen on port:"+port);  
    30.           
    31.         //调整通道的阻塞模式  
    32.         ssc.configureBlocking(false);  
    33.         //向给定的选择器注册此通道,返回一个选择键。SelectionKey.OP_ACCEPT--用于套接字接受操作的操作集位     
    34.         ssc.register(selector, SelectionKey.OP_ACCEPT);  
    35.           
    36.         while(true){  
    37.             //timeout:为正,则在等待某个通道准备就绪时最多阻塞 timeout 毫秒;如果为零,则无限期地阻塞;必须为非负数  
    38.             int nKeys=selector.select(1000);  
    39.             if(nKeys>0){  
    40.                   
    41.                 for(SelectionKey key:selector.selectedKeys()){  
    42.                     /*测试此键的通道是否已准备好接受新的套接字连接-- 
    43.                      * 如果此键的通道不支持套接字接受操作,则此方法始终返回 false 
    44.                      * */  
    45.                     if(key.isAcceptable()){  
    46.                         ServerSocketChannel server=(ServerSocketChannel) key.channel();  
    47.                         SocketChannel sc=server.accept();  
    48.                           
    49.                         if(sc==null){  
    50.                             continue;  
    51.                         }  
    52.                         sc.configureBlocking(false);  
    53.                         sc.register(selector, SelectionKey.OP_READ);  
    54.                     }else if(key.isReadable()){  
    55.                         //分配一个新的字节缓冲区  
    56.                         ByteBuffer buffer=ByteBuffer.allocate(1024);  
    57.                         SocketChannel sc=(SocketChannel) key.channel();  
    58.                         int readBytes=0;  
    59.                         String message=null;  
    60.                         try{  
    61.                             int ret;  
    62.                             try{  
    63.                                 while((ret=sc.read(buffer))>0){  
    64.                                     readBytes +=ret;  
    65.                                 }  
    66.                                   
    67.                             }catch(Exception e ){  
    68.                                 readBytes=0;  
    69.                                 //ignore  
    70.                             }finally{  
    71.                                 //反转此缓冲区。首先对当前位置设置限制,然后将该位置设置为零  
    72.                                 buffer.flip();  
    73.                             }  
    74.                               
    75.                             if(readBytes>0){  
    76.                                 message=Charset.forName("UTF-8").decode(buffer).toString();  
    77.                                 buffer=null;  
    78.                             }  
    79.                         }finally{  
    80.                             if(buffer!=null)  
    81.                                 buffer.clear();  
    82.                         }  
    83.                           
    84.                         if(readBytes>0){  
    85.                             System.out.println("message from client:"+message);  
    86.                             if("quit".equalsIgnoreCase(message.trim())){  
    87.                                 sc.close();  
    88.                                 selector.close();  
    89.                                 System.out.println("Server has been shutdown!");  
    90.                                 System.exit(0);  
    91.                             }  
    92.                             String outMessage="server response:"+message;  
    93.                             sc.write(Charset.forName("UTF-8").encode(outMessage));  
    94.                         }  
    95.                           
    96.                     }  
    97.                 }  
    98.                 selector.selectedKeys().clear();  
    99.             }  
    100.           
    101.         }  
    102.     }  
    103. }  

    客户端java代码:

    Java代码  收藏代码
    1. package com.java.distributed.message.tcpip;  
    2.   
    3. import java.io.BufferedReader;  
    4. import java.io.IOException;  
    5. import java.io.InputStreamReader;  
    6. import java.net.InetSocketAddress;  
    7. import java.net.SocketAddress;  
    8. import java.nio.ByteBuffer;  
    9. import java.nio.channels.SelectionKey;  
    10. import java.nio.channels.Selector;  
    11. import java.nio.channels.SocketChannel;  
    12. import java.nio.charset.Charset;  
    13.   
    14.   
    15. public class NIOClient {  
    16.   
    17.     /** 
    18.      * @param args 
    19.      * @throws IOException  
    20.      */  
    21.     public static void main(String[] args) throws IOException {  
    22.         int port =7889;  
    23.         SocketChannel channel=SocketChannel.open();  
    24.         channel.configureBlocking(false);  
    25.           
    26.         SocketAddress target=new InetSocketAddress("127.0.0.1",port);  
    27.         channel.connect(target);  
    28.         Selector selector=Selector.open();  
    29.         //用于套接字连接操作的操作集位  
    30.         channel.register(selector, SelectionKey.OP_CONNECT);  
    31.         BufferedReader systemIn=new BufferedReader(new InputStreamReader(System.in));  
    32.           
    33.         while(true){  
    34.             if(channel.isConnected()){  
    35.                 String command=systemIn.readLine();  
    36.                 channel.write(Charset.forName("UTF-8").encode(command));  
    37.                   
    38.                 if(command==null||"quit".equalsIgnoreCase(command.trim())){  
    39.                     systemIn.close();  
    40.                     channel.close();  
    41.                     selector.close();  
    42.                     System.out.println("Client quit !");  
    43.                     System.exit(0);  
    44.                 }  
    45.             }  
    46.             int nKeys=selector.select(1000);  
    47.             if(nKeys>0){  
    48.                 for(SelectionKey key:selector.selectedKeys()){  
    49.                     if(key.isConnectable()){  
    50.                         SocketChannel sc=(SocketChannel) key.channel();  
    51.                         sc.configureBlocking(false);  
    52.                         sc.register(selector, SelectionKey.OP_READ);  
    53.                         sc.finishConnect();  
    54.                     }else if(key.isReadable()){  
    55.                         ByteBuffer buffer=ByteBuffer.allocate(1024);  
    56.                         SocketChannel sc=(SocketChannel) key.channel();  
    57.                         int readBytes=0;  
    58.                         try{  
    59.                             int ret=0;  
    60.                             try{  
    61.                                 while((ret=sc.read(buffer))>0){  
    62.                                     readBytes+=ret;  
    63.                                 }  
    64.                             }finally{  
    65.                                 buffer.flip();  
    66.                             }  
    67.                             if (readBytes > 0) {     
    68.                                 System.out.println(Charset.forName("UTF-8")     
    69.                                         .decode(buffer).toString());     
    70.                                 buffer = null;     
    71.                             }     
    72.   
    73.                         }finally {     
    74.                             if (buffer != null) {     
    75.                                 buffer.clear();     
    76.                             }  
    77.                         }  
    78.                     }  
    79.                 }  
    80.                     selector.selectedKeys().clear();     
    81.             }  
    82.         }  
    83.     }  
    84.   
    85. }  
  • 相关阅读:
    Hadoop下面WordCount运行详解
    ubuntu下hadoop环境配置
    ubuntu下的jdk安装
    ASP.NET MVC4中用 BundleCollection使用问题手记
    Lab6: Paxos
    java命令行操作
    Mesos 入门教程
    Docker background
    找实习的日子
    九度 1557:和谐答案 (LIS 变形)
  • 原文地址:https://www.cnblogs.com/sharpest/p/12702762.html
Copyright © 2011-2022 走看看