zoukankan      html  css  js  c++  java
  • 调度模式·WorkerChannelRequest


    以下代码在《Java多线程设计模式》(结城浩著,博硕文化译,中国铁道出版社,2005)第8章"Worler Thread --等到工作来,来了就工作"代码基础上改进而得。

    改动主要为:
    (1)提炼了IRequest接口
    (2)添加了终结工作线程的方法。

    参与者:
    Client--委托方。
    Channel--生产线
    Request--生产任务
    WorkerThread--工作线程

    委托方把Request放入Channel中。Worker从Channel中取出Request,进行加工。

    代码:

    Channel.java

    public class Channel
    {
        
    private static final int MAX_REQUEST = 100;

        
    private final IRequest[] requestQueue;

        
    private int tail; // 下一个putRequest的地方

        
    private int head; // 下一个takeRequest的地方

        
    private int count; // Request的数量
        
        
    private boolean stoped = true;

        
    private final WorkerThread[] threadPool;

        
    public Channel(int threads)
        {
            
    this.requestQueue = new Request[MAX_REQUEST];
            
    this.head = 0;
            
    this.tail = 0;
            
    this.count = 0;

            threadPool 
    = new WorkerThread[threads];
            
    for (int i = 0; i < threadPool.length; i++)
            {
                threadPool[i] 
    = new WorkerThread(this);
            }
        }

        
    public void startWorkers()
        {
            
    this.stoped = false;
            
    for (int i = 0; i < threadPool.length; i++)
            {
                threadPool[i].start();
            }
        }
        
        
    public synchronized int getCount()
        {
            
    return this.count;
        }
        
        
    public synchronized boolean isStoped()
        {
            
    return this.stoped;
        }
        
        
    public synchronized void stopWorkers()
        {
            
    this.stoped = true;
        }

        
    public synchronized void putRequest(IRequest request)
        {
            
    while (count >= requestQueue.length)
            {
                
    try
                {
                    wait();
                }
                
    catch (InterruptedException e)
                {
                }
            }
            requestQueue[tail] 
    = request;
            tail 
    = (tail + 1% requestQueue.length;
            count
    ++;
            notifyAll();
        }

        
    public synchronized IRequest takeRequest()
        {
            
    while (count <= 0)
            {
                
    try
                {
                    wait();
                }
                
    catch (InterruptedException e)
                {
                }
            }
            IRequest request 
    = requestQueue[head];
            head 
    = (head + 1% requestQueue.length;
            count
    --;
            notifyAll();
            
    return request;
        }
    }

    IRequest.java

    public interface IRequest
    {
        
    public abstract void execute() throws Exception;
    }

    WorkerThread.java

    public class WorkerThread extends Thread
    {
        
    private final Channel channel;

        
    public WorkerThread(Channel channel)
        {
            
    this.channel = channel;
        }

        
    public void run()
        {
            System.out.println(
    "[Thread]: WorkerThread " + this.getName()
                    
    + " start!");

            
    while (true)
            {
                
    if (channel.getCount() <= 0 && channel.isStoped())
                {
                    System.out.println(
    "[Thread]: WorkerThread " + this.getName()
                            
    + " stop!");
                    stop();
                }
                
    else
                {
                    IRequest request 
    = channel.takeRequest();
                    
    try
                    {
                        request.execute();
                    }
                    
    catch (Exception e)
                    {
                        System.out.println(e.getMessage());
                    }
                }
            }
        }
    }

    Client方调用方法:

    (1) 首先初始化Channel,设定工作线程数。
    int workerCount = ......;
    channel = new Channel(workerCount);
    (2)启动生产线
    channel.startWorkers();
    (3)放置Request
    channel.putRequest(aRequest);
    (4)下班了--停止生产线
    channel.stopWorkers();
    workers把目前生产线上的Request处理完后,自己Stop掉自己。
    版权所有,欢迎转载
  • 相关阅读:
    redis发布订阅
    redis学习笔记(面试题)
    redis安全 (error) NOAUTH Authentication required
    HDU3001 Travelling —— 状压DP(三进制)
    POJ3616 Milking Time —— DP
    POJ3186 Treats for the Cows —— DP
    HDU1074 Doing Homework —— 状压DP
    POJ1661 Help Jimmy —— DP
    HDU1260 Tickets —— DP
    HDU1176 免费馅饼 —— DP
  • 原文地址:https://www.cnblogs.com/xiaotie/p/267044.html
Copyright © 2011-2022 走看看