zoukankan      html  css  js  c++  java
  • Java 多线程进阶-并发协作控制

    Java 多线程进阶-并发协作控制

    • 线程协作对比

      • Thread/Executor/Fork-Join
        • 线程启动, 运行, 结束.
        • 线程之间缺少协作.
      • synchronized 同步
        • 互斥, 限定只有一个线程才能进入关键区.
        • 简单粗暴, 性能损失有点大>_<.
    • Lock 锁

      • Lock 也可以实现同步的效果
        • 实现更复杂的临界区结构.
        • tryLock 方法可以预判锁是否空闲.
        • 允许分离读写的操作, 多读单写.
        • 性能更好.
      • ReentrantLock 类, 可重入的互斥锁.
      • RenntrantReadWriteLock 类, 可重入的读写锁.
      • tryLock()/lock()/unlock() 函数.
          package concurrentDemo0421;
      
          import java.time.LocalDateTime;
          import java.util.concurrent.locks.ReentrantLock;
          import java.util.concurrent.locks.ReentrantReadWriteLock;
      
          public class LockExample {
              private static final ReentrantLock queueLock111 = new ReentrantLock(); // 可重入锁
              private static final ReentrantReadWriteLock orderLock111 = new ReentrantReadWriteLock(); // 可重入读写锁
      
              /**
               * 学校门口有家奶茶店, 学生们点单有时需要排队
               * 1. 买奶茶
               * 假设想买奶茶的同学如果看到需要排队, 就决定不买了
               * (一次只有一个买)
               * <p>
               * 2. 操作奶茶账本
               * 假设奶茶店有老板和多名员工, 记录方式比较原始, 只有一个订单本
               * (多个读, 一个写)
               * 老板负责写新订单, 员工不断查看订单本得到信息来制作奶茶, 在老板写新订单的时候员工不能查看订单本
               * (写时, 不能读)
               * 多个员工可以同时查看订单本, 此时老板不能写新订单
               * (读时, 不能写)
               *
               * @param args 1
               */
              public static void main(String[] args) throws InterruptedException {
                  // 1. 买奶茶的例子
                  buyMilkTea(); // 使用可重入锁
                  // 2. 操作奶茶账本的例子
                  handleOrder(); // 使用读写锁
              }
      
              public static void buyMilkTea() throws InterruptedException {
                  LockExample lockExample = new LockExample();
                  int STUDENTS_COUNT = 10;
                  Thread[] students = new Thread[STUDENTS_COUNT];
                  for (int i = 0; i < students.length; i++) {
                      students[i] = new Thread(new Runnable() { // 匿名的线程类, 没有名字的
                          @Override
                          public void run() {
                              try {
                                  long walkingTime = (long) (Math.random() * 1000);
                                  Thread.sleep(walkingTime);
                                  lockExample.tryToBuyMilk();
                              } catch (InterruptedException e) {
                                  e.printStackTrace();
                              }
                          }
                      });
                      students[i].start();
                  }
      
                  for (Thread student : students) {
                      student.join();
                  }
              }
      
              private void tryToBuyMilk() throws InterruptedException {
                  boolean flag = true;
                  while (flag) {
                      if (queueLock111.tryLock()) { // 查一下现在是否锁住, 锁住了在下面的flag地方等一下再来操作.
                          // tryLock()实际包含了两个操作, 先 try 再 lock; 如果没有锁再锁住.
                          long thinkingTime = (long) (Math.random() * 500);
                          Thread.sleep(thinkingTime);
                          System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 来一杯珍珠奶茶, 不要珍珠");
                          flag = false;
                          queueLock111.unlock();
                      } else {
                          System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 再等等");
                      }
                      if (flag) {
                          Thread.sleep(1000);
                      }
                  }
              }
      
              /**
               * 处理订单
               */
              static void handleOrder() {
                  LockExample lockExample = new LockExample();
      
                  Thread boss = new Thread(() -> {
                      while (true) {
                          try {
                              lockExample.addOrder(); // 老板加新单子
                              long waitingTime = (long) (Math.random() * 1000);
                              Thread.sleep(waitingTime);
                          } catch (InterruptedException e) {
                              e.printStackTrace();
                          }
                      }
                  });
                  boss.start();
      
                  int workerCount = 3;
                  Thread[] workers = new Thread[workerCount];
                  for (int i = 0; i < workerCount; i++) {
                      workers[i] = new Thread(new Runnable() {
                          @Override
                          public void run() {
                              while (true) {
                                  try {
                                      lockExample.viewOrder(); // 员工取出单子
                                      long workingTime = (long) (Math.random() * 5000);
                                      Thread.sleep(workingTime);
                                  } catch (InterruptedException e) {
                                      e.printStackTrace();
                                  }
                              }
                          }
                      });
                      workers[i].start();
                  }
              }
      
              /**
               * 向订单本录入新订单
               */
              private void addOrder() throws InterruptedException {
                  orderLock111.writeLock().lock(); // writeLock 写锁, 排他的, 只能一个线程拥有
                  long writingTime = (long) (Math.random() * 1000);
                  Thread.sleep(writingTime);
                  System.out.println(LocalDateTime.now() + " => " + "老板新加一个订单");
                  orderLock111.writeLock().unlock();
              }
      
              /**
               * 查看订单本
               */
              private void viewOrder() throws InterruptedException {
                  orderLock111.readLock().lock(); // readLock 读锁, 可以多个线程共享(同时访问)
      
                  long readingTime = (long) (Math.random() * 500);
                  Thread.sleep(readingTime);
                  System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 查看了订单本");
                  orderLock111.readLock().unlock();
              }
      
          }
      
    • Semaphore 信号量

      • 由1965年Dijkstra提出的.
      • 信号量: 本质上是一个计数器.
      • 计数器大于0, 可以使用, 等于0不能使用.
      • 可以设置多个并发量, 例如限制10个访问.
      • Semaphore
        • acquire 获取.
        • release 释放.
      • 比 Lock 更进一步, 可以控制多个同时访问关键区.
          package concurrentDemo0421;
      
          import java.time.LocalDateTime;
          import java.util.concurrent.Semaphore;
          import java.util.concurrent.TimeUnit;
      
          /**
           * 控制同时访问代码块的线程数
           * 现在有一个地下车库, 共有5个车位, 有10辆车需要停放, 每次停放时, 去申请信号量
           */
          public class SemaphoreExample {
              private final Semaphore placeSemaphore = new Semaphore(5);
      
              public static void main(String[] args) throws InterruptedException {
                  SemaphoreExample example = new SemaphoreExample();
      
                  int tryToParkCount = 10;
                  Thread[] parkers = new Thread[tryToParkCount];
                  for (int i = 0; i < parkers.length; i++) {
                      parkers[i] = new Thread(new Runnable() {
                          @Override
                          public void run() {
                              try {
                                  long randomTime = (long) (Math.random() * 1000);
                                  Thread.sleep(randomTime); // 过一段时间来停车
                                  if (example.parking()) {
                                      long parkingTime = (long) (Math.random() * 1200);
                                      Thread.sleep(parkingTime); // 停一段时间离开
                                      example.leaving();
                                  }
                              } catch (InterruptedException e) {
                                  e.printStackTrace();
                              }
                          }
                      });
                      parkers[i].start();
                  }
      
                  for (Thread t : parkers) {
                      t.join();
                  }
      
      
                  TimeUnit.SECONDS.sleep(60);
              }
      
              private boolean parking() {
                  if (placeSemaphore.tryAcquire()) { // 查看是否有剩余的信号量(剩余的车位)
                      System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 停车成功!");
                      return true;
                  } else {
                      System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 没有空位");
                      return false;
                  }
              }
      
              private void leaving() {
                  placeSemaphore.release();
                  System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 开走了");
              }
          }
      
    • Latch 等待锁

      • 是一个同步辅助类.
      • 用来同步执行任务的一个或多个线程
      • 不是用来保护临界区或共享资源
      • CountDownLatch
        • countDown() 计数减一.
        • await() 等待 latch 变成 0.
          package concurrentDemo0421;
      
          import java.time.LocalDateTime;
          import java.util.concurrent.CountDownLatch;
      
          /**
           * 设想百米赛跑, 发令枪发出信号后选手开始跑, 全部选手跑到终点后比赛结束.
           */
          public class CountDownLatchExample {
              public static void main(String[] args) throws InterruptedException {
                  CountDownLatch startSignal = new CountDownLatch(1);
                  CountDownLatch doneSignal = new CountDownLatch(10);
      
                  int runnerCount = 10; // 选手数量
                  for (int i = 0; i < runnerCount; i++) { // create and start threads
                      new Thread(new Runner(startSignal, doneSignal)).start(); // 所有选手开始跑~
                  }
                  System.out.println(LocalDateTime.now() + " => " + "准备就绪..");
                  startSignal.countDown(); // let all threads proceed
                  System.out.println(LocalDateTime.now() + " => " + "比赛开始!");
                  doneSignal.await(); // wait for all threads to finish
                  System.out.println(LocalDateTime.now() + " => " + "比赛结束!");
              }
          }
      
          class Runner implements Runnable {
              private final CountDownLatch startSignal;
              private final CountDownLatch doneSignal;
      
              Runner(CountDownLatch startSignal, CountDownLatch doneSignal) {
                  this.startSignal = startSignal;
                  this.doneSignal = doneSignal;
              }
      
              @Override
              public void run() {
                  try {
                      startSignal.await();
                      doWork();
                      doneSignal.countDown();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
      
              private void doWork() throws InterruptedException {
                  long time = (long) (Math.random() * 10 * 1000);
                  Thread.sleep(time); // 随机在十秒内跑完
                  System.out.printf(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 跑完全程, 用时 %d 秒 
      ", time/1000);
              }
          }
      
    • Barrier/ˈbæriər/ n.障碍物

      • 集合点, 也是一个同步辅助类
      • 允许多个线程在某一个点上进行同步
      • CyclicBarrier
        • 构造函数是需要同步的线程数量.
        • await 等待其他线程, 到达数量后就放行.
          package concurrentDemo0421;
      
          import java.time.LocalDateTime;
          import java.util.concurrent.BrokenBarrierException;
          import java.util.concurrent.CyclicBarrier;
      
          /**
           * Barrier/ˈbæriər/ n.障碍物
           * 假定有三行数字, 用三个线程分别计算每一行的和, 最后计算综合
           */
          public class BarrierExample {
              public static void main(String[] args) {
                  int rowCount = 3;
                  int colCount = 5;
                  final int[][] numbers = new int[rowCount][colCount];
                  final int[] results = new int[rowCount];
                  numbers[0] = new int[]{1, 2, 3, 4, 5};
                  numbers[1] = new int[]{6, 7, 8, 9, 10};
                  numbers[2] = new int[]{11, 12, 13, 14, 15};
      
                  CalcFinalSum111 finalResult = new CalcFinalSum111(results);
                  CyclicBarrier cyclicBarrier = new CyclicBarrier(rowCount, finalResult);
                  // 当有3个线程在 barrier上await时, 就执行最终计算
      
                  for (int i = 0; i < rowCount; i++) {
                      CalcRowSum111 eachRow = new CalcRowSum111(numbers, i, results, cyclicBarrier);
                      new Thread(eachRow).start();
                  }
      
              }
          }
      
          class CalcRowSum111 implements Runnable {
              final int[][] numbers;
              final int rowNumber;
              final int[] result;
              final CyclicBarrier barrier;
      
              CalcRowSum111(int[][] numbers, int rowNumber, int[] result, CyclicBarrier barrier) {
                  this.numbers = numbers;
                  this.rowNumber = rowNumber;
      
                  this.result = result;
                  this.barrier = barrier;
              }
      
              @Override
              public void run() {
                  int[] row = numbers[rowNumber];
                  int sum = 0;
                  for (int data : row) {
                      sum += data;
                      result[rowNumber] = sum;
                  }
                  try {
                      System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + " : 计算第" + (rowNumber + 1) + "行结束, 结果为: " + sum);
                      barrier.await(); // 等待! 只要超过(Barrier的构造参数填入的数量)的个数, 就放行
                  } catch (InterruptedException | BrokenBarrierException e) {
                      e.printStackTrace();
                  }
              }
          }
      
          class CalcFinalSum111 implements Runnable {
              final int[] eachRowResult;
              int finalResult;
      
              CalcFinalSum111(int[] eachRowResult) {
                  this.eachRowResult = eachRowResult;
              }
      
              @Override
              public void run() {
                  int sum = 0;
                  for (int data : eachRowResult) {
                      sum += data;
                  }
                  finalResult = sum;
                  System.out.println(LocalDateTime.now() + " => " + "最终结果为: " + finalResult);
              }
          }
      
    • Phaser 阶段性控制多个线程

      • 允许执行并发多阶段任务, 同步辅助类.
      • 在每一个阶段结束的位置对线程进行同步, 当所有的线程都到达这一步, 再进行下一步.
      • Phaser
        • arrive()
        • arriveAndAwaitAdvance()
          package concurrentDemo0421;
      
          import java.time.LocalDateTime;
          import java.util.concurrent.Phaser;
      
          /**
           * 假设举行考试, 总共三道大题, 每次下发一道题目, 等所有学生都完成之后再进行下一道题
           */
          public class PhaserExample {
              public static void main(String[] args) {
                  int studentCount = 5;
                  Phaser phaser = new Phaser(studentCount);
      
                  for (int i = 0; i < studentCount; i++) {
                      new Thread(null, new Student111(phaser), "学生" + i).start();
                  }
              }
          }
      
          class Student111 implements Runnable {
              private final Phaser phaser;
      
              Student111(Phaser phaser) {
                  this.phaser = phaser;
              }
      
              @Override
              public void run() {
                  try {
                      doTesting(1);
                      phaser.arriveAndAwaitAdvance(); // 等到所有线程都到达了, 才放行
                      doTesting(2);
                      phaser.arriveAndAwaitAdvance();
                      doTesting(3);
                      phaser.arriveAndAwaitAdvance();
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
      
              private void doTesting(int i) throws InterruptedException {
                  String name = Thread.currentThread().getName();
                  System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + "开始答第" + i + "题");
                  long thinkingTime = (long) (Math.random() * 1000);
                  Thread.sleep(thinkingTime); // 模拟学生答题时间
                  System.out.println(LocalDateTime.now() + " => " + Thread.currentThread().getName() + "第" + i + "道题答题结束");
              }
          }
      
    • Exchanger 两个线程间交换数据

      • 允许在并发线程中互相交换消息.
      • 允许在2个线程中定义同步点, 当两个线程都到达同步点, 他们交换数据结构
      • Exchanger
        • exchange(), 线程双方互相交互数据
        • 交换数据是双向的
          package concurrentDemo0421;
      
          import java.time.LocalDateTime;
          import java.util.Scanner;
          import java.util.concurrent.Exchanger;
      
          /**
           * 通过Exchanger实现学生成绩查询, 两个线程间简单的数据交换,
           * 把自己线程的内容输出给另一个线程(只能简单的双向传送, 不能向MPI一样随意点对点的传输, 线程1给线程3 线程3向线程2...这样)
           */
          public class ExchangerExample {
              public static void main(String[] args) throws InterruptedException {
                  Exchanger<String> exchanger = new Exchanger<>();
                  BackgroundWorker111 backgroundWorker111 = new BackgroundWorker111(exchanger);
                  new Thread(backgroundWorker111).start();
      
                  Scanner scanner = new Scanner(System.in);
                  while (true) {
                      System.out.println(LocalDateTime.now() + " => " + "请输入要查询的学生名字:");
                      String input = scanner.nextLine().trim();
                      exchanger.exchange(input);
                      String exResult = exchanger.exchange(null); // 拿到线程反馈的结果
                      // 当两个线程都同时执行到同一个exchanger.exchange()方法, 两个线程就互相交换数据, 交换是双向的.
                      if ("exit".equals(exResult)) {
                          System.out.println(LocalDateTime.now() + " => " + "退出查询~");
                          break;
                      }
                      System.out.println(LocalDateTime.now() + " => " + "查询结果: " + exResult);
                  }
              }
          }
      
          class BackgroundWorker111 implements Runnable {
              final Exchanger<String> exchanger;
      
              BackgroundWorker111(Exchanger<String> exchanger) {
                  this.exchanger = exchanger;
              }
      
              @Override
              public void run() {
                  while (true) {
                      try {
                          String item = exchanger.exchange(null);
                          switch (item) {
                              case "zhangsan":
                                  exchanger.exchange("90");
                                  break;
                              case "lisi":
                                  exchanger.exchange("80");
                                  break;
                              case "wangwu":
                                  exchanger.exchange("70");
                                  break;
                              case "exit":
                                  exchanger.exchange("exit");
                                  return; // 退出run, 即结束当前线程
                              default:
                                  exchanger.exchange("no body!");
                          }
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  }
              }
          }
      
    • 总结

      • java.util.concurrent 包提供了很多并发编程的控制协作类.
      • 根据业务特点, 使用正确的线程并发控制协作.
  • 相关阅读:
    集合
    元组 与 字典
    列表
    练习题
    字符串的类型补充
    3月19号练习题及作业
    变量,用户交互的补充内容,以及数据内容
    Python 入门
    计算机基础
    决策树的概念及其训练
  • 原文地址:https://www.cnblogs.com/sweetXiaoma/p/12749714.html
Copyright © 2011-2022 走看看