zoukankan      html  css  js  c++  java
  • Exchanger学习

    1. Java并发新构件之Exchanger
    2. JDK API

    Exchaner 介绍

    JDK API 解释

    A synchronization point at which threads can pair and swap elements within pairs. Each thread presents some object on entry to the exchange method, matches with a partner thread, and receives its partner's object on return. An Exchanger may be viewed as a bidirectional form of a SynchronousQueue. Exchangers may be useful in applications such as genetic algorithms and pipeline designs.
    Sample Usage: Here are the highlights of a class that uses an Exchanger to swap buffers between threads so that the thread filling the buffer gets a freshly emptied one when it needs it, handing off the filled one to the thread emptying the buffer.

    我的翻译:
    Exchanger可以看成是一个同步点,在这个同步点上两个线程可以结对并且交换各自的数据。每个线程都可传入某个对象给exchange方法,然后与对应的伙伴线程匹配,并且接收伙伴线程交换的某个对象。Exchanger可以看成是一个双向的SynchronousQueue队列。Exchanger可以被用在诸如遗传算法和通道设计等应用里。

    张孝祥老师的比喻
    Exchaner好比两个毒贩要进行交易,一手交money,一手交drug,不管是谁先到接头地点后,就处于等待状态;当另外一个方也达到接头地点(所谓到达接头地点,也就是到达了准备接头的状态)时,两者的数据就立即交换了,然后就可以各忙各的了。

    我的理解
    Exchanger的作用:两个结对线程之间交换数据的工具类。

    简单使用

    import java.util.concurrent.Exchanger;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * Created by 58 on 2017-7-11.
     */
    public class ExchangerTest1 {
    
        private static final Exchanger<String> exchanger = new Exchanger<>();
    
        private static ExecutorService threadPool = Executors.newFixedThreadPool(3) ;
    
        public static void main(String[] args) {
    
            threadPool.execute(() -> {
                String A = "A" ;
                try {
                    System.out.println("A--->" + exchanger.exchange(A)) ;
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            });
            threadPool.execute(() -> {
    
                String B = "B" ;
                try {
                    System.out.println("B--->" + exchanger.exchange(B));
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            });
            threadPool.execute(() -> {
    
                String C = "C" ;
                try {
                    System.out.println("C--->" + exchanger.exchange(C));
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            });
        }
    }
    

    应用场景

    一个任务在创建对象,而这些对象的生产代价很高,另一个任务在消费这些对象。通过这种方式,可以有很多的对象被创建的同时被消费。

    
    import java.util.List;
    import java.util.concurrent.*;
    
    public class ExchangerDemo {
        static int size = 10;
        static int delay = 5; //秒
        public static void main(String[] args) throws Exception {
            ExecutorService exec = Executors.newCachedThreadPool();
            // 这个特定的List变体允许列表在被遍历的时候调用remove()方法,而不会抛出ConcurrentModifiedException异常
            List<Fat> producerList = new CopyOnWriteArrayList<>();
            List<Fat> consumerList = new CopyOnWriteArrayList<>();
            Exchanger<List<Fat>> exchanger = new Exchanger<>();
            exec.execute(new ExchangerProducer(exchanger, producerList));
            exec.execute(new ExchangerConsumer(exchanger, consumerList));
            TimeUnit.SECONDS.sleep(delay);
            exec.shutdownNow();
        }
    }
    
    
    class ExchangerProducer implements Runnable {
        private List<Fat> holder;
        private Exchanger<List<Fat>> exchanger;
        public ExchangerProducer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {
            this.exchanger = exchanger;
            this.holder = holder;
        }
        @Override
        public void run() {
            try {
                while(!Thread.interrupted()) {
                    //生产对象
                    for (int i = 0;i < ExchangerDemo.size; i++) {
                        holder.add(new Fat());
                    }
                    //生产完毕后 等待进行交换
                    holder = exchanger.exchange(holder);
                }
            } catch (InterruptedException e) {
            }
            System.out.println("Producer stopped.");
        }
    }
    
    class ExchangerConsumer implements Runnable {
        private List<Fat> holder;
        private Exchanger<List<Fat>> exchanger;
        private volatile Fat value;
        private static int num = 0;
        public ExchangerConsumer(Exchanger<List<Fat>> exchanger, List<Fat> holder) {
            this.exchanger = exchanger;
            this.holder = holder;
        }
        @Override
        public void run() {
            try {
                while(!Thread.interrupted()) {
                    //等待交换
                    holder = exchanger.exchange(holder);
                    //取到生产者生产的数据 开始模拟消费的场景
                    for (Fat x : holder) {
                        num ++;
                        value = x;
                        //消费 在循环内删除元素,这对于CopyOnWriteArrayList是没有问题的
                        holder.remove(x);
                    }
                    if (num % 10000 == 0) {
                        System.out.println("Exchanged count=" + num);
                    }
                }
            } catch (InterruptedException e) {
    
            }
            System.out.println("Consumer stopped. Final value: " + value);
        }
    }
    
    class Fat {
        private volatile double d;
        private static int counter = 1;
        private final int id = counter++;
        public Fat() {
            //执行一段耗时的操作
            for (int i = 1; i<10000; i++) {
                d += (Math.PI + Math.E) / (double)i;
            }
        }
        public String toString() {return "Fat id=" + id;}
    }
    
  • 相关阅读:
    LeetCode153.寻找旋转排序数组中的最小值
    LeetCode88.合并两个有序数组
    分析树
    LeetCode119.杨辉三角 II
    ssh传输文件
    ubuntu arm妙算加载cp210x驱动
    terminator终端工具
    ros使用rplidar hector_mapping建地图
    launch文件
    eclipse配置ros cakin编译环境
  • 原文地址:https://www.cnblogs.com/boothsun/p/7158867.html
Copyright © 2011-2022 走看看