zoukankan      html  css  js  c++  java
  • reactor模式:单线程的reactor模式

    reactor模式称之为响应器模式,常用于nio的网络通信框架,其服务架构图如下

    不同于传统IO的串行调度方式,NIO把整个服务请求分为五个阶段

    read:接收到请求,读取数据

    decode:解码数据

    compute:业务逻辑处理

    encode:返回数据编码

    send:发送数据

    其中,以read和send阶段IO最为频繁

    代码实现

     1  // Reactor線程  
     2     package server;  
     3       
     4     import java.io.IOException;  
     5     import java.net.InetSocketAddress;  
     6     import java.nio.channels.SelectionKey;  
     7     import java.nio.channels.Selector;  
     8     import java.nio.channels.ServerSocketChannel;  
     9     import java.util.Iterator;  
    10     import java.util.Set;  
    11       
    12     public class TCPReactor implements Runnable {  
    13       
    14         private final ServerSocketChannel ssc;  
    15         private final Selector selector;  
    16       
    17         public TCPReactor(int port) throws IOException {  
    18             selector = Selector.open();  
    19             ssc = ServerSocketChannel.open();  
    20             InetSocketAddress addr = new InetSocketAddress(port);  
    21             ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口  
    22             ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞  
    23             SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件,然後返回該通道的key  
    24             sk.attach(new Acceptor(selector, ssc)); // 給定key一個附加的Acceptor對象  
    25         }  
    26       
    27         @Override  
    28         public void run() {  
    29             while (!Thread.interrupted()) { // 在線程被中斷前持續運行  
    30                 System.out.println("Waiting for new event on port: " + ssc.socket().getLocalPort() + "...");  
    31                 try {  
    32                     if (selector.select() == 0) // 若沒有事件就緒則不往下執行  
    33                         continue;  
    34                 } catch (IOException e) {  
    35                     // TODO Auto-generated catch block  
    36                     e.printStackTrace();  
    37                 }  
    38                 Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合  
    39                 Iterator<SelectionKey> it = selectedKeys.iterator();  
    40                 while (it.hasNext()) {  
    41                     dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度  
    42                     it.remove();  
    43                 }  
    44             }  
    45         }  
    46       
    47         /* 
    48          * name: dispatch(SelectionKey key) 
    49          * description: 調度方法,根據事件綁定的對象開新線程 
    50          */  
    51         private void dispatch(SelectionKey key) {  
    52             Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程  
    53             if (r != null)  
    54                 r.run();  
    55         }  
    56       
    57     }  
     1 // 接受連線請求線程  
     2     package server;  
     3       
     4     import java.io.IOException;  
     5     import java.nio.channels.SelectionKey;  
     6     import java.nio.channels.Selector;  
     7     import java.nio.channels.ServerSocketChannel;  
     8     import java.nio.channels.SocketChannel;  
     9       
    10     public class Acceptor implements Runnable {  
    11       
    12         private final ServerSocketChannel ssc;  
    13         private final Selector selector;  
    14           
    15         public Acceptor(Selector selector, ServerSocketChannel ssc) {  
    16             this.ssc=ssc;  
    17             this.selector=selector;  
    18         }  
    19           
    20         @Override  
    21         public void run() {  
    22             try {  
    23                 SocketChannel sc= ssc.accept(); // 接受client連線請求  
    24                 System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected.");  
    25                   
    26                 if(sc!=null) {  
    27                     sc.configureBlocking(false); // 設置為非阻塞  
    28                     SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); // SocketChannel向selector註冊一個OP_READ事件,然後返回該通道的key  
    29                     selector.wakeup(); // 使一個阻塞住的selector操作立即返回  
    30                     sk.attach(new TCPHandler(sk, sc)); // 給定key一個附加的TCPHandler對象  
    31                 }  
    32                   
    33             } catch (IOException e) {  
    34                 // TODO Auto-generated catch block  
    35                 e.printStackTrace();  
    36             }  
    37         }  
    38       
    39           
    40     }  
     1 // Handler線程  
     2     package server;  
     3       
     4     import java.io.IOException;  
     5     import java.nio.ByteBuffer;  
     6     import java.nio.channels.SelectionKey;  
     7     import java.nio.channels.SocketChannel;  
     8     import java.util.concurrent.LinkedBlockingQueue;  
     9     import java.util.concurrent.ThreadPoolExecutor;  
    10     import java.util.concurrent.TimeUnit;  
    11       
    12     public class TCPHandler implements Runnable {  
    13       
    14         private final SelectionKey sk;  
    15         private final SocketChannel sc;  
    16       
    17         int state;   
    18       
    19         public TCPHandler(SelectionKey sk, SocketChannel sc) {  
    20             this.sk = sk;  
    21             this.sc = sc;  
    22             state = 0; // 初始狀態設定為READING  
    23         }  
    24       
    25         @Override  
    26         public void run() {  
    27             try {  
    28                 if (state == 0)  
    29                     read(); // 讀取網絡數據  
    30                 else  
    31                     send(); // 發送網絡數據  
    32       
    33             } catch (IOException e) {  
    34                 System.out.println("[Warning!] A client has been closed.");  
    35                 closeChannel();  
    36             }  
    37         }  
    38           
    39         private void closeChannel() {  
    40             try {  
    41                 sk.cancel();  
    42                 sc.close();  
    43             } catch (IOException e1) {  
    44                 e1.printStackTrace();  
    45             }  
    46         }  
    47       
    48         private synchronized void read() throws IOException {  
    49             // non-blocking下不可用Readers,因為Readers不支援non-blocking  
    50             byte[] arr = new byte[1024];  
    51             ByteBuffer buf = ByteBuffer.wrap(arr);  
    52               
    53             int numBytes = sc.read(buf); // 讀取字符串  
    54             if(numBytes == -1)  
    55             {  
    56                 System.out.println("[Warning!] A client has been closed.");  
    57                 closeChannel();  
    58                 return;  
    59             }  
    60             String str = new String(arr); // 將讀取到的byte內容轉為字符串型態  
    61             if ((str != null) && !str.equals(" ")) {  
    62                 process(str); // 邏輯處理  
    63                 System.out.println(sc.socket().getRemoteSocketAddress().toString()  
    64                         + " > " + str);  
    65                 state = 1; // 改變狀態  
    66                 sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件  
    67                 sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回  
    68             }  
    69         }  
    70       
    71         private void send() throws IOException  {  
    72             // get message from message queue  
    73               
    74             String str = "Your message has sent to "  
    75                     + sc.socket().getLocalSocketAddress().toString() + "
    ";  
    76             ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()  
    77       
    78             while (buf.hasRemaining()) {  
    79                 sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容  
    80             }  
    81               
    82             state = 0; // 改變狀態  
    83             sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件  
    84             sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回  
    85         }  
    86           
    87         void process(String str) {  
    88             // do process(decode, logically process, encode)..  
    89             // ..  
    90         }  
    91     }  
     1 package server;  
     2       
     3     import java.io.IOException;  
     4       
     5     public class Main {  
     6       
     7           
     8         public static void main(String[] args) {  
     9             // TODO Auto-generated method stub  
    10             try {  
    11                 TCPReactor reactor = new TCPReactor(1333);  
    12                 reactor.run();  
    13             } catch (IOException e) {  
    14                 // TODO Auto-generated catch block  
    15                 e.printStackTrace();  
    16             }  
    17         }  
    18       
    19     }  

    客户端代码

     1 package main.pkg;  
     2       
     3     import java.io.BufferedReader;  
     4     import java.io.IOException;  
     5     import java.io.InputStreamReader;  
     6     import java.io.PrintWriter;  
     7     import java.net.Socket;  
     8     import java.net.UnknownHostException;  
     9       
    10     public class Client {  
    11       
    12         /** 
    13          * @param args 
    14          */  
    15         public static void main(String[] args) {  
    16             // TODO Auto-generated method stub  
    17             String hostname=args[0];  
    18             int port = Integer.parseInt(args[1]);  
    19             //String hostname="127.0.0.1";  
    20             //int port=1333;  
    21               
    22             System.out.println("Connecting to "+ hostname +":"+port);  
    23             try {  
    24                 Socket client = new Socket(hostname, port); // 連接至目的地  
    25                 System.out.println("Connected to "+ hostname);  
    26                   
    27                 PrintWriter out = new PrintWriter(client.getOutputStream());  
    28                 BufferedReader in = new BufferedReader(new InputStreamReader(client.getInputStream()));  
    29                 BufferedReader stdIn = new BufferedReader(new InputStreamReader(System.in));  
    30                 String input;  
    31                   
    32                 while((input=stdIn.readLine()) != null) { // 讀取輸入  
    33                     out.println(input); // 發送輸入的字符串  
    34                     out.flush(); // 強制將緩衝區內的數據輸出  
    35                     if(input.equals("exit"))  
    36                     {  
    37                         break;  
    38                     }  
    39                     System.out.println("server: "+in.readLine());  
    40                 }  
    41                 client.close();  
    42                 System.out.println("client stop.");  
    43             } catch (UnknownHostException e) {  
    44                 // TODO Auto-generated catch block  
    45                 System.err.println("Don't know about host: " + hostname);  
    46             } catch (IOException e) {  
    47                 // TODO Auto-generated catch block  
    48                 System.err.println("Couldn't get I/O for the socket connection");  
    49             }  
    50               
    51         }  
    52       
    53     }  
        

    代码解读:

    1.创建TCPReactor 类的实例,启动端口监听

    2.Acceptor 类只用于处理接受请求的时候,后续的读写跟其无任何关系

    3.TCPReactor.run( )一直在进行,后续selectionkey有变动,会监听到,一直执行dispatch方法

    最后提醒一点,从性能来说,单线程的reactor没过多的提升,因为IO和CPU的速度还是严重不匹配

    参考文章:

    https://blog.csdn.net/yehjordan/article/details/51012833

  • 相关阅读:
    HBase with MapReduce (MultiTable Read)
    HBase with MapReduce (SummaryToFile)
    HBase with MapReduce (Summary)
    HBase with MapReduce (Read and Write)
    HBase with MapReduce (Only Read)
    Hbase中的BloomFilter(布隆过滤器)
    HBase的快照技术
    How To Use Hbase Bulk Loading
    Cloudera-Manager修改集群的IP
    Java中的HashSet和TreeSet
  • 原文地址:https://www.cnblogs.com/billmiao/p/9872222.html
Copyright © 2011-2022 走看看