zoukankan      html  css  js  c++  java
  • 读写锁+阻塞队列+线程池

    1. 读写锁

     1 // 独占锁(写锁):一次只能被一个线程占有
     2 // 共享锁(读锁):可以被多个线程同时占有
     3 // 运行结果分析:写锁保证操作的原子性,读锁不能保证操作的原子性
     4 // 注意:lock锁一定要配对,不然可能会导致死锁
     5 // 个人理解:写操作即为修改数据,会改变原有的数据,因此必须保证其写操作的完整性,
     6 // 不能被中途打断,所以不能同时被多个线程调用
     7 // 读操作即为查询数据,原有数据不会改变,因此可被多个线程同时调用
     8 
     9 public class ReadWriteLockDemo1 {
    10     public static void main(String[] args) {
    11         MyCacheLock myCacheLock = new MyCacheLock();
    12 
    13         for (int i = 1; i <= 5; i++) {
    14             int temp = i;
    15             new Thread(() -> {
    16                 myCacheLock.write(temp, "_" + temp + "_");
    17             }).start();
    18         }
    19 
    20         for (int i = 1; i <= 5; i++) {
    21             int temp = i;
    22             new Thread(() -> {
    23                 myCacheLock.read(temp);
    24             }).start();
    25         }
    26     }
    27 }
    28 
    29 class MyCacheLock {
    30     private volatile Map<Integer, Object> map = new HashMap<>();
    31     // 创建读写锁
    32     ReadWriteLock lock = new ReentrantReadWriteLock();
    33 
    34     //
    35     public void write(Integer key, Object value) {
    36         // 写锁加锁
    37         lock.writeLock().lock();
    38         try {
    39             // 业务编码
    40             System.out.println(Thread.currentThread().getName() + "写入" + key);
    41             map.put(key, value);
    42             System.out.println(Thread.currentThread().getName() + "写入成功");
    43         } catch (Exception e) {
    44             e.printStackTrace();
    45         } finally {
    46             // 释放写锁
    47             lock.writeLock().unlock();
    48         }
    49     }
    50 
    51     //
    52     public void read(Integer key) {
    53         // 读锁加锁
    54         lock.readLock().lock();
    55         try {
    56             // 业务编码
    57             System.out.println(Thread.currentThread().getName() + "读取" + key);
    58             Object value = map.get(key);
    59             System.out.println(Thread.currentThread().getName() + "读取结果" + value);
    60         } catch (Exception e) {
    61             e.printStackTrace();
    62         } finally {
    63             // 释放读锁
    64             lock.readLock().unlock();
    65         }
    66     }
    67 }

    2. 阻塞队列

      

       阻塞:当队列满时,还往里存,此时就会阻塞; 当队列为空时,还往外取,此时也会阻塞。

      我们为什么要使用阻塞队列?

        在多线程下,线程之间通信,我们需要关心的是线程的 —唤醒!

        例如:在消息投递过程中,消息发送方和接受方 可通过消息中间件MQ 来异步解决阻塞问题。

     1 // put(E e)  一直等待
     2 // offer(E e, long timeout, TimeUnit unit) 超时等待,队列还是满即返回false
     3 // offter(E e) 返回false,不报异常
     4 // add(E e) 执行这行报异常 java.lang.IllegalStateException: Queue full
     5 public class BlockingDemo {
     6     public static void main(String[] args) throws InterruptedException {
     7         // 参数:队列初始容量  设置为3
     8         ArrayBlockingQueue queue = new ArrayBlockingQueue<>(3);
     9 
    10         // 添加一个元素
    11         queue.put(1);
    12         // 取两次
    13         System.out.println(queue.take());
    14         System.out.println(queue.take()); // 执行到此,一直等待
    15 
    16 
    17         // put(E e)添加元素
    18         /*queue.put(1);
    19         queue.put(2);
    20         queue.put(3);
    21         queue.put(4); // 执行到此,一直等待
    22 
    23         System.out.println(queue.poll()); // 1
    24         System.out.println(queue.poll()); // 2
    25         System.out.println(queue.poll()); // 3
    26         System.out.println(queue.poll()); // null
    27         */
    28 
    29         // -----------------------------------------------------------
    30         // offer(E e, long timeout, TimeUnit unit)添加元素
    31         /*System.out.println(queue.offer(1, 3L, TimeUnit.SECONDS)); // true
    32         System.out.println(queue.offer(2, 3L, TimeUnit.SECONDS)); // true
    33         System.out.println(queue.offer(3, 3L, TimeUnit.SECONDS)); // true
    34         System.out.println(queue.offer(4, 3L, TimeUnit.SECONDS)); // 等待3秒,false (超时等待,队列还是满即返回false)
    35         */
    36 
    37         // ----------------------------------------------------------------
    38         // offter(E e)添加元素
    39         /*System.out.println(queue.offer(1)); // true
    40         System.out.println(queue.offer(2)); // true
    41         System.out.println(queue.offer(3)); // true
    42         System.out.println(queue.offer(4)); // 返回false,不报异常
    43 
    44         System.out.println(queue.poll()); // 1
    45         System.out.println(queue.poll()); // 2
    46         System.out.println(queue.poll()); // 3
    47         System.out.println(queue.poll()); // null
    48         */
    49 
    50         // -------------------------------------------------
    51         // add(E e)添加元素
    52         /*queue.add(1);
    53         queue.add(2);
    54         queue.add(3);
    55         // 添加第4个元素
    56         queue.add(4);  //执行这行报异常 java.lang.IllegalStateException: Queue full
    57 
    58         System.out.println(queue.remove());
    59         System.out.println(queue.remove());
    60         System.out.println(queue.remove());
    61         System.out.println(queue.remove());
    62         */
    63     }
    64 }

       特殊的阻塞队列——同步队列

     1 // 同步队列
     2 public class SynchronousQueueDemo {
     3     public static void main(String[] args) {
     4         // 特殊的阻塞队列  容量固定为1
     5         SynchronousQueue<Object> queue = new SynchronousQueue<>();
     6 
     7         // 添加元素
     8         new Thread(() -> {
     9             try {
    10                 // 延时 以便看效果
    11                 TimeUnit.SECONDS.sleep(3);
    12                 queue.put(1);
    13                 TimeUnit.SECONDS.sleep(3);
    14                 queue.put(2);
    15                 TimeUnit.SECONDS.sleep(3);
    16                 queue.put(3);
    17             } catch (InterruptedException e) {
    18                 e.printStackTrace();
    19             }
    20         },"A").start();
    21 
    22         // 取出元素
    23         new Thread(() -> {
    24             try {
    25                 TimeUnit.SECONDS.sleep(3);
    26                 System.out.println(queue.take());
    27                 TimeUnit.SECONDS.sleep(3);
    28                 System.out.println(queue.take());
    29                 TimeUnit.SECONDS.sleep(3);
    30                 System.out.println(queue.take());
    31             } catch (InterruptedException e) {
    32                 e.printStackTrace();
    33             }
    34         },"B").start();
    35     }
    36 }

    3. 线程池

      池化技术:程序运行会占用系统资源,提高程序的使用率,降低我们一个性能消耗

        线程池,连接池,内存池,对象池......

      为什么要使用线程池?  线程复用

      线程池:

        三大方法:

          1.  Executors.newSingleThreadExecutor()  单例,只有一个线程

          2.  Executors.newFixedThreadPool(5) 固定线程数  参数:线程数量

          3.  Executors.newCachedThreadPool()  线程数可变,可伸缩

        七大参数:

          1.  int corePoolSize  // 核心池线程数

          2.  int maximumPoolSize  // 最大线程数

          3.  long  keepAliveTime   // 超时等待时间

          4.  TimeUnit unit  // keepAliveTime参数的时间单位

          5.  BlockingQueue< Runnable > workQueue  // 阻塞队列

          6.  ThreadFactory threadFactory  // 线程工厂

          7.  RejectedExecutionHandler handler  // 拒绝策略 (见下面)

        四种拒绝策略:

          1.  AbortPolicy()  // 直接报异常,丢弃任务

          2.  DiscardOldestPolicy()  // 线程池关闭之前,尝试去获取

          3.  DiscardPolicy() //  直接返回

          4.  CallerRunsPolicy()  // 从哪来回哪去 

     1 public class ThreadPoolExecutorDemo1 {
     2     public static void main(String[] args) {
     3         ThreadPoolExecutor executor = new ThreadPoolExecutor(
     4                 2, // 核心池线程数
     5                 5, // 最大线程数
     6                 5L,   // 超时等待时间
     7                 TimeUnit.SECONDS,  // keepAliveTime参数的时间单位
     8                 new LinkedBlockingDeque<>(3), // 阻塞队列
     9                 Executors.defaultThreadFactory(), // 线程工厂
    10                 new ThreadPoolExecutor.AbortPolicy()  // 拒绝策略
    11         );
    12 
    13         /*
    14         * new ThreadPoolExecutor.CallerRunsPolicy()  // 从哪来回哪去
    15          * new ThreadPoolExecutor.DiscardOldestPolicy() // 线程池关闭之前,尝试去获取
    16         * new ThreadPoolExecutor.DiscardPolicy() //  直接返回
    17         * new ThreadPoolExecutor.AbortPolicy()// 直接报异常,丢弃任务
    18         */
    19 
    20         try {
    21             for (int i = 0; i < 15; i++) {
    22                 executor.execute(() -> {
    23                     System.out.println(Thread.currentThread().getName() + "__ok");
    24                 });
    25             }
    26         } catch (Exception e) {
    27             e.printStackTrace();
    28         } finally {
    29             // 关闭线程池
    30             executor.shutdown();
    31         }
    32     }
    33 }

      最大线程池 该如何设置?

        CPU密集型: Runtime.getRuntime().availableProcessors() 获取当前CPU核数

        IO密集型:  假设当前有50个线程都是进程经常操作大IO资源的,比较耗时! 我们则需要保证设置的线程池数量大于 50

  • 相关阅读:
    数据库学习笔记5---MySQL字符串函数、日期时间函数
    关于hibernate的AnnotationConfiguration的问题
    浅谈Java web 中request的setAttribute()用法
    JAVA常见面试题之Forward和Redirect的区别
    JSP页面中<%!%>与<%%>与<%=%>
    Servlet的生命周期
    JavaEE学习路线图
    java web项目WEB-INF与META-INF的作用
    iOS-申请邓白氏编码的超详细流程介绍
    从高版本JDK换成低版本JDK报错
  • 原文地址:https://www.cnblogs.com/ShallowPen/p/12423945.html
Copyright © 2011-2022 走看看