zoukankan      html  css  js  c++  java
  • 线程之Semaphore生产消费模型

    private final int capacity = 10;
    private final Semaphore empty = new Semaphore(capacity); //仓库中空的槽的信号量
    private final Semaphore full = new Semaphore(0); //仓库中被占用的槽的信号量
    private final Semaphore mutex = new Semaphore(1); //互斥信号量
    private int insertIndex = 0; //仓库中当前可以放置物品的位置
    private int removeIndex = 0; //仓库中当前可以拿走物品的位置。

    信号量是多线程环境下使用的一种方式,可以用来保证两个或多个程序不能同时进入临界区,从而不能同时放一个共享资源,达到线程互斥的作用。
    此外,通过使用信号量,我们也可以实现线程同步:在生产者消费者问题中,信号量full和empty用来保证某种事件顺序发生或者不发生,
    即对于一个物品,生产的过程始终要在消费的过程之前。
    里面有三个信号量full,empty和mutex,其中full和empty用来做进程同步,mutex用来做进程互斥。
    每当要生产一个物品时,我们首先检查能否获取信号量empty的一个许可,如果不可以则阻塞线程,如果可以则继续获取items数组的访问权,该访问权由互斥信号量mutex来控制,当放置完物品后,会释放信号量empty的一个许可。
    同理,每当要消费一个物品时,我们首先检查能否获取信号量full的一个许可,如果不可以则阻塞线程,如果可以择继续获取items数组的访问权,当拿走物品后,会释放信号量full的一个许可。这样,我们就用信号量实现了生产者消费者问题。

    1.Semaphore的使用案例?

    需求
    要求:使用2个线程,分别代表:生产者、消费者。让他们并发的去生产、消费产品。生产的总数是不能超过N的。

    实现思路
    这里我们使用的是使用信号量去控制线程的生产消费,通过释放令牌的形式去控制生产者消费者的上限。使用互斥锁保证每次最多只有一个角色去修改共享变量。

    来看张图,一图胜千言。

        /**
    * 生产者消费者模型
    */
    public class ProducerConsumerProblem {
    //初始容量
    private static final int N = 10;

    /***
    * full 产品容量
    * empty 空余容量
    * mutex 读写锁
    */
    private static Semaphore full, empty, mutex;
    //记录当前的产品数量
    private static volatile int count = 0;

    static {
    /**
    * full 初始化0个产品
    * empty 初始化有N个空余位置放置产品
    * mutex 初始化每次最多只有一个线程可以读写
    * */
    full = new Semaphore(0);
    empty = new Semaphore(N);
    mutex = new Semaphore(1);
    }

    public static void main(String[] args) {
    new Thread(new Producer()).start();
    new Thread(new Consumer()).start();
    }

    //生产者类
    static class Producer implements Runnable {

    @Override
    public void run() {
    while (true) {
    try {
    empty.acquire();//等待空位
    mutex.acquire();//等待读写锁
    count++;
    out.println("生产者生产了一个,还剩:" + count);
    mutex.release();//释放读写锁
    full.release();//放置产品
          //随机休息一段时间,让生产者线程有机会抢占读写锁
    Thread.sleep(((int) Math.random()) % 10);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }

    //消费者类
    static class Consumer implements Runnable {

    @Override
    public void run() {
    while (true) {
    try {
    full.acquire();//等待产品
    mutex.acquire();//等待读写锁
    count--;
    out.println("消费者消费了一个,还剩:" + count);
    mutex.release();//释放读写锁
    empty.release();//释放空位
            //随机休息一段时间,让消费者线程有机会抢占读写锁
    Thread.sleep(((int) Math.random()) % 10);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }
    }

     1.java信号量解决生产消费问题

    信号量的基本原理
    简单点理解,信号量可看成一个整形量,表示可用资源的个数,使用资源时信号量-1,释放资源时信号量+1,以生产消费为例:
    1. 假设此时缓冲区已满,sem=0,表示当前空的缓冲区个数为0
    2. 生产者P1生产,sem=-1,陷入阻塞
    3. 生产者P2生产,sem=-2,陷入阻塞
    4. 消费者C1消费,sem=-1,唤醒P1
    5. 消费者C2消费,sem=0,唤醒P2

    归纳
    信号量的值可以反映有多少线程正在等待资源, 信号量<0表示有线程因无资源而陷入等待,如sem=-2,就表示有2个生产者陷入阻塞
    消费时sem++,执行后若sem<=0, 表示之前有线程阻塞,需要唤醒,如上例中唤醒P1,P2
    信号量个数与初值的选择:
    生产消费问题中,需要判断空和满 2个状态。 首先需要2个信号量, empty表示空缓冲区个数,full表示满缓冲区个数
    初值empty=缓冲区大小,   full=0,表示初始状态缓冲区全空,满的缓冲区个数为0。还需要信号量mutex=1保证互斥,初值为1为什么可保持互斥请看P,V操作。
    引入P,V操作
    P:表示申请资源
    V:表示释放资源
    P,V操作都是原子性的

    P(semaphore s) {
    s.value--;
    if (s.value < 0)
    当前线程阻塞
    }

    V(semaphore s) {
    s.value++;
    if (s.value <= 0)
    唤醒等待在该资源上的线程
    }

    P,V解决生产消费

     semaphore full = 0;
    semaphore empty = BUFF_SIZE;
    semaphore mutex = 1;
    Producer() {
    P(empty);//要生产,则耗费空资源,empty--
    P(mutex);//保证互斥
    生产
    .....
    V(mutex);
    V(full);//生产完成则产生1个满资源,full++
    }

    Consumer() {
    P(full);
    P(mutex);
    消费
    ...
    V(mutex);
    V(empty);
    }


     
    完整的java代码
    java.util.concurrent.Semaphore,java中已实现好了Semaphore这个类,可在构造方法中设定初值,require和release方法对应P,V操作

    public class SynStack {
    private char[] data = new char[6];
    int cnt=0;

    Semaphore empty = new Semaphore(data.length);//空信号量
    Semaphore full = new Semaphore(0);//满信号量
    Semaphore mutex = new Semaphore(1);//互斥信号量
        //生产
    public void push(char ch){
    try {
    empty.acquire();
    mutex.acquire();
    data[cnt]=ch;
    cnt++;
    System.out.println("生产线程,生产第"+cnt+"个产品,此产品是"+ch);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }finally{
    mutex.release();
    full.release();
    }
    }
    //消费
    public char pop(){
    char ch=' ';
    try {
    full.acquire();
    mutex.acquire();
    ch=data[cnt-1];
    System.out.println("消费线程,消费第"+cnt+"个产品,此产品是"+ch);
    --cnt;
    } catch (InterruptedException e) {
    e.printStackTrace();
    }finally{
    mutex.release();
    empty.release();
    }
    return ch;
    }
    }


    public class Producer implements Runnable{
    private SynStack ss =null;
    public Producer(SynStack ss){
    this.ss = ss;
    }
    @Override
    public void run() {
    char ch;
    for(int i=0;i<15;++i){
    ch =(char)('a'+i);
    ss.push(ch);
    /*try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }*/
    }
    }
    }

    public class Consumer implements Runnable{
    private SynStack ss = null;
    public Consumer(SynStack ss){
    this.ss = ss;
    }
    @Override
    public void run() {
    for(int i=0;i<15;++i){
    ss.pop();
    }
    }
    }

    public class Test {
    public static void main(String[] args) {
    SynStack ss = new SynStack();
    Producer producer= new Producer(ss);
    Consumer consumer =new Consumer(ss);
    Thread threadP = new Thread(producer);
    Thread threadC = new Thread(consumer);
    //Thread threadC2= new Thread(consumer);
    threadP.start();
    threadC.start();
    //threadC2.start();
    }
    }


     定用一个整数表示信号量,然后有一个增加操作和一个减少操作,于是有如下的

      class Semaphore {
    private volatile int permit;
    public Semaphore(int permit) {
    this.permit = permit;
    }

    public acquire() {
    if (permit <= 0) {
        //等待
    }
        //执行permit--操作
    }

    public release() {
      //执行permit++ 操作
    if(permit > 0){
        //唤醒
    }
    }
    }

    这里有两点需要说明:
    这里先用一个二进制信号量来等效互斥操作;
     由于信号量只能通过0值来进行阻塞和唤醒,所以这里必须使用两个信号量来模拟容器空和容器满两种状态

    public class Cache {
    private int cacheSize = 0;

    public Semaphore mutex;
    public Semaphore empty; //保证了容器空的时候(empty的信号量<=0), 消费者等待
    public Semaphore full; //保证了容器满的时候(full的信号量 <= 0),生产者等待

    public Cache(int size) {
    mutex = new Semaphore(1); //二进制信号量,表示互斥锁
    empty = new Semaphore(size);
    full = new Semaphore(0);
    }

    public int getCacheSize() throws InterruptedException {
    return cacheSize;
    }

    public void produce() throws InterruptedException {
    empty.acquire(); // 消耗一个空位
    mutex.acquire();
    cacheSize++;
    System.out.println("生产了一个产品, 当前产品数为" + cacheSize);
    mutex.release();
    full.release(); // 增加了一个产品

    }

    public void consume() throws InterruptedException {
    full.acquire(); // 消耗了一个产品
    mutex.acquire();
    cacheSize--;
    System.out.println("消费了一个产品, 当前产品数为" + cacheSize);
    mutex.release();
    empty.release(); // 增加了一个空位
    }
    }

    public class Consumer implements Runnable {
    private Cache cache;

    public Consumer(Cache cache) {
    this.cache = cache;
    }

    @Override
    public void run() {
    while (true) {
    try {
    cache.consume();
    Thread.sleep(100);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    }
    }

    public class Producer implements Runnable {
    private Cache cache;
    public Producer(Cache cache) {
    this.cache = cache;
    }

    @Override
    public void run() {
    while (true) {
    try {
    cache.produce();
    Thread.sleep(100);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }

    public class Main {
    public static void main(String[] args) {
    Cache cache = new Cache(10);

    Producer p = new Producer(cache);
    Consumer c = new Consumer(cache);
    int producerCount = 4, consumerCount = 4;
    for (int i = 0; i < producerCount; i++) {
    new Thread(p).start();
    }
    for (int i = 0; i < consumerCount; i++) {
    new Thread(c).start();
    }
    }
    }

    ------

    看过了信号量,我们来看看信号。信号(Signal)是一种处理异步事件的通讯方式,用于通知其他进程或者自己本身,来告知将有某种事件发生。在Java中,信号机制通过wait(),notify()和notifyAll()来实现。其中wait()使得当前调用wait()的线程挂起,并释放已经获得的wait()所在代码块的锁;notify()用于随即唤醒一个被wait()挂起的线程进入线程调度队列;notifyAll()用于唤醒所有被wait()挂起的线程进入线程调度队列。
    用Java信号实现的生产者和消费问题, 代码如下:

    public class TestSignal {
    //http://blog.csdn.net/sunset108/article/details/38819529
    static Monitor monitor = new Monitor();

    //生产者
    static class Producer implements Runnable {
    static int num = 1;
    @Override
    public void run() {
    while (true) {
    try {
    monitor.insert(num);
    System.out.println("生产物品" + num);
    num++;
    Thread.sleep(100);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }

    }

    //消费者
    static class Consumer implements Runnable {
    @Override
    public void run() {
    while (true) {
    try {
    System.out.println("消费物品" + monitor.remove());
    Thread.sleep(500);
    } catch (InterruptedException e) {
    }

    }
    }

    }

    //管程,只能有一个线程占用
    static class Monitor {
    private final int capacity = 10;
    private int insertIndex = 0; //仓库中当前可以放置物品的位置
    private int removeIndex = 0; //仓库中当前可以拿走物品的位置
    private final Object[] items = new Object[capacity]; //仓库中的所有物品
    int count = 0; //仓库中的现有物品数

    //向仓库中放置物品
    public synchronized void insert(Object item) throws InterruptedException {
    //当仓库已满时,挂起生产线程
    if (count == capacity) {
    wait();
    }
    items[insertIndex++] = item;
    if (insertIndex == capacity) {
    insertIndex = 0;
    }
    count++;
    //当仓库由空变为不空时,唤起消费线程
    if (count == 1) {
    notify();
    }
    }

    //从仓库中拿走物品
    public synchronized Object remove() throws InterruptedException {
    //当仓库没有物品时,挂起消费线程
    if (count == 0) {
    wait();
    }
    Object item = items[removeIndex++];
    if (removeIndex == capacity) {
    removeIndex = 0;
    }
    count--;
    //当仓库由满变为不满时,唤起生产线程
    if (count == capacity - 1) {
    notify();
    }
    return item;
    }
    }

    public static void main(String[] args) {
    new Thread(new Producer()).start();
    new Thread(new Consumer()).start();
    }
    }

    可以看到,该例子使用一个Monitor类来实现仓库中放置和拿走物品的方法,该类相当于一个管程,只能保证同一时刻只能有一个线程使用该类。具体实现是采用静态类和synchronized方法,保证当insert()调用时拥有Monitor类的类锁,使得remove()无法获得Monitor类的类锁,同理,保证当remove()调用时拥有Monitor类的类锁,使得remove()无法被调用。里面实现的巧妙之处在于,当count为capacity时,我们会挂起生产进程,当count从capacity变为capacity - 1时,就会唤醒生产进程加入线程调度队列,同理,当count为0时,我们会挂起消费进程,当count从0变为1时,就会唤醒消费进程加入线程调度队列。

    这种方式的执行结果,与信号量的结果类似,不再列出来。

    看完了这两个例子,我们对信号量和信号有了一定的了解。但我们依然不清楚的是:它们的区别到底在哪里呢?

    如果我们单单使用信号,即使用wait和notify方法,很有可能会错过丢失某些信号通知。比如,如果我们不对count的访问添加限制,当count为0时,调度程序知道仓库里没有物品了,便准备挂起消费者线程并启动生产者线程,如果生产者线程的启动时间比消费者线程的挂起时间快得多,很可能会有这种情况:当生产者线程生产了一个物品,使count由0变为1,此时会向消费者线程发送一个唤醒信号,但此时消费者线程还没有完全挂起,因此它会忽略这个信号,这样一来,生产者线程会一直生产物品,直到count为capacity时挂起,而消费者线程在完全挂起之后不会再收到唤醒信号,因此也会一直挂起,这样整个系统就全部挂起,永远不会执行下去。当然,我们的实现方式是同一时刻只能有一个线程操作count,因此消费者线程的wait方法一定在生产者线程的notify方法之前执行,即当消费者线程完全挂起之后,生产者线程才能启动,于是不会出现错过丢失信号的问题。

    而Java中Semaphore的实现也是建立在信号的基础上的,但不同的是它会保存所有的信号,不会丢弃任何信号,这样就很好地避免了前面的问题,这也是信号量和信号的最大区别所在。 



  • 相关阅读:
    三步完成自适应网页设计
    EasyUI DataGrid 修改每页显示数量的最大值&&导出Grid到Excel
    EasyUI DataGrid 实用例子(2015-05-22)
    C# 如何将List拆分成多个子集合
    EasyUI Tabs绑定右键
    微信支付-扫码支付备忘
    微信支付:模板消息实现过程备忘
    4、http协议之二
    1、套按字及http基础知识之一
    3、Web server 之httpd2.2 配置说明
  • 原文地址:https://www.cnblogs.com/awkflf11/p/12637966.html
Copyright © 2011-2022 走看看