zoukankan      html  css  js  c++  java
  • 线程通信

    线程通信

    一 使用Synchronized的线程

    1.当线程在系统内运行时,线程的调度具有一定的透明性,程序通常无法准确控制线程的轮换执行,但java也提供了一些机制来保证线程协调运行。
    Object类提供了wait(),notify()和notifyAll()三个方法,这三个方法属于Object类,但是必须由同步监视器来调用,可以分为以下两种情况:
    (1)对于使用synchronized修饰的同步方法,因为该类的默认实例this就是同步监视器,所以可以在同步方法中直接调用这三个方法
    (2)对于使用synchronized修饰的同步代码块,同步监视器是synchronized后括号里的对象,所以必须使用该对象调用这三个方法

    2.wait():导致当前线程等待,直到其他线程调用该同步监视器的notify()方法或notifyAll()方法来唤醒该线程。无时间参数的wait会一直等待,直到其他线程通知,带毫秒的wait会等到指定时间后自动苏醒。调用wait()方法的当前线程会释放对该同步监视器的锁定。
    notify():唤醒在此同步监视器上等待的单个线程。如果所有线程都是在此同步监视器上等待,则会选择唤醒其中一个线程,选择是任意的。
    notifyAll():唤醒在此同步监视器上等待的所有线程。

    3.假设在系统中有两个线程,分别代表存款者和取钱者,系统要求存款者和取钱者不断重复存钱、取钱的动作,而且要求每当存款者将钱存入指定账户后,取钱者就立即取出该笔钱,不允许存款者连续两次存钱,也不允许取钱者连续两次取钱。

    程序中可以通过一个旗标来标识账户中是否已有存款,当旗标为false时,账户中没有存款,存款者线程可以向下执行,当存款者存款后,旗标为true,并调用notify()或notifyAll()方法来唤醒其他线程,并调用wait()让存款者线程等待,取款者的操作类似。

    Account.java

    public class Account {
        private String accountNo;
        private double balance;
        private boolean flag=false;
        public Account(String accountNo,double balance){
            this.accountNo=accountNo;
            this.balance=balance;
        }
    
        //因为账户余额不可以随便更改,所以只为balance提供getter方法
        public double getBalance() {
            return balance;
        }
    
        public String getAccountNo() {
            return accountNo;
        }
    
        public void setAccountNo(String accountNo) {
            this.accountNo = accountNo;
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
    
            Account account = (Account) o;
    
            return accountNo.equals(account.accountNo);
    
        }
    
        @Override
        public int hashCode() {
            return accountNo.hashCode();
        }
    
        public synchronized void draw(double drawAmount){
            try{
                if(!flag){
                    wait();
                }else{
                    System.out.println(Thread.currentThread().getName()+"取钱:"+drawAmount);
                    balance-=drawAmount;
                    System.out.println("账户余额为:"+balance);
                    flag=false;
                    notifyAll();
                }
            }catch(InterruptedException ex){
                ex.printStackTrace();
            }
        }
    
        public synchronized void depoist(double depoistAmount){
            try{
                if(flag){
                    wait();
                }else{
                    System.out.println(Thread.currentThread().getName()+"存款:"+depoistAmount);
                    balance+=depoistAmount;
                    System.out.println("账户余额为:"+balance);
                    flag=true;
                    notifyAll();
                }
            }catch(InterruptedException ex){
                ex.printStackTrace();
            }
        }
    }

    DrawThread.java 

    public class DrawThread extends Thread {
        private Account account;
        private double drawAmount;
    
        public DrawThread(String name, Account account, double drawAmount) {
            super(name);
            this.account = account;
            this.drawAmount = drawAmount;
        }
        public void run(){
            for(int i=0;i<100;i++) {
                account.draw(drawAmount);
            }    
        }
    }
    

    DepositThread.java

    public class DepositThread extends Thread {
        private Account account;
        private double depositAmount;
        public DepositThread(String name, Account account, double depositAmount) {
            super(name);
            this.account = account;
            this.depositAmount = depositAmount;
        }
        public void run(){
            for(int i=0;i<100;i++){
                account.depoist(depositAmount);
            }
        }
    }

    DrawTest.java 

    public class DrawTest {
        public static void main(String[] args){
            Account acct=new Account("1234567",0);
            new DrawThread("取钱者",acct,800).start();
            new DepositThread("存款者甲",acct,800).start();
            new DepositThread("存钱者乙",acct,800).start();
            new DepositThread("存钱者丙",acct,800).start();
        }
    }

    结果:
    存款者甲存款:800.0
    账户余额为:800.0
    取钱者取钱:800.0
    账户余额为:0.0


    从结果可以发现,三个存款者线程随机的向账户中存款,只有一个取款者取钱。但是程序最后被阻塞无法继续向下执行,这是因为三个存款者线程共有300此存款操作,而一个取钱者线程只有100次取钱操作,程序最后被阻塞,如下:


    存钱者丙存款:800.0
    账户余额为:800.0

    这里的阻塞并不是死锁,只是取钱者线程已经执行完毕,而存款者线程还在等待其他线程来取钱而已,并不是等待其他线程释放同步监视器。

    二 使用Condition控制线程通信

    1.如果程序中直接使用Lock对象来保证同步,则系统中不存在隐式的同步监视器,就不能使用wait(),notify(),notifyAll()进行线程通信了。Java提供了一个Condition来保持协调,使用Condition可以让那些已经得到Lock对象却无法继续执行的线程释放Lock锁,也可以唤醒其他处于等待的线程。Condition实例被绑定在了一个Lock对象上,只要调用Lock对象的newCondition()方法即可。

    2.Condition提供了如下三个方法:
    await():同wait()方法,但是该方法有更多的变体,如long awaitNanos(long nanosTimeout),void awaitUninterruptibly(),awaitUtil(Date deadline)等
    signal():同notify()
    signalAll():同notifyAll()

    3.Account.java

    public class Account {
        private final Lock lock=new ReentrantLock();
        private final Condition cond=lock.newCondition();
        private String accountNo;
        private double balance;
        private boolean flag=false;
        public Account(String accountNo,double balance){
            this.accountNo=accountNo;
            this.balance=balance;
        }
    
        //因为账户余额不可以随便更改,所以只为balance提供getter方法
        public double getBalance() {
            return balance;
        }
    
        public String getAccountNo() {
            return accountNo;
        }
    
        public void setAccountNo(String accountNo) {
            this.accountNo = accountNo;
        }
    
        @Override
        public boolean equals(Object o) {
            if (this == o) return true;
            if (o == null || getClass() != o.getClass()) return false;
    
            Account account = (Account) o;
    
            return accountNo.equals(account.accountNo);
    
        }
    
        @Override
        public int hashCode() {
            return accountNo.hashCode();
        }
    
        public synchronized void draw(double drawAmount){
            lock.lock();
            try{
                if(!flag){
                    cond.await();
                }else{
                    System.out.println(Thread.currentThread().getName()+"取钱:"+drawAmount);
                    balance-=drawAmount;
                    System.out.println("账户余额为:"+balance);
                    flag=false;
                    cond.signalAll();
                }
            }catch(InterruptedException ex){
                ex.printStackTrace();
            }finally{
                lock.unlock();
            }
        }
    
        public synchronized void deposit(double depositAmount){
            lock.lock();
            try{
                if(flag){
                    cond.await();
                }else{
                    System.out.println(Thread.currentThread().getName()+"存款:"+depositAmount);
                    balance+=depositAmount;
                    System.out.println("账户余额为:"+balance);
                    flag=true;
                    cond.signalAll();
                }
            }catch(InterruptedException ex){
                ex.printStackTrace();
            }finally{
                lock.unlock();
            }
        }
    }

    其他的几个java文件不变,运行结果是一样的。

    三 使用阻塞队列(BlockingQueue)控制线程通信
    1.BlockingQueue是Queue的子接口,但它的主要用途并不是作为容器,而是作为线程同步的工具。它的一个特征是:当生产者线程试图向BlockingQueue中放入元素时,如果该队列已满,则该线程被阻塞;当消费者线程试图从BlockingQueue中取出元素时,如果该队列已空,则该线程被阻塞。程序的两个线程交替向BlockingQueue中放入、取出元素,即可很好的控制线程的通信。

    2.提供了两个支持阻塞的方法:
    put(E e):试图把E元素放入BlockingQueue中,如果队列的元素已满,则阻塞该线程;
    take():尝试从BlockingQueue的头部取出元素,如果队列的元素已空,则阻塞该线程;

    3.BlockingQueue继承了Queue接口,可以使用Queue接口中的方法,归纳起来分为三组:
    (1)在队列尾部插入元素,包括add(E e),offer(E e),put(E e),当队列已满时,分别会抛出异常、返回false、阻塞队列;
    (2)在队列头部删除并返回删除的元素,包括remove(),poll()和take(),当队列已空时,分别会抛出异常、返回false、阻塞队列;
    (3)在队列头部取出但不删除元素,包括element()和peek()方法,当队列已空时,着三个方法分别抛出异常、返回false

    4.Java7之后又新增了一些阻塞队列,包含5个实现类:
    (1)ArrayBlockingQueue:基于数组实现的BlockingQueue队列;
    (2)LinkedBlockingQueue:基于链表实现的BlockingQueue队列;
    (3)PriorityBlockingQueue:它并不是标准的阻塞队列,当调用remove()扥方法取出元素时,并不是取出队列中存在时间最长的元素,而是队列中最小的元素。判断元素的大小可根据元素(实现Comparable接口)的本身大小来自然排序,也可以使用Comparator进行定制排序。
    (4)SynchronousQueue:同步队列,对该队列的存取操作必须交替进行;
    (5)DelayQueue:它是一个特殊的BlockingQueue,底层基于PriorityBlockingQueue实现,并要求所有的集合元素都要实现Delay接口。

    5.举例

    public class BlockingQueueTest{
        public static void main(String[] args) throws Exception{
            BlockingQueue<String> bq=new ArrayBlockingQueue<String>(2);
            bq.put("java");
            bq.put("study");
            bq.put("javaee");  //阻塞线程
        }
    }

    四 线程池
    1.系统启动一个新线程的成本是比较高的,因为涉及到与操作系统的交互,使用线程池可以很好的提高性能,尤其是当程序中需要创建大量生存期很短的线程时,更要考虑使用线程池。线程池在系统启动时就创建大量的空闲的线程,程序将一个Runnable对象或Callable对象传给线程池,线程池就会启动一个线程来执行它的run()或call()方法,当方法结束后,该线程并不会死亡,而是再次返回线程池中成为空闲状态,等待下一个Runnable对象的run()或call()方法。使用线程池还可以控制系统中并发线程的数量,当系统中包含大量并发线程时,会导致系统性能剧烈下降,甚至导致JVM崩溃。

    2.Java5之后支持内建线程池,通过一个Executors工厂类来产生线程池,该工厂类包含以下几个静态工厂方法来创建线程池:
    (1)newCachedThreadPool():创建一个具有缓存功能的线程池
    (2)newFixedThreadPool( int nThreads):创建一个可重用的、具有固定线程数的线程池
    (3)newSingleThreadPool():创建一个只有单线程的线程池
    (4)newScheduledThreadPool(int corePoolSize):创建具有指定线程数的线程池,它可以在指定延迟后执行线程任务。
    (6)newSingleThreadScheduledExecutor():创建只有一个线程的线程池,它可以在指定延迟后执行线程任务。
    (7)ExecutorService newWorkStealingPool(int parallelism):创建持有足够的线程的线程池来支持给定的并行级别,该方法还会使用多个队列来减少竞争
    (8)ExecutorService newWorkStealingPool():上一个方法的简化版本,如果当前机器有4个CPU,则目标并行级别被设置为4,也就是相当于前一个方法传入4作为参数
    注意:前三个方法返回一个ExecutorService对象,该对象代表一个线程池,可以执行Runnable或Callable对象所代表的线程;中间两个方法返回ScheduledExecutorService线程池,它可以在指定延迟后执行线程任务;后两个是Java8新增的,充分利用了多CPU并行的能力,这两个方法生成的work stealing池,相当于后台线程池,如果所有的后台线程都死亡了,work stealing池中的线程会自动死亡。

    3.ExecutorService里提供了如下三个方法:
    (1)Future<?> submit(Runnable task): 将一个Runnable对象提交给指定的线程池,线程池将在有空闲线程时执行Runnable对象代表的任务。这里的Future对象代表Runnable任务的返回值,但run()方法无返回值,所以在run()方法执行结束后返回null。但可以调用Future的isDone(),isCancelled()来获得Runnable对象的执行状态。
    (2)<T> Future<T> submit(Runnable task, T result): 这里的result显式指定线程执行结束的返回值,所以Future对象将在run()方法执行结束后返回result
    (3)<T> Future<T> submit(Callable<T> task): 这里提交的是Callable对象

    4.ScheduledExecutorService提供了4个方法:
    (1)ScheduledFuture<V> schedule(Callable<V> callable,long delay,TimeUnit unit):指定callable任务将在delay延迟后执行。
    (2)ScheduledFuture<?> schedule(Runnable command,long delay,TimeUnit unit):指定command任务将在delay延迟后执行。
    (3)ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit): 指定command任务将在delay延迟后执行,而且以设定频率重复执行,也就是说,在initialDelay后开始执行,一次在initialDelay+period,initialDelay+2*period...处重复执行。
    (4)ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit):
    创建并执行一个在给定初始延迟后首次启用的定期操作,随后在每一次执行终止和下一次执行开始之间都会存在给定的延迟。如果任务在某次执行时遇到异常,就会取消后续执行;否则,只会通过程序来显示取消或终止该任务。

    5.用完一个线程池后,应该调用该线程池的shutdown()方法,该方法将启动线程池的关闭序列,线程池不再接收新任务,但会将所有已提交任务执行完成;也可以调用shutdownNow()方法来关闭线程池,该方法试图停止所有正在执行的活动任务,暂停处理正在等待的任务,并返回等待执行的任务列表。

    6.举例

    public class ThreadPoolTest {
        public static void main(String[] args) throws Exception{
            ExecutorService pool= Executors.newFixedThreadPool(6);
            Runnable target= ()->{
                for(int i=0;i<100;i++){
                    System.out.println(Thread.currentThread().getPriority()+"的i值为:"+i);
                }
            };
            //向线程池中提交两个线程
            pool.submit(target)
            pool.submit(target);
            pool.shutdown();
        }
    }

    五 ThreadLocal类

    1.java为ThreadLocal类增加了泛型支持,使用ThreadLocal类可以简化多线程编程时的并发访问,使用这个工具类可以很简捷的隔离多线程程序的竞争资源。

    2.ThreadLocal,是Thread Local Variable (线程局部变量) 的意思,它的功能很简单,就是为每一个使用该变量的线程都提供一个变量值的副本,使每一个线程都可以独立的改变自己的副本,而不会与其他线程的副本冲突。从线程的角度看,就好像每一个线程都完全拥有该变量一样。

    3.ThreadLocal类只提供了三个public方法:
    T get():返回此线程局部变量中当前线程副本中的值
    void remove():删除此线程局部变量中当前线程的值
    void set(T value):设置此线程局部变量中当前线程副本中的值

    4.ThreadLocalTest.java

    class Account{
        //定义一个ThreadLocal类型的变量,该变量将是一个线程局部变量,每个线程都会保留该变量的一个副本
        private ThreadLocal<String> name=new ThreadLocal<>();
        public Account(String str){
            this.name.set(str);
            //用于访问当前线程的name的副本
            System.out.println("---"+this.name.get());
        }
        public String getName(){
            return name.get();
        }
        public void setName(String str){
            this.name.set(str);
        }
    }
    class MyTest extends Thread{
        private Account account;
    
        public MyTest(String name, Account account) {
            super(name);
            this.account = account;
        }
        public void run(){
            for(int i=0;i<10;i++){
                if(i==6) {
                    account.setName(getName());
                }
                System.out.println(account.getName()+"账户的i值:"+i);
            }
        }
    }
    public class ThreadLocalTest {
        public static void main(String[] args){
            //虽然两个线程共享同一个账户,即只有一个账户名,但由于账户名是ThreadLocal类型的,所以每个线程都完全拥有各自的账户名副本,因此在i==6之后,将看到两个线程访问同一个账户时出现不同的账户名
            Account at=new Account("初始名");
            new MyTest("线程甲",at).start();
            new MyTest("线程乙",at).start();
        }
    }

    结果:

    ---初始名
    null账户的i值:0
    null账户的i值:0
    null账户的i值:1
    null账户的i值:1
    null账户的i值:2
    null账户的i值:2
    null账户的i值:3
    null账户的i值:3
    null账户的i值:4
    null账户的i值:4
    null账户的i值:5
    null账户的i值:5
    线程甲账户的i值:6
    线程乙账户的i值:6
    线程甲账户的i值:7
    线程乙账户的i值:7
    线程甲账户的i值:8
    线程乙账户的i值:8
    线程甲账户的i值:9
    线程乙账户的i值:9

    实际上账户名有三个副本,主线程一个,另外启动的两个线程各一个,它们的值互不干扰,每个线程都完全拥有自己的ThreadLocal变量。ThreadLocal将需要并发访问的资源复制多份,每个线程拥有一份资源,每个线程都拥有自己的线程副本,从而也就没有必要对该变量进行同步了。ThreadLocal提供了线程安全的共享对象,在编写多线程代码时,可以把不安全的整个变量封装进ThreadLocal,或者把该对象与线程相关的状态使用ThreadLocal保存。

    5.ThreadLocal并不能代替同步机制,两者面向的问题领域不同,同步机制是为了同步多个线程对相同资源的并发访问,是多个线程之间进行通信的有效方式,而ThreadLocal是为了隔离多个线程的数据共享,从根本上避免多个线程之间对共享资源的竞争。

     

  • 相关阅读:
    [LeetCode] 394. Decode String 解码字符串
    [LeetCode] 393. UTF-8 Validation 编码验证
    [LeetCode] Is Subsequence 是子序列
    [LintCode] Maximum Gap 求最大间距
    [LintCode] Nuts & Bolts Problem 螺栓螺母问题
    [LintCode] Kth Smallest Number in Sorted Matrix 有序矩阵中第K小的数字
    [LeetCode] Perfect Rectangle 完美矩形
    LaTex Remove Left Margin 去除左边空间
    LaTex Font Size 字体大小命令
    [LintCode] Continuous Subarray Sum 连续子数组之和
  • 原文地址:https://www.cnblogs.com/lyy-2016/p/6286279.html
Copyright © 2011-2022 走看看