zoukankan      html  css  js  c++  java
  • reactor模式:多线程的reactor模式

    上文说到单线程的reactor模式 reactor模式:单线程的reactor模式

    单线程的reactor模式并没有解决IO和CPU处理速度不匹配问题,所以多线程的reactor模式引入线程池的概念,把耗时的IO操作交由线程池处理,处理完了之后再同步到selectionkey中,服务器架构图如下

    上文(reactor模式:单线程的reactor模式)提到,以read和send阶段IO最为频繁,所以多线程的reactor版本里,把这2个阶段单独拎出来。

    下面看看代码实现

     1 // Reactor線程 (该类与单线程的处理基本无变动) 
     2     package server;  
     3       
     4     import java.io.IOException;  
     5     import java.net.InetSocketAddress;  
     6     import java.nio.channels.SelectionKey;  
     7     import java.nio.channels.Selector;  
     8     import java.nio.channels.ServerSocketChannel;  
     9     import java.util.Iterator;  
    10     import java.util.Set;  
    11       
    12     public class TCPReactor implements Runnable {  
    13       
    14         private final ServerSocketChannel ssc;  
    15         private final Selector selector;  
    16       
    17         public TCPReactor(int port) throws IOException {  
    18             selector = Selector.open();  
    19             ssc = ServerSocketChannel.open();  
    20             InetSocketAddress addr = new InetSocketAddress(port);  
    21             ssc.socket().bind(addr); // 在ServerSocketChannel綁定監聽端口  
    22             ssc.configureBlocking(false); // 設置ServerSocketChannel為非阻塞  
    23             SelectionKey sk = ssc.register(selector, SelectionKey.OP_ACCEPT); // ServerSocketChannel向selector註冊一個OP_ACCEPT事件,然後返回該通道的key  
    24             sk.attach(new Acceptor(selector, ssc)); // 給定key一個附加的Acceptor對象  
    25         }  
    26       
    27         @Override  
    28         public void run() {  
    29             while (!Thread.interrupted()) { // 在線程被中斷前持續運行  
    30                 System.out.println("Waiting for new event on port: " + ssc.socket().getLocalPort() + "...");  
    31                 try {  
    32                     if (selector.select() == 0) // 若沒有事件就緒則不往下執行  
    33                         continue;  
    34                 } catch (IOException e) {  
    35                     // TODO Auto-generated catch block  
    36                     e.printStackTrace();  
    37                 }  
    38                 Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 取得所有已就緒事件的key集合  
    39                 Iterator<SelectionKey> it = selectedKeys.iterator();  
    40                 while (it.hasNext()) {  
    41                     dispatch((SelectionKey) (it.next())); // 根據事件的key進行調度  
    42                     it.remove();  
    43                 }  
    44             }  
    45         }  
    46       
    47         /* 
    48          * name: dispatch(SelectionKey key) 
    49          * description: 調度方法,根據事件綁定的對象開新線程 
    50          */  
    51         private void dispatch(SelectionKey key) {  
    52             Runnable r = (Runnable) (key.attachment()); // 根據事件之key綁定的對象開新線程  
    53             if (r != null)  
    54                 r.run();  
    55         }  
    56       
    57     }  
     1  // 接受連線請求線程  
     2     package server;  
     3       
     4     import java.io.IOException;  
     5     import java.nio.channels.SelectionKey;  
     6     import java.nio.channels.Selector;  
     7     import java.nio.channels.ServerSocketChannel;  
     8     import java.nio.channels.SocketChannel;  
     9       
    10     public class Acceptor implements Runnable {  
    11       
    12         private final ServerSocketChannel ssc;  
    13         private final Selector selector;  
    14           
    15         public Acceptor(Selector selector, ServerSocketChannel ssc) {  
    16             this.ssc=ssc;  
    17             this.selector=selector;  
    18         }  
    19           
    20         @Override  
    21         public void run() {  
    22             try {  
    23                 SocketChannel sc= ssc.accept(); // 接受client連線請求  
    24                 System.out.println(sc.socket().getRemoteSocketAddress().toString() + " is connected.");  
    25                   
    26                 if(sc!=null) {  
    27                     sc.configureBlocking(false); // 設置為非阻塞  
    28                     SelectionKey sk = sc.register(selector, SelectionKey.OP_READ); // SocketChannel向selector註冊一個OP_READ事件,然後返回該通道的key  
    29                     selector.wakeup(); // 使一個阻塞住的selector操作立即返回  
    30                     sk.attach(new TCPHandler(sk, sc)); // 給定key一個附加的TCPHandler對象  
    31                 }  
    32                   
    33             } catch (IOException e) {  
    34                 // TODO Auto-generated catch block  
    35                 e.printStackTrace();  
    36             }  
    37         }  
    38       
    39           
    40     }  
     1     // Handler線程  
     2     package server;  
     3       
     4     import java.io.IOException;  
     5     import java.nio.channels.SelectionKey;  
     6     import java.nio.channels.SocketChannel;  
     7     import java.util.concurrent.LinkedBlockingQueue;  
     8     import java.util.concurrent.ThreadPoolExecutor;  
     9     import java.util.concurrent.TimeUnit;  
    10       
    11     public class TCPHandler implements Runnable {  
    12       
    13         private final SelectionKey sk;  
    14         private final SocketChannel sc;  
    15         private static final int THREAD_COUNTING = 10;  
    16         private static ThreadPoolExecutor pool = new ThreadPoolExecutor(  
    17                 THREAD_COUNTING, THREAD_COUNTING, 10, TimeUnit.SECONDS,  
    18                 new LinkedBlockingQueue<Runnable>()); // 線程池  
    19       
    20         HandlerState state; // 以狀態模式實現Handler  
    21       
    22         public TCPHandler(SelectionKey sk, SocketChannel sc) {  
    23             this.sk = sk;  
    24             this.sc = sc;  
    25             state = new ReadState(); // 初始狀態設定為READING  
    26             pool.setMaximumPoolSize(32); // 設置線程池最大線程數  
    27         }  
    28       
    29         @Override  
    30         public void run() {  
    31             try {  
    32                 state.handle(this, sk, sc, pool);  
    33                   
    34             } catch (IOException e) {  
    35                 System.out.println("[Warning!] A client has been closed.");  
    36                 closeChannel();  
    37             }  
    38         }  
    39           
    40         public void closeChannel() {  
    41             try {  
    42                 sk.cancel();  
    43                 sc.close();  
    44             } catch (IOException e1) {  
    45                 e1.printStackTrace();  
    46             }  
    47         }  
    48       
    49         public void setState(HandlerState state) {  
    50             this.state = state;  
    51         }  
    52     }  
    53 
    54  
     1     package server;  
     2       
     3     import java.io.IOException;  
     4     import java.nio.channels.SelectionKey;  
     5     import java.nio.channels.SocketChannel;  
     6     import java.util.concurrent.ThreadPoolExecutor;  
     7       
     8     public interface HandlerState {  
     9       
    10         public void changeState(TCPHandler h);  
    11       
    12         public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,  
    13                 ThreadPoolExecutor pool) throws IOException ;  
    14     }  
     1     package server;  
     2       
     3     import java.io.IOException;  
     4     import java.nio.ByteBuffer;  
     5     import java.nio.channels.SelectionKey;  
     6     import java.nio.channels.SocketChannel;  
     7     import java.util.concurrent.ThreadPoolExecutor;  
     8       
     9     public class ReadState implements HandlerState{  
    10       
    11         private SelectionKey sk;  
    12           
    13         public ReadState() {  
    14         }  
    15           
    16         @Override  
    17         public void changeState(TCPHandler h) {  
    18             // TODO Auto-generated method stub  
    19             h.setState(new WorkState());  
    20         }  
    21       
    22         @Override  
    23         public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,  
    24                 ThreadPoolExecutor pool) throws IOException { // read()  
    25             this.sk = sk;  
    26             // non-blocking下不可用Readers,因為Readers不支援non-blocking  
    27             byte[] arr = new byte[1024];  
    28             ByteBuffer buf = ByteBuffer.wrap(arr);  
    29               
    30             int numBytes = sc.read(buf); // 讀取字符串  
    31             if(numBytes == -1)  
    32             {  
    33                 System.out.println("[Warning!] A client has been closed.");  
    34                 h.closeChannel();  
    35                 return;  
    36             }  
    37             String str = new String(arr); // 將讀取到的byte內容轉為字符串型態  
    38             if ((str != null) && !str.equals(" ")) {  
    39                 h.setState(new WorkState()); // 改變狀態(READING->WORKING)  
    40                 pool.execute(new WorkerThread(h, str)); // do process in worker thread  
    41                 System.out.println(sc.socket().getRemoteSocketAddress().toString()  
    42                         + " > " + str);  
    43             }  
    44               
    45         }  
    46           
    47         /* 
    48          * 執行邏輯處理之函數 
    49          */  
    50         synchronized void process(TCPHandler h, String str) {  
    51             // do process(decode, logically process, encode)..  
    52             // ..  
    53             h.setState(new WriteState()); // 改變狀態(WORKING->SENDING)  
    54             this.sk.interestOps(SelectionKey.OP_WRITE); // 通過key改變通道註冊的事件  
    55             this.sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回  
    56         }  
    57       
    58         /* 
    59          * 工作者線程 
    60          */  
    61         class WorkerThread implements Runnable {  
    62       
    63             TCPHandler h;  
    64             String str;  
    65       
    66             public WorkerThread(TCPHandler h, String str) {  
    67                 this.h = h;  
    68                 this.str=str;  
    69             }  
    70       
    71             @Override  
    72             public void run() {  
    73                 process(h, str);  
    74             }  
    75       
    76         }  
    77     }  
     1  package server;  
     2       
     3     import java.io.IOException;  
     4     import java.nio.channels.SelectionKey;  
     5     import java.nio.channels.SocketChannel;  
     6     import java.util.concurrent.ThreadPoolExecutor;  
     7       
     8     public class WorkState implements HandlerState {  
     9       
    10         public WorkState() {  
    11         }  
    12           
    13         @Override  
    14         public void changeState(TCPHandler h) {  
    15             // TODO Auto-generated method stub  
    16             h.setState(new WriteState());  
    17         }  
    18       
    19         @Override  
    20         public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,  
    21                 ThreadPoolExecutor pool) throws IOException {  
    22             // TODO Auto-generated method stub  
    23               
    24         }  
    25       
    26     }  
     1     package server;  
     2       
     3     import java.io.IOException;  
     4     import java.nio.ByteBuffer;  
     5     import java.nio.channels.SelectionKey;  
     6     import java.nio.channels.SocketChannel;  
     7     import java.util.concurrent.ThreadPoolExecutor;  
     8       
     9     public class WriteState implements HandlerState{  
    10       
    11         public WriteState() {  
    12         }  
    13           
    14         @Override  
    15         public void changeState(TCPHandler h) {  
    16             // TODO Auto-generated method stub  
    17             h.setState(new ReadState());  
    18         }  
    19       
    20         @Override  
    21         public void handle(TCPHandler h, SelectionKey sk, SocketChannel sc,  
    22                 ThreadPoolExecutor pool) throws IOException { // send()  
    23             // get message from message queue  
    24               
    25             String str = "Your message has sent to "  
    26                     + sc.socket().getLocalSocketAddress().toString() + "
    ";  
    27             ByteBuffer buf = ByteBuffer.wrap(str.getBytes()); // wrap自動把buf的position設為0,所以不需要再flip()  
    28       
    29             while (buf.hasRemaining()) {  
    30                 sc.write(buf); // 回傳給client回應字符串,發送buf的position位置 到limit位置為止之間的內容  
    31             }  
    32               
    33             h.setState(new ReadState()); // 改變狀態(SENDING->READING)  
    34             sk.interestOps(SelectionKey.OP_READ); // 通過key改變通道註冊的事件  
    35             sk.selector().wakeup(); // 使一個阻塞住的selector操作立即返回  
    36         }  
    37     }  
     1     package server;  
     2       
     3     import java.io.IOException;  
     4       
     5     public class Main {  
     6       
     7           
     8         public static void main(String[] args) {  
     9             // TODO Auto-generated method stub  
    10             try {  
    11                 TCPReactor reactor = new TCPReactor(1333);  
    12                 reactor.run();  
    13             } catch (IOException e) {  
    14                 // TODO Auto-generated catch block  
    15                 e.printStackTrace();  
    16             }  
    17         }  
    18       
    19     }  

    总的来说,多线程版本的reactor是为了解决单线程reactor版本的IO和CPU处理速度不匹配问题,从而达到高效处理的目的

    参考文章:

    https://blog.csdn.net/yehjordan/article/details/51017025

  • 相关阅读:
    CSS3 target伪类简介
    不用position,让div垂直居中
    css3 在线编辑工具 连兼容都写好了
    a标签伪类的顺序
    oncopy和onpaste
    【leetcode】1523. Count Odd Numbers in an Interval Range
    【leetcode】1518. Water Bottles
    【leetcode】1514. Path with Maximum Probability
    【leetcode】1513. Number of Substrings With Only 1s
    【leetcode】1512. Number of Good Pairs
  • 原文地址:https://www.cnblogs.com/billmiao/p/9872221.html
Copyright © 2011-2022 走看看