zoukankan      html  css  js  c++  java
  • Java线程同步类容器和并发容器(四)

    同步类容器都是线程安全的,在某些场景下,需要枷锁保护符合操作,最经典ConcurrentModifiicationException,原因是当容器迭代的过程中,被并发的修改了内容。

    for (Iterator iterator = tickets.iterator(); iterator.hasNext();) {
            			String string = (String) iterator.next();
            			tickets.remove(20);
           		}
    

      

    //多线程使用Vector或者HashTable的示例(简单线程同步问题)
    public class Tickets {
        public static void main(String[] args) {
            //初始化火车票池并添加火车票:避免线程同步可采用Vector替代ArrayList  HashTable替代HashMap
            final Vector<String> tickets = new Vector<String>();
    
            Map<String, String> map = Collections.synchronizedMap(new HashMap<String, String>());
    
            for(int i = 1; i<= 1000; i++){
                tickets.add("火车票"+i);
            }
    
    //        		for (Iterator iterator = tickets.iterator(); iterator.hasNext();) {
    //        			String string = (String) iterator.next();
    //        			tickets.remove(20);
    //       		}
    
            for(int i = 1; i <=10; i ++){
                new Thread("线程"+i){
                    public void run(){
                        while(true){
                            if(tickets.isEmpty()) break;
                            System.out.println(Thread.currentThread().getName() + "---" + tickets.remove(0));
                        }
                    }
                }.start();
            }
        }
    }
    

      同步类容器:如古老的Vector、HashTable。都是通过Collections.synchronized等工厂方法去创建实现的,底层用传统的synchronized关键字对每个共用的方法进行同步,使得每次只能有一个线程访问容器的状态。

    并发类容器是专门针对并发设计的。ConCurrentHashMap替代HashTable。使用CopyOnWriteArrayList代替Voctor,CopyonWriteArraySet,并发的Queue,ConcurrentLinkedQueue高性能队列,LinkedBlockingQueue阻塞形式的队列

    ConCurrentHashMap接口有两个实现

    ConcurentHashMap

    ConcurentSkipListMap(支持排序功能)

    ConcurentHashMap内部使用段(Segment)来表示不同的部分,每个段就是一个小的HashTable,有自己的锁,只要多个修改操作,发生在不同的段上,就可以并发进行。

    把一个整体分成16个段,最高支持16个线程的并发修改操作,在多线程中减小锁的粒度,从而降低锁竞争的一种方案,共享变量使用了volatile关键字声明,目的是第一时间获取修改的内容。

    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    public class UseConcurrentMap {
        public static void main(String[] args) {
            ConcurrentHashMap<String,Object> chm=new ConcurrentHashMap<String, Object>();
            chm.put("k1", "v1");
            chm.put("k1", "v7777");//覆盖
            chm.put("k2", "v2");
            chm.put("k3", "v3");
            chm.putIfAbsent("k4", "vvvv"); //存在 就不放入了
            chm.putIfAbsent("k4", "555");
            System.out.println(chm.get("k2"));
            System.out.println(chm.size());
            for (Map.Entry<String,Object> me:chm.entrySet()){
                System.out.println("key:" + me.getKey() + ",value:" + me.getValue());
            }
        }
    }
    

      

    import java.util.concurrent.TimeUnit;
    
    public class TimeUinitTest {
        private TimeUnit timeUnit = TimeUnit.DAYS;
        public static void main(String[] TimeUinitTest) {
            TimeUinitTest tut = new TimeUinitTest();
            tut.outInfo();
        }
        public void outInfo() {
            System.out.println(timeUnit.name());
            System.out.println(timeUnit.toDays(1));
            System.out.println(timeUnit.toHours(1));
            System.out.println(timeUnit.toMinutes(1));
            System.out.println(timeUnit.toSeconds(1));
            System.out.println(timeUnit.toMillis(1));
            System.out.println(timeUnit.toMicros(1));
            System.out.println(timeUnit.toNanos(1));
            System.out.println((timeUnit.convert(1, TimeUnit.DAYS)) + timeUnit.name());
            System.out.println((timeUnit.convert(24, TimeUnit.HOURS)) + timeUnit.name());
            System.out.println((timeUnit.convert(1440, TimeUnit.MINUTES)) + timeUnit.name());
            System.out.println("-------------------");
        }
    }
    

      

    public class Task implements Comparable<Task>{
        private int id ;
        private String name;
        public int getId() {
            return id;
        }
        public void setId(int id) {
            this.id = id;
        }
        public String getName() {
            return name;
        }
        public void setName(String name) {
            this.name = name;
        }
        @Override
        public int compareTo(Task task){
            return this.id>task.id?1:(this.id<task.id?-1:0);
        }
        public String toString(){
            return this.id + "," + this.name;
        }
    }
    

      

    Copy-On-Write容器:程序设计中的优化策略

    两种:CopyOneWriteArrayList和CopyOnWriteArraySet

    Copy-On-Write容器为写时复制的容器,当往一个容器添加元素的时候,不直接往当前容器添加,而是将当前容器就行Copy,复制出一个新的容器,然后新的容器中添加元素,添加完元素之后,在将原容器的引用指向新容器。

    可以对CopyOneWrite容器进行并发的读,不需要枷锁,当前容器不会添加任何元素,所以CopyOnWrite也是一种读写分离的思想,读和写在不同的容器中

    import java.util.concurrent.CopyOnWriteArrayList;
    import java.util.concurrent.CopyOnWriteArraySet;
    
    public class UseCopyOnWrite {
        public static void main(String[] args) {
            CopyOnWriteArrayList<String> cwal=new CopyOnWriteArrayList<String>();
            CopyOnWriteArraySet<String> cwas = new CopyOnWriteArraySet<String>();
        }
    }
    

      并发Queue:

     

    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.SynchronousQueue;
    

      

    public class UseQueue {
        public static void main(String[] args) throws Exception {
    
            //高性能无阻塞无界队列:ConcurrentLinkedQueue  offer和add是没有区别的
    
             ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
             q.offer("a");
             q.offer("b");
             q.offer("c");
             q.offer("d");
             q.add("e");
    
             System.out.println(q.poll());	//a 从头部取出元素,并从队列里删除
             System.out.println(q.size());	//4
             System.out.println(q.peek());	//b
             System.out.println(q.size());	//4
    
    
    
             ArrayBlockingQueue<String> array = new ArrayBlockingQueue<String>(5);
             array.put("a");
             array.put("b");
             array.add("c");
             array.add("d");
             array.add("e");
             array.add("f");
             //System.out.println(array.offer("a", 3, TimeUnit.SECONDS));
    
    
    
    
             //阻塞队列
             LinkedBlockingQueue<String> q = new LinkedBlockingQueue<String>();
             q.offer("a");
             q.offer("b");
             q.offer("c");
             q.offer("d");
             q.offer("e");
             q.add("f");
             //System.out.println(q.size());
    
             //		for (Iterator iterator = q.iterator(); iterator.hasNext();) {
             //			String string = (String) iterator.next();
             //			System.out.println(string);
             //		}
    
             List<String> list = new ArrayList<String>();
             System.out.println(q.drainTo(list, 3));
             System.out.println(list.size());
             for (String string : list) {
             System.out.println(string);
             }
    
    
    
            final SynchronousQueue<String> q = new SynchronousQueue<String>();
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println(q.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            t1.start();
            Thread t2 = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    q.add("asdasd");
                }
            });
            t2.start();
        }
    }
    

      BlockingQueue接口:(下面的全是阻塞的)

    import java.util.concurrent.PriorityBlockingQueue;
    
    public class UsePriorityBlockingQueue {
        public static void main(String[] args) throws Exception{
    
            PriorityBlockingQueue<Task> q = new PriorityBlockingQueue<Task>();
    
            Task t1 = new Task();
            t1.setId(3);
            t1.setName("id为3");
            Task t2 = new Task();
            t2.setId(4);
            t2.setName("id为4");
            Task t3 = new Task();
            t3.setId(1);
            t3.setName("id为1");
    
            //return this.id > task.id ? 1 : 0;
            q.add(t1);	//3
            q.add(t2);	//4
            q.add(t3);  //1
    
            // 1 3 4
            System.out.println("容器:" + q);
            System.out.println(q.take().getId());
            System.out.println("容器:" + q);
    		System.out.println(q.take().getId());
    		System.out.println(q.take().getId());
        }
    }
    

      

     //没有缓存的队列,生产者产生的数据会被消费者直接消费掉
            final SynchronousQueue<String> q = new SynchronousQueue<String>();
    
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("进入到t1线程中,阻塞等待获取元素");
                        System.out.println("消费"+q.take());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
            t1.start();
            TimeUnit.SECONDS.sleep(2);
            Thread t2 = new Thread(new Runnable() {
    
                @Override
                public void run() {
                    q.add("asdasd");
                }
            });
            t2.start();
        }
    

      

    import java.util.concurrent.DelayQueue;
    
    public class WangBa implements Runnable{
    private DelayQueue<Wangmin> queue=new DelayQueue<Wangmin>();
        public boolean yinye =true;
    
        public void shangji(String name,String id,int money){
            Wangmin man = new Wangmin(name, id, 1000 * money + System.currentTimeMillis());
            System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"交钱"+money+"块,开始上机...");
            this.queue.add(man);
        }
        public void xiaji(Wangmin man){
            System.out.println("网名"+man.getName()+" 身份证"+man.getId()+"时间到下机...");
        }
    
        @Override
        public void run() {
            while(yinye){
                try {
                    Wangmin man = queue.take();
                    xiaji(man);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public static void main(String args[]){
            try{
                System.out.println("网吧开始营业");
                WangBa siyu = new WangBa();
                Thread shangwang = new Thread(siyu);
                shangwang.start();
    
                siyu.shangji("路人甲", "123", 1);
                siyu.shangji("路人乙", "234", 10);
                siyu.shangji("路人丙", "345", 5);
            }
            catch(Exception e){
                e.printStackTrace();
            }
        }
    }
    

      

    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class Wangmin implements Delayed {
        private String name;
        //身份证
        private String id;
        //截止时间
        private long endTime;
        //定义时间工具类
        private TimeUnit timeUnit = TimeUnit.SECONDS;
    
        public Wangmin(String name,String id,long endTime){
            this.name=name;
            this.id=id;
            this.endTime = endTime;
        }
    
        public String getName(){
            return this.name;
        }
    
        public String getId(){
            return this.id;
        }
    
        /**
         * 用来判断是否到了截止时间
         */
        @Override
        public long getDelay(TimeUnit unit) {
            //return unit.convert(endTime, TimeUnit.MILLISECONDS) - unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            return endTime - System.currentTimeMillis();
        }
    
        /**
         * 相互批较排序用
         */
        @Override
        public int compareTo(Delayed delayed) {
            Wangmin w = (Wangmin)delayed;
            return this.getDelay(this.timeUnit) - w.getDelay(this.timeUnit) > 0 ? 1:0;
        }
    }
    

      AQS锁:

    Thread A = new Thread(new Runnable() {
    			
    			@Override
    			public void run() {
    				int sum = 0;
    				for(int i =0; i < 10; i ++){
    					sum += i;
    				}
    				try {
    					Thread.sleep(4000);
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    				LockSupport.park();	//后执行
    				System.err.println("sum: " + sum);
    			}
    		});
    		
    		A.start();
    		
    		Thread.sleep(1000);
    		
    		LockSupport.unpark(A);	//先执行
    

      

  • 相关阅读:
    1055. [HAOI2008]玩具取名【区间DP】
    BZOJ2435:[NOI2011]道路修建 (差分)
    1084. [SCOI2005]最大子矩阵【网格DP】
    1060. [ZJOI2007]时态同步【树形DP】
    1050. [HAOI2006]旅行【并查集+枚举】
    2463. [中山市选2009]谁能赢呢?【博弈论】
    luogu P1195 口袋的天空
    luogu P1162 填涂颜色
    luogu P1223 排队接水
    luogu P1331 海战
  • 原文地址:https://www.cnblogs.com/sunliyuan/p/10843769.html
Copyright © 2011-2022 走看看