zoukankan      html  css  js  c++  java
  • Reactor模型-多线程程版

    1.概述

    Reactor单线程版本的设计中,I/O任务乃至业务逻辑都由Reactor线程来完成,这无疑增加了Reactor线程的负担,高负载情况下必然会出现性能瓶颈。此外,对于多处理器的服务器来说,单个Reactor线程也发挥不了多CPU的最大功效。下面我们对之前单线程版的Reactor进行改进。

    改进方向
    1. 接受客户端连接请求的不在是单个线程-Acceptor,而是一个NIO线程池。
    2. I/O处理也不再是单个线程处理,而是交给一个I/O线程池进行处理。

    其实改进方向很明确:就是针对可能的系统瓶颈,由单线程改进为多线程处理。这样的方案带来的好处显而易见,增加可靠性的同时也发挥多线程的优势,在高负载的情况下能够从容应对。

    Key Word

    Java NIO 事件驱动 主从Reactor模型


    2.code未动,test先行

    首先定义服务端用于处理请求的Handler,通过实现ChannelHandler接口完成。

    public class SimpleServerChannelHandler implements ChannelHandler {
        
        private static Logger LOG = LoggerFactory.getLogger(SimpleServerChannelHandler.class);
        
        //记录接受消息的次数
        public volatile int receiveSize;
        
        //记录抛出的异常
        public volatile Throwable t;
        
        @Override
        public void channelActive(NioChannel channel) {
            if(LOG.isDebugEnabled()){
                LOG.debug("ChannelActive");
            }
        }
    
        @Override
        public void channelRead(NioChannel channel, Object msg) throws Exception {
            
            ByteBuffer bb = (ByteBuffer)msg;
    
            byte[] con = new byte[bb.remaining()];
            bb.get(con);
    
            String str = new String(con,0,con.length);
    
            String resp = "";
            switch(str){
            case "request1":resp = "response1";break;
            case "request2":resp = "response2";break;
            case "request3":resp = "response3";break;
            default :resp = "Hello Client";
            }
    
            ByteBuffer buf = ByteBuffer.allocate(resp.getBytes().length);
            buf.put(resp.getBytes());
            
            receiveSize++;
            
            channel.sendBuffer(buf);
        }
    
        @Override
        public void exceptionCaught(NioChannel channel, Throwable t)
                throws Exception {
            this.t = t;
            channel.close();
        }
    
    }
    

    Junit测试用例,setUp用于启动Server端和Client端。

    public class ReactorTest extends BaseTest{
        private static final Logger LOG = LoggerFactory.getLogger(ReactorTest.class);
    
        private static String HOST = "localhost";
    
        private static int PORT = 8888;
    
        private static Client client;
        private static Server server;
    
        static SimpleServerChannelHandler h;
    
        @BeforeClass
        public static void setUp() throws Exception {
            startServer();
            startClient();
        }
        private static void startServer() throws Exception{
            server = new Server();
            ReactorPool mainReactor = new ReactorPool();
            ReactorPool subReactor = new ReactorPool();
    
            h = new SimpleServerChannelHandler();
            server.reactor(mainReactor, subReactor)
            .handler(h)
            .bind(new InetSocketAddress(HOST,PORT));
        }
        private static void startClient() throws SocketException{
            client = new Client();
            client.socket().setTcpNoDelay(true);
            client.connect(
                    new InetSocketAddress(HOST,PORT));
        }
        @Test
        public void test() {
            LOG.info("Sucessful configuration");
        }
    
        @Test
        public void testBaseFunction(){
            LOG.debug("testBaseFunction()");
    
            String msg ="Hello Reactor";
            ByteBuffer resp = client.syncSend(ByteBuffer.wrap(msg.getBytes()));
            byte[] res = new byte[resp.remaining()];
            resp.get(res);
    
            Assert.assertEquals("Hello Client", new String(res,0,res.length));
        }
    
        @Test
        public void testMultiSend(){
    
            int sendSize = 1024;
    
            for(int i = 0; i < sendSize; i++){
                ByteBuffer bb = ByteBuffer.wrap("Hello Reactor".getBytes());
                ByteBuffer resp = client.syncSend(bb);
                byte[] res = new byte[resp.remaining()];
                resp.get(res);
    
                Assert.assertEquals("Hello Client", new String(res,0,res.length));
            }
            Assert.assertEquals(sendSize, h.receiveSize);
        }
        @Test
        public void testTooLongReceivedByteSizeEexception(){
            LOG.debug("testTooLongReceivedByteSizeEexception()");
    
            int threshold = 1024;
            byte[] dest = new byte[threshold + 1];
            Random r = new Random();
            r.nextBytes(dest);
            client.syncSend(ByteBuffer.wrap(dest));
            
            Assert.assertEquals(IllegalArgumentException.class, h.t.getClass());
            
            Assert.assertEquals("Illegal data length, len:" + (threshold+1), h.t.getMessage());
        }
        @AfterClass
        public static void tearDown() throws Exception {
            server.close();
            client.close();
        }
    }
    

    一共进行三项基本测试:

    testBaseFunction

    实现了基本发送接收消息的功能。

    testMultiSend

    重复发送消息,并且记录消息收发的次数。

    testTooLongReceivedByteSizeEexception

    测试server端在接收到异常码流的情况下,是否抛出异常。

    3.设计及实现

    3.1 Reactor和ReactorPool

    Reactor作用就是不断进行轮询并检查是否有已经就绪的事件,如果有,那么就将事件分发给对应的Handler进行处理。这个角色其实就是NIO编程中的多路复用器java.nio.channels.Selector。因此,Reactor聚合一个Selector类型成员变量。轮询的过程如下:

    public class Reactor extends Thread{
    
    //...
    
        private Selector selector;
    
        private volatile boolean isShutdown;
    
        Reactor(){
            try {
                selector = Selector.open();
            } catch (IOException e) {
                throw new RuntimeException("failed to open a new selector", e);
            }
        }
        
    @Override
        public void run() {
            for(;;){
                try {
                    getSelector().select(wakenUp);
                    Set<SelectionKey> keys;
                    synchronized(this){
                        keys = getSelector().selectedKeys();
                    }
                    Iterator<SelectionKey> it = keys.iterator();
                    while(it.hasNext()){
                        SelectionKey key = it.next();
                        processSelectedKey(key);
                        it.remove();
                    }
                    if(isShutdown()){
                        break;
                    }
                } catch (Throwable e) {
                    LOG.warn("Unexpected exception in the selector loop.", e);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) { }
                }
            }
        }
    }
    

    processSelectedKey(key)中进行的就是根据就绪事件key.readyOps()进行相应操作:

        private void processSelectedKey(SelectionKey key){
            try {       
                NioChannel nioChannel = (NioChannel)key.attachment();
    
                if (!nioChannel.isOpen()) {
                    LOG.warn("trying to do i/o on a null socket");
                    return;
                }
    
                int readyOps = key.readyOps();
                if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                    nioChannel.sink().doRead();
                }
                if((readyOps & SelectionKey.OP_WRITE) != 0){
                    nioChannel.sink().doSend();
                }
                if((readyOps & SelectionKey.OP_CONNECT) != 0){
                    //remove OP_CONNECT
                    key.interestOps((key.interestOps() & ~SelectionKey.OP_CONNECT));
                }
            }catch (Throwable t) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Throwable stack trace", t);
                }
                closeSocket();
            }
        }
    
    

    这里的NioChannel是抽象类,是对NIO编程中的Channel语义的抽象(后面会有分析)。

    此外,Reactor肯定要提供一个注册接口啦。。。

        public SelectionKey register(final NioChannel sc, final int interestOps, Object attachment){
            if(sc == null){
                throw new NullPointerException("SelectableChannel");
            }
            if(interestOps == 0){
                throw new IllegalArgumentException("interestOps must be non-zero.");
            }
            SelectionKey key;
            try {
                key = sc.channel().register(getSelector(), interestOps, attachment);
            } catch (ClosedChannelException e) {
                throw new RuntimeException("failed to register a channel", e);
            }
            return key;
        }
    

    ReactorPool是一个Reactor的线程池,这里就通过简单的数组形式进行模拟:

    public class ReactorPool {
    
        private static final Logger LOG = LoggerFactory.getLogger(ReactorPool.class);
    
        private Reactor[] reactors;
    
        private AtomicInteger index = new AtomicInteger();
        
        //线程数默认为CPU数*2
        private final int DEFAULT_THREADS = Runtime.getRuntime().availableProcessors() * 2;
    
        public ReactorPool (){
            this(0);
        }
        public ReactorPool(int nThreads){
            if(nThreads < 0){
                throw new IllegalArgumentException("nThreads must be nonnegative number");
            }
            if(nThreads == 0){
                nThreads = DEFAULT_THREADS;
            }
            reactors = new Reactor[nThreads];
            for(int i = 0; i < nThreads; i++){
                boolean succeed = false;
                try{
                    reactors[i] = new Reactor();
                    succeed = true;
                }catch(Exception e){
                    throw new IllegalStateException("failed to create a Reactor", e);
                }finally{
                    if (!succeed) {
                        for (int j = 0; j < i; j ++) {
                            reactors[j].close();
                        }
                    }
                }
            }
        }
    
        public Reactor next(){
            return reactors[index.incrementAndGet() % reactors.length];
        }
    
        public void close(){
            for(int i = 0; i < reactors.length; i++){
                reactors[i].setShutdown(true);
                reactors[i].close();
            }
        }
    }
    

    3.2 NioChannel和NioChannelSink

    在进行Java原生Nio编程的过程中,会涉及到两种类型的Channel:

    • java.nio.channels.SocketChannel
    • java.nio.channels.ServerSocketChannel

    其分别作为客户端和服务端调用接口。为了统一其公共行为,这里抽象出一个抽象类NioChannel,其成员组成如下:

    • 聚合一个SelectableChannel类型(SocketChannel和ServerSocketChannel的公共父类)的成员变量。
    • 持有一个所属Reactor对象的引用
    • 聚合一个NioChannelSink类型成员变量。

    NioChannelSink是将NioChannel的底层读写功能独立出来。一方面使NioChannel避免集成过多功能而显得臃肿,另一方面分离出底层传输协议,为以后底层传输协议的切换做准备。(TCP vs UDP,NIO、OIO、AIO)从这种意义上说,NioChannel取名为Channel貌似更合理。

    public abstract class NioChannel {
    
        protected Reactor reactor;
    
        protected SelectableChannel sc;
    
        protected SelectionKey selectionKey;
    
        private NioChannelSink sink;
    
        protected volatile ChannelHandler handler;
        
        public NioChannel(SelectableChannel sc, int interestOps){
            this.sc = sc;
            try {
                sc.configureBlocking(false);
            } catch (IOException e) {
                e.printStackTrace();
            }
            sink = nioChannelSink();
        }
        
        protected void fireChannelRead(ByteBuffer bb){
            try {
                handler.channelRead(this, bb);
            } catch (Exception e) {
                fireExceptionCaught(e);
            }
        }
        protected void fireExceptionCaught(Throwable t){
            try {
                handler.exceptionCaught(this, t);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //。。。
        
        public abstract NioChannelSink nioChannelSink();
    
        public interface NioChannelSink{
    
            void doRead();
    
            void doSend();
    
            void sendBuffer(ByteBuffer bb);
            
            void close();
        }
    }
    

    再来分析下NioChannel需要提供哪些功能:

    首先,NIO编程中SocketChannel或ServerSocketChannel需要注册到多路复用器Selector中。那么这里就抽象成了NioChannel和Reactor的交互。

    public void register(Reactor reactor, int interestOps){
        this.reactor = reactor;
        try {
            selectionKey = sc.register(reactor().getSelector(), interestOps, this);
        } catch (ClosedChannelException e) {
            e.printStackTrace();
        }
    }
    

    这里将NioChannel对象作为附件,在Reactor中心轮询到ready事件后,会根据事件的类型(OP_ACCEPT OP_READ等),从SelectionKey中取出绑定的附件NioChannel

    NioChannel nioChannel = (NioChannel)key.attachment();
    

    然后根据进行key.readyOps()做相应操作。这在Reactor中已经做过分析。

    其次,作为Channel肯定要提供绑定bind和连接connect的功能了:

    public abstract void bind(InetSocketAddress remoteAddress) throws Exception;
        
    public abstract void connect(InetSocketAddress remoteAddress) throws Exception;
    

    这里用抽象方法是要将实现交由子类来完成。

    最后,是用户通过NioChannel发送的消息的函数:

    public void sendBuffer(ByteBuffer bb){
        sink().sendBuffer(bb);
    }
    
    protected final void enableWrite(){
        int i = selectionKey.interestOps();
        if((i & SelectionKey.OP_WRITE) == 0){
            selectionKey.interestOps(i | SelectionKey.OP_WRITE);
        }
    }
    protected final void disableWrite(){
        int i = selectionKey.interestOps();
        if((i & SelectionKey.OP_WRITE) == 1){
            selectionKey.interestOps(i & (~SelectionKey.OP_WRITE));         
        }
    }
    

    3.3 NioServerSocketChannel和NioSocketChannel

    NioServerSocketChannel和NioSocketChannel是抽象类NioChannel的一个子类,NioServerSocketChannel和java.nio.channels.ServerSocketChannel的语义是一致的,供服务端使用,绑定指定端口,监听客户端发起的连接请求,并交由相应Handler处理。而NioSocketChannel和java.nio.channels.NioSocketChannel语义一致,作为通信的一个通道。

    public class NioServerSocketChannel extends NioChannel{
    
        private static final Logger LOG = LoggerFactory.getLogger(NioServerSocketChannel.class);
        
        public NioServerSocketChannel(){
            super(newSocket());
        }
        
        public static ServerSocketChannel newSocket(){
            ServerSocketChannel socketChannel = null;
            try {
                socketChannel = ServerSocketChannel.open();
            } catch (IOException e) {
                LOG.error("Unexpected exception occur when open ServerSocketChannel");
            }
            return socketChannel;
        }
        
        @Override
        public NioChannelSink nioChannelSink() {
            return new NioServerSocketChannelSink();
        }
        
        class NioServerSocketChannelSink implements NioChannelSink{
            //。。。
        }
            @Override
        public void bind(InetSocketAddress remoteAddress) throws Exception {
            ServerSocketChannel ssc = (ServerSocketChannel)sc;
            ssc.bind(remoteAddress);
        }
        @Override
        public void connect(InetSocketAddress remoteAddress) throws Exception {
            throw new UnsupportedOperationException();
        }
    }
    

    这里获取ServerSocketChannel实例的方式是通过ServerSocketChannel.open(),其实也可以通过反射来获取,这样就能将ServerSocketChannel和SocketChannel的实例化逻辑进行统一,我们只需要在实例化Channel的时候将ServerSocketChannel.class 或 SocketChannel.class当作参数传入即可。

    NioSocketChannel的实现如下:

    public class NioSocketChannel extends NioChannel{
    
        private static final Logger LOG = LoggerFactory.getLogger(NioSocketChannel.class);
    
        public NioSocketChannel() throws IOException{
            super( newSocket());
        }
        public NioSocketChannel(SocketChannel sc) throws IOException{
            super(sc);
        }
        public static SocketChannel newSocket(){
            SocketChannel socketChannel = null;
            try {
                socketChannel = SocketChannel.open();
            } catch (IOException e) {
            }
            return socketChannel;
        }
    
        @Override
        public NioChannelSink nioChannelSink() {
            return new NioSocketChannelSink();
        }
        
        class NioSocketChannelSink implements NioChannelSink{
            //。。。
        }
        
        @Override
        public void bind(InetSocketAddress remoteAddress) throws Exception {
            throw new UnsupportedOperationException();
        }
        @Override
        public void connect(InetSocketAddress remoteAddress) throws Exception {
            SocketChannel socketChannel = (SocketChannel)sc;
            socketChannel.connect(remoteAddress);
        }
    }
    

    3.4 NioServerSocketChannelSink和NioSocketChannelSink

    通过上面分析可知,NioChannel的只向上提供了操作接口,而具体的底层读写等功能全部代理给了NioChannelSink完成。接下来分析下NioChannelSink的两个子类NioServerSocketChannelSink和NioSocketChannelSink。

    首先再看下NioChannelSink的接口:

        public interface NioChannelSink{
    
            void doRead();
    
            void doSend();
    
            void sendBuffer(ByteBuffer bb);
            
            void close();
        }
    

    对于NioChannelSink的两个实现类来说,每个方法所对应的语义如下:

    doRead()

    • NioServerSocketChannelSink:通过accept()接受客户端的请求。
    • NioSocketChannelSink:读取NioChannel中的数据

    doSend()

    • NioServerSocketChannelSink:不支持。
    • NioSocketChannelSink:将缓冲区中数据写入NioChannel

    sendBuffer()

    • NioServerSocketChannelSink:不支持。
    • NioSocketChannelSink:发送数据,其实就是将待发送数据加入缓冲队列中。

    close()

    • NioServerSocketChannelSink:关闭Channel。
    • NioSocketChannelSink:同上。

    当然了,作为网络编程中的Channel所提供的功能原比这里要多且复杂,作为学习Demo,这里只实现了最常用的几个功能。

    下面看下NioServerSocketChannelSink的实现:

    public class NioServerSocketChannel extends NioChannel{
    
        //。。。
        
        class NioServerSocketChannelSink implements NioChannelSink{
    
            public void doRead() {
                try {
                    ServerSocketChannel ssc = (ServerSocketChannel)sc;
                    handler.channelRead(NioServerSocketChannel.this,
                            new NioSocketChannel(ssc.accept()));
                    if(LOG.isDebugEnabled()){
                        LOG.debug("Dispatch the SocketChannel to SubReactorPool");
                    }
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
    
            public void doSend(){
                throw new UnsupportedOperationException();
            }
    
            @Override
            public void sendBuffer(ByteBuffer bb) {
                throw new UnsupportedOperationException();
            }
    
            @Override
            public void close() {
                try {
                    if(sc != null){
                        sc.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }// end NioChannelSink
        
        //。。。
    }
    

    下面是NioSocketChannelSink实现:

    public class NioSocketChannel extends NioChannel{
        
        //。。。
        
        class NioSocketChannelSink implements NioChannelSink{
            
            private static final int MAX_LEN = 1024;
            
            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
    
            ByteBuffer inputBuffer = lenBuffer;
    
            ByteBuffer outputDirectBuffer = ByteBuffer.allocateDirect(1024 * 64);
    
            LinkedBlockingQueue<ByteBuffer> outputQueue = new LinkedBlockingQueue<ByteBuffer>();
    
            public void close(){
                //clear buffer
                outputDirectBuffer = null;
    
                try {
                    if(sc != null){
                        sc.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
            public void doRead() {
                
                SocketChannel socketChannel = (SocketChannel)sc;
    
                int byteSize;
                try {
                    byteSize = socketChannel.read(inputBuffer);
    
                    if(byteSize < 0){
                        LOG.error("Unable to read additional data");
                        throw new RuntimeException("Unable to read additional data");
                    }
                    if(!inputBuffer.hasRemaining()){
    
                        if(inputBuffer == lenBuffer){
                            //read length
                            lenBuffer.flip();
                            int len = lenBuffer.getInt();
                            if(len < 0 || len > MAX_LEN){
                                throw new IllegalArgumentException("Illegal data length, len:" + len);
                            }
                            //prepare for receiving data
                            inputBuffer = ByteBuffer.allocate(len);
                            inputBuffer.clear();
                        }else{
                            //read data
                            if(inputBuffer.hasRemaining()){
                                socketChannel.read(inputBuffer);
                            }
                            if(!inputBuffer.hasRemaining()){
                                inputBuffer.flip();
                                
                                fireChannelRead(inputBuffer);
                                
                                //clear lenBuffer and waiting for next reading operation 
                                lenBuffer.clear();
                                inputBuffer = lenBuffer;
                            }
                        }
                    }
                } catch (Throwable t) {
                    if(LOG.isDebugEnabled()){
                        LOG.debug("Exception :" + t);
                    }
                    fireExceptionCaught(t);
                }
            }
    
            public void doSend(){
                /**
                 * write data to channel:
                 * step 1: write the length of data(occupy 4 byte)
                 * step 2: data content
                 */
                try {
                    if(outputQueue.size() > 0){
                        ByteBuffer directBuffer = outputDirectBuffer;
                        directBuffer.clear();
                        for(ByteBuffer buf : outputQueue){
                            buf.flip();
    
                            if(buf.remaining() > directBuffer.remaining()){
                                //prevent BufferOverflowException
                                buf = (ByteBuffer) buf.slice().limit(directBuffer.remaining());
                            }
                            //transfers the bytes remaining in buf into  directBuffer
                            int p = buf.position();
                            directBuffer.put(buf);
                            //reset position
                            buf.position(p);
    
                            if(!directBuffer.hasRemaining()){
                                break;
                            }
                        }
                        directBuffer.flip();
                        int sendSize = ((SocketChannel)sc).write(directBuffer);
    
                        while(!outputQueue.isEmpty()){
                            ByteBuffer buf = outputQueue.peek();
                            int left = buf.remaining() - sendSize;
                            if(left > 0){
                                buf.position(buf.position() + sendSize);
                                break;
                            }
                            sendSize -= buf.remaining();
                            outputQueue.remove();
                        }
                    }
    
                    synchronized(reactor){
                        if(outputQueue.size() == 0){
                            //disable write
                            disableWrite();
                        }else{
                            //enable write
                            enableWrite();
                        }
                    }
                } catch (Throwable t) {
                    fireExceptionCaught(t);
                }
            }
            private ByteBuffer wrapWithHead(ByteBuffer bb){
                bb.flip();
                lenBuffer.clear();
                int len = bb.remaining();
                lenBuffer.putInt(len);
                ByteBuffer resp = ByteBuffer.allocate(len+4);
    
                lenBuffer.flip();
                resp.put(lenBuffer);
                resp.put(bb);
    
                return resp;
            }
            public void sendBuffer(ByteBuffer bb){
                try{
                    synchronized(this){
                        //wrap ByteBuffer with length header
                        ByteBuffer wrapped = wrapWithHead(bb);
    
                        outputQueue.add(wrapped);
    
                        enableWrite();
                    }
                }catch(Exception e){
                    LOG.error("Unexcepted Exception: ", e);
                }
            }
        }// end NioSocketChannelSink
        
        //。。。
    }
    

    NioSocketChannelSink中的读写功能在Reactor单线程版本里已经分析过,这里就不再赘述。

    3.5 ChannelHandler

    ChannelHandler是Reactor框架提供给用户进行自定义的接口。接口提供了常用的接口:

    public interface ChannelHandler {
        
        void channelActive(NioChannel channel);
        
        void channelRead(NioChannel channel, Object msg) throws Exception;
        
        void exceptionCaught(NioChannel channel, Throwable t) throws Exception;
    }
    

    4. 总结

    4.1 软件设计中的一些注意点

    时刻紧绷一根弦:资源是有限的

    比如在网络编程中,每建立一个Socket连接都会消耗一定资源,当回话结束后一定要关闭。此外,必须考虑非正常流程时的情况。比如发生异常,可能执行不到关闭资源的操作。 如ReactorPool的实例化过程:

        public ReactorPool(int nThreads){
            //。。
            reactors = new Reactor[nThreads];
            for(int i = 0; i < nThreads; i++){
                boolean succeed = false;
                try{
                    reactors[i] = new Reactor();
                    succeed = true;
                }catch(Exception e){
                    throw new IllegalStateException("failed to create a Reactor", e);
                }finally{
                    if (!succeed) {
                        for (int j = 0; j < i; j ++) {
                            reactors[j].close();
                        }
                    }
                }
            }
        }
    

    当实例化过程中发送异常时,记得要及时回收已占用资源。

    又比如在通信一端接受字节流的时候需要注意对异常码流的处理,避免码流过大而耗尽内存,导致OOM。

    并发操作分析

    • 这个类是线程安全的吗?
    • 这个方法是在哪个线程中执行的?
    • 是否是热点区域?
    • 是否存在并发修改的可能?
    • 并发修改是否可见?

    在单线程版的Reactor模型中,所有的逻辑都由Reactor单个线程执行,不存在多线程并发操作的情况,那么在我们添加了线程池workerPool后,情况又会怎么样呢?

    一般我们在分析并发性问题,通常的做法是先找到可能被多个线程共同访问的类,再分析下这个类是否是线程安全的。如何判断某个类是否是线程安全的?

    1. 该类是否是有状态的,无状态的类一定是线程安全的。
    2. 如果有状态,是否可变。如果一个类状态不可变,那么肯定也是线程安全的。

    所谓的状态暂可以简单理解为是否有成员变量,不管是静态成员变量还是普通成员变量。

    关于"单一职责"

    单一职责原则是面向对象软件设计的基本原则之一,难点在于接口的职责如何划分,而职责的划分又需要具体问题具体考虑。拿本次这个小Demo来说,NioChannel的职责是作为数据传输通道,而通道中数据传输方式可能有很多种,那么这里就抽象出一个NioChannelSink接口负责具体传输方式的实现。

    职责粒度的划分需要根据需求好好把控。过粗不利于扩展,过细不利于实现。



    作者:TopGun_Viper
    链接:https://www.jianshu.com/p/847600114337
    來源:简书
    简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。
  • 相关阅读:
    freertos 启动任务调度器后卡在svc 0,汇编停在了0x0800014A E7FE B 0x0800014A
    cadence报错:Class must be one of IC, IO, DISCRETE, MECHANICAL, PLATING_BAR or DRIVER_CELL.
    DDR内存256M16、512M8含义
    常用摄像头像素
    cadence报错because the library part is newer than the part in the design cache.Select the part in the cache and choose Design-Update Cache,and then place the part again.
    ESP-Example ble-ancs解析
    ping 请求找不到主机 www.baidu.com
    linux驱动ioctl报[-Werror=incompatible-pointer-types]错
    常用排序算法对比
    修改gitlab服务器网段后修改git配置的方法
  • 原文地址:https://www.cnblogs.com/wuer888/p/10144412.html
Copyright © 2011-2022 走看看