zoukankan      html  css  js  c++  java
  • 秒杀多线程第十篇 生产者消费者问题 (续)

    使用java 和semaphore实现的 ,多个生产者和多个消费者的问题。

    1.使用Semaphore,Semaphore的大小设定为BUFFER_LENGTH。也就是同时最多有这么多线程来操作缓冲区。2个semaphore, empty和exist。

    默认开始缓冲区为空

    1)StoreEmpty 在开始时,所有的都可用。

    2)StoreHas 在开始时都是锁定的,也就是没有空余的可以acquire,直到生产者放入数据以后,就可以。

    2.生产者逻辑:

    1)等待缓冲区有空间

    2)同步放入数据到缓存区

    3)通知缓冲区存在数据

    4)所有数据都已生产,通知其他生产线程停止。

    3.消费者逻辑

    1)等待缓冲区有数据

    2)同步取出数据

    3)通知缓冲区有空间

    4)所有数据已消费,通知其他消费线程停止

    main class:

    package com.multithread.prosumer;
    
    import java.util.LinkedList;
    import java.util.Queue;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.Executor;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Semaphore;
    
    import com.multithread.main.ExampleInterface;
    
    public class ProsumerExample extends ExampleInterface {
    
        static final int BUFFER_LENGTH = 10;
        static final int CUSTOMER_SIZE = 4;
        static final int PRODUCTOR_SIZE = 3;
        public static final int TOTAL_PRODUCTORS = 200;
        public Queue<Integer> g_produtor = new LinkedList<Integer>();
        public volatile int mProductor = 0;
        public Object objlock = new Object();
        Semaphore StoreEmpty  = new Semaphore(BUFFER_LENGTH);//等待缓冲区数据
        Semaphore StoreHas  = new Semaphore(BUFFER_LENGTH);  
        
        public CountDownLatch mLatchDown = new CountDownLatch(PRODUCTOR_SIZE+CUSTOMER_SIZE);
        public CountDownLatch mLatchStart = new CountDownLatch(PRODUCTOR_SIZE+CUSTOMER_SIZE);
        
        public boolean bStopCustomFlag = false;
        public boolean bStopProductorFlag = false;
        
        @Override
         public void startDemo() {
            try {
                g_produtor.clear();        
                bStopCustomFlag = false;
                initEmptySingal();
                initExistSingal();
                
                Executor mEcecutor = Executors.newFixedThreadPool(PRODUCTOR_SIZE+CUSTOMER_SIZE);
                
                for(int i=1;i<=PRODUCTOR_SIZE;i++)
                {
                    mEcecutor.execute(new ProducerThread(this,"生产者"+i));
                }
                
                for(int j=1;j<=CUSTOMER_SIZE;j++)
                {
                    char c =(char)(j+'A'-1);
                    mEcecutor.execute(new CustomerThread(this,"消费者"+c));
                }
                
                mLatchStart.await();
                System.out.println("所有操作线程已启动...");
                mLatchDown.await();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            catch(Exception e)
            {
                e.printStackTrace();            
            }
            System.out.println("所有线程操作结束");
        }
        
        /*
         * if true ,go back, if false, wait here
         * */
        public void waitEmpty(String name)
        {
            try {
    //            System.out.println("[waitEmpty]"+name+"等待缓冲区,有空余地方:"+StoreEmpty.availablePermits());
                StoreEmpty.acquire();
    //            System.out.println("[waitEmpty]"+name+"等待缓冲区,有空余地方结束 剩余空间:"+StoreEmpty.availablePermits());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
        public void singalEmpty(String name)
        {
            StoreEmpty.release();
    //        System.out.println("[singalEmpty]"+name+"缓冲区释放空余地方,剩余空间:"+StoreEmpty.availablePermits());
        }
        
        public void waitExist(String name){
            try {
    //            System.out.println("[waitExist]"+name+"等待缓冲区,数据存放空间:"+StoreHas.availablePermits());
                StoreHas.acquire();
    //            System.out.println("[waitExist]"+name+"缓冲区有数据放入,缓冲区数据个数:"+StoreHas.availablePermits());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
        public void singalExist(String name){
            StoreHas.release();
    //        System.out.println("[singalExist]"+name+"将数据放入缓冲区:"+StoreHas.availablePermits());
        }
        
        public void initEmptySingal()
        {
            //init,all is empty;
    //        try {
    //            StoreEmpty.acquire(BUFFER_LENGTH-1);
    //        } catch (InterruptedException e) {
    //            // TODO Auto-generated catch block
    //            e.printStackTrace();
    //        }
            
        }
        
        public void initExistSingal()
        {
            //init,nothing is exist
            try {
    //            System.out.println("释放所有缓冲区数据,消费线程全部等待:"+StoreHas.availablePermits());
                StoreHas.acquire(StoreHas.availablePermits());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        
        public void releaseExistSingal(){
    //        System.out.println("[releaseExistSingle]等待缓冲区有数据放入:释放所有"+StoreHas.availablePermits());
            StoreHas.release(BUFFER_LENGTH);
        }
        
        public void releaseEmptySingal(){
            StoreEmpty.release(BUFFER_LENGTH);
        }
    }
    package com.multithread.prosumer;
    
    public class ProducerThread extends Thread {
    
        ProsumerExample mProsumer = null;
        String name = null;
        boolean flag = true;
        public ProducerThread(ProsumerExample pe,String name)
        {
            mProsumer = pe;
            this.name = name;
        }
        
        @Override
        public void run() {
            System.out.println(name+"操作開始");
            mProsumer.mLatchStart.countDown();
    //        for(int i=0;i<=END_PRODUCE_NUMBER;i++)
    //        {
    //            try {
    //                //等待缓冲区为空
    //                mProsumer.waitEmpty(name);
    //                //互斥的访问缓冲区 
    //                synchronized (mProsumer.objlock) {
    //                    int index = mProsumer.g_produtor.size();
    //                    mProsumer.g_produtor.offer(i);
    //                    System.out.println(name+"将数据"+i+"放入缓冲区位置:"+(index+1));
    //                }
    //                
    //                //通知缓冲区有新数据了
    //                mProsumer.singalExist(name);
    //                
    //            } catch (Exception e) {
    //                // TODO Auto-generated catch block
    //                e.printStackTrace();
    //                break;
    //            }
    //            finally{
    //            }            
    //        }
            
            while(flag)
            {
                //等待缓冲区为空
                mProsumer.waitEmpty(name);
                
                //互斥的访问缓冲区 
                synchronized (mProsumer.objlock){
                    if(mProsumer.mProductor<ProsumerExample.TOTAL_PRODUCTORS)
                    {
                        int index = mProsumer.g_produtor.size();
                        mProsumer.g_produtor.offer(mProsumer.mProductor);
                        System.out.println(name+"将数据"+mProsumer.mProductor+"放入缓冲区位置:"+(index+1));
                        ++mProsumer.mProductor;
                        if(mProsumer.mProductor>=ProsumerExample.TOTAL_PRODUCTORS)
                        {
                            flag = false;
                            mProsumer.releaseEmptySingal();
                        }                    
                    }
                    else
                    {
                        flag = false;//结束操作
                        break;//不用通知,应为没有产生新数据
                    }
                }
                
                
                //通知缓冲区有新数据了
                mProsumer.singalExist(name);
            }
            System.out.println(name+"操作结束");
            mProsumer.mLatchDown.countDown();
            
        }
        
        
        
    }
    public class CustomerThread extends Thread {
        volatile boolean flag = true;
        ProsumerExample mProsumer = null;
        int mProductor = 0;
        String name = null;
    
        public CustomerThread(ProsumerExample pe, String name) {
            mProsumer = pe;
            this.name = name;
        }
    
        @Override
        public void run() {
            System.out.println("---" + name + "操作開始");
            mProsumer.mLatchStart.countDown();
            while (flag) {
                try {
                    // 等待缓冲池有数据
                    System.out.println("---" + name + "等待缓冲区数据");
                    mProsumer.waitExist(name);
    
                    // 互斥的访问缓冲区
                    synchronized (mProsumer.objlock) {
                        if (mProsumer.g_produtor.size() > 0) {
                            mProductor = mProsumer.g_produtor.poll();
                            System.out.println("---" + name + "将数据" + mProductor
                                    + "取出缓冲区");
                            if (mProductor == (ProsumerExample.TOTAL_PRODUCTORS-1)) {
                                flag = false;
                                mProsumer.bStopCustomFlag = true;
                                // 释放其他消费线程
                                mProsumer.releaseExistSingal();
                            }
                        } else {
                            System.out.println("---" + name + "缓冲区已空");
                            // 其他消费者线程已停止,缓冲区已为空,此线程也要停止。
                            if (mProsumer.bStopCustomFlag) {
                                flag = false;
                                break;//没有产生新的空间
                            }
                        }
                    }
    
                    // 通知缓存区有空间
                    mProsumer.singalEmpty(name);
    
                    // doing other things
                    Thread.sleep((long) (Math.random() * 100));
    
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
    
            }
            System.out.println("---" + name + "操作结束");
            mProsumer.mLatchDown.countDown();
        }
    
    }
  • 相关阅读:
    利用相关的Aware接口
    java 值传递和引用传递。
    权限控制框架Spring Security 和Shiro 的总结
    优秀代码养成
    Servlet 基础知识
    leetcode 501. Find Mode in Binary Search Tree
    leetcode 530. Minimum Absolute Difference in BST
    leetcode 543. Diameter of Binary Tree
    leetcode 551. Student Attendance Record I
    leetcode 563. Binary Tree Tilt
  • 原文地址:https://www.cnblogs.com/deman/p/4091365.html
Copyright © 2011-2022 走看看