zoukankan      html  css  js  c++  java
  • Worker Thread模式

    工人线程Worker thread会逐个取回工作并进行处理,当所有工作全部完成后,工人线程会等待新的工作到来

    5个工人线程从传送带取数据,3个传送工人线程将数据放入传送带

    public class Channel {
    
        private final static int MAX_REQUEST = 100;
    
        private final Request[] requestQueue;
    
        private int head;
    
        private int tail;
    
        private int count;
    
        private final WorkerThread[] workerPool;
    
        public Channel(int workers) {
            this.requestQueue = new Request[MAX_REQUEST];
            this.head = 0;
            this.tail = 0;
            this.count = 0;
            this.workerPool = new WorkerThread[workers];
            this.init();
        }
    
        private void init() {
            for (int i = 0; i < workerPool.length; i++) {
                workerPool[i] = new WorkerThread("【工人:"+i+"", this);
            }
        }
    
        /**
         * push switch to start all of worker to work.
         */
        public void startWorker() {
            Arrays.asList(workerPool).forEach(WorkerThread::start);
        }
    
        public synchronized void put(Request request) {
            while (count >= requestQueue.length) {
                try {
                    this.wait();
                } catch (Exception e) {
                }
            }
    
            this.requestQueue[tail] = request;
            this.tail = (tail + 1) % requestQueue.length;
            this.count++;
            this.notifyAll();
        }
    
        public synchronized Request take() {
            while (count <= 0) {
                try {
                    this.wait();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            Request request = this.requestQueue[head];
            this.head = (this.head + 1) % this.requestQueue.length;
            this.count--;
            this.notifyAll();
            return request;
        }
    }
    public class Request {
    
        private final String name;
    
        private final int number;
    
        public Request(final String name, final int number) {
            this.name = name;
            this.number = number;
        }
    
        public void execute() {
            System.out.println(Thread.currentThread().getName() + " executed " + this);
        }
    
        @Override
        public String toString() {
            return " Request=> No." + number + " Name." + name;
        }
    }
    public class TransportThread extends Thread {
        private final Channel channel;
    
        private static final Random random = new Random(System.currentTimeMillis());
    
        public TransportThread(String name, Channel channel) {
            super(name);
            this.channel = channel;
        }
    
        @Override
        public void run() {
            try {
                for (int i = 0; true; i++) {
                    Request request = new Request(getName(), i);
                    //向channel中放入request对象(谁放的,编号是多少)
                    this.channel.put(request);
                    Thread.sleep(random.nextInt(1_000));
                }
            } catch (Exception e) {
            }
        }
    }
    public class WorkerThread extends Thread {
    
        private final Channel channel;
    
        private static final Random random = new Random(System.currentTimeMillis());
    
        public WorkerThread(String name, Channel channel) {
            super(name);
            this.channel = channel;
        }
    
        @Override
        public void run() {
            while (true) {
                //取出request,执行里面的方法
                channel.take().execute();
                try {
                    Thread.sleep(random.nextInt(1_000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    public class Test {
        public static void main(String[] args) {
            
            final Channel channel = new Channel(5);
            channel.startWorker();
    
            new TransportThread("Mark", channel).start();
            new TransportThread("Jack", channel).start();
            new TransportThread("Irish", channel).start();
        }
    }
  • 相关阅读:
    监控LVS
    技巧:结合Zabbix与SNMP监控嵌入式设备
    Vmware Exsi使用简要说明
    (转)Linux LVM逻辑卷配置过程详解(创建、扩展、缩减、删除、卸载、快照创建)
    Linux系统下减少LV(逻辑卷)容量
    Linux系统下增加LV(逻辑卷)容量 、Linux系统下减少LV(逻辑卷)容量
    yarn命令删除job
    mr自定义排序和分类
    mr利用shuffle阶段来实现数据去重的功能
    hadoop如何使用第三方依赖jar包(转载)
  • 原文地址:https://www.cnblogs.com/moris5013/p/10999189.html
Copyright © 2011-2022 走看看