zoukankan      html  css  js  c++  java
  • 高并发编程基础(java.util.concurrent包常见类基础)

      JDK5中添加了新的java.util.concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能。因为同步容器将所有对容器状态的访问都串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了并发性,当多个线程竞争容器时,吞吐量严重降低。因此JDK5开始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入了java.util.concurrent包。与Vector和Hashtable、Collections.synchronizedXxx()同步容器等相比,util.concurrent中引入的并发容器主要解决了两个问题:

    1)根据具体场景进行设计,尽量避免synchronized,提供并发性。
    2)定义了一些并发安全的复合操作,并且保证并发环境下的迭代操作不会出错。

    这里先粗略的介绍下 concurrrent 包下面常见并发容器的简单应用,后续再针对性的去深入研究。

    并发容器:

    这些容器的关键方法大部分都实现了线程安全的功能,却不使用同步关键字(synchronized)。
     
    常见阻塞队列:
      BlockingQueue.class,阻塞队列接口
      DelayQueue.class,阻塞队列,无界队列,并且元素是Delay的子类,保证元素在达到一定时间后才可以取得到
    public class T07_DelayQueue {
    	// 执行定时任务
    	static BlockingQueue<MyTask> tasks = new DelayQueue<>();
    	static Random r = new Random();
    
    	static class MyTask implements Delayed {
    
    		long runningTime;
    
    		MyTask(Long rt) {
    
    			this.runningTime = rt;
    		}
    		@Override
    		public int compareTo(Delayed o) {
    			if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
    				return -1;
    			if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS))
    				return 1;
    			else
    				return 0;
    		}
    		@Override
    		public long getDelay(TimeUnit unit) {
    			return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    		}
    		
    		@Override
    		public String toString() {
    			return "" + runningTime ;
    		}
    	}
    	public static void main(String[] args) throws InterruptedException {
    		long now = System.currentTimeMillis();
    		
    		MyTask t1 =new MyTask(now +1000);
    		MyTask t2 =new MyTask(now +2000);
    		MyTask t3 =new MyTask(now +1500);
    		MyTask t4 =new MyTask(now +2500);
    		MyTask t5 =new MyTask(now +500);
    		
    		tasks.put(t1);
    		tasks.put(t2);
    		tasks.put(t3);
    		tasks.put(t4);
    		tasks.put(t5);
    		
    		System.out.println(tasks);
    		
    		for(int i=0;i<5;i++) {
    			System.out.println(tasks.take());
    		}
    		
    	}
    }
      
      BlockingDeque.class,双端阻塞队列接口
      ArrayBlockingQueue.class,阻塞队列,数组实现。有界阻塞队列
    public class T06_ArrayBlockingQueue {
    	// 阻塞队列
    	static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10);
    	static Random r =new Random();
    	public static void main(String[] args) throws InterruptedException { 
    		for(int i=0;i<10;i++) {
    			strs.put("a"+i);
    		}
    //		strs.add("aaa");//Exception in thread "main" java.lang.IllegalStateException: Queue full
    //		strs.put("aaa"); // 满了会等待 阻塞
    //		strs.offer("aaa");//不会报异常 有返回值  true or false
    //		strs.offer("aaa",1,TimeUnit.SECONDS); // 按照时间段阻塞
    	}
    }

      LinkedBlockingDeque.class,阻塞双端队列,链表实现
      LinkedBlockingQueue.class,阻塞队列,链表实现,无界,以下是该类对与生产者消费者模型的简单实现:
    public class T05_LinkedBlockingQueue {
    	// 阻塞队列
    	static BlockingQueue<String> strs = new LinkedBlockingQueue<>();
    	
    	static Random r =new Random();
    	//生产者消费者
    	public static void main(String[] args) {
    		new Thread(()->{
    			for(int i=0;i<100;i++) {
    				try {
    					strs.put("a"+i);//如果满了就会等待阻塞
    					System.err.println("a"+i);
    					TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    		},"p1").start();
    		
    		for(int i=0;i<5;i++) {
    			new Thread(()->{
    				while(true) {
    					try {
    						//take 如果空了 就会阻塞
    						System.out.println(Thread.currentThread().getName()+" take - "+ strs.take());
    					} catch (InterruptedException e) {
    						e.printStackTrace();
    					}
    				}
    			},"c" +i).start();
    		}
    	}
    }
      
      SynchronousQueue.class,同步队列,但是队列长度为0,生产者放入队列的操作会被阻塞,直到消费者过来取,所以这个队列根本不需要空间存放元素;有点像一个独木桥,一次只能一人通过,还不能在桥上停留
    public class T09_SynchronousQueue {
    	// 特殊的 transferQueue ,队列为空的
    	//任何添加东西都要直接丢给消费者的,不会往队列里加的
    	public static void main(String[] args) throws InterruptedException {
    		
    		BlockingQueue<String> strs = new SynchronousQueue<>();
    		
    		new Thread(()->{
    			try {
    				System.out.println(strs.take());
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}).start();
    		strs.put("aaa");//阻塞,等待消费者消费
    //		strs.add("aaa"); // 报错
    		System.out.println(strs.size());
    	}		
    }
     
    常见非阻塞队列:
      ConcurrentLinkedDeque.class,非阻塞双端队列,链表实现,无界队列
      ConcurrentLinkedQueue.class,非阻塞队列,链表实现
    public class T04_ConcurrentQueue {
    
    	public static void main(String[] args) {
    		Queue<String> strs = new ConcurrentLinkedQueue<>();
    		Queue<String> strs2 = new ConcurrentLinkedDeque<>();//双端队列
    		
    		for(int i=0;i<10;i++) {
    			// 根据返回值来判断是否添加成功
    			strs.offer("a"+i);
    		}
    		
    		System.out.println(strs);//[a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]
    		System.out.println(strs.size());// 10
    		System.out.println(strs.poll());// a0
    		System.out.println(strs.size());// 9
    		System.out.println(strs.peek());// a1
    		System.out.println(strs.size());// 9
    	}
    }
    

     

    转移队列:
      TransferQueue.class,转移队列接口,生产者要等消费者消费的队列,生产者尝试把元素直接转移给消费者
      LinkedTransferQueue.class,转移队列的链表实现,它比SynchronousQueue更快,transfer 方法有客户端准备消费,直接把消息直接传递给消费者,不放到队列里,没有消费者线程的话该线程会阻塞。但是可以调用 add put 王队列里丢,队列还是有容量的。
    public class T08_TransferQueue {
    	
    	public static void main(String[] args) throws InterruptedException {
    		
    		LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
    		
    		new Thread(()->{
    			try {
    				System.out.println(strs.take());
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		}).start();
    		strs.transfer("aaa");
    //		new Thread(()->{
    //			try {
    //				System.out.println(strs.take());
    //			} catch (InterruptedException e) {
    //				e.printStackTrace();
    //			}
    //		}).start();
    	}		
    }
    

      

    常见集合容器:
      ConcurrentMap.class,并发Map的接口,定义了putIfAbsent(k,v)、remove(k,v)、replace(k,oldV,newV)、replace(k,v)这四个并发场景下特定的方法
      ConcurrentHashMap.class,并发HashMap,ConcurrentHashMap在并发中效率比 HashTable高,因为 HashTable 在往里添加东西的时候药锁定整个对象,而 ConcurrentHashMap 分成了16段,插入的时候只锁定了其中的一段,其实就是把锁细粒度化了,因此在多线程情况下回比 hashTable高 ,同样也比 Collections.synchronizedMap(map1) 高。
      ConcurrentSkipListMap.class,跳表数据结构,它也是NavigableMap的实现类(要求元素之间可以比较),只有你确实需要快速的遍历操作,并且可以承受额外的插入开销的时候,在高并发中要求排序才去使用它。
      ConcurrentSkipListSet.class,和上面类似,只不过map变成了set
      以下代码可以简单的比较一下几个容器再并发的情况下的效率问题
    public class T01_ConcurrentMap {
    	public static void main(String[] args) {
    Map<String,String> map =new ConcurrentHashMap<>();//并发性比较高 // Map<String,String> map =new ConcurrentSkipListMap<>();//并发性高而且要求排序 // Map<String,String> map =new Hashtable<>();//并发性不是跟高 // Map<String,String> map1 =new HashMap<>(); // Map<String, String> map = Collections.synchronizedMap(map1);//并发性不是很高 Random r =new Random(); Thread[] ths =new Thread[100]; CountDownLatch latch =new CountDownLatch(ths.length); long start = System.currentTimeMillis(); for(int i=0;i<ths.length;i++) { ths[i]=new Thread(()->{ for(int j=0;j<10000;j++) { map.put("a"+r.nextInt(10000), "a"+r.nextInt(10000)); } latch.countDown(); }); } Arrays.asList(ths).forEach(o->o.start()); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } long end = System.currentTimeMillis(); System.out.println(end-start); } }
     
      CopyOnWriteArrayList.class,copy-on-write(写时复制)模式的array list,每当需要插入元素,不在原list上操作,而是会新建立一个list,然后将原先的引用指向副本。适合读远远大于写的场景
      CopyOnWriteArraySet.class,和上面类似,list变成set而已 
      以下代码可以简单的比较一下几个容器再并发的情况下的效率问题 
      
    public class T01_CopyOnWriteList {
    
    	public static void main(String[] args) {
    		List<String> lists = 
    //				new Vector<String>();//并发效率相对高
    //				new ArrayList<String>();//这个并发会有问题
    				new CopyOnWriteArrayList<String>();//效率比较低,读取非常快,读的时候不加锁
    		Random r =new Random();
    		Thread[] ths  =new Thread[100];
    		
    		for(int i=0;i<ths.length;i++) {
    			ths[i]=new Thread(()->{
    				for(int j=0;j<1000;j++) {
    					lists.add("a"+r.nextInt(10000) );
    				}
    			});
    		}
    		long start = System.currentTimeMillis();
    		
    		Arrays.asList(ths).forEach(o->o.start());
    		Arrays.asList(ths).forEach(o->{
    			try {
    				o.join();
    			} catch (InterruptedException e) {
    				e.printStackTrace();
    			}
    		});
    		
    		long end = System.currentTimeMillis();
    		System.out.println(end-start);
    		System.out.println(lists.size());
    	}
    }
    
  • 相关阅读:
    侧边框伸缩
    百度登录界面
    PHP 判断是否包含在某个字符串中
    三个等于号===和两个等于号==的区别
    PHP的魔法方法
    Apache和PHP环境配置
    群同构与线性空间同构的区别
    SciPy0.11.0(or higher)安装
    博客搬家
    简单的组件传值
  • 原文地址:https://www.cnblogs.com/wuzhenzhao/p/9928630.html
Copyright © 2011-2022 走看看