zoukankan      html  css  js  c++  java
  • Exchanger, Changing data between concurrent tasks

    The Java concurrency API provides a synchronization utility that allows the interchange of data between two concurrent tasks. In more detail, the Exchanger class allows the definition of a synchronization point between two threads. When the two threads arrive to this point, they interchange a data structure so the data structure of the first thread goes to the second one and the data structure of the second thread goes to the first one.

    This class may be very useful in a situation similar to the producer-consumer problem. This is a classic concurrent problem where you have a common buffer of data, one or more producers of data, and one or more consumers of data. As the Exchanger class only synchronizes two threads, you can use it if you have a producer-consumer problem with one producer and one consumer.

    In this recipe, you will learn how to use the Exchanger class to solve the producer-consumer problem with one producer and one consumer.

    1. First, let's begin by implementing the producer. Create a class named Producer and specify that it implements the Runnable interface.

    package com.packtpub.java7.concurrency.chapter3.recipe7.task;
    
    import java.util.List;
    import java.util.concurrent.Exchanger;
    
    /**
     * This class implements the producer
     *
     */
    public class Producer implements Runnable {
    
        /**
         * Buffer to save the events produced
         */
        private List<String> buffer;
        
        /**
         * Exchager to synchronize with the consumer
         */
        private final Exchanger<List<String>> exchanger;
        
        /**
         * Constructor of the class. Initializes its attributes
         * @param buffer Buffer to save the events produced
         * @param exchanger Exchanger to syncrhonize with the consumer
         */
        public Producer (List<String> buffer, Exchanger<List<String>> exchanger){
            this.buffer=buffer;
            this.exchanger=exchanger;
        }
        
        /**
         * Main method of the producer. It produces 100 events. 10 cicles of 10 events.
         * After produce 10 events, it uses the exchanger object to synchronize with 
         * the consumer. The producer sends to the consumer the buffer with ten events and
         * receives from the consumer an empty buffer
         */
        @Override
        public void run() {
            int cycle=1;
            
            for (int i=0; i<10; i++){
                System.out.printf("Producer: Cycle %d
    ",cycle);
                
                for (int j=0; j<10; j++){
                    String message="Event "+((i*10)+j);
                    System.out.printf("Producer: %s
    ",message);
                    buffer.add(message);
                }
                
                try {
                    /*
                     * Change the data buffer with the consumer
                     */
                    buffer=exchanger.exchange(buffer);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                System.out.printf("Producer: %d
    ",buffer.size());
                
                cycle++;
            }
            
        } 
    
    }

    2. Second, implement the consumer. Create a class named Consumer and specify that it implements the Runnable interface.

    package com.packtpub.java7.concurrency.chapter3.recipe7.task;
    
    import java.util.List;
    import java.util.concurrent.Exchanger;
    
    /**
     * This class implements the consumer of the example
     *
     */
    public class Consumer implements Runnable {
    
        /**
         * Buffer to save the events produced
         */
        private List<String> buffer;
        
        /**
         * Exchager to synchronize with the consumer
         */
        private final Exchanger<List<String>> exchanger;
    
        /**
         * Constructor of the class. Initializes its attributes
         * @param buffer Buffer to save the events produced
         * @param exchanger Exchanger to syncrhonize with the consumer
         */
        public Consumer(List<String> buffer, Exchanger<List<String>> exchanger){
            this.buffer=buffer;
            this.exchanger=exchanger;
        }
        
        /**
         * Main method of the producer. It consumes all the events produced by the Producer. After
         * processes ten events, it uses the exchanger object to synchronize with 
         * the producer. It sends to the producer an empty buffer and receives a buffer with ten events
         */
        @Override
        public void run() {
            int cycle=1;
            
            for (int i=0; i<10; i++){
                System.out.printf("Consumer: Cycle %d
    ",cycle);
    
                try {
                    // Wait for the produced data and send the empty buffer to the producer
                    buffer=exchanger.exchange(buffer);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                System.out.printf("Consumer: %d
    ",buffer.size());
                
                for (int j=0; j<10; j++){
                    String message=buffer.get(0);
                    System.out.printf("Consumer: %s
    ",message);
                    buffer.remove(0);
                }
                
                cycle++;
            }
            
        }
    
    }

    3. Finally, implement the main class of the example by creating a class named Core and add the main() method to it.

    package com.packtpub.java7.concurrency.chapter3.recipe7.core;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Exchanger;
    
    import com.packtpub.java7.concurrency.chapter3.recipe7.task.Consumer;
    import com.packtpub.java7.concurrency.chapter3.recipe7.task.Producer;
    
    /**
     * Main class of the example
     *
     */
    public class Main {
    
        /**
         * Main method of the example
         * @param args
         */
        public static void main(String[] args) {
            
            // Creates two buffers
            List<String> buffer1=new ArrayList<>();
            List<String> buffer2=new ArrayList<>();
            
            // Creates the exchanger
            Exchanger<List<String>> exchanger=new Exchanger<>();
            
            // Creates the producer
            Producer producer=new Producer(buffer1, exchanger);
            // Creates the consumer
            Consumer consumer=new Consumer(buffer2, exchanger);
            
            // Creates and starts the threads
            Thread threadProducer=new Thread(producer);
            Thread threadConsumer=new Thread(consumer);
            
            threadProducer.start();
            threadConsumer.start();
    
        }
    
    }

    The consumer begins with an empty buffer and calls Exchanger to synchronize with the producer. It needs data to consume. The producer begins its execution with an empty buffer. It creates 10 strings, stores it in the buffer, and uses the exchanger to synchronize with the consumer.

    At this point, both threads (producer and consumer) are in Exchanger and it changes the data structures, so when the consumer returns from the exchange() method, it will have a buffer with 10 strings. When the producer returns from the exchange() method, it will have an empty buffer to fill again. This operation will be repeated 10 times.

    If you execute the example, you will see how producer and consumer do their jobs concurrently and how the two objects interchange their buffers in every step. As it occurs with other synchronization utilities, the first thread that calls the exchange() method was put to sleep until the other threads arrived.

  • 相关阅读:
    压测场景下的 TIME_WAIT 处理
    拥抱云原生,Fluid结合JindoFS :阿里云OSS加速利器
    从DHTML、HTC、XHTML到AJAX
    altas(ajax)控件(一):多功能面板控件Accordion
    fedora7 常用软件安装
    Fedora7安装后的配置
    .net程序员的盲点(六):StringBuilder 和 String 的区别
    .net程序员的盲点(五):告诉你一个不一样的new
    .net程序员的盲点(四):索引器Indexers
    员工究竟渴望学到的是什么?-(杂谈-20070816)
  • 原文地址:https://www.cnblogs.com/huey/p/5554078.html
Copyright © 2011-2022 走看看