zoukankan      html  css  js  c++  java
  • java并发包下的并发工具类

    1.Exchanger

    功能:用于线程间数据的交换

    应用场景:1)遗传算法,目前还不是特别理解  2)校对工作,假设A,B线程做同一件任务,可以通过数据校验判断两线程是否正确的工作

    例子:是一个简单的校对工作例子

    public class TestExchanger {
        public static void main(String[] args) {
            Exchanger<String> exchanger = new Exchanger<>();
            ExecutorService es = Executors.newFixedThreadPool(2);  //拥有两个线程的线程池
            es.execute(new Runnable() {
                @Override
                public void run() {
                    String A = "银行流水A";
                    try {
                        exchanger.exchange(A);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
    
            es.execute(new Runnable() {
                @Override
                public void run() {
                    String B = "银行流水B";
                    try {
                        String A = exchanger.exchange(B);
                        System.out.println("A和B数据是否一致: " + A.equals(B) + ",A: " + A + ",B: " + B);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }

    Exchanger类中最重要的一个方法就是exchange(),该方法用于交换信息,并且接受来自另外一个线程的数据,exchange()方法里面还可以加参数。exchange(V x,long timeout,TimeUnit unit)设置一个最大等待时间避免一直等待

    注意:信息交换是在同步点(Exchager提供一个同步点)进行交换,而不是两个线程调用了exchange()方法就进行交换。

    底层实现:Lock+Condition,暂时还没深入,学习了进行补充

    2.Semaphore

    功能:控制同时访问特定资源的线程数量

    应用场景:流量控制,比如数据库的连接

    例子:

    public class TestSemaphore {
        public static void main(String[] args) {
            // 线程池
            ExecutorService exec = Executors.newCachedThreadPool();
            //一次只能5个线程同时访问
            final Semaphore semp = new Semaphore(5);
            // 模拟20个客户端访问
            for (int index = 0; index < 20; index++) {
                final int NO = index;
                Runnable run = new Runnable() {
                    public void run() {
                        try {
                            // 获取许可
                            semp.acquire();
                            System.out.println("Accessing: " + NO);
                            Thread.sleep((long) (Math.random() * 10000));
                            // 访问完后,释放
                            semp.release();
                            System.out.println("-----------------"+semp.availablePermits());
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
    
                    }
    
                };
                exec.execute(run);
            }
    
            // 退出线程池
            exec.shutdown();
    
        }
    }

    同时有五个线程可以执行,获取资源后打印语句后随机睡眠,最后释放资源,semp.availablePermits(),可以获得的许可数量,释放一个后将有一个没有被分发的许可证,当有多的许可证时,会采取先到先得的方式分配许可

    Semaphore有两个构造函数,Semaphore(int)和Semaphore(int,boolean)。参数中的int表示该信号量拥有的许可数量,boolean表示获取许可的时候是否是公平的,如果是公平的那么,当有多个线程要获取许可时,会按照线程来的先后顺序分配许可,否则,线程获得许可的顺序是不定的

    3.CyclicBarrier

    功能:控制同时访问特定资源的线程数量

    应用场景:希望创建一组任务,并行执行,在进行下一个步骤之前等待,直到所有任务完成

    例子:赛马比赛。只有当所有马一起到达栅栏时,才能继续跑下一个栅栏

    Horse类:

    public class Horse implements Runnable {
        private static int counter = 0;
        private final int id = counter++;
        private int strides = 0;
        private static Random rand = new Random(47);
        private static CyclicBarrier barrier;
        public Horse(CyclicBarrier b) { barrier = b; }
        public synchronized int getStrides() { return strides; }
        public void run() {
            try {
                while(!Thread.interrupted()) {
                    synchronized(this) {
                        strides += rand.nextInt(3); // Produces 0, 1 or 2
                    }
                    barrier.await();  //在栅栏处等待
                }
            } catch(InterruptedException e) {
                // A legitimate way to exit
            } catch(BrokenBarrierException e) {
                // This one we want to know about
                throw new RuntimeException(e);
            }
        }
        public String toString() { return "Horse " + id + " "; }
        public String tracks() {
            StringBuilder s = new StringBuilder();
            for(int i = 0; i < getStrides(); i++)
                s.append("*");
            s.append(id);
            return s.toString();
        }
    } 

    HorseRace类:

    public class HorseRace {
        static final int FINISH_LINE = 75;
        private List<Horse> horses = new ArrayList<Horse>();
        private ExecutorService exec =
                Executors.newCachedThreadPool();
        private CyclicBarrier barrier;
        public HorseRace(int nHorses, final int pause) {
            barrier = new CyclicBarrier(nHorses, new Runnable() {
                public void run() {
                    System.out.println("执行runnable方法的线程: " + Thread.currentThread().getName());
                    StringBuilder s = new StringBuilder();
                    for(int i = 0; i < FINISH_LINE; i++)
                        s.append("="); // The fence on the racetrack
                    System.out.println(s);
                    for(Horse horse : horses)
                        System.out.println(horse.tracks());
                    for(Horse horse : horses)
                        if(horse.getStrides() >= FINISH_LINE) {
                            System.out.println(horse + "won!");
                            exec.shutdownNow();
                            return;
                        }
                    try {
                        TimeUnit.MILLISECONDS.sleep(pause);
                    } catch(InterruptedException e) {
                        System.out.println("barrier-action sleep interrupted");
                    }
                }
            });
    
            for(int i = 0; i < nHorses; i++) {
                Horse horse = new Horse(barrier);
                horses.add(horse);
                exec.execute(horse);
            }
        }
    
        public static void main(String[] args) {
            int nHorses = 7;
            int pause = 200;
    
            new HorseRace(nHorses, pause);
        }
    }

    CyclicBarrier有两个构造函数:

    public CyclicBarrier(int parties, Runnable barrierAction) {}
    public CyclicBarrier(int parties) {}

    parties代表一次要并行执行的任务,barrierAction当这些线程都到达barries状态时要执行的任务,会选择一个线程去启动这个任务

    重载方法await()

    public int await() throws InterruptedException, BrokenBarrierException { } 
    public int await(long timeout, TimeUnit unit)throws InterruptedException,BrokenBarrierException,TimeoutException { }

    第一个方法用来挂起当前线程,直至所有线程都到达barrier状态再同时执行后续任务。第二个方法是让这些线程等待至一定的时间,如果还有线程没有到达barrier状态就直接让到达barrier的线程执行后续任务

    4.CountDownLatch

    功能:允许一个或多个线程等待其他线程完成操作

    应用场景:

    例子:

    public class TestCountDown {
        private static CountDownLatch c = new CountDownLatch(2);  //等待线程的执行数量为2
        public static void main(String[] args) throws InterruptedException {
            new Thread(
                    new Runnable() {
                        @Override
                        public void run() {
                            System.out.println(1);
                            c.countDown();
                        }
                    }
            ).start();
    
            new Thread(new Runnable() {
                @Override
                public void run() {
                    System.out.println(2);
                    c.countDown();
                }
            }).start();
    
            c.await();  //阻塞当前线程,即main线程等待其他线程完成任务以后才能执行
            System.out.println(3);
        }
    
    
    }
  • 相关阅读:
    moss jscript
    CSS两端对准
    简单的日期格式验证
    .NET中使用Exchange 2007 Webservice来读取邮件
    张二狗
    spcontext
    Infopath表单用改控件值提交或作别的更改时一定要在控件属性浏览器表单的回发设置为始终.
    使用 Web 服务
    SharePoint 2010中开发模式的改进 COM 客户端对象模型
    Retrieve data from Separate Oracle System
  • 原文地址:https://www.cnblogs.com/Hdaydayup/p/7986433.html
Copyright © 2011-2022 走看看