zoukankan      html  css  js  c++  java
  • Java的多线程安全

    一,创建线程的几种方式:

    public class futureTask_test {
            public static void main(String[] args) throws InterruptedException, ExecutionException {
                //创建futureTask
                FutureTask<Integer> task=new FutureTask<Integer>(()->{
                    System.out.println("hello");
                    Thread.sleep(3000);
                    
                    return 1000;
                });
                
                //传入thread中
                Thread t1=new Thread(task,"t1");
                t1.start();
                System.out.println(task.get());
                
                
            }
    }

     二,jconsole

    Jconsole支持对本地和远程Java进程的监控。 

     

    1.     监控本地Java进程

    对于本地的Java进程很简单,只需要在启动需要监控的Java程序后,启动Jconsole,在本地进程列表中选择该进程点击“连接”按钮即可。例如,我们先启动JDK自带的另一个监控工具JVisualVM,然后启动JConsole:

    在JConsole的本地进程列表中可以看到JVisualVM进程,选择并连接之。
     

    2.    监控远程Java进程

    2.1  设置远程监控相关选项

    对于远程Java进程的连接,会要麻烦些,首先需要在需监控的远程Java程序启动脚本中加入与JVM远程监控相关的选项:
    1)     开启JVM远程监控
            
    -Dcom.sun.management.jmxremote=true

    2)     监控的IP地址
             
    -Djava.rmi.server.hostname=192.168.91.166,远程进程所在主机的IP。

    3)     监控的端口
            
    -Dcom.sun.management.jmxremote.port=50013,这个端口值可以任意设置,但在之后用Jconsole连接这个远程进程的时候,远程进程中的port一定要和此处的设置一致,并且一定不要和远程进程的服务端口区分开。

    4)     是否禁用ssl验证
            
    -Dcom.sun.management.jmxremote.ssl,false为禁用,true为启用。

    5)     是否需要用户密码验证
            
    -Dcom.sun.management.jmxremote.authenticate,false为不需要验证,true为需要验证。但我在Win7(32位、64位)中tomcat5.5.23下试着将该选项设置为true,tomcat无法启动,命令行窗口一闪而过。

            有试过象参考文献中所说的,修改jdkjre及jre下的jmxremote.password和jmxremote.access,又有尝试象参考文献4中所说的,修改jdkjre及jre下结果也是一样的。结果也是一样的management.properties,结果均一样的。
            
             有哪位遇到过,并解决了这个问题,如果看到这儿的,希望可以帮忙解答一下,先谢过了!! 


    2.2  连接远程Java进程

    有两种连接远程Java进程的方法,效果是一样的。


    1)     命令行中连接远程Java进程
            直接在命令行中启动JConsole的时候,带远程进程参数:
     

    2)     JConsole图形界面上连接远程Java进程

    先在命令行中不带远程进程参数启动JConsole,然后在JConsole新建连接对话框中选择远程进程,填入远程进程参数:,最后点击“连接”按钮。

    java -cp . -Dcom.sun.management.jmxremote.port=8999 -Dcom.sun.managent.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false JConsoleTest
    二,两阶段终止模式

     1 public class TwoPhaseTermination {
     2 
     3  public static void main(String[] args) throws InterruptedException {
     4     Monitor monitor=new Monitor();
     5     monitor.startMon();
     6     
     7     Thread.sleep(3500);
     8     monitor.stop();
     9 }
    10 }
    11 
    12 class Monitor{
    13     private Thread monitor;//监视器线程
    14      
    15      //开始监视的方法
    16      public void startMon() {
    17          //定义线程
    18          monitor=new Thread(()->{
    19              while(true) {
    20                  //获取打断标记
    21                  boolean interrupted=Thread.currentThread().isInterrupted();
    22                  if(interrupted) {//被打断了
    23                      System.out.println("我被打断了,正在处理后事。。");
    24                      break;
    25                  }else {//未被打断
    26                      try {
    27                         Thread.sleep(1000);//让出时间片
    28                         //未出现异常
    29                         System.out.println("我开始执行监控事情");
    30                     } catch (InterruptedException e) {
    31                         // TODO: handle exception
    32                         //未出现异常,被打断了
    33                         //interrupted=true;不能用这种方式来改变打断标志
    34                         //
    35                         monitor.interrupt();
    36                         
    37                     }
    38                      
    39                  }
    40              }
    41          });
    42          
    43          //开启线程
    44          monitor.start();
    45          
    46      }
    47      
    48      //停止监控
    49      public  void stop() {
    50         monitor.interrupt();
    51     }
    52      
    53 }
    监控案例

     三,保护性暂停模式

     1 public class GuardedSuspension {
     2     private int id;
     3     
     4     public GuardedSuspension(int id) {
     5         this.id=id;
     6     }
     7 
     8     public int getId() {
     9         return id;
    10     }
    11 
    12     
    13 
    14     public Object getResponse() {
    15         return response;
    16     }
    17 
    18     
    19 
    20     private Object response;
    21     
    22     public Object get(long timeout) {
    23         synchronized (this) {
    24             long begin=System.currentTimeMillis();
    25             long passtime=0;
    26             if(response==null) {
    27                 //没有数据
    28                 try {
    29                     long waittime=timeout-passtime;
    30                     this.wait(waittime);
    31                 } catch (InterruptedException e) {
    32                     // TODO Auto-generated catch block
    33                     e.printStackTrace();
    34                 }
    35                 passtime=System.currentTimeMillis()-begin;
    36             }
    37             return response;
    38         }
    39     };
    40     
    41     public void set(Object o) {
    42         synchronized (this) {
    43             this.response=o;
    44             this.notifyAll();
    45             
    46         }
    47     };
    48 }
    Guarded

    升级的解耦:

     1 public class Mailboxs {
     2     private static  int id=1;
     3     private static synchronized int  generateId() {
     4         return id++;//自增id
     5     }
     6     private static Map<Integer, GuardedSuspension> boxes=new Hashtable<Integer, GuardedSuspension>();//保证线程安全
     7     public static GuardedSuspension createGuardedSuspension(int id) {
     8         
     9         //加入邮箱,取信
    10     return    boxes.remove(id);
    11         
    12         
    13     }
    14     public static GuardedSuspension getGuardedSuspension() {
    15         GuardedSuspension go=new GuardedSuspension(generateId());
    16         //加入邮箱,送信
    17         boxes.put(go.getId(),go);
    18         return go;
    19         
    20     }
    21     public static Set<Integer> getIds() {
    22         return boxes.keySet();
    23     }
    24     
    25     
    26 }
    View Code

     四,顺序执行

    public class Secquence extends ReentrantLock{
        
        public static void main(String[] args) {
            Secquence secquence=new Secquence();
            Condition a=secquence.newCondition();
            Condition b=secquence.newCondition();
            Condition c=secquence.newCondition();
            Thread t1=new Thread(()->{
                secquence.showInfo("a", a, b);
            },"t1");
            Thread t2=new Thread(()->{
                secquence.showInfo("b", b, c);
            },"t2");
            
            Thread t3=new Thread(()->{
                secquence.showInfo("c", c, a);
            },"t3");
            t1.start();
            t2.start();t3.start();
            
            //主线程开启
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            secquence.lock();
            try {
                System.out.println("开始");
                a.signal();
            }finally {
                secquence.unlock();
            }
        }
        
        //多线程的顺序执行
        
        private static int loopNumber=5;
        //打印方法
        public void showInfo(String str,Condition current,Condition next) {
            for(int i=0;i<loopNumber;i++) {
                lock();
                try {
                    
                    current.await();
                    System.out.print(str);
                    next.signal();
                    
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                } finally {
                    // TODO: handle exception
                    unlock();
                }
            }
            
        }
        
    }
    reentrantLock方式
     1 public class Secquence extends ReentrantLock{
     2     
     3     public static void main(String[] args) {
     4         Secquence secquence=new Secquence();
     5         Condition a=secquence.newCondition();
     6         Condition b=secquence.newCondition();
     7         Condition c=secquence.newCondition();
     8         Thread t1=new Thread(()->{
     9             secquence.showInfo("a", a, b);
    10         },"t1");
    11         Thread t2=new Thread(()->{
    12             secquence.showInfo("b", b, c);
    13         },"t2");
    14         
    15         Thread t3=new Thread(()->{
    16             secquence.showInfo("c", c, a);
    17         },"t3");
    18         t1.start();
    19         t2.start();t3.start();
    20         
    21         //主线程开启
    22         try {
    23             Thread.sleep(1000);
    24         } catch (InterruptedException e) {
    25             // TODO Auto-generated catch block
    26             e.printStackTrace();
    27         }
    28         secquence.lock();
    29         try {
    30             System.out.println("开始");
    31             a.signal();
    32         }finally {
    33             secquence.unlock();
    34         }
    35     }
    36     
    37     //多线程的顺序执行
    38     
    39     private static int loopNumber=5;
    40     //打印方法
    41     public void showInfo(String str,Condition current,Condition next) {
    42         for(int i=0;i<loopNumber;i++) {
    43             lock();
    44             try {
    45                 
    46                 current.await();
    47                 System.out.print(str);
    48                 next.signal();
    49                 
    50             } catch (InterruptedException e) {
    51                 // TODO Auto-generated catch block
    52                 e.printStackTrace();
    53             } finally {
    54                 // TODO: handle exception
    55                 unlock();
    56             }
    57         }
    58         
    59     }
    60     
    61 }
    unpark&park

    1,同步锁原理

    2,线程的状态图

    2.1无限等待状态的图示

     1 public class Demo01WaitAndNotify {
     2     public static void main(String[] args) {
     3         //创建唯一锁对象
     4         Object obj=new Object();
     5         
     6         //创建一个消费者线程
     7         new Thread() {
     8             @Override
     9             public void run() {
    10                 //也一直等着吃
    11                 while(true) {
    12                     //保证等待和唤醒只能有一个执行,
    13                     synchronized (obj) {
    14                         System.out.println("告诉老板要的包子的种类和数量");
    15                         try {
    16                             obj.wait();
    17                         } catch (InterruptedException e) {
    18                             // TODO Auto-generated catch block
    19                             e.printStackTrace();
    20                         }
    21                         //唤醒之后的代码
    22                         System.out.println("包子做好了,开饭了");
    23                         System.out.println("-----------------------");
    24                     }
    25                 }
    26             }
    27             
    28         }.start();
    29         
    30         //创建一个老板线程(生产者)
    31         new Thread() {
    32             @Override
    33             public void run() {
    34                 //一直在这做
    35                 while(true) {
    36                     try {
    37                         Thread.sleep(5000);
    38                     } catch (InterruptedException e) {
    39                         // TODO Auto-generated catch block
    40                         e.printStackTrace();
    41                     }
    42                     synchronized (obj) {
    43                         System.out.println("老板5秒之后做好包子,告知顾客可以吃 包子");
    44                         obj.notify();
    45                     }
    46                 }
    47             }
    48         }.start();
    49     }
    50     
    51 }
    生产者消费者模型

    等待与唤醒机制,线程间的通信 

    代码实现:

     1 package BaoZi;
     2 
     3 public class Baozi {
     4     //
     5     String pi;
     6     //
     7     String xian;
     8     //状态
     9     boolean flag=false;
    10 }
    Baozi .java
     1 package BaoZi;
     2 
     3 //注意使用同步技术,互斥
     4 //必须使用同一个锁对象,并且将包子对象作为参数进行传递
     5 
     6 public class BaoZiPu extends Thread {
     7     //包子变量
     8     private Baozi bz;
     9     
    10     //使用带参的构造方法,为包子变量赋值
    11     public BaoZiPu(Baozi bz) {
    12         this.bz=bz;
    13     }
    14     
    15     //设置线程任务(run)生产包子
    16     @Override
    17     public void run() {
    18         int count=0;//用于不同的馅的产生
    19         //使用同步机制
    20         while(true) {//一直生产包子
    21             synchronized (bz) {
    22                 if(bz.flag==true) {
    23                     try {
    24                         bz.wait();
    25                     } catch (InterruptedException e) {
    26                         // TODO Auto-generated catch block
    27                         e.printStackTrace();
    28                     }
    29                 }
    30                 
    31                 //被唤醒之后,生产包子
    32                 if(count%2==0) {
    33                     //生产薄皮 三鲜馅
    34                     bz.pi="薄皮";
    35                     bz.xian="三鲜馅";
    36                 }else {
    37                     bz.pi="冰皮";
    38                     bz.xian="牛肉大葱";
    39                 }
    40                 count++;
    41                 System.out.println("包子铺正在生产:"+bz.pi+bz.xian+"包子");
    42                 //生产包子要3秒
    43                 try {
    44                     Thread.sleep(3000);
    45                 } catch (InterruptedException e) {
    46                     // TODO Auto-generated catch block
    47                     e.printStackTrace();
    48                 }
    49                 //包子铺生产好了,修改状态
    50                 bz.flag=true;
    51                 bz.notify();//唤醒等待的吃货线程
    52                 System.out.println("包子铺已经生产:"+bz.pi+bz.xian+"包子好了,您可以吃了");
    53             }
    54         }
    55         
    56     }
    57 
    58 }
    BaoZiPu.java
     1 package BaoZi;
     2 
     3 public class ChiHuo extends Thread{
     4     //包子变量
     5         private Baozi bz;
     6         
     7         //使用带参的构造方法,为包子变量赋值
     8         public ChiHuo(Baozi bz) {
     9             this.bz=bz;
    10         }
    11         
    12     
    13     @Override
    14     public void run() {
    15         //使用死循环一直吃
    16         while(true) {
    17             synchronized (bz) {
    18                 if(bz.flag==false) {
    19                     //没有包子
    20                     try {
    21                         bz.wait();
    22                     } catch (InterruptedException e) {
    23                         // TODO Auto-generated catch block
    24                         e.printStackTrace();
    25                     }
    26                 }
    27                 
    28                 //被唤醒之后执行的代码
    29                 System.out.println("吃货正在吃"+bz.pi+bz.xian+"的包子");
    30                 bz.flag=false;
    31                 bz.notify();//唤醒包子铺
    32                 System.out.println("吃货已经吃完了:"+bz.pi+bz.xian+"的包子");
    33                 System.out.println("-------------------");
    34             }
    35         }
    36     }
    37 
    38 }
    ChiHuo.java
     1 package BaoZi;
     2 
     3 
     4 
     5 public class Test {
     6     public static void main(String[] args) {
     7         //创建包子对象
     8         Baozi bz=new Baozi();
     9         new BaoZiPu(bz).start();//创建包子铺线程
    10         new ChiHuo(bz).start();//创建吃货线程
    11     }
    12 }
    test.java

     五,线程池

    1,自定义线程池

     1 package ThreadPool;
     2 //线程池
     3 
     4 import java.util.HashSet;
     5 import java.util.concurrent.TimeUnit;
     6 
     7 public class TeadPool {
     8     //任务队列
     9  private BlockingQueue<Runnable> taskQueue;
    10  //核心线程
    11  private int coreSize;
    12  //线程集合,存放任务
    13  private HashSet<Worker> workers=new HashSet<TeadPool.Worker>();
    14  //获取任务的超时时间,以及单位
    15  private long timeout;
    16  private TimeUnit timeUnit;
    17      //拒绝策略
    18  RejectPolicy<Runnable> rejectPolicy;
    19  public TeadPool( int coreSize ,long timeout,TimeUnit timeUnit,int queueSize,RejectPolicy<Runnable> rejectPolicy) {
    20     
    21     taskQueue=new BlockingQueue<Runnable>(queueSize);
    22     this.coreSize = coreSize;
    23     
    24     this.timeout = timeout;
    25     this.timeUnit = timeUnit;
    26     this.rejectPolicy=rejectPolicy;
    27 }
    28 
    29  //执行任务
    30  public void execute(Runnable task) {//要加锁
    31      synchronized (workers) {
    32         //小于交给worker执行
    33          //if线程集合中的任务数大于核心线程数,放入阻塞队列
    34          if(workers.size()<coreSize) {
    35              Worker worker=new Worker(task);
    36              workers.add(worker);
    37              worker.start();
    38          }else {
    39              //使用策略模式,处理情况拒绝策略,将决策权交给线程池的使用者
    40              //抽象成接口,实现是有调用者实现
    41             //掉用策略的逻辑放在blockingquqeue
    42              taskQueue.tryPut(rejectPolicy,task);
    43          }
    44     }
    45      
    46  }
    47  
    48 //处理任务的类
    49  class Worker extends Thread{
    50      private Runnable task;
    51      
    52      public Worker(Runnable task) {
    53         
    54         this.task = task;
    55     }
    56 
    57     @Override
    58      
    59         public void run() {
    60             //执行任务
    61             //task不为空,执行任务
    62             //task执行完毕,去阻塞队列里取
    63             while(task!=null||(task=taskQueue.take())!=null) {
    64                 try {
    65                     task.run();
    66                 } catch (Exception e) {
    67                     // TODO: handle exception
    68                 }finally {
    69                     task=null;//执行完任务
    70                 }
    71                 
    72             }
    73             synchronized (workers) {
    74                 workers.remove(this);//没有任务要执行,移除
    75             }
    76         }
    77     
    78  }
    79 }
    threadPool
      1 import java.util.ArrayDeque;
      2 import java.util.Deque;
      3 import java.util.concurrent.TimeUnit;
      4 import java.util.concurrent.locks.Condition;
      5 import java.util.concurrent.locks.ReentrantLock;
      6 
      7 public class BlockingQueue<T> {
      8     //任务队列
      9     private Deque<T> queue=new ArrayDeque<T>();
     10     //
     11     ReentrantLock lock=new ReentrantLock();
     12     //容量
     13     private int capcity;
     14     //生产者等待条件变量
     15     private Condition fullWaitSet= lock.newCondition();
     16     //消费者等待条件变量
     17     private Condition emptyWaitSet=lock.newCondition();
     18     
     19     public BlockingQueue(int size) {
     20     this.capcity=size;
     21         // TODO Auto-generated constructor stub
     22     }
     23     //阻塞获取
     24     public T take() {//要加速操作
     25         lock.lock();
     26         try {
     27             
     28             while(queue.isEmpty()) {
     29                     //为空
     30                     try {
     31                         emptyWaitSet.await();
     32                     } catch (InterruptedException e) {
     33                         // TODO Auto-generated catch block
     34                         e.printStackTrace();
     35                     }
     36                 }
     37                     //不为空
     38                 T t=queue.removeFirst();//要移除
     39                     //获取后唤醒 full
     40                     fullWaitSet.signal();
     41                     return t;
     42                 
     43             
     44             
     45         } finally {
     46             // TODO: handle exception
     47             lock.unlock();
     48         }
     49         
     50     }
     51     //超时时间阻塞获取
     52         public T offer(long timeout,TimeUnit timeUnit) {//要加速操作
     53             lock.lock();
     54             
     55             try {
     56                 long nanos=timeUnit.toNanos(timeout);
     57                 while(queue.isEmpty()) {
     58                         //为空
     59                         try {
     60                             if(nanos<=0) {
     61                                 return null;
     62                             }
     63                             nanos=emptyWaitSet.awaitNanos(nanos);//该方法会返回等待的剩余时间
     64                             
     65                         } catch (InterruptedException e) {
     66                             // TODO Auto-generated catch block
     67                             e.printStackTrace();
     68                         }
     69                     }
     70                         //不为空
     71                     T t=queue.removeFirst();//要移除
     72                         //获取后唤醒 full
     73                         fullWaitSet.signal();
     74                         return t;
     75                     
     76                 
     77                 
     78             } finally {
     79                 // TODO: handle exception
     80                 lock.unlock();
     81             }
     82             
     83         }
     84     //阻塞添加
     85     public void put(T t) {
     86         lock.lock();
     87         try {
     88             while(queue.size()==capcity) {
     89                 try {
     90                     fullWaitSet.await();
     91                 } catch (InterruptedException e) {
     92                     // TODO Auto-generated catch block
     93                     e.printStackTrace();
     94                 }
     95             }
     96             //队列有剩余空间
     97             queue.addLast(t);
     98             //唤醒 空
     99             emptyWaitSet.signal();
    100             
    101         } finally {
    102             // TODO: handle finally clause
    103             lock.unlock();
    104         }
    105     }
    106     //阻塞添加的超时时间
    107         public boolean poll(T t,long timeout,TimeUnit timeUnit) {
    108             lock.lock();
    109             try {
    110                 long nanos=timeUnit.toNanos(timeout);
    111                 while(queue.size()==capcity) {
    112                     try {
    113                         if(nanos<=0) {
    114                             return false;
    115                         }
    116                         nanos=fullWaitSet.awaitNanos(nanos);
    117                     } catch (InterruptedException e) {
    118                         // TODO Auto-generated catch block
    119                         e.printStackTrace();
    120                     }
    121                 }
    122                 //队列有剩余空间
    123                 queue.addLast(t);
    124                 //唤醒 空
    125                 emptyWaitSet.signal();
    126                 return true;
    127             } finally {
    128                 // TODO: handle finally clause
    129                 lock.unlock();
    130             }
    131         }
    132     //获取大小
    133     public int getCapcity() {//也要枷锁
    134         lock.lock();
    135         try {
    136             return this.capcity;
    137         } finally {
    138             // TODO: handle finally clause
    139             lock.unlock();
    140         }
    141         
    142     }
    143     public void tryPut(RejectPolicy<T> rejectPolicy,T task) {
    144         // TODO Auto-generated method stub
    145         lock.lock();
    146         try {
    147             //任务队列满了
    148             if(queue.size()==capcity) {
    149                 //拒绝策略实现
    150                 rejectPolicy.reject(this, task);
    151             }else {
    152                 //队列有剩余空间
    153                 queue.addLast(task);
    154                 //唤醒 空
    155                 emptyWaitSet.signal();
    156             }
    157         } finally {
    158             // TODO: handle finally clause
    159             lock.unlock();
    160         }
    161     }
    162 }
    阻塞队列
    //拒绝策略
    public interface RejectPolicy<T> {
        void reject(BlockingQueue<T> queue,T task);
    }

    2,reentrantLock的tryAcquire的源码分析



    原文链接:https://blog.csdn.net/sscout/article/details/102616722

  • 相关阅读:
    人生,时间煮雨,岁月缝花
    Nginx入门到实践---nginx中间件
    CentOS 8 系统安装 Oracle 19c 数据库
    centos8 下安装Oracle jdk8(免安装版)
    Mysql忘记密码
    这个病秋季高发!调理不当最伤孩子体质,记得收好这2个方
    redis client-output-buffer-limit 设置
    redis主从同步收到以下参数影响
    linux:永久打开core文件功能
    linux:core文件的产生和调试
  • 原文地址:https://www.cnblogs.com/gjx1212/p/14734914.html
Copyright © 2011-2022 走看看