zoukankan      html  css  js  c++  java
  • Java: Concurrency

    Thread life-cycle

    new: the thread is newly begun.

    runnable: the thread is executing its task.

    waiting: the thread waits for another thread to perform a task. When another thread notifies it to continue, it transitions back to runnable.

    timed waiting:

    1) A runnable thread transitions to timed waiting if it provides an optional wait interval when waiting for another thread to perform a task. When the thread is notified by another thread or when the specified timed interval expires – whichever comes first, it returns to runnable.

    2) A runnable thread also transitions to timed waiting if it is put to sleep for a designated sleep interval, after which it returns to runnable.

    blocked: A runnable thread transitions to blocked when it attempts to perform a task (e.g. I/O request) that cannot be completed immediately and it must temporarily wait until that task completes.

    Timed waitingwaiting and blocked threads cannot use a processor even if one is available.

    terminated: a.k.a. dead. A runnable thread transitions to terminated when it successfully completes its task or otherwise terminates (e.g. due to an error).

    Operating-system view of Runnable state

    The transition between ready and running states are handled solely by OS, JVM simply views them as runnable.

    Ready: when a thread first transitions to runnable from the new state.

    Running: A ready thread transitions to running when OS assigns it to a processor (i.e. dispatching the thread).

    In most OSs, each thread is given a quantum (a.k.a. timeslice) to perform its task, when its quantum expires the thread returns to ready, and OS assigns another thread to the processor.

    Runnable 

    public class YourTask implements Runnable {
        
        public YourTask() {
            /*constructor*/
        }
        
        public void run() {
            try {
                /*perform task */
                Thread.sleep(sleepTime);//put to sleep
            } catch (InterruptedException exception) {
                exception.printStackTrace();
                Thread.currentThread().interrupt();
            }
        }
        
    }
    public static void main(String[] args) {
        YourTask task1=new PrintTask("task1");
        YourTask task2=new PrintTask("task2");
        YourTask task3=new PrintTask("task3");
            
        ExecutorService executorService=Executors.newCachedThreadPool();
            
        executorService.execute(task1);
        executorService.execute(task2);
        executorService.execute(task3);
            
        executorService.isShutdown();
    }

    Executor Framework

    e.g.

     1 public class YourTask implements Runnable {
     2     
     3     public YourTask() {
     4         /*constructor*/
     5     }
     6     
     7     public void run() {
     8         try {
     9             /*perform task */
    10 Thread.sleep(sleepTime);//put to sleep
    11 /* sleep is called only for demonstration cuz normally it’s unpredictable when and how long will  each threa perform its task. */
    12         } catch (InterruptedException exception) {
    13             exception.printStackTrace();
    14             Thread.currentThread().interrupt();
    15         }
    16     }
    17     
    18 }
    19 
    20 public static void main(String[] args) {
    21     PrintTask task1=new yourTask("task1");
    22     PrintTask task2=new yourTask("task2");
    23     PrintTask task3=new yourTask("task3");
    24     System.out.println("Starting Executor");
    25     
    26     ExecutorService executorService=Executors.newCachedThreadPool();
    27     
    28     executorService.execute(task1);
    29     executorService.execute(task2);
    30     executorService.execute(task3);
    31     
    32     executorService.isShutdown();
    33        executorService.awaitTermination(1, TimeUnit.MINUTES); 
    34 }

    Runnable interface

    specifies a task to execute concurrently with other tasks.

    run() contains the task to perform.

    Executor Interface

    ,executes Runnable objects, manages a thread pool.

    execute() accepts argument Runnable, which will be passed to one of the available threads (to perform its run()) in the thread pool.

    ExecutorService interface

    Extends Executor.

    Obtained by calling class Executors’static method newCachedThreadPool().

    shutdown() notifies the ExecutorService to stop accepting new tasks but continue executing tasks that have already been submitted.

    awaitTermination() returns control to its caller either when all tasks executing in the ExecutorService complete (return true) or when the specified timeout elapses(return false). Can be used to ask the main thread to wait for scheduled tasks to complete (because in practice main thread can terminates early while other threads of the program can continue executing).

    Producer-Consumer

    1 //home-made interface Buffer shared by Producer and Consumer threads
    2 public interface Buffer {
    3     //called by Producer
    4     public void blockingPut(int value) throws InterruptedException;
    5     //called by Consumer
    6     public int blockingGet() throws InterruptedException;
    7 }

    Java.util.concurrent -- use encapsulated synchronization class

    Classes from java.util.concurrent encapsulate the synchronization.

    ArrayBlockingQueue: A fully implemented thread-safe buffer class that implement interface BlockingQueue.

    put() places an element at the end of the BlockingQueue, waiting if the queue is full.

    take() removes an element from the head of the BlockingQueue, waiting if the queue is empty.

    import java.util.concurrent.ArrayBlockingQueue; 

     1 public class BlockingBuffer implements Buffer {
     2 
     3     private final ArrayBlockingQueue<Integer> buffer;
     4     
     5     public BlockingBuffer() {
     6         buffer=new ArrayBlockingQueue<Integer>(1);
     7     }
     8     
     9     public void blockingPut(int value) throws InterruptedException {
    10         buffer.put(value);
    11         /* other operations
    12         ...
    13         */
    14     }
    15     
    16     public int blockingGet() throws InterruptedException {
    17         int readValue=buffer.take();
    18         /* other operations
    19         ...
    20         */
    21         return readValue;
    22     }
    23 }

     Monitors: synchronized, wait, notify & notifyAll

    Every object has a monitor and a monitor lock. The monitor ensures that the object’s monitor lock is held by a maximum of 1 thread at any time.

    synchronized

    synchronized (object) { //object is normally ‘this’
        /* statements */
    }

    code placed in a synchronized statement is guarded by the monitor lock, a thread must acquire the lock to execute the guarded statements.

    synchronized methods

    Before executing, a synchronized instance method must acquire the lock on the object used to call the method.

    A static synchronized method must acquire the lock on the class used to call the method.

    Place all access to mutable data that may be shared by multiple threads inside synchronized statement or synchronized methods.

    Keep the duration of synchronized statement as short as possible to minimize the wait time for blocked thread. E.g. avoid performing I/O, length calculation.

    wait(): When a thread holding the monitor lock on an object determines that it cannot continue its task until some condition is satisfied, it call wait() on the synchronized obj to release the monitor lock on the synchronized object, then the thread switches to waiting state while other threads can enter the obj’s synchronized statements/methods.

    notify(): When a thread executing a synchronized statement/method completes or satisfies the condition on which another thread may be waiting, it can call notify() on the synchronized obj to allow a waiting thread to transition to runnable again.

    notifyAll(): If a thread call notifyAll() on the synchronized obj, all threads waiting for that monitor lock become eligible to reaquire the lock.

    e.g.

     1 public class SynchronizedBuffer implements Buffer {
     2     
     3     private int buffer=-1;
     4     private boolean occupied=false;
     5     
     6     public synchronized void blockingPut(int value) throws InterruptedException {
     7         //when there are no empty location, place thread in waiting state
     8         while (occupied) {
     9             System.out.println("Producer tries to write.");
    10             displayState("Buffer full. Producer waits.");
    11             wait();
    12         }
    13         //put value
    14         buffer=value;
    15         occupied=true;
    16         displayState("Producer writes "+buffer);
    17         //tell waiting threads to enter runnable state
    18         notifyAll();
    19     } //release lock on SynchronizedBuffer
    20     
    21     public synchronized int blockingGet() throws InterruptedException {
    22         //when there's no data to read, place thread in waiting state
    23         while (!occupied) {
    24             System.out.println("Consmer tries to read.");
    25             displayState("Buffer empty. Consumer waits.");
    26             wait();
    27         }
    28         //retrieve value
    29         occupied=false;
    30         displayState("Consumer reads "+buffer);
    31         //tell waiting threads to enter runnable state
    32         notifyAll();
    33         return buffer;
    34     } //release lock on SynchronizedBuffer
    35     
    36     private synchronized void displayState(String operation) {
    37         System.out.printf("%-40s%d		%b%n%n", operation,buffer,occupied);
    38     }
    39 }

    Lock & Condition

    Lock interface

    lock() & unlock(): A thread calls Locks’s lock() method to acquire the lock. Once a Lock has been obtained by one thread, the Lock object will not allow another thread to obtain the Lock until the first thread releases the Lock (by calling the Lock’s unlock() method).

    Place unlock() in a finally block so that if an exception is thrown unlock must still be called.

    ReentrantLock: a class that is a basic implementation of Lock. Its constructor takes a Boolean argument that specifies fairness policy – true: “the longest-waiting thread will acquire the lock when it’s available”(avoids indefinite postpone); false: no guarantee as to which waiting thread will acquire the lock when it’s available.

    newCondition(): returns an object that implements Condition interface that are associated with that specific Lock.

    Condition interface

    If a thread that owns a Lock determines that it cannot continue with its task until some condition is satisfied, the thread can wait on a condition object.

    await(): A thread can call the Condition’s await(), this immediately releases the associated Lock and places the thread in waiting state for that Condition object.

    signal(): When a runnable thread completes its task, it can call Condition method signal() to allow a thread in that Condition’s waiting state to return to runnable.

    signalAll(): If a thread calls Condition method signalAll(), all threads waiting for that condition transition to runnable state, only one of them can obtain the Lock and other will wait again.

    e.g. /* somehow it does not work properly in Windows 10 because the lock isn’t released when the thread await on a condition, I don’t know why. It works fine in MAC OS X. */ 

     1 public class SynchronizedBuffer implements Buffer {
     2 
     3     private final Lock accessLock=new ReentrantLock();
     4     private final Condition canWrite=accessLock.newCondition();
     5     private final Condition canRead=accessLock.newCondition();
     6     private int buffer=-1;
     7     private boolean occupied=false;
     8     
     9     public void blockingPut(int value) throws InterruptedException {
    10         accessLock.lock(); //lock this object
    11         try {
    12             while (occupied) {
    13                 System.out.println("Producer tries to write.");
    14                 displayState("Buffer full. Producer waits.");
    15                 canWrite.await(); //wait until buffer is empty
    16             }
    17             buffer=value;
    18             occupied=true;
    19             displayState("Producer writes "+buffer);
    20             canRead.signalAll();
    21         } finally {
    22             accessLock.unlock();
    23         }
    24     }
    25     
    26     public int blockingGet() throws InterruptedException {
    27         int readValue=0;
    28         accessLock.lock();
    29         try {
    30             //if there's no data to read, place the thread in waiting state
    31             while (!occupied) {
    32                 System.out.println("Consumer tries to read.");
    33                 displayState("Buffer empty. Consumer waits.");
    34                 canRead.await();//wait on Condition canRead (full buffer) also release the lock implicitly
    35             }
    36             occupied=false;
    37             readValue=buffer;
    38             displayState("Consumer reads "+readValue);
    39             //signal any threads waiting for Condition canWrite (empty buffer)
    40             canWrite.signalAll();
    41         } finally {
    42             accessLock.unlock();
    43         }
    44         return readValue;
    45     }
    46     
    47     private void displayState(String operation) {
    48         try {
    49             accessLock.lock();
    50             System.out.printf("%-40s%d		%b%n%n", operation,buffer,occupied);
    51         } finally {
    52             accessLock.unlock();
    53         }
    54     }
    55     
    56 }

    SwingWorker – Multithreading with GUI

    event dispatch thread: All Swing applications have this single thread to handle interaction with the application’s GUI components. All tasks that require interaction with an application’s GUI are placed in an event queue and are executed sequentially by the event dispatch thread.

    Swing GUI component are not thread safe. Thread safety in GUI application is achieved not by synchronizing thread action, but by thread confinement.

    Thread confinement: Allowing just one thread (the event dispatch thread) to access Swing components.

    Problem: If an application must perform a lengthy computation in response to a user interaction, the event dispatch thread cannot attend to other tasks in the event queue while computation, this causes the GUI component to be unresponsive.

    Solution: to handle a long-running computation in a separate thread, freeing the event dispatch thread to continue managing other GUI interactions.

    SwingWorker<T,V> class: implements Runnable. When execute() is called, the object will be scheduled to perform an asynchronous task in a worker thread then update Swing components from the event dispatch thread based on the task’s results. 

    T – the type returned by doInBackground().

    V – the type passed between publish() and process() to handle intermediate results.

    The GUI components that will be manipulated by SwingWorker methods (e.g. process() or done()) should be passed to the SwingWorker subclass’s constructor and stored in the subclass object.

    doInBackground() defines a long computation and is called in a worker thread.

    done() executes on the event dispatch thread when doInBackground() returns.

    execute() schedules the SwingWorker object to be executed in a worker thread.

    get() waits for the result to be ready if necessary. 

    publish() sends intermediate results from doInBackground() to process().

    process() receives intermediate results from publish() and processes them on the event dispatch thread.

    setProgress() sets the progress property to notify any property change listener on the event dispatch thread of progress bar updates.

    e.g. use SwingWorker subclass to perform background calculation

     1 public class BackgroundCalculator extends SwingWorker<Long, Object> {
     2     
     3     private final JLabel resultJLabel; //some related UI object
     4     
     5     public BackgroundCalculator(int n, JLabel resultJLabel) {
     6         //constructor
     7     }
     8     
     9     public Long doInBackground() {
    10         //long-running task to performed in a worker thread
    11     }
    12     
    13     protected void done() {
    14         //code to run on the event dispatch thread when doinBackground returns
    15         try {
    16             //get the result of doInBackground and display it
    17             
    18             resultJLabel.setText(get().toString());
    19         } catch (InterruptedException ex) {
    20             /* if the current thred is interrupted while waiting for get() to return.
    21 But Since we call get() from done(), the computation will be complete before get() is called
    22 Thus this exception will not occur in this example. */
    23         } catch (ExecutionException ex) {
    24             /* if an exception occurs during the computation */
    25         }
    26     }
    27 
    28 }
     1 public class FibonacciNumbers extends JFrame {
     2     //button to activate the swingWorker subclass's task
     3     private final JButton goJButton=new JButton("Go");
     4     //ui component connected to the swingWorker subclass
     5     private final JLabel fibonacciJLabel=new JLabel();
     6     
     7     //Constructor
     8     public FibonacciNumbers() {
     9         /* super, setLayout, ...etc */
    10         goJButton.addActionListener(
    11             new ActionListener() {
    12                 public void ActionPerformed(ActionEvent event) {
    13                     /* retrieve user's input as swingWorker subclass's input
    14                     e.g. integer n. */
    15                     //create a task to perform calculation in background
    16                     BackgroundCalculator task=new BackgroundCalculator(n, fibonacciJLabel);
    17                     task.execute(); //execute the task
    18             });
    19         /* add GUI components, etc.*/
    20         
    21         
    22     }
    23     
    24 }

    e.g. update the GUI with intermediate results before the long calculation completes.

    -- using publish(), process() and setProgress().

    publish() -> sends prime numbers to process() as they’re found;

    process() -> displays found primes in a GUI component;

    setProgress() -> updates the progress property.

     1 public class PrimeCalculator extends SwingWorker<Integer, Integer> {
     2     
     3     /* GUI components & arrays for finding primes */
     4     private final boolean[] primes;
     5     
     6     //constructor
     7     public PrimeCalculator(int max, JTextArea intermediateJTextArea,
     8         JLabel statusJLabel, JButton getPrimesJButton,
     9         JButton cancelJButton) {
    10         /* stores parameters GUI component inside */
    11     }
    12     
    13     //calculation: find all primes up to max
    14     public Integer doInBackground() {
    15         //...
    16         if (isCancelled())
    17             return count;
    18         else {
    19             /* ... */
    20             /*put the worker thread to sleep for a few milliseconds between calls to publish()
    21             thus slow down the calculation and event dispatch thread can keep up with requests of update.
    22             Because too many requests piling up on the event dispatch thread will cause event queue out of memory.
    23              
    24             */
    25             Thread.sleep();
    26             //during calculation, set progress according to count of found primes
    27             setProgress(100*(i+1)/primes.length);    
    28             //during calculation, make a found prime i available for displaying an intermediate result
    29             publish(i);
    30         }
    31         /* ... */
    32     }
    33     
    34     //display found primes in a GUI component
    35     protected void process(List<Integer> publishedVals) {
    36         for (int i=0;i<publishedVals.size();i++)
    37             intermediateJTextArea.append(publishedVals.get(i)+"
    ");
    38     }
    39     
    40     //code to execute when doInBackground completes
    41     protected void done() {
    42         /* ... */
    43     }
    44         
    45 }

    -- In JFrame subclass, add PropertyChangeListener for the SwingWorker subclass to listen for progress changes.

     1 public class FindPrimes extends JFrame {
     2 
     3     //GUI components & the SwingWorker subclass
     4     private final JProgressBar progressJProgressBar=new JProgressBar();
     5     private PrimeCalculator calculator;
     6     
     7     //Consturctor
     8     public FindPrimes() {
     9         /* ...
    10         * Initialization
    11         * & reset GUI components
    12         * & get user input */
    13         //inside the actionPerformed() event listener method of the start button,
    14         //construct a new SwingWorker subclass object
    15         calculator = new PrimeCalculator(number,displayPrimesJTextArea,
    16             statusJLabel, getPrimesJButton, cancelJButton);
    17         //listen for progress bar property changes
    18         calculator.addPropertyChangeListener(
    19             new PropertyChangeListener() {
    20                 public void propertyChange(PropertyChangeEvent e) {
    21                     //if the changed property is progress, update the progresss bar
    22                     if (e.getPropertyName().contentEquals("progress")) {
    23                         int newValue=(integer)e.getNewValue();
    24                         progressJProgressBar.setValue(newValue);
    25                     }
    26                 }
    27             });
    28             calculator.execute();
    29     }
    30     
    31 }

    Parallel Sort & Parallel Stream

    ParallelSort()

    For sorting large arrays on multi-core systems, Arrays.parallelSort() is more efficient than Arrays.sort().

    Can compare the efficiency by Date/Time API:

    e.g.

    1 Instant sortStart=Instant.now();
    2 Arrays.sort(array1);
    3 Instant sortEnd=Instant.now();
    4 long sortTime=Duration.between(sortStart,sortEnd).toMillis();
    5 Instant parallelSortStart=Instant.now();
    6 Arrays.sort(array2);
    7 Instant parallelSortEnd=Instant.now();
    8 long parallelSortTime=Duration.between(parallelSortStart,parallelSortEnd).toMillis();
    9 String percentage=NumberFormat.getPercentInstance().format((double)sortTime/parallelSortTime);

    Parallel Stream

    Invoke parallel() on an existing stream to obtain a parallel stream thus enhance performance on multi-core system.

    Long stream stream2=Arrays.stream(values).parallel():

    Interface Callable

    Problem: Runnable’s run() cannot return a value, would require shared mutable data to pass the value back if the calling thread needs it.

    Solution: Callable.

    Callable interface

    call(): returns a value representing the result of its task.

    ExecutorService interface

    submit(): executes its Callable argument and returns an object of type Future.

    Future interface

    Represents the Callable’s future result.

    get(): blocks the calling thread and waits for the Callable to complete and return its result.

    Also provides methods that cancel a Callable’s execution/determine whether Callable was cancelled/determine whether the Callable completed its task.

    CompletableFuture class

    Implements the Future interface. Can asynchronously execute Runnables or Suppliers.

    e.g.

     1 //perform synchronous calculation tasks fibonacci()
     2 //TimeData is the specific return type of fibonacci()
     3 TimeData synchronousResult1=startFibonacci(45);
     4 TimeData synchronousResult2=startFibonacci(44);
     5          
     6 //perform asynchronous calculation tasks fibonacci()
     7 CompletableFuture<TimeData> futureResult1=
     8                 CompletableFuture.supplyAsync(()->startFibonacci(45));
     9 CompletableFuture<TimeData> futureResult2=
    10                 CompletableFuture.supplyAsync(()->startFibonacci(44));
    11 //wait for results from the asynchronous operations
    12 TimeData asynchronousResult1=futureResult1.get();
    13 TimeData asynchronousResult2=futureResult2.get();

     

  • 相关阅读:
    Tomcat安装(安装版)
    Selenium自动化测试(一)之环境搭建
    Windows快速启动应用高效搜索文件工具-Listary
    Python3之jsonpath使用和json转换
    Python3操作SQLite数据库
    初识面向对象
    忘记虚拟机中Linux的登录密码解决办法
    win10自带虚拟机的使用(Hyper-v)
    nigx下配置tp5.1路由
    PHP无限极菜单
  • 原文地址:https://www.cnblogs.com/RDaneelOlivaw/p/12125724.html
Copyright © 2011-2022 走看看