zoukankan      html  css  js  c++  java
  • 一个取消多生产者单消费者的日志线程池服务

    package concurrent._ThreadPool.logService;
    
    import net.jcip.annotations.GuardedBy;
    import org.omg.PortableInterceptor.SYSTEM_EXCEPTION;
    
    import java.io.FileNotFoundException;
    import java.io.IOException;
    import java.io.PrintWriter;
    import java.io.Writer;
    import java.lang.reflect.Proxy;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.*;
    
    
    //一个日志的服务,多个生产者,一个消费者
    public class Demo {
        public static void main(String[] args) throws Exception {
            MyLogService myLogService = new MyLogService(8,5);
           myLogService.start();
    
    
            Thread.sleep(5000);
            System.out.println("请求停止线程池");
            myLogService.stop();
    
    
        }
    }
    
    class MyLogService{
    
        //线程池的数量
        private int serviceCount;
        //阻塞队列的数量
        private int blockingqueueCount;
        //日志服务线程池
        public static ExecutorService executorService;
        //日志服务需要的阻塞队列
        private BlockingQueue<String> bq = null;
        //用户取消而设置的标志位
        //GuardedBy表示要取得括号里面的这个锁才能访问这个属性
        @GuardedBy("this")
        private volatile boolean iscancel;
        //用于记录,put与take次数的差,如果取消标志位设置为真,次数差到0说明,put的操作都已经被消费了,这时候说明没有put操作被阻塞
        // 可以将cum线程终止,所有线程都成功停下来了
        @GuardedBy("this")
        private int reservations;
    
        //消费者;
        private FutureTask<Integer> cum = null;
    
        //线程池默认数量
        private final static int DEFAULT_SERVICE_COUNT = 8;
        private final static int DEFAULT_QUEUE_COUNT = 4;
    
        public MyLogService(int serviceCount , int blockingqueueCount){
            this.serviceCount = serviceCount;
            this.blockingqueueCount = serviceCount;
            this.iscancel = false;
            this.reservations = 0;
            executorService = Executors.newFixedThreadPool(serviceCount);
            bq = new ArrayBlockingQueue<String>(blockingqueueCount);
        }
        public MyLogService(){
           this(DEFAULT_SERVICE_COUNT,DEFAULT_QUEUE_COUNT);
        }
    
        //线程池开始工作
        public void start(){
            this.cum = new FutureTask<Integer>(this.CumLogfactory());
            //启动cum
            executorService.submit(cum);
            List<FutureTask<String>> list = new ArrayList<FutureTask<String>>();
            for(int i = 0 ;i < this.serviceCount ; i ++){
                FutureTask<String> futureTask = new FutureTask<String>(this.ProLogFactory());
                list.add(futureTask);
            }
            for(FutureTask<String> f:list){
                executorService.submit(f);
            }
            System.out.println("线程池启动工作");
        }
    
        //线程池停止工作
        public void stop() throws ExecutionException, InterruptedException {
            synchronized (this){
                iscancel = true;
            }
            Integer result = cum.get();
            if(result == 1){
                System.out.println("所有线程已经停止");
                executorService.shutdown();
            }
        }
        private ProLog ProLogFactory(){
           return new ProLog();
        }
    
        private CumLog CumLogfactory(){
            return new CumLog();
        }
    
        private class ProLog implements Callable<String>{
    
            private final String name ;
    
            //默认名字
            private final static String DEFAULT_NAME = "proLog";
    
            public ProLog(){
                this(DEFAULT_NAME);
            }
            public ProLog(String name){
                this.name = name;
            }
    
            @Override
            public String call() throws Exception {
                String thradname = Thread.currentThread().getName();
                proLog(thradname);
    
                return null;
    
            }
    
            private void proLog(String name) throws InterruptedException {
                String msg ;
                while(true){
                    synchronized (MyLogService.this){
                        if(iscancel){
                            System.out.println(name +"发现请求停止,抛出异常");
                            throw new IllegalStateException();
                        }
                        ++reservations;
                    }
                   msg = createLog();
                    //将日志写入阻塞队列中
                    bq.put(msg);
                    System.out.println(name + "写入到队列中" + msg + "__队列剩余空位_" + bq.remainingCapacity());
                }
    
            }
    
            private String createLog() throws InterruptedException {
                //创建一个日志
                long time = System.currentTimeMillis();
                String msg = Long.toString(time);
                //模拟等待一段时间
                Thread.sleep(new Random().nextInt(5000));
                return msg;
            }
        }
    
        private class CumLog implements  Callable<Integer>{
            private final String name ;
    
            //默认名字
            private static final String DEFAULT_NAME = "cumLog";
    
            public CumLog(){
                this(DEFAULT_NAME);
            }
            public CumLog(String name){
                this.name = name;
            }
    
            @Override
            public Integer call() throws Exception {
                System.out.println("启动一个日志cum");
    
                Integer msg = cumLog();
                return msg;
            }
    
            private Integer cumLog() throws InterruptedException, IOException {
                String msg = null;
                Integer isdone = 0;
                //获取日志
                while(true){
                    Thread.sleep(300);
    
                        synchronized (MyLogService.this){
                            if(iscancel && reservations==0){
                                isdone = 1;
                                break;
                            }
                        }
                        msg = bq.take();
                        synchronized (MyLogService.this){
                            --reservations;
                        }
                        System.out.println("消费了一个日志"+msg + "__队列剩余空位_" + bq.remainingCapacity());
    
    
                }
                return isdone;
            }
        }
    
    }

    结果:

    启动一个日志cum
    线程池启动工作
    pool-1-thread-7写入到队列中1543640100705__队列剩余空位_4
    消费了一个日志1543640100705__队列剩余空位_5
    pool-1-thread-5写入到队列中1543640100705__队列剩余空位_4
    消费了一个日志1543640100705__队列剩余空位_5
    pool-1-thread-5写入到队列中1543640103668__队列剩余空位_4
    消费了一个日志1543640103668__队列剩余空位_5
    pool-1-thread-8写入到队列中1543640100705__队列剩余空位_4
    pool-1-thread-6写入到队列中1543640100705__队列剩余空位_3
    pool-1-thread-2写入到队列中1543640100705__队列剩余空位_2
    消费了一个日志1543640100705__队列剩余空位_3
    消费了一个日志1543640100705__队列剩余空位_4
    pool-1-thread-4写入到队列中1543640100705__队列剩余空位_3
    pool-1-thread-6写入到队列中1543640105030__队列剩余空位_2
    pool-1-thread-3写入到队列中1543640100705__队列剩余空位_1
    消费了一个日志1543640100705__队列剩余空位_2
    请求停止线程池
    pool-1-thread-7写入到队列中1543640100867__队列剩余空位_1
    pool-1-thread-7发现请求停止,抛出异常
    pool-1-thread-7发现请求停止,抛出异常
    pool-1-thread-3写入到队列中1543640105582__队列剩余空位_0
    pool-1-thread-3发现请求停止,抛出异常
    消费了一个日志1543640100705__队列剩余空位_1
    消费了一个日志1543640105030__队列剩余空位_2
    消费了一个日志1543640100705__队列剩余空位_3
    pool-1-thread-8写入到队列中1543640104835__队列剩余空位_2
    pool-1-thread-8发现请求停止,抛出异常
    消费了一个日志1543640100867__队列剩余空位_3
    pool-1-thread-2写入到队列中1543640105042__队列剩余空位_2
    pool-1-thread-2发现请求停止,抛出异常
    消费了一个日志1543640105582__队列剩余空位_3
    消费了一个日志1543640104835__队列剩余空位_4
    消费了一个日志1543640105042__队列剩余空位_5
    pool-1-thread-6写入到队列中1543640105573__队列剩余空位_4
    消费了一个日志1543640105573__队列剩余空位_5
    pool-1-thread-6发现请求停止,抛出异常
    消费了一个日志1543640104777__队列剩余空位_5
    pool-1-thread-5写入到队列中1543640104777__队列剩余空位_5
    pool-1-thread-5发现请求停止,抛出异常
    pool-1-thread-4写入到队列中1543640105562__队列剩余空位_4
    pool-1-thread-4发现请求停止,抛出异常
    消费了一个日志1543640105562__队列剩余空位_5
    所有线程已经停止
  • 相关阅读:
    微信小程序开发常用方法
    HTML5 video常用属性
    移动端键盘定制
    移动端弹性滑动以及滑动出界解决方案
    vue移动端Ui组件 mint-ui 使用指南
    vue.js的ajax和jsonp请求
    获取用户地理位置
    如何将一个已有的项目托管到github或是码云上?git的配置
    mvvm模式和mvc模式 概述总结对比
    使用Java的BlockingQueue实现生产者-消费者
  • 原文地址:https://www.cnblogs.com/da-peng/p/10052535.html
Copyright © 2011-2022 走看看