zoukankan      html  css  js  c++  java
  • Worker Thread模式

    工人线程Worker thread会逐个取回工作并进行处理,当所有工作全部完成后,工人线程会等待新的工作到来

    5个工人线程从传送带取数据,3个传送工人线程将数据放入传送带

    public class Channel {
    
        private final static int MAX_REQUEST = 100;
    
        private final Request[] requestQueue;
    
        private int head;
    
        private int tail;
    
        private int count;
    
        private final WorkerThread[] workerPool;
    
        public Channel(int workers) {
            this.requestQueue = new Request[MAX_REQUEST];
            this.head = 0;
            this.tail = 0;
            this.count = 0;
            this.workerPool = new WorkerThread[workers];
            this.init();
        }
    
        private void init() {
            for (int i = 0; i < workerPool.length; i++) {
                workerPool[i] = new WorkerThread("【工人:"+i+"", this);
            }
        }
    
        /**
         * push switch to start all of worker to work.
         */
        public void startWorker() {
            Arrays.asList(workerPool).forEach(WorkerThread::start);
        }
    
        public synchronized void put(Request request) {
            while (count >= requestQueue.length) {
                try {
                    this.wait();
                } catch (Exception e) {
                }
            }
    
            this.requestQueue[tail] = request;
            this.tail = (tail + 1) % requestQueue.length;
            this.count++;
            this.notifyAll();
        }
    
        public synchronized Request take() {
            while (count <= 0) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            Request request = this.requestQueue[head];
            this.head = (this.head + 1) % this.requestQueue.length;
            this.count--;
            this.notifyAll();
            return request;
        }
    }
    public class Request {
    
        private final String name;
    
        private final int number;
    
        public Request(final String name, final int number) {
            this.name = name;
            this.number = number;
        }
    
        public void execute() {
            System.out.println(Thread.currentThread().getName() + " executed " + this);
        }
    
        @Override
        public String toString() {
            return " Request=> No." + number + " Name." + name;
        }
    }
    public class TransportThread extends Thread {
        private final Channel channel;
    
        private static final Random random = new Random(System.currentTimeMillis());
    
        public TransportThread(String name, Channel channel) {
            super(name);
            this.channel = channel;
        }
    
        @Override
        public void run() {
            try {
                for (int i = 0; true; i++) {
                    Request request = new Request(getName(), i);
                    //向channel中放入request对象(谁放的,编号是多少)
                    this.channel.put(request);
                    Thread.sleep(random.nextInt(1_000));
                }
            } catch (Exception e) {
            }
        }
    }
    public class WorkerThread extends Thread {
    
        private final Channel channel;
    
        private static final Random random = new Random(System.currentTimeMillis());
    
        public WorkerThread(String name, Channel channel) {
            super(name);
            this.channel = channel;
        }
    
        @Override
        public void run() {
            while (true) {
                //取出request,执行里面的方法
                channel.take().execute();
                try {
                    Thread.sleep(random.nextInt(1_000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public class Test {
        public static void main(String[] args) {
            
            final Channel channel = new Channel(5);
            channel.startWorker();
    
            new TransportThread("Mark", channel).start();
            new TransportThread("Jack", channel).start();
            new TransportThread("Irish", channel).start();
        }
    }
  • 相关阅读:
    【分布式】缓存穿透、缓存雪崩,缓存击穿解决方案
    mongodb常用查询语法
    依据记录总数和每页大小取页数(转)
    SpringBoot普通类中如何获取其他bean例如Service、Dao(转)
    RabbitMQ三种Exchange模式(fanout,direct,topic)的性能比较(转)
    java中job运行时间
    如何查看某个端口被谁占用
    Push to origin/master was rejected (Git提交错误)(转)
    curl网站开发指南
    2012 不宜进入的三个技术点(中)
  • 原文地址:https://www.cnblogs.com/moris5013/p/10999189.html
Copyright © 2011-2022 走看看