zoukankan      html  css  js  c++  java
  • Two-Phase Termination Design Pattern

    分两个阶段结束线程

    第一阶段:结束线程
    第二阶段:清理释放资源

    简单版

    package com.dwz.concurrency2.chapter17;
    
    import java.util.Random;
    
    public class CounterIncrement extends Thread {
        
        private volatile boolean terminated = false;
        
        private int counter = 0;
        
        private Random random = new Random(System.currentTimeMillis());
        
        @Override
        public void run() {
            try {
                while (!terminated) {
                    System.out.println(Thread.currentThread().getName() + " " + counter++);
                    Thread.sleep(random.nextInt(1000));
                } 
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                this.clean();
            }
        }
        
        private void clean() {
            System.out.println("do some clean work for the second phase, current counter is " + counter);
        }
        
        public void close() {
            this.terminated = true;
            this.interrupt();
        }
    }

    测试

    package com.dwz.concurrency2.chapter17;
    
    public class CounterTest {
        public static void main(String[] args) throws InterruptedException {
            CounterIncrement counterIncrement = new CounterIncrement();
            counterIncrement.start();
            
            Thread.sleep(10_000L);
            counterIncrement.close();
        }
    }

    改进版

    AppServer服务器

    package com.dwz.concurrency2.chapter17;
    
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class AppServer extends Thread {
        private final int port;
        
        private static final int DEFAULT_PORT = 12722;
        
        private volatile boolean start = true;
        
        private List<ClientHandler> clientHandlers = new ArrayList<>();
        
        private final ExecutorService executor = Executors.newFixedThreadPool(10);
        
        private ServerSocket server;
        
        public AppServer() {
            this(DEFAULT_PORT);
        }
        
        public AppServer(int port) {
            this.port = port;
        }
        
        @Override
        public void run() {
            try {
                this.server = new ServerSocket(port);
                while (start) {
                    Socket client = server.accept();
                    ClientHandler clientHandler = new ClientHandler(client);
                    executor.submit(clientHandler);
                    this.clientHandlers.add(clientHandler);
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            } finally {
                this.dispose();
            }
        }
        
        private void dispose() {
            this.clientHandlers.stream().forEach(ClientHandler::stop);
            this.executor.shutdown();
        }
    
        public void shotdown() throws IOException {
            this.start = false;
            this.interrupt();
            this.server.close();
        }
    }

    ClientHandler控制器

    package com.dwz.concurrency2.chapter17;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.io.OutputStream;
    import java.io.PrintWriter;
    import java.net.Socket;
    
    public class ClientHandler implements Runnable {
        private final Socket socket;
        
        private volatile boolean running = true;
    
        public ClientHandler(Socket socket) {
            this.socket = socket;
        }
        
        @Override
        public void run() {
            //将需要关闭的资源放到try(){}的()中会主动帮我们释放资源,@since jdk1.7
            try (InputStream inputstream = socket.getInputStream();
                 OutputStream outputstream = socket.getOutputStream();
                 BufferedReader br = new BufferedReader(new InputStreamReader(inputstream));
                 PrintWriter printWriter = new PrintWriter(outputstream)) {
                while (running) {
                    String message = br.readLine();
                    if(null == message) {
                        break;
                    }
                    System.out.println("Come from client ->" + message);
                    printWriter.write("echo dang->" + message + "
    ");
                    printWriter.flush();
                }
            } catch (IOException e) {
                this.running = false;
            } finally {
                this.stop();
            }
        }
        
        public void stop() {
            if(!running) {
                return;
            }
            
            this.running = false;
            try {
                this.socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    测试步骤:

    1.启动AppServer 

    2.在cmd中输入指令 telnet localhost port连上服务器发送消息

    package com.dwz.concurrency2.chapter17;
    
    import java.io.IOException;
    
    public class AppServerClient {
        public static void main(String[] args) throws InterruptedException, IOException {
            AppServer appServer = new AppServer(13345);
            appServer.start();
            
            Thread.sleep(20_000L);
            appServer.shotdown();
        }
    }
  • 相关阅读:
    Spark算子(二)Action
    Spark中利用Scala进行数据清洗(代码)
    Spark核心概念
    Scala面向对象详解
    Scala控制语句
    Scala基础语法
    Scala简介、安装、函数、面向对象
    Hbase优化
    管理员必备的20个Linux系统监控工具
    iOS 关于webView的使用方法
  • 原文地址:https://www.cnblogs.com/zheaven/p/12167191.html
Copyright © 2011-2022 走看看