zoukankan      html  css  js  c++  java
  • day 04 Java并发多线程


    http://www.cnblogs.com/hellocsl/p/3969768.html?utm_source=tuicool&utm_medium=referral
    PS:而JVM 每遇到一个线程,就为其分配一个Program Counter Register(程序计数器), VM Stack(虚拟机栈)和Native Method Stack (本地方法栈)
    引用别人的博客,关于Java内存管理,博客很好

     并发编程的挑战

     PS: 轻量级volatile

    ---------------------------------------------------

     

    PS: 上述   看清是四种   状态。   目的是为了  减小获得和释放锁的性能消耗

     


    PS: 出现那么多的锁就是为了减少 获得锁和释放锁的 性能消耗; 而且锁只能升级不能降级

    /**
     * Alipay.com Inc.
     * Copyright (c) 2004-2015 All Rights Reserved.
     */
    package chapter02;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * 计数器
     * 
     * @author tengfei.fangtf
     * @version $Id: Snippet.java, v 0.1 2015-7-31 下午11:32:42 tengfei.fangtf Exp $
     */
    public class Counter {
    
        private AtomicInteger atomicI = new AtomicInteger(0);
        private int           i       = 0;
    
        public static void main(String[] args) {
            final Counter cas = new Counter();
            List<Thread> ts = new ArrayList<Thread>(600);
            long start = System.currentTimeMillis();
            for (int j = 0; j < 100; j++) {
                Thread t = new Thread(new Runnable() {
                    @Override
                    public void run() {
                        for (int i = 0; i < 10000; i++) {
                            cas.count();
                            cas.safeCount();
                        }
                    }
                });
                ts.add(t);
            }
            for (Thread t : ts) {
                t.start();
    
            }
            // 等待所有线程执行完成
            for (Thread t : ts) {
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
    
            }
            System.out.println(cas.i);
            System.out.println(cas.atomicI.get()); //获取值
            System.out.println(System.currentTimeMillis() - start);
        }
    
        /**
         * 使用CAS实现线程安全计数器
         */
        private void safeCount() {
            for (;;) {
                int i = atomicI.get();
                boolean suc = atomicI.compareAndSet(i, ++i);
                if (suc) {
                    break;
                }
            }
        }
    
        /**
         * 非线程安全计数器
         */
        private void count() {
            i++;//一个cpu加了,但是另一个不一定加
        }
    
    }

    PS: 原子性的那个操作一直都不会变

       

     

     

    PS: main方法天生就是一个多线程

    package chapter04;
    
    import java.lang.management.ManagementFactory;
    import java.lang.management.ThreadInfo;
    import java.lang.management.ThreadMXBean;
    
    /**
     * 6-1
     */
    public class MultiThread {
    
        public static void main(String[] args) {
            // 获取Java线程管理MXBean
            ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();
            // 不需要获取同步的monitor和synchronizer信息,仅仅获取线程和线程堆栈信息
            ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(false, false);
            // 遍历线程信息,仅打印线程ID和线程名称信息
            for (ThreadInfo threadInfo : threadInfos) {
                System.out.println("[" + threadInfo.getThreadId() + "] " + threadInfo.getThreadName());
            }
        }
    }

    PS : suspend 、resume 、stop方法 

    PS: 优雅的结束线程

    package chapter04;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 6-9
     */
    public class Shutdown {
        public static void main(String[] args) throws Exception {
            Runner one = new Runner();
            Thread countThread = new Thread(one, "CountThread");
            countThread.start();
            // 睡眠1秒,main线程对CountThread进行中断,使CountThread能够感知中断而结束
            TimeUnit.SECONDS.sleep(1);
            countThread.interrupt();
            Runner two = new Runner();
            countThread = new Thread(two, "CountThread");
            countThread.start();
            // 睡眠1秒,main线程对Runner two进行取消,使CountThread能够感知on为false而结束
            TimeUnit.SECONDS.sleep(1);
            two.cancel();
        }
    
        private static class Runner implements Runnable {
            private long             i;
    
            private volatile boolean on = true;
    
            @Override
            public void run() {
                while (on && !Thread.currentThread().isInterrupted()) {
                    i++;
                }
                System.out.println("Count i = " + i);
            }
    
            public void cancel() {
                on = false;
            }
        }
    }

    PS: 当java虚拟机中存在 Daemon线程的时候,java虚拟机会退出

    package chapter04;
    
    /**
     * 6-5
     */
    public class Daemon {
    
        public static void main(String[] args) {
            Thread thread = new Thread(new DaemonRunner());
            thread.setDaemon(true);
            thread.start();
        }
    
        static class DaemonRunner implements Runnable {
            @Override
            public void run() {
                try {
                    SleepUtils.second(100);
                } finally {
                    System.out.println("DaemonThread finally run.");
                }
            }
        }
    }
    ///什么也没有执行

    Java并发编程:深入剖析ThreadLocal

    -------------------------------------------------------------------------------------------------------------------------

    PS:进程通俗的讲就是一个应用程序,  太会在  内存中分配独立的运行空间
    线程:
    它是位于进程中,负责当前进程中的某个具备独立运行资格的空间。

    .1.1.    synchronized

    package cn.itcast_01_mythread.thread.testThread;
    
    public class MySynchronized {
        public static void main(String[] args) {
            final MySynchronized mySynchronized = new MySynchronized();//这才是公用的锁
            final MySynchronized mySynchronized2 = new MySynchronized();
            new Thread("thread1") {
                public void run() {
                    synchronized (mySynchronized) {
                    try {
                        System.out.println(this.getName()+" start");
                        int i =1/0;   //如果发生异常,jvm会将锁释放
                        Thread.sleep(5000);
                        System.out.println(this.getName()+"醒了");
                        System.out.println(this.getName()+" end");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    }
                }
            }.start();
            new Thread("thread2") {
                public void run() {
                    synchronized (mySynchronized) {         //争抢同一把锁时,线程1没释放之前,线程2只能等待
    //                    synchronized (mySynchronized2) {    //如果不是一把锁,可以看到两句话同时打印
                        System.out.println(this.getName()+" start");
                        System.out.println(this.getName()+" end");
                        
                    }
                }
            }.start();
        }
    }
    • synchronized的缺陷
    
    

        synchronized是java中的一个关键字,也就是说是Java语言内置的特性。

    
    

         如果一个代码块被synchronized修饰了,当一个线程获取了对应的锁,并执行该代码块时,其他线程便只能一直等待,等待获取锁的线程释放锁,而这里获取锁的线程释放锁只会有两种情况:

    
    

         1)获取锁的线程执行完了该代码块,然后线程释放对锁的占有;

    
    

          2)线程执行发生异常(他挂了),此时JVM会让线程自动释放锁。

    ------------------------------------------------------------
    PS:也就是 释放锁有两种方式 自己释放;自己挂了。

    ---------------------------------------------------------------------------------------

    PS :因为Synchronoied使用起来不方便,java5以后出现了Lock

    PS : CountDownLatch、ReentrantLock和ReentrantReadWriteLock都是同步组件

    package chapter05;
    
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * 10-20
     */
    public class ConditionUseCase {
        Lock      lock      = new ReentrantLock();
        Condition condition = lock.newCondition();
    
        public void conditionWait() throws InterruptedException {
            lock.lock();
            try {
                condition.await();
            } finally {
                lock.unlock();
            }
        }
    
        public void conditionSignal() throws InterruptedException {
            lock.lock();
            try {
                condition.signal();
            } finally {
                lock.unlock();
            }
        }
    }

     

    Answer: 1.因为HashMap会照成环形数据结构,一直有next,然后就死锁了

                  2.HashTable使用synchronied并发效率非常低下

     

    PS: Synchronizd

    PS :ReentrantLock是唯一实现Lock的接口

    PS:lock的用法

    import java.util.ArrayList;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class MyLockTest {
        private static ArrayList<Integer> arrayList = new ArrayList<Integer>();
        static Lock lock = new ReentrantLock(); // 注意这个地方,因为Lock是一个借口,通常ReentrantLock进行实现
        public static <E> void main(String[] args) {
            new Thread() {
                public void run() {
                    Thread thread = Thread.currentThread();
                    
                    lock.lock();//获取锁
                    try {
                        System.out.println(thread.getName() + "得到了锁");
                        for (int i = 0; i < 5; i++) {
                            arrayList.add(i);
                        }
                    } catch (Exception e) {
                        // TODO: handle exception
                    } finally {
                        System.out.println(thread.getName() + "释放了锁");
                        lock.unlock();//不释放就死锁了
                    }
    
                };
            }.start();
            
            new Thread() {
                public void run() {
                    Thread thread = Thread.currentThread();
                    lock.lock();
                    try {
                        System.out.println(thread.getName() + "得到了锁");
                        for (int i = 0; i < 5; i++) {
                            arrayList.add(i);
                        }
                    } catch (Exception e) {
                        // TODO: handle exception
                    } finally {
                        System.out.println(thread.getName() + "释放了锁");
                        lock.unlock();
                    }
    
                };
            }.start();
        }
    
    }
    PS:每次需要手动关闭锁

    tryLock()方法是有返回值的,它表示用来尝试获取锁,如果获取成功,则返回true,如果获取失败(即锁已被其他线程获取),则返回false,也就说这个方法无论如何都会立即返回。在拿不到锁时不会一直在那等待。

    PS:他会尝试获取锁
    
    

     

    import java.util.ArrayList;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    /**
     * 观察现象:一个线程获得锁后,另一个线程取不到锁,不会一直等待
     * @author
     *
     */
    public class MyTryLock {
    
        private static ArrayList<Integer> arrayList = new ArrayList<Integer>();
        static Lock lock = new ReentrantLock(); // 注意这个地方
        public static void main(String[] args) {
            
            new Thread() {
                public void run() {
                    Thread thread = Thread.currentThread();
                    boolean tryLock = lock.tryLock();
                    System.out.println(thread.getName()+" "+tryLock);
                    if (tryLock) {
                        try {
                            System.out.println(thread.getName() + "得到了锁");
                            for (int i = 0; i < 5; i++) {
                                arrayList.add(i);
                            }
                        } catch (Exception e) {
                            // TODO: handle exception
                        } finally {
                            System.out.println(thread.getName() + "释放了锁");
                            lock.unlock();
                        }
                    }
                };
            }.start();
    
            new Thread() {
                public void run() {
                    Thread thread = Thread.currentThread();
                    boolean tryLock = lock.tryLock();
                    System.out.println(thread.getName()+" "+tryLock);
                    if (tryLock) {
                        try {
                            System.out.println(thread.getName() + "得到了锁");
                            for (int i = 0; i < 5; i++) {
                                arrayList.add(i);
                            }
                        } catch (Exception e) {
                            // TODO: handle exception
                        } finally {
                            System.out.println(thread.getName() + "释放了锁");
                            lock.unlock();
                        }
                    }
    
                };
            }.start();
        }
    
    
    }
    PS:一旦某个线程tryLock false以后, 其他线程就获取不了 了


    当两个线程同时通过lock.lockInterruptibly()想获取某个锁时,假若此时线程A获取到了锁,而线程B只有等待,那么对线程B调用threadB.interrupt()方法能够中断线程B的等待过程(并非中断A的操作)

      注意,当一个线程获取了锁之后,是不会被interrupt()方法中断的。

      因此当通过lockInterruptibly()方法获取某个锁时,如果不能获取到,只有进行等待的情况下,是可以响应中断的。

    
    
    package cn.itcast_01_mythread.thread.lock;
    
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    /**
     * 观察现象:如果thread-0得到了锁,阻塞。。。thread-1尝试获取锁,如果拿不到,则可以被中断等待
     * @author
     *
     */
    public class MyInterruptibly {
         private Lock lock = new ReentrantLock();  
         
            public static void main(String[] args)  {
                MyInterruptibly test = new MyInterruptibly();
                MyThread thread0 = new MyThread(test);
                MyThread thread1 = new MyThread(test);
                thread0.start();
                thread1.start();
                 
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                thread1.interrupt();    //如果线程1 陷入等待,  这可以让他结束等待
                System.out.println("=====================");
            }  
             
            public void insert(Thread thread) throws InterruptedException{
                lock.lockInterruptibly();   //注意,如果需要正确中断等待锁的线程,必须将获取锁放在外面,然后将InterruptedException抛出
                try {  
                    System.out.println(thread.getName()+"得到了锁");

    //long startTime = System.currentTimeMillis();
    //for( ; ;) {
    /*if(System.currentTimeMillis() - startTime >= Integer.MAX_VALUE)
    break;*/
    //插入数据
    //}
                     Thread.sleep(5000);

    
                }
                finally {
                    System.out.println(Thread.currentThread().getName()+"执行finally");
                    lock.unlock();
                    System.out.println(thread.getName()+"释放了锁");
                }  
            }
        }
         
        class MyThread extends Thread {
            private MyInterruptibly test = null;
            public MyThread(MyInterruptibly test) {
                this.test = test;
            }
            @Override
            public void run() {
                 
                try {
                    test.insert(Thread.currentThread());
                } catch (Exception e) {
                    System.out.println(Thread.currentThread().getName()+"被中断");
                }
            }
    
    }

     PS : ReadWriteLock

    package cn.itcast_01_mythread.thread.lock;
    
    
    /**
     * 一个线程又要读又要写,用synchronize来实现的话,读写操作都只能锁住后一个线程一个线程地进行
     * @author
     *
     */
    public class MySynchronizedReadWrite {
        
        public static void main(String[] args)  {
            final MySynchronizedReadWrite test = new MySynchronizedReadWrite();
             
            new Thread(){
                public void run() {
                    test.get(Thread.currentThread());
                };
            }.start();
             
            new Thread(){
                public void run() {
                    test.get(Thread.currentThread());
                };
            }.start();
             
        }  
         
        public synchronized void get(Thread thread) {
            long start = System.currentTimeMillis();
            int i=0;
            while(System.currentTimeMillis() - start <= 1) {//执行时间不操过1s
                i++;
                if(i%4==0){
                System.out.println(thread.getName()+"正在进行写操作");
                }else {
                    System.out.println(thread.getName()+"正在进行读操作");    
                }
            }
            System.out.println(thread.getName()+"读写操作完毕");
        }
    
    }

    Thread-0正在进行读操作
    Thread-0正在进行读操作
    Thread-0正在进行读操作
    Thread-0正在进行写操作
    Thread-0正在进行读操作
    Thread-0正在进行读操作
    Thread-0正在进行读操作
    Thread-0正在进行写操作
    Thread-0正在进行读操作
    Thread-0正在进行读操作
    Thread-0正在进行读操作
    Thread-0正在进行写操作
    Thread-0正在进行读操作
    Thread-0正在进行读操作
    Thread-0正在进行读操作
    Thread-0正在进行写操作
    Thread-0正在进行读操作
    Thread-0正在进行读操作
    Thread-0正在进行读操作
    Thread-0正在进行写操作
    Thread-0正在进行读操作
    Thread-0正在进行读操作
    Thread-0正在进行读操作
    Thread-0正在进行写操作
    Thread-0正在进行读操作
    Thread-0正在进行读操作
    Thread-0正在进行读操作
    Thread-0读写操作完毕
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行写操作
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行写操作
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行写操作
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行写操作
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行写操作
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行写操作
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行写操作
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行读操作
    Thread-1正在进行写操作
    Thread-1读写操作完毕

     
    读写锁:读的时候,其他线程还可以操作,写的时候绝对不行。   里面有两个方法
    import java.util.concurrent.locks.ReentrantReadWriteLock;
    
    /**
     * 使用读写锁,可以实现读写分离锁定,读操作并发进行,写操作锁定单个线程
     * 
     * 如果有一个线程已经占用了读锁,则此时其他线程如果要申请写锁,则申请写锁的线程会一直等待释放读锁。
     * 如果有一个线程已经占用了写锁,则此时其他线程如果申请写锁或者读锁,则申请的线程会一直等待释放写锁。
    PS : 多线程就是可以同时读操作,写操作的时候不可以读,读的时候不可以写 *
    @author * *bee:读的时候多个线程可以同时操作, 写线程不能操作 */

    public class MyReentrantReadWriteLock { private ReentrantReadWriteLock rwl = new ReentrantReadWriteLock(); public static void main(String[] args) { final MyReentrantReadWriteLock test = new MyReentrantReadWriteLock(); new Thread(){ public void run() { test.get(Thread.currentThread()); test.write(Thread.currentThread()); }; }.start(); new Thread(){ public void run() { test.get(Thread.currentThread()); test.write(Thread.currentThread()); }; }.start(); } /** * 读操作,用读锁来锁定 * @param thread */ public void get(Thread thread) { rwl.readLock().lock(); try { long start = System.currentTimeMillis(); while(System.currentTimeMillis() - start <= 1) { System.out.println(thread.getName()+"正在进行读操作"); } System.out.println(thread.getName()+"读操作完毕"); } finally { rwl.readLock().unlock(); } } /** * 写操作,用写锁来锁定 * @param thread */ public void write(Thread thread) { rwl.writeLock().lock();; try { long start = System.currentTimeMillis(); while(System.currentTimeMillis() - start <= 1) { System.out.println(thread.getName()+"正在进行写操作"); } System.out.println(thread.getName()+"写操作完毕"); } finally { rwl.writeLock().unlock(); } } }

    ---------------------------------------------------

    1.3 关于volatile的介绍

    package cn.itcast_01_mythread.volatiletest;
    
    public class TestVolatile {
    
    	public static volatile int numb = 0;
    
    	public static void main(String[] args) throws Exception {
    
    		for (int i = 0; i < 100; i++) {
    
    			new Thread(new Runnable() {
    
    				@Override
    				public void run() {
    					for (int i = 0; i < 1000; i++) {
    						numb++;
    					}
    				}
    			}).start();
    
    		}
    		
    		Thread.sleep(2000);
    		System.out.println(numb);
    	}
    
    } 
    PS: 因为 每当启动一个线程的时候都会创建一个栈内存,他们共享着堆空间的数据。当想要对 堆上某个数据进行操作的时候,
    就会复制相应的数据到 自己的栈空间 进行操作, volatile就是为了各个线程间同步数据的问题。
    这和synchronized还不太一样,synchronized是完全把数据上锁了。 volatile还用是提高数据保存的位置。

    1.4 并发的执行

     

    PS:当抢小米手机的时候,通常会异步解耦。其实就是一个修改库存的过程,成千上万个用户同时访问服务器,服务器这边会使用线程池对线程进行管理,防止创建线程过多,服务器奔溃。
    同时,会使用任务消息队列 ,在java中使用 JMS规范的ActiveMQ(为了解决大并发的请求,放入编写好的消息队列中)来解决。比如,有5台手机,有100人发起请求,前五个线程可以获得手机,后面直接提示jj。这样一个解耦
    • PS: 线程是不可以无限增长的,所以用一个线程池进行管理
    package cn.itcast_01_mythread.pool;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry.Entry;
    
    public class TestPool {
    
        public static void main(String[] args) throws Exception {
            Future<?> submit = null;
            Random random = new Random();
            
            //创建固定数量线程池
            ExecutorService exec = Executors.newFixedThreadPool(4);
            
            //创建调度线程池
            //ScheduledExecutorService exec = Executors.newScheduledThreadPool(4);
            
            //用来记录各线程的返回结果
            ArrayList<Future<?>> results = new ArrayList<Future<?>>();
            
            for (int i = 0; i < 10; i++) {
                //fixedPool提交线程,runnable无返回值,callable有返回值
                submit = exec.submit(new TaskRunnable(i));
                /*submit = exec.submit(new TaskCallable(i));*/
                
                //对于schedulerPool来说,调用submit提交任务时,跟普通pool效果一致
                /*submit = exec.submit(new TaskCallable(i));*/
                //对于schedulerPool来说,调用schedule提交任务时,则可按延迟,按间隔时长来调度线程的运行
                //submit = exec.schedule(new TaskCallable(i), random.nextInt(10), TimeUnit.SECONDS);
                //存储线程执行结果
                results.add(submit);
                
            }
            
            
            //打印结果
            for(Future f: results){
                boolean done = f.isDone();
                System.out.println(done?"已完成":"未完成");  //从结果的打印顺序可以看到,即使未完成,也会阻塞等待
                System.out.println("线程返回future结果: " + f.get());
            }
            
            exec.shutdown();
            
        }
    }
    pool-1-thread-1 启动时间:1510802909
    未完成
    pool-1-thread-3 启动时间:1510802909
    pool-1-thread-4 启动时间:1510802909
    pool-1-thread-2 启动时间:1510802909
    pool-1-thread-1 is working...0
    线程返回future结果: null
    未完成
    pool-1-thread-1 启动时间:1510802909
    pool-1-thread-3 is working...2
    pool-1-thread-3 启动时间:1510802909
    pool-1-thread-1 is working...4
    pool-1-thread-1 启动时间:1510802909
    pool-1-thread-2 is working...1
    线程返回future结果: null
    pool-1-thread-2 启动时间:1510802910
    已完成
    线程返回future结果: null
    未完成
    pool-1-thread-1 is working...6
    pool-1-thread-3 is working...5
    pool-1-thread-1 启动时间:1510802910
    pool-1-thread-3 启动时间:1510802910
    pool-1-thread-4 is working...3
    线程返回future结果: null
    已完成
    线程返回future结果: null
    已完成
    线程返回future结果: null
    已完成
    线程返回future结果: null
    未完成
    pool-1-thread-3 is working...9
    pool-1-thread-1 is working...8
    pool-1-thread-2 is working...7
    线程返回future结果: null
    已完成
    线程返回future结果: null
    已完成
    线程返回future结果: null
    

     PS :Runnable和Callanble的区别 

    package cn.itcast_01_mythread.pool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ThreadPoolWithRunable {
    
        
        /**
         * 通过线程池执行线程
         * @param args
         */
        public static void main(String[] args) {
            //创建一个线程池
            ExecutorService pool = Executors.newCachedThreadPool();
            for(int i = 1; i < 5; i++){
                pool.execute(new Runnable() {//提交任务
                    @Override
                    public void run() {
                        System.out.println("thread name: " + Thread.currentThread().getName());
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                });
            }
            pool.shutdown();
        }
    
    }

    提交 Callable,该方法返回一个 Future 实例表示任务的状态

    调用submit提交任务, 匿名Callable,重写call方法, 有返回值, 获取返回值会阻塞,一直要等到线程任务返回结果

    见代码:ThreadPoolWithcallable

    pool-1-thread-1 启动时间:1510803239
    pool-1-thread-2 启动时间:1510803239
    pool-1-thread-2 is working...1
    pool-1-thread-2 启动时间:1510803239
    pool-1-thread-3 启动时间:1510803239
    未完成
    pool-1-thread-4 启动时间:1510803239
    pool-1-thread-4 is working...3
    pool-1-thread-4 启动时间:1510803239
    pool-1-thread-4 is working...5
    pool-1-thread-4 启动时间:1510803239
    pool-1-thread-1 is working...0
    线程返回future结果: 0
    已完成
    线程返回future结果: 1
    未完成
    pool-1-thread-1 启动时间:1510803240
    pool-1-thread-3 is working...2
    线程返回future结果: 2
    已完成
    线程返回future结果: 3
    未完成
    pool-1-thread-3 启动时间:1510803240
    pool-1-thread-2 is working...4
    线程返回future结果: 4
    pool-1-thread-2 启动时间:1510803241
    已完成
    线程返回future结果: 5
    未完成
    pool-1-thread-1 is working...7
    pool-1-thread-4 is working...6
    线程返回future结果: 6
    已完成
    线程返回future结果: 7
    未完成
    pool-1-thread-3 is working...8
    线程返回future结果: 8
    未完成
    pool-1-thread-2 is working...9
    线程返回future结果: 9
    

      

    package cn.itcast_01_mythread.pool;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.Random;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    import com.sun.corba.se.impl.encoding.OSFCodeSetRegistry.Entry;
    
    public class TestPool {
    
        public static void main(String[] args) throws Exception {
            Future<?> submit = null;
            Random random = new Random();
            
            //创建固定数量线程池
            ExecutorService exec = Executors.newFixedThreadPool(4);
            
            //创建调度线程池
            //ScheduledExecutorService exec = Executors.newScheduledThreadPool(4);
            
            //用来记录各线程的返回结果
            ArrayList<Future<?>> results = new ArrayList<Future<?>>();
            
            for (int i = 0; i < 10; i++) {
                //fixedPool提交线程,runnable无返回值,callable有返回值
                //submit = exec.submit(new TaskRunnable(i));
                submit = exec.submit(new TaskCallable(i));
                
                //对于schedulerPool来说,调用submit提交任务时,跟普通pool效果一致
                /*submit = exec.submit(new TaskCallable(i));*/
                //对于schedulerPool来说,调用schedule提交任务时,则可按延迟,按间隔时长来调度线程的运行
                //submit = exec.schedule(new TaskCallable(i), random.nextInt(10), TimeUnit.SECONDS);
                //存储线程执行结果
                results.add(submit);
                
            }
            
            
            //打印结果
            for(Future f: results){
                boolean done = f.isDone();
                System.out.println(done?"已完成":"未完成");  //从结果的打印顺序可以看到,即使未完成,也会阻塞等待
                System.out.println("线程返回future结果: " + f.get());
            }
            
            exec.shutdown();
            
        }
    }
    package cn.itcast_01_mythread.pool;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    /**
     * callable 跟runnable的区别:
     * runnable的run方法不会有任何返回结果,所以主线程无法获得任务线程的返回值
     * 
     * callable的call方法可以返回结果,但是主线程在获取时是被阻塞,需要等待任务线程返回才能拿到结果
     *
     */
    public class ThreadPoolWithcallable {
    
        public static void main(String[] args) throws InterruptedException, ExecutionException {
            ExecutorService pool = Executors.newFixedThreadPool(4); 
            
            for(int i = 0; i < 10; i++){
                Future<String> submit = pool.submit(new Callable<String>(){//线程是有返回值的
                    @Override
                    public String call() throws Exception {
                        //System.out.println("a");
                        Thread.sleep(5000);
                        return "b--"+Thread.currentThread().getName();
                    }               
                   });
                //从Future中get结果,这个方法是会被阻塞的,一直要等到线程任务执行完成   才能返回结果!!!!!   
    所以一般不要使用,他会阻塞主线程,如果必须想要得到结果在使用
    System.out.println(submit.get());, } pool.shutdown(); } }
    package cn.itcast_01_mythread.pool;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    
    /**
     * 列出并发包中的各种线程池
     * @author
     *
     */
    
    public class ExecutorDemo {
        
        public static void main(String[] args) {
            ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();//单线程,任务顺序执行
            //线程池里有很多线程需要同时执行,老的可用线程将被新的任务触发重新执行,
            //如果线程超过60秒内没执行,那么将被终止并从池中删除,
            ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
            int cpuNums = Runtime.getRuntime().availableProcessors();
            System.out.println(cpuNums);//CPU的核数
            //在构造函数中的参数4是线程池的大小,你可以随意设置,也可以和cpu的核数量保持
            ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(cpuNums);
            //用来调度即将执行的任务的线程池,可能是不是直接执行, 每隔多久执行一次... 策略型的
            ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(8);
            
            //只有一个线程,用来调度任务在指定时间执行
            ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        }
    }
    PS:上面可以看到Runnable和callable分别是线程中的不同的实现方式。Runnable不会返回结果,而callable会返回结果,但是在拿结果的时候会阻塞。
    在线程池中通过固定数量的线程进行相同的操作,有不同 的实现 。

    .1.     java并发包消息队列及在开源软件中的应用

    BlockingQueue也是java.util.concurrent下的主要用来控制线程同步的工具。           类似与锁

    主要的方法是:put、take一对阻塞存取;add、poll一对非阻塞存取。

             插入:

                       1)add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则抛出异常,不好

            2)offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.

            3)put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续, 有阻塞, 放不进去就等待

             读取:

            4)poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null; 

            5)take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止; 阻塞, 取不到就一直等

             其他

    int remainingCapacity();返回队列剩余的容量,在队列插入和获取的时候,不要瞎搞,数 据可能不准, 不能保证数据的准确性

    boolean remove(Object o); 从队列移除元素,如果存在,即移除一个或者更多,队列改    变了返回true

    public boolean contains(Object o); 查看队列是否存在这个元素,存在返回true

    int drainTo(Collection<? super E> c); //移除此队列中所有可用的元素,并将它们添加到给定 collection 中。取出放到集合中

    int drainTo(Collection<? super E> c, int maxElements); 和上面方法的区别在于,指定了移   动的数量; 取出指定个数放到集合

    BlockingQueue有四个具体的实现类,常用的两种实现类为:

    1、ArrayBlockingQueue:一个由数组支持的有界阻塞队列,规定大小的BlockingQueue,其构造函数必须带一个int参数来指明其大小.其所含的对象是以FIFO(先入先出)顺序排序的。

    2、LinkedBlockingQueue:大小不定的BlockingQueue,若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制,若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定.其所含的对象是以FIFO(先入先出)顺序排序的。

             LinkedBlockingQueue 可以指定容量,也可以不指定,不指定的话,默认最大是Integer.MAX_VALUE,其中主要用到put和take方法,put方法在队列满的时候会阻塞直到有队列成员被消费,take方法在队列空的时候会阻塞,直到有队列成员被放进来。

    LinkedBlockingQueue和ArrayBlockingQueue区别:

    LinkedBlockingQueue和ArrayBlockingQueue比较起来,它们背后所用的数据结构不一样,导致LinkedBlockingQueue的数据吞吐量要大于ArrayBlockingQueue,但在线程数量很大时其性能的可预见性低于ArrayBlockingQueue.

     

    生产者消费者的示例代码:

    见代码

    package cn.itcast_02_blockingqueue.main;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    
    import cn.itcast_02_blockingqueue.consumer.Consumer;
    import cn.itcast_02_blockingqueue.producer.Producer;
    
    public class Test {
        public static void main(String[] args) throws Exception {
            BlockingQueue<String> queue = new LinkedBlockingQueue<String>(2);
            // BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
            // 不设置的话,LinkedBlockingQueue默认大小为Integer.MAX_VALUE
            // BlockingQueue<String> queue = new ArrayBlockingQueue<String>(2);
            Consumer consumer = new Consumer(queue);
            Producer producer = new Producer(queue);
            for (int i = 0; i < 3; i++) {
                new Thread(producer, "Producer" + (i + 1)).start();//线程就是这样启动的
            }
            for (int i = 0; i < 5; i++) {
                new Thread(consumer, "Consumer" + (i + 1)).start();
            }
            
            new Thread(producer, "Producer" + (5)).start();
        }
    }
    package cn.itcast_02_blockingqueue.producer;
    
    import java.util.concurrent.BlockingQueue;
    
    public class Producer implements Runnable {  
        BlockingQueue<String> queue;    
        public Producer(BlockingQueue<String> queue) {  
            this.queue = queue;  
        }    
        @Override  
        public void run() {  
            try {  
                
                System.out.println("I have made a product:"  
                        + Thread.currentThread().getName()); 
                String temp = "A Product, 生产线程:"  
                        + Thread.currentThread().getName();  
                queue.put(temp);//如果队列是满的话,会阻塞当前线程  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }    
    }  
    package cn.itcast_02_blockingqueue.consumer;
    
    import java.util.concurrent.BlockingQueue;
    
    public class Consumer implements Runnable{  
        BlockingQueue<String> queue; 
        public Consumer(BlockingQueue<String> queue){  
            this.queue = queue;  
        }        
        @Override  
        public void run() {  
            try {  
                String consumer = Thread.currentThread().getName();
                System.out.println(consumer);  
                String temp = queue.take();//如果队列为空,会阻塞当前线程  
                System.out.println(consumer+"get a product:"+temp);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
    }  

    4.kafka 和 redis的应用,Storm中用到了很多的Blockquene

    PS: 比如,双十一在大屏上显示实时的消费金额,  在上面我们讲到,kafka她类似于消息队列,spout从kafka中那数据,根据业务把数据分发到各个节点blot取出来处理。把每一个任务再细分拓扑,到redis,然后再显示到屏幕上。  
    比如你在淘宝上下订单,并不是保存到数据库那么简单的。

     

  • 相关阅读:
    poj 2773 利用欧拉函数求互质数
    poj3358:欧拉定理
    poj:2992 因子数量
    poj3696:同余方程,欧拉定理
    USACO5.4-Character Recognition
    hdu5017:补题系列之西安网络赛1011
    hdu5014:number sequence对称思想
    欧拉函数,欧拉定理例题整理
    POJ 3463 Sightseeing (次短路)
    POJ
  • 原文地址:https://www.cnblogs.com/bee-home/p/7839974.html
Copyright © 2011-2022 走看看