zoukankan      html  css  js  c++  java
  • 并发编程之基础( 五)

    并发编程实战1


     

    该示例代码模拟银行出纳员服务的过程。银行中随时都会来顾客,而且每位顾客的服务时间长短不确定。当顾客数量多时需要增加出纳,当顾客数量少时需要减少出纳并让出纳去干别的事情。

      关键字

    • Runnable
    • Comparable<T>
    • Thread.interrupted()
    • wait/notifyAll
    • PriorityQueue
    • ExecutorService
      1 package com.dy.xidian;
      2 
      3 import java.util.LinkedList;
      4 import java.util.PriorityQueue;
      5 import java.util.Queue;
      6 import java.util.Random;
      7 import java.util.concurrent.ArrayBlockingQueue;
      8 import java.util.concurrent.ExecutorService;
      9 import java.util.concurrent.Executors;
     10 import java.util.concurrent.TimeUnit;
     11 
     12 class Customer {
     13     private final int serviceTime;
     14     public Customer(int tm) {
     15         serviceTime = tm;
     16     }
     17     public int getService() {
     18         return serviceTime;
     19     }
     20     public String toString() {
     21         return "[" + serviceTime + "]";
     22     }
     23 }
     24 
     25 @SuppressWarnings("serial")
     26 class CustomerLine extends ArrayBlockingQueue<Customer> {
     27 
     28     public CustomerLine(int maxLineSize) {
     29         super(maxLineSize);
     30     }
     31 
     32     public String toString() {
     33         if (this.size() == 0)
     34             return "[Empty]";
     35         StringBuilder result = new StringBuilder();
     36         for (Customer customer : this)
     37             result.append(customer);
     38         return result.toString();
     39     }
     40 
     41 }
     42 
     43 class CustomerGenerator implements Runnable {
     44     private CustomerLine customers;
     45     private static Random rand = new Random(47);
     46 
     47     public CustomerGenerator(CustomerLine cq) {
     48         customers = cq;
     49     }
     50 
     51     @Override
     52     public void run() {
     53         try {
     54             while (!Thread.interrupted()) {
     55                 TimeUnit.MILLISECONDS.sleep(rand.nextInt(300));
     56                 customers.put(new Customer(rand.nextInt(1000)));
     57             }
     58         } catch (InterruptedException e) {
     59             System.out.println("CustomerGenerator interrupted!");
     60         }
     61         System.out.println("CustomerGenerator terminating");
     62     }
     63 }
     64 
     65 class Teller implements Runnable {
     66     private static int counter = 0;
     67     private final int id = counter++;
     68     private int customerServed = 0;
     69     private CustomerLine customers;
     70     private boolean servingCustomerLine = true;
     71 
     72     public Teller(CustomerLine customers) {
     73         super();
     74         this.customers = customers;
     75     }
     76 
     77     @Override
     78     public void run() {
     79         try {
     80             while (!Thread.interrupted()) {
     81                 Customer customer = customers.take();
     82                 TimeUnit.MILLISECONDS.sleep(customer.getService());
     83                 synchronized (this) {
     84                     customerServed++;
     85                     while (!servingCustomerLine)
     86                         wait();
     87                 }
     88             }
     89         } catch (InterruptedException e) {
     90             System.out.println(this + "interrupted!");
     91         }
     92         System.out.println(this + "terminating!");
     93     }
     94 
     95     public synchronized void doSomethingElse() {
     96         customerServed = 0;
     97         servingCustomerLine = false;
     98     }
     99 
    100     public synchronized void serveCustomerLine() {
    101         assert !servingCustomerLine : "already serving" + this;
    102         servingCustomerLine = true;
    103         notifyAll();
    104     }
    105 
    106     public String toString() {
    107         return "Teller " + id + " ";
    108     }
    109 
    110     public String shortString() {
    111         return "T" + id;
    112     }
    113 
    114     public synchronized int compateTo(Teller other) {
    115         if (customerServed > other.customerServed)
    116             return 1;
    117         if (customerServed < other.customerServed)
    118             return -1;
    119         return 0;
    120     }
    121 }
    122 
    123 class TellerManger implements Runnable {
    124     private ExecutorService exec;
    125     private CustomerLine customers;
    126     private PriorityQueue<Teller> workingTellers = new PriorityQueue<Teller>();
    127     private Queue<Teller> tellersDoingOtherThings = new LinkedList<Teller>();
    128     private int adjustmentPeriod;
    129 
    130     public TellerManger(ExecutorService ex, CustomerLine customers,
    131             int adjustmentPeriod) {
    132         super();
    133         this.exec = ex;
    134         this.customers = customers;
    135         this.adjustmentPeriod = adjustmentPeriod;
    136         Teller teller = new Teller(customers);
    137         exec.execute(teller);
    138         workingTellers.add(teller);
    139     }
    140 
    141     public void adjustTellerNumber() {
    142         if (customers.size() / workingTellers.size() > 2) {
    143             Teller teller = tellersDoingOtherThings.remove();
    144             teller.serveCustomerLine();
    145             workingTellers.offer(teller);
    146             return;
    147         }
    148         if (workingTellers.size() > 1
    149                 && customers.size() / workingTellers.size() < 2)
    150             reassignOneTeller();
    151         if (customers.size() == 0)
    152             while (workingTellers.size() > 1)
    153                 reassignOneTeller();
    154 
    155     }
    156 
    157     private void reassignOneTeller() {
    158         Teller teller = workingTellers.poll();
    159         teller.doSomethingElse();
    160         tellersDoingOtherThings.offer(teller);
    161     }
    162 
    163     @Override
    164     public void run() {
    165         try {
    166             while (!Thread.interrupted()) {
    167                 TimeUnit.MILLISECONDS.sleep(adjustmentPeriod);
    168                 adjustTellerNumber();
    169                 System.out.println(customers + " { ");
    170                 for (Teller teller : workingTellers) {
    171                     System.out.println(teller.shortString() + " ");
    172                 }
    173                 System.out.println("}");
    174             }
    175         } catch (InterruptedException e) {
    176             System.out.println(this + "interrupted");
    177         }
    178         System.out.println(this + "terminating");
    179     }
    180 }
    181 
    182 public class BankTellerSimulation {
    183     static final int MAX_LINE_SIZE = 50;
    184     static final int ADJUSTMENT_PERIOD = 1000;
    185 
    186     public static void main(String[] args) throws Exception {
    187         ExecutorService exec = Executors.newCachedThreadPool();
    188         CustomerLine customers = new CustomerLine(MAX_LINE_SIZE);
    189         exec.execute(new CustomerGenerator(customers));
    190         exec.execute(new TellerManger(exec, customers, ADJUSTMENT_PERIOD));
    191         if (args.length > 0)
    192             TimeUnit.SECONDS.sleep(new Integer(args[0]));
    193         else {
    194             System.out.println("press 'Enter' to quit");
    195             System.in.read();
    196         }
    197         exec.shutdownNow();
    198     }
    199 }
    View Code

    并发编程实战2


      该代码模拟的了汽车组装线,每辆Car通过流水线生产,通过机器人来安装汽车发动机、车厢和轮子。Car是通过CarQueue(阻塞队列)从一个地方传送到另一个地方。ChassisBuilder创建了一个未加修饰的Car,并将它放在一个CarQueue中。Assembler从CarQueue中取走Car,并雇佣Robot对其进行加工。CyclicBarrier使Assembler等待,等到所有的Robot都完成后,将Car放置在即将离开它的CarQueue中,然后被传送至下一个操作。最终的CarQueue的消费者是一个Reporter对象,它只打印Car,显示所有的任务已经正确完成。Robot是在池中管理的,当需要完成工作时,就会从池中雇佣适当的Robot,使用完之后会归还Robot.

      关键字

    • Runnable
    • Thread.interrupted()
    • wait/notifyAll
    • CyclicBarrier
    • 对象池
    • synchronized
    • 泛型
      1 package com.dy.xidian;
      2 
      3 import java.util.HashSet;
      4 import java.util.Set;
      5 import java.util.concurrent.BrokenBarrierException;
      6 import java.util.concurrent.CyclicBarrier;
      7 import java.util.concurrent.ExecutorService;
      8 import java.util.concurrent.Executors;
      9 import java.util.concurrent.LinkedBlockingQueue;
     10 import java.util.concurrent.TimeUnit;
     11 
     12 class Car {
     13     private final int id;
     14     private boolean engine = false, driveTrain = false, wheels = false;
     15 
     16     public Car(int idn) {
     17         id = idn;
     18     }
     19 
     20     public Car() {
     21         id = -1;
     22     }
     23 
     24     public synchronized int gerId() {
     25         return id;
     26     }
     27 
     28     public synchronized void addEngine() {
     29         engine = true;
     30     }
     31 
     32     public synchronized void addDriveTrain() {
     33         driveTrain = true;
     34     }
     35 
     36     public synchronized void addWheels() {
     37         wheels = true;
     38     }
     39 
     40     public synchronized String toString() {
     41         return "Car " + id + " engine " + engine + " driveTrain: " + driveTrain
     42                 + " wheels: " + wheels + " ]";
     43     }
     44 }
     45 
     46 class CarQueue extends LinkedBlockingQueue<Car> {
     47 }
     48 
     49 class ChassisBuilder implements Runnable {
     50     private CarQueue carQueue;
     51     public int counter = 0;
     52     public ChassisBuilder(CarQueue cq) {
     53         carQueue = cq;
     54     }
     55 
     56     @Override
     57     public void run() {
     58         try {
     59             while (!Thread.interrupted()) {
     60                 TimeUnit.MILLISECONDS.sleep(500);
     61                 Car c = new Car(counter++);
     62                 System.out.println("ChassisBuilder created " + c);
     63                 carQueue.put(c);
     64             }
     65         } catch (InterruptedException e) {
     66             System.out.println("Interrupted: ChassisBuilder");
     67         }
     68         System.out.println("ChassisBuilder off");
     69     }
     70 
     71 }
     72 
     73 abstract class Robot implements Runnable {
     74     private RobotPool pool;
     75     public Robot(RobotPool p) {
     76         pool = p;
     77     }
     78     protected Assembler assembler;
     79 
     80     public Robot assignAssembler(Assembler assembler) {
     81         this.assembler = assembler;
     82         return this;
     83     }
     84 
     85     private boolean engage = false;
     86 
     87     public synchronized void engage() {
     88         engage = true;
     89         notifyAll();
     90     }
     91 
     92     abstract protected void performService();
     93 
     94     public void run() {
     95         try {
     96             powerDown();
     97             while (!Thread.interrupted()) {
     98                 performService();
     99                 try {
    100                     assembler.barrier().await();
    101                 } catch (BrokenBarrierException e) {
    102                     throw new RuntimeException(e);
    103                 }
    104                 powerDown();
    105             }
    106         } catch (InterruptedException e) {
    107             System.out.println("Exiting " + this + " via interrupt");
    108         }
    109         System.out.println(this + " off");
    110     }
    111 
    112     private synchronized void powerDown() throws InterruptedException {
    113         engage = false;
    114         assembler = null;
    115         pool.release(this);
    116         while (engage == false)
    117             wait();
    118     }
    119 
    120     public String toString() {
    121         return getClass().getName();
    122     }
    123 }
    124 
    125 class EngineRobot extends Robot {
    126     public EngineRobot(RobotPool pool) {
    127         super(pool);
    128     }
    129     protected void performService() {
    130         System.out.println(this + " installing engine");
    131         assembler.car().addEngine();
    132     }
    133 }
    134 
    135 class DriveTrainRobot extends Robot {
    136 
    137     public DriveTrainRobot(RobotPool p) {
    138         super(p);
    139     }
    140 
    141     @Override
    142     protected void performService() {
    143         System.out.println(this + " installing DriveTrain");
    144         assembler.car().addDriveTrain();
    145     }
    146 
    147 }
    148 
    149 class WheelRobot extends Robot {
    150 
    151     public WheelRobot(RobotPool p) {
    152         super(p);
    153     }
    154 
    155     @Override
    156     protected void performService() {
    157         System.out.println(this + " installing wheels");
    158         assembler.car().addWheels();
    159     }
    160 }
    161 
    162 class RobotPool {
    163     private Set<Robot> pool = new HashSet<Robot>();
    164 
    165     public synchronized void add(Robot r) {
    166         pool.add(r);
    167         notifyAll();
    168     }
    169 
    170     public synchronized void hire(Class<? extends Robot> robotType, Assembler d)
    171             throws InterruptedException {
    172         for (Robot r : pool)
    173             if (r.getClass().equals(robotType)) {
    174                 pool.remove(r);
    175                 r.assignAssembler(d);
    176                 r.engage();
    177                 return;
    178             }
    179         wait();
    180         hire(robotType, d);
    181     }
    182 
    183     public synchronized void release(Robot r) {
    184         add(r);
    185     }
    186 }
    187 
    188 class Assembler implements Runnable {
    189     private CarQueue chassisQueue, finishingQueue;
    190     private Car car;
    191     private CyclicBarrier barrier = new CyclicBarrier(4);
    192     private RobotPool robotPool;
    193 
    194     public Assembler(CarQueue chassisQueue, CarQueue finishingQueue,
    195             RobotPool robotPool) {
    196         super();
    197         this.chassisQueue = chassisQueue;
    198         this.finishingQueue = finishingQueue;
    199         this.robotPool = robotPool;
    200     }
    201 
    202     public Car car() {
    203         return car;
    204     }
    205 
    206     public CyclicBarrier barrier() {
    207         return barrier;
    208     }
    209 
    210     @Override
    211     public void run() {
    212         try {
    213             while (!Thread.interrupted()) {
    214                 car = chassisQueue.take();
    215                 robotPool.hire(EngineRobot.class, this);
    216                 robotPool.hire(DriveTrainRobot.class, this);
    217                 robotPool.hire(WheelRobot.class, this);
    218                 try {
    219                     barrier.await();
    220                 } catch (BrokenBarrierException e) {
    221                     throw new RuntimeException(e);
    222                 }
    223                 finishingQueue.put(car);
    224             }
    225         } catch (InterruptedException e) {
    226             System.out.println("Exiting Assembler via interrupt");
    227         }
    228         System.out.println("Assemble off");
    229     }
    230 
    231 }
    232 
    233 class Reporter implements Runnable {
    234     private CarQueue carQueue;
    235     public Reporter(CarQueue cq) {
    236         carQueue = cq;
    237     }
    238 
    239     @Override
    240     public void run() {
    241         try {
    242             while (!Thread.interrupted())
    243                 System.out.println(carQueue.take());
    244         } catch (InterruptedException e) {
    245             System.out.println("Exiting Reporter via interrupt");
    246         }
    247         System.out.println("Reporter off");
    248     }
    249 }
    250 
    251 public class CarBuilder {
    252     public static void main(String[] args) throws InterruptedException {
    253         CarQueue chassisQueue = new CarQueue(), finishingQueue = new CarQueue();
    254         ExecutorService exec = Executors.newCachedThreadPool();
    255         RobotPool robotPool = new RobotPool();
    256         exec.execute(new EngineRobot(robotPool));
    257         exec.execute(new DriveTrainRobot(robotPool));
    258         exec.execute(new WheelRobot(robotPool));
    259         exec.execute(new Assembler(chassisQueue, finishingQueue, robotPool));
    260         exec.execute(new Reporter(finishingQueue));
    261         exec.execute(new ChassisBuilder(chassisQueue));
    262         TimeUnit.SECONDS.sleep(7);
    263         exec.shutdownNow();
    264     }
    265 }
    View Code

     

  • 相关阅读:
    一篇文章搞懂密码学基础及SSL/TLS协议
    如何编写一个多进程性能测试程序
    自动化测试用例编写日志总结(一)
    Python lambda匿名函数
    Python map() 函数
    python中sorted函数的理解(对list列表排序,对dict字典排序)
    python list列表冒泡排序
    range的用法
    同时安装python2和python3是使用pip的方法:
    cmd输出文件树结构
  • 原文地址:https://www.cnblogs.com/xidongyu/p/5339491.html
Copyright © 2011-2022 走看看