zoukankan      html  css  js  c++  java
  • Java多线程设计模式之线程池模式

    前序:

      Thread-Per-Message Pattern,是一种对于每个命令或请求,都分配一个线程,由这个线程执行工作。它将“委托消息的一端”和“执行消息的一端”用两个不同的线程来实现。该线程模式主要包括三个部分:

      1,Request参与者(委托人),也就是消息发送端或者命令请求端

      2,Host参与者,接受消息的请求,负责为每个消息分配一个工作线程。

      3,Worker参与者,具体执行Request参与者的任务的线程,由Host参与者来启动。

      由于常规调用一个方法后,必须等待该方法完全执行完毕后才能继续执行下一步操作,而利用线程后,就不必等待具体任务执行完毕,就可以马上返回继续执行下一步操作。

      背景:

      由于在Thread-Per-Message Pattern中对于每一个请求都会生成启动一个线程,而线程的启动是很花费时间的工作,所以鉴于此,提出了Worker Thread,重复利用已经启动的线程。

      线程池:

      Worker Thread,也称为工人线程或背景线程,不过一般都称为线程池。该模式主要在于,事先启动一定数目的工作线程。当没有请求工作的时候,所有的工人线程都会等待新的请求过来,一旦有工作到达,就马上从线程池中唤醒某个线程来执行任务,执行完毕后继续在线程池中等待任务池的工作请求的到达。

      任务池:主要是存储接受请求的集合,利用它可以缓冲接受到的请求,可以设置大小来表示同时能够接受最大请求数目。这个任务池主要是供线程池来访问。

      线程池:这个是工作线程所在的集合,可以通过设置它的大小来提供并发处理的工作量。对于线程池的大小,可以事先生成一定数目的线程,根据实际情况来动态增加或者减少线程数目。线程池的大小不是越大越好,线程的切换也会耗时的。

      存放池的数据结构,可以用数组也可以利用集合,在集合类中一般使用Vector,这个是线程安全的。

      Worker Thread的所有参与者:

      1,Client参与者,发送Request的参与者

      2,Channel参与者,负责缓存Request的请求,初始化启动线程,分配工作线程

      3,Worker参与者,具体执行Request的工作线程

      4,Request参与者

      注意:将在Worker线程内部等待任务池非空的方式称为正向等待。

      将在Channel线程提供Worker线程来判断任务池非空的方式称为反向等待。

      线程池实例1:

      利用同步方法来实现,使用数组来作为任务池的存放数据结构。在Channel有缓存请求方法和处理请求方法,利用生成者与消费者模式来处理存储请求,利用反向等待来判断任务池的非空状态。

      Channel参与者:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    package whut.threadpool;
    //用到了生产者与消费者模式
    //生成线程池,接受客户端线程的请求,找到一个工作线程分配该客户端请求
    public class Channel {
        private static final int MAX_REQUEST = 100;// 并发数目,就是同时可以接受多少个客户端请求
        //利用数组来存放请求,每次从数组末尾添加请求,从开头移除请求来处理
        private final Request[] requestQueue;// 存储接受客户线程的数目
        private int tail;//下一次存放Request的位置
        private int head;//下一次获取Request的位置
        private int count;// 当前request数量
        private final WorkerThread[] threadPool;// 存储线程池中的工作线程
        // 运用数组来存储
        public Channel(int threads) {
            this.requestQueue = new Request[MAX_REQUEST];
            this.head = 0;
            this.head = 0;
            this.count = 0;
            threadPool = new WorkerThread[threads];
            // 启动工作线程
            for (int i = 0; i < threadPool.length; i++) {
                threadPool[i] = new WorkerThread("Worker-" + i, this);
            }
        }
        public void startWorkers() {
            for (int i = 0; i < threadPool.length; i++) {
                threadPool[i].start();
            }
        }
        // 接受客户端请求线程
        public synchronized void putRequest(Request request) {
            // 当Request的数量大于或等于同时接受的数目时候,要等待
            while (count >= requestQueue.length)
                try {
                    wait();
                catch (InterruptedException e) {
                }
            requestQueue[tail] = request;
            tail = (tail + 1) % requestQueue.length;
            count++;
            notifyAll();
        }
        // 处理客户端请求线程
        public synchronized Request takeRequest() {
            while (count <= 0)
                try {
                    wait();
                catch (InterruptedException e) {
                }
            Request request = requestQueue[head];
            head = (head + 1) % requestQueue.length;
            count--;
            notifyAll();
            return request;
        }
    }

      客户端请求线程:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    package whut.threadpool;
    import java.util.Random;
    //向Channel发送Request请求的
    public class ClientThread extends Thread{
        private final Channel channel;
        private static final Random random=new Random();
                                                                   
        public ClientThread(String name,Channel channel)
        {
            super(name);
            this.channel=channel;
        }
        public void run()
        {
            try{
                for(int i=0;true;i++)
                {
                    Request request=new Request(getName(),i);
                    channel.putRequest(request);
                    Thread.sleep(random.nextInt(1000));
                }
            }catch(InterruptedException e)
            {
            }
        }
    }

      工作线程:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    package whut.threadpool;
    //具体工作线程
    public class WorkerThread extends Thread{
                                                          
        private final Channel channel;
        public WorkerThread(String name,Channel channel)
        {
          super(name);
          this.channel=channel;
        }
                                                          
        public void run()
        {
            while(true)
            {
                Request request=channel.takeRequest();
                request.execute();
            }
        }
    }

      线程池实例2:

      工作线程:

      利用同步块来处理,利用Vector来存储客户端请求。在Channel有缓存请求方法和处理请求方法,利用生成者与消费者模式来处理存储请求,利用正向等待来判断任务池的非空状态。

      这种实例,可以借鉴到网络ServerSocket处理用户请求的模式中,有很好的扩展性与实用性。

      利用Vector来存储,依旧是每次集合的最后一个位置添加请求,从开始位置移除请求来处理。

      Channel参与者:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    package whut.threadpool2;
    import java.util.Vector;
    /*
     * 这个主要的作用如下
     * 0,缓冲客户请求线程(利用生产者与消费者模式)
     * 1,存储客户端请求的线程
     * 2,初始化启动一定数量的线程
     * 3,主动来唤醒处于任务池中wait set的一些线程来执行任务
     */
    public class Channel {
        public final static int THREAD_COUNT=4;
        public static void main(String[] args) {
          //定义两个集合,一个是存放客户端请求的,利用Vector,
          //一个是存储线程的,就是线程池中的线程数目
                                 
          //Vector是线程安全的,它实现了Collection和List
          //Vector 类可以实现可增长的对象数组。与数组一样,
          //它包含可以使用整数索引进行访问的组件。但Vector 的大小可以根据需要增大或缩小,
          //以适应创建 Vector 后进行添加或移除项的操作。
          //Collection中主要包括了list相关的集合以及set相关的集合,Queue相关的集合
          //注意:Map不是Collection的子类,都是java.util.*下的同级包
          Vector pool=new Vector();
          //工作线程,初始分配一定限额的数目
          WorkerThread[] workers=new WorkerThread[THREAD_COUNT];
                              
          //初始化启动工作线程
          for(int i=0;i<workers.length;i++)
          {
              workers[i]=new WorkerThread(pool);
              workers[i].start();
          }
                               
          //接受新的任务,并且将其存储在Vector中
          Object task=new Object();//模拟的任务实体类
          //此处省略具体工作
          //在网络编程中,这里就是利用ServerSocket来利用ServerSocket.accept接受一个Socket从而唤醒线程
                               
          //当有具体的请求达到
          synchronized(pool)
          {
              pool.add(pool.size(), task);
              pool.notifyAll();//通知所有在pool wait set中等待的线程,唤醒一个线程进行处理
          }
          //注意上面这步骤添加任务池请求,以及通知线程,都可以放在工作线程内部实现
          //只需要定义该方法为static,在方法体用同步块,且共享的线程池也是static即可
                               
          //下面这步,可以有可以没有根据实际情况
          //取消等待的线程
          for(int i=0;i<workers.length;i++)
          {
              workers[i].interrupt();
          }
        }
    }

        工作线程:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    package whut.threadpool2;
    import java.util.List;
    public class WorkerThread extends Thread {
        private List pool;//任务请求池
        private static int fileCompressed=0;//所有实例共享的
                         
        public WorkerThread(List pool)
        {
              this.pool=pool; 
        }
                         
        //利用静态synchronized来作为整个synchronized类方法,仅能同时一个操作该类的这个方法
        private static synchronized void incrementFilesCompressed()
        {
            fileCompressed++;
        }
                         
        public void run()
        {
            //保证无限循环等待中
            while(true)
            {
                //共享互斥来访问pool变量
                synchronized(pool)
                {
                    //利用多线程设计模式中的
                    //Guarded Suspension Pattern,警戒条件为pool不为空,否则无限的等待中
                    while(pool.isEmpty())
                    {
                        try{
                            pool.wait();//进入pool的wait set中等待着,释放了pool的锁
                        }catch(InterruptedException e)
                        {
                        }
                    }
                    //当线程被唤醒,需要重新获取pool的锁,
                    //再次继续执行synchronized代码块中其余的工作
                    //当不为空的时候,继续再判断是否为空,如果不为空,则跳出循环
                    //必须先从任务池中移除一个任务来执行,统一用从末尾添加,从开始处移除
                                     
                    pool.remove(0);//获取任务池中的任务,并且要进行转换
                }
                //下面是线程所要处理的具体工作
            }
        }
    }

  • 相关阅读:
    hdu 4002 Find the maximum
    hdu 2837 坑题。
    hdu 3123
    zoj Treasure Hunt IV
    hdu 2053 Switch Game 水题一枚,鉴定完毕
    poj 1430 Binary Stirling Numbers
    hdu 3037 Saving Beans
    hdu 3944 dp?
    南阳oj 求N!的二进制表示最低位的1的位置(从右向左数)。
    fzu 2171 防守阵地 II
  • 原文地址:https://www.cnblogs.com/daichangya/p/12959282.html
Copyright © 2011-2022 走看看