zoukankan      html  css  js  c++  java
  • JAVA5 并发库的使用

    一、线程池

       1: public static void main(String[] args) {
       2:     // 产生线程池,有3个线程,使用固定线程池创建
       3:     //ExecutorService threadPool = Executors.newFixedThreadPool(3);
       4:     //产生线程池,动态创建线程池的大小
       5:     ExecutorService threadPool = Executors.newCachedThreadPool();
       6:     //向线程池添加10个任务
       7:     for (int i=1; i<10; i++) {
       8:         final int task = i;
       9:         threadPool.execute(new Runnable() {
      10:             @Override
      11:             public void run() {
      12:                 for (int j=0; j<10; j++) {
      13:                     try {
      14:                         Thread.sleep(20);
      15:                     } catch (InterruptedException e) {
      16:                         // TODO Auto-generated catch block
      17:                         e.printStackTrace();
      18:                     }
      19:                     System.out.println(
      20:                             Thread.currentThread().getName() +
      21:                             " is  looping of " + j +
      22:                             " for task " + task);
      23:                 }
      24:                 
      25:             }
      26:         });
      27:     }
      28:     
      29:     //线程调度的使用
      30:     //该功能为定时运行6秒以后运行,然后每隔2秒运行一次
      31:     Executors.newScheduledThreadPool(3).scheduleAtFixedRate(
      32:             new Runnable() {
      33:                 @Override
      34:                 public void run() {
      35:                     // TODO Auto-generated method stub
      36:                     System.out.println("bomb....");
      37:                 }
      38:             }, 
      39:             6,
      40:             2,
      41:             TimeUnit.SECONDS);
      42: }

    二、并发锁(lock)

    Lock比传统线程模型中的synchronized方式更加面向对象。

       1: Lock lock = new ReentrantLock();
       2: lock.lock();
       3: try {
       4: ........
       5: } finally {
       6:   lock.unlock();
       7: }
    读写锁:读锁不互斥,读锁与写锁互斥,写锁与写锁互斥
       1: //从JAVA文档中找到的,如果数据没有读到,就解开读锁,加上写锁
       2: class CachedData {
       3:    Object data;
       4:    volatile boolean cacheValid;
       5:    ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
       6:  
       7:    void processCachedData() {
       8:      rwl.readLock().lock();
       9:      if (!cacheValid) {
      10:         // Must release read lock before acquiring write lock
      11:         rwl.readLock().unlock();
      12:         rwl.writeLock().lock();
      13:         // Recheck state because another thread might have acquired
      14:         //   write lock and changed state before we did.
      15:         if (!cacheValid) {
      16:           data = ...
      17:           cacheValid = true;
      18:         }
      19:         // Downgrade by acquiring read lock before releasing write lock
      20:         //我的理解,写上读锁,下面的写锁降级成更新锁
      21:         rwl.readLock().lock();
      22:         rwl.writeLock().unlock(); // Unlock write, still hold read
      23:      }
      24:  
      25:      use(data);
      26:      rwl.readLock().unlock();
      27:    }
      28:  }

    三、锁的条件(Condition)

    Condition的功能就是在传统线程技术中的wait和notify的功能。

       1: /**
       2:  * 每个方法各执行分10次和5次运行
       3:  *
       4:  */
       5: public class ConditionTest {
       6:  
       7:     public static void main(String[] args) {
       8:         ExecutorService service = Executors.newSingleThreadExecutor();
       9:         final Business2 business = new Business2();
      10:         service.execute(new Runnable(){
      11:  
      12:             public void run() {
      13:                 for(int i=0;i<50;i++){
      14:                     business.sub();
      15:                 }
      16:             }
      17:             
      18:         });
      19:         
      20:         for(int i=0;i<50;i++){
      21:             business.main();
      22:         }
      23:     }
      24:  
      25: }
      26:  
      27: class Business2{
      28:     Lock lock = new ReentrantLock();
      29:     //为了达到线程间通信
      30:     Condition condition = lock.newCondition();
      31:     boolean bShouldSub = true;
      32:     public void sub(){
      33:         lock.lock();
      34:         if(!bShouldSub)
      35:             try {
      36:                 condition.await();//线程等待
      37:             } catch (InterruptedException e) {
      38:                 // TODO Auto-generated catch block
      39:                 e.printStackTrace();
      40:             }
      41:         try
      42:         {
      43:             for(int i=0;i<10;i++){
      44:                 System.out.println(Thread.currentThread().getName() + " : " + i);
      45:             }
      46:             bShouldSub = false;
      47:             //通知其他的线程
      48:             condition.signal();
      49:         }finally{
      50:             lock.unlock();
      51:         }
      52:     }
      53:     
      54:     public void main(){
      55:         lock.lock();
      56:         if(bShouldSub)
      57:             try {
      58:                 condition.await();
      59:             } catch (InterruptedException e) {
      60:                 e.printStackTrace();
      61:             }        
      62:         try
      63:         {
      64:             for(int i=0;i<5;i++){
      65:                 System.out.println(Thread.currentThread().getName() + " : " + i);
      66:             }
      67:             bShouldSub = true;
      68:             condition.signal();            
      69:         }finally{
      70:             lock.unlock();
      71:         }        
      72:     }
      73: }

    另外,再来一个JDK中的例子,更加经典

       1: //从JDK中找到的例子,还是JDK得例子经典
       2: //本例子是一个可阻塞的队列
       3: class BoundedBuffer {
       4:    final Lock lock = new ReentrantLock();
       5:    //此处用到2个Condition,是为了区别取和放操作
       6:    final Condition notFull  = lock.newCondition(); 
       7:    final Condition notEmpty = lock.newCondition(); 
       8:    //队列为100
       9:    final Object[] items = new Object[100];
      10:    int putptr, takeptr, count;
      11:  
      12:    public void put(Object x) throws InterruptedException {
      13:      lock.lock();
      14:      try {
      15:        while (count == items.length) 
      16:          notFull.await();//队列满,等待
      17:        items[putptr] = x; 
      18:        if (++putptr == items.length) 
      19:       putptr = 0;//指针越界
      20:        ++count;
      21:        notEmpty.signal();
      22:      } finally {
      23:        lock.unlock();
      24:      }
      25:    }
      26:  
      27:    public Object take() throws InterruptedException {
      28:      lock.lock();
      29:      try {
      30:        while (count == 0) 
      31:          notEmpty.await();//队列空
      32:        Object x = items[takeptr]; 
      33:        if (++takeptr == items.length) takeptr = 0;
      34:        --count;
      35:        notFull.signal();
      36:        return x;
      37:      } finally {
      38:        lock.unlock();
      39:      }
      40:    } 
      41:  }

    四、信号灯(Semaphore)

    维护当前访问自身的线程个数,并提供同步机制。

       1: public class SemaphoreTest {
       2:     public static void main(String[] args) {
       3:         ExecutorService service = Executors.newCachedThreadPool();
       4:         final  Semaphore sp = new Semaphore(3);//只允许3个线程的并发
       5:         for(int i=0;i<10;i++){
       6:             Runnable runnable = new Runnable(){
       7:                     public void run(){
       8:                     try {
       9:                         sp.acquire();//是否可以让当前线程执行
      10:                     } catch (InterruptedException e1) {
      11:                         e1.printStackTrace();
      12:                     }
      13:                     System.out.println("线程" + Thread.currentThread().getName() + 
      14:                             "进入,当前已有" + (3-sp.availablePermits()) + "个并发");
      15:                     try {
      16:                         Thread.sleep((long)(Math.random()*10000));
      17:                     } catch (InterruptedException e) {
      18:                         e.printStackTrace();
      19:                     }
      20:                     System.out.println("线程" + Thread.currentThread().getName() + 
      21:                             "即将离开");                    
      22:                     sp.release();//释放信号量
      23:                     //下面代码有时候执行不准确,因为其没有和上面的代码合成原子单元
      24:                     System.out.println("线程" + Thread.currentThread().getName() + 
      25:                             "已离开,当前已有" + (3-sp.availablePermits()) + "个并发");                    
      26:                 }
      27:             };
      28:             service.execute(runnable);            
      29:         }
      30:     }
      31:  
      32: }

    五、其他同步工具
    CyclicBarrier:需要有多个线程同时到达才向下执行

       1:  
       2: public class CyclicBarrierTest {
       3:         //模拟旅游集合的情况
       4:     public static void main(String[] args) {
       5:         ExecutorService service = Executors.newCachedThreadPool();
       6:         final  CyclicBarrier cb = new CyclicBarrier(3);
       7:         for(int i=0;i<3;i++){
       8:             Runnable runnable = new Runnable(){
       9:                     public void run(){
      10:                     try {
      11:                         Thread.sleep((long)(Math.random()*10000));    
      12:                         System.out.println("线程" + Thread.currentThread().getName() + 
      13:                                 "即将到达集合地点1,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");                        
      14:                         cb.await();//只有3个线程都到此处,程序才会往下走
      15:                         
      16:                         Thread.sleep((long)(Math.random()*10000));    
      17:                         System.out.println("线程" + Thread.currentThread().getName() + 
      18:                                 "即将到达集合地点2,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");                        
      19:                         cb.await();    
      20:                         Thread.sleep((long)(Math.random()*10000));    
      21:                         System.out.println("线程" + Thread.currentThread().getName() + 
      22:                                 "即将到达集合地点3,当前已有" + cb.getNumberWaiting() + "个已经到达,正在等候");                        
      23:                         cb.await();                        
      24:                     } catch (Exception e) {
      25:                         e.printStackTrace();
      26:                     }                
      27:                 }
      28:             };
      29:             service.execute(runnable);
      30:             
      31:         }
      32:         service.shutdown();
      33:     }
      34:     
      35: }


    CountDownLatch:犹如倒计数器,等计数器减少到0,程序向下执行

       1: public class CountdownLatchTest {
       2:    //模拟赛跑的情况
       3:     public static void main(String[] args) {
       4:         ExecutorService service = Executors.newCachedThreadPool();
       5:          //计数器
       6:         final CountDownLatch cdOrder = new CountDownLatch(1);
       7:         final CountDownLatch cdAnswer = new CountDownLatch(3);        
       8:         for(int i=0;i<3;i++){
       9:             Runnable runnable = new Runnable(){
      10:                     public void run(){
      11:                     try {
      12:                         System.out.println("线程" + Thread.currentThread().getName() + 
      13:                                 "正准备接受命令");                        
      14:                         cdOrder.await();
      15:                         System.out.println("线程" + Thread.currentThread().getName() + 
      16:                         "已接受命令");                                
      17:                         Thread.sleep((long)(Math.random()*10000));    
      18:                         System.out.println("线程" + Thread.currentThread().getName() + 
      19:                                 "回应命令处理结果");                        
      20:                         cdAnswer.countDown();                        
      21:                     } catch (Exception e) {
      22:                         e.printStackTrace();
      23:                     }                
      24:                 }
      25:             };
      26:             service.execute(runnable);
      27:         }    
      28:         //主线程    
      29:         try {
      30:             Thread.sleep((long)(Math.random()*10000));
      31:         
      32:             System.out.println("线程" + Thread.currentThread().getName() + 
      33:                     "即将发布命令");
      34:             //计数器减1                        
      35:             cdOrder.countDown();
      36:             System.out.println("线程" + Thread.currentThread().getName() + 
      37:             "已发送命令,正在等待结果");    
      38:             cdAnswer.await();//等待计数器为0,然后主线程往下走
      39:             System.out.println("线程" + Thread.currentThread().getName() + 
      40:             "已收到所有响应结果");    
      41:         } catch (Exception e) {
      42:             e.printStackTrace();
      43:         }                
      44:         service.shutdown();
      45:  
      46:     }
      47: }

    Exchanger:实现线程间的数据交换

       1: public class ExchangerTest {
       2:  
       3:     public static void main(String[] args) {
       4:         ExecutorService service = Executors.newCachedThreadPool();
       5:         final Exchanger exchanger = new Exchanger();
       6:         service.execute(new Runnable(){
       7:             public void run() {
       8:                 try {                
       9:                     Thread.sleep((long)(Math.random()*10000));
      10:                     String data1 = "zxx";
      11:                     System.out.println("线程" + Thread.currentThread().getName() + 
      12:                     "正在把数据" + data1 +"换出去");
      13:                     String data2 = (String)exchanger.exchange(data1);//交换数据
      14:                     System.out.println("线程" + Thread.currentThread().getName() + 
      15:                     "换回的数据为" + data2);
      16:                 }catch(Exception e){
      17:                     
      18:                 }
      19:             }    
      20:         });
      21:         service.execute(new Runnable(){
      22:             public void run() {
      23:                 try {                
      24:                     Thread.sleep((long)(Math.random()*10000));
      25:                     String data1 = "lhm";
      26:                     System.out.println("线程" + Thread.currentThread().getName() + 
      27:                     "正在把数据" + data1 +"换出去");
      28:                     String data2 = (String)exchanger.exchange(data1);
      29:                     System.out.println("线程" + Thread.currentThread().getName() + 
      30:                     "换回的数据为" + data2);
      31:                 }catch(Exception e){
      32:                     
      33:                 }                
      34:             }    
      35:         });        
      36:     }
      37:  
      38: }

    六、可阻塞的队列

       1: public class BlockingQueueCondition {
       2:     //现在2个线程的交替操作
       3:     public static void main(String[] args) {
       4:         ExecutorService service = Executors.newSingleThreadExecutor();
       5:         final Business3 business = new Business3();
       6:         service.execute(new Runnable(){
       7:  
       8:             public void run() {
       9:                 for(int i=0;i<50;i++){
      10:                     business.sub();
      11:                 }
      12:             }
      13:             
      14:         });
      15:         
      16:         for(int i=0;i<50;i++){
      17:             business.main();
      18:         }
      19:     }
      20:  
      21: }
      22:  
      23: class Business3{
      24:     BlockingQueue subQueue = new ArrayBlockingQueue(1);
      25:     BlockingQueue mainQueue = new ArrayBlockingQueue(1);
      26:     //匿名构造方法,相当于一个构造方法
      27:         {
      28:         try {
      29:             mainQueue.put(1);//让主队列满,不能put操作
      30:         } catch (InterruptedException e) {
      31:             e.printStackTrace();
      32:         }
      33:     }
      34:     public void sub(){
      35:         try
      36:         {
      37:             mainQueue.take();
      38:             for(int i=0;i<10;i++){
      39:                 System.out.println(Thread.currentThread().getName() + " : " + i);
      40:             }
      41:             subQueue.put(1);
      42:         }catch(Exception e){
      43:  
      44:         }
      45:     }
      46:     
      47:     public void main(){
      48:         
      49:         try
      50:         {
      51:             subQueue.take();
      52:             for(int i=0;i<5;i++){
      53:                 System.out.println(Thread.currentThread().getName() + " : " + i);
      54:             }
      55:             mainQueue.put(1);
      56:         }catch(Exception e){
      57:         }        
      58:     }
      59: }

    七、同步集合类

  • 相关阅读:
    ado.net 完整修改删除,攻击防攻击
    ado.net 修改,查询
    navicat连接sqlserver未指定默认驱动程序
    设计模式之Proxy(代理)(转)
    设计模式之Prototype(原型)(转)
    设计模式之Observer(观察者)(转)
    设计模式之Memento(备忘机制)(转)
    设计模式之Mediator(中介者)(转)
    设计模式之Interpreter(解释器)(转)
    信步漫谈之Xfire—基础介绍
  • 原文地址:https://www.cnblogs.com/sodmecai/p/2483437.html
Copyright © 2011-2022 走看看