zoukankan      html  css  js  c++  java
  • JAVA简易数据连接池Condition

    用Condition和synchronized:

    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.SQLException;
    import java.util.LinkedList;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    public class MyDataSource {
    	// 不停的增删改
    	private LinkedList<Connection> pool = new LinkedList<>();
    	private static final int INIT_CONNECTIONS = 10;
    	private static final String DRIVER_CLASS = "";
    	private static final String USER = "";
    	private static final String PASSWORD = "";
    	private static final String URL = "";
    
    	private Lock lock = new ReentrantLock();
    	private Condition c1 = lock.newCondition();
    
    	static {
    		try {
    			Class.forName("com.mysql.jdbc.Driver");
    		} catch (ClassNotFoundException e) {
    			e.printStackTrace();
    		}
    	}
    
    	public MyDataSource() {
    		for (int i = 0; i < INIT_CONNECTIONS; i++) {
    			try {
    				Connection conn = DriverManager.getConnection(URL, USER, PASSWORD);
    				pool.addLast(conn);
    			} catch (SQLException e) {
    				e.printStackTrace();
    			}
    		}
    	}
    
    	public Connection getConnection() {
    		Connection result = null;
    		lock.lock();
    		try {
    			while (pool.size() <= 0) {
    				try {
    					c1.await();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			if (!pool.isEmpty()) {
    				result = pool.removeFirst();
    			}
    
    			return result;
    		} finally {
    			lock.unlock();
    		}
    	}
    
    	public Connection getConnectionSynchronized() {
    		Connection result = null;
    		synchronized (pool) {
    			while (pool.size() <= 0) {
    				try {
    					wait();
    				} catch (InterruptedException e) {
    					e.printStackTrace();
    				}
    			}
    			if (!pool.isEmpty()) {
    				result = pool.removeFirst();
    			}
    		}
    		return result;
    	}
        public void release(Connection conn){
        	if(conn!=null){
        		lock.lock();
        		try {
    				pool.addLast(conn);
    				c1.signal(); 
    			} finally {
    				lock.unlock();
    			}
        	}
        }
    	public void releaseSynchronized(Connection conn) {
    		if (conn != null) {
    			synchronized (pool) {
    				pool.addLast(conn);
    				notifyAll();
    			}
    		}
    	}
    }
    

      线程之间的通信-join

    public class Demo {
        public void a(Thread joinThread){
        	System.out.println("方法a执行了。。。。");
        	joinThread.start();
        	try {
    			joinThread.join();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
        	System.out.println("a方法执行完毕。。。");
        }
        public void b(){
        	System.out.println("加塞线程开始执行");
        	try {
    			Thread.sleep(1000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
        	System.out.println("加塞线程执行完毕...");
        }
        public static void main(String[] args) {
    		Demo demo=new Demo();
    		Thread joinThread=new Thread(new Runnable() {
    			@Override
    			public void run() {
    				demo.b();
    			}
    		});
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    				demo.a(joinThread);
    			}
    		}).start();
    	}
    }
    

      ThreadLocal:

    public class Demo99 {
    	private ThreadLocal<Integer> count = 
    			new ThreadLocal<Integer>() {
    		protected Integer initialValue() {
    			return new Integer(0);
    		};
    	};
    
    	public int getNext() {
    		Integer value = count.get();
    		value++;
    		count.set(value);
    		return value;
    	}
    
    	public static void main(String[] args) {
    		Demo99 d=new Demo99();
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    				while (true) {
    					System.out.println(
    							Thread.currentThread().getName() + " " + 
    					d.getNext());
    					try {
    						Thread.sleep(1000);
    					} catch (InterruptedException e) {
    						
    						e.printStackTrace();
    					}
    				}
    			}
    		}).start();
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    				while (true) {
    					System.out.println(
    							Thread.currentThread().getName() + " " + 
    					d.getNext());
    					try {
    						Thread.sleep(1000);
    					} catch (InterruptedException e) {
    						
    						e.printStackTrace();
    					}
    				}
    			}
    		}).start();
    		
    	}
    }
    

      线程之间的通讯:CountDownLatch

    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    public class Demo {
    	private int[] nums;
    
    	public Demo(int line) {
    		nums = new int[line];
    	}
    
    	public void calc(String line, int index) {
    
    		String[] nus = line.split(",");// 切分出每一个值
    		int total = 0;
    		for (String num : nus) {
    			total += Integer.parseInt(num);
    		}
    		nums[index] = total;// 把计算的结果放到数组中指点的位置
    		System.out.println(Thread.currentThread().getName() + "行计划任务...." + line + "结果为:" + total);
    	}
    
    	public void sum() {
    		System.out.println("汇总线程开始执行....");
    		int total = 0;
    		for (int i = 0; i < nums.length; i++) {
    			total += nums[i];
    		}
    		System.out.println("最终的结果为:" + total);
    	}
    
    	public static void main(String[] args) {
    		List<String> contents = readFile();
    		int lineCount = contents.size();
    		Demo d = new Demo(lineCount);
    		for (int i = 0; i < lineCount; i++) {
    			final int j = i;
    			new Thread(new Runnable() {
    				@Override
    				public void run() {
    					d.calc(contents.get(j), j);
    				}
    			}).start();
    		}
    		while (Thread.activeCount() > 1) {
    
    		}
    		d.sum();
    	}
    
    	private static List<String> readFile() {
    		List<String> contents = new ArrayList<>();
    		String line = null;
    		BufferedReader br = null;
    		try {
    			br = new BufferedReader(new FileReader("e:\num.txt"));
    			while ((line = br.readLine()) != null) {
    				contents.add(line);
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			if (br != null) {
    				try {
    					br.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    		return contents;
    	}
    }
    

      

    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    public class Demo2 {
    	private int[] nums;
    
    	public Demo2(int line) {
    		nums = new int[line];
    	}
    
    	public void calc(String line, int index,CountDownLatch latch) {
    
    		String[] nus = line.split(",");// 切分出每一个值
    		int total = 0;
    		for (String num : nus) {
    			total += Integer.parseInt(num);
    		}
    		nums[index] = total;// 把计算的结果放到数组中指点的位置
    		System.out.println(Thread.currentThread().getName() + "行计划任务...." + line + "结果为:" + total);
    	    latch.countDown();
    	}
    
    	public void sum() {
    		System.out.println("汇总线程开始执行....");
    		int total = 0;
    		for (int i = 0; i < nums.length; i++) {
    			total += nums[i];
    		}
    		System.out.println("最终的结果为:" + total);
    	}
    
    	public static void main(String[] args) {
    		List<String> contents = readFile();
    		int lineCount = contents.size();
    		CountDownLatch latch=new CountDownLatch(lineCount);
    		Demo2 d = new Demo2(lineCount);
    		for (int i = 0; i < lineCount; i++) {
    			final int j = i;
    			new Thread(new Runnable() {
    				@Override
    				public void run() {
    					d.calc(contents.get(j), j,latch);
    				}
    			}).start();
    		}
    		try {
    			latch.await();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		d.sum();
    	}
    
    	private static List<String> readFile() {
    		List<String> contents = new ArrayList<>();
    		String line = null;
    		BufferedReader br = null;
    		try {
    			br = new BufferedReader(new FileReader("e:\num.txt"));
    			while ((line = br.readLine()) != null) {
    				contents.add(line);
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			if (br != null) {
    				try {
    					br.close();
    				} catch (IOException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    		return contents;
    	}
    }
    

      线程之间的通信:CyclicBarrier  达到屏障点之后,后面的线程才继续执行

    import java.util.Random;
    import java.util.concurrent.CyclicBarrier;
    
    public class Demo {
    	Random random = new Random();
    
    	public void meeting(CyclicBarrier barrier) {
    		try {
    			Thread.sleep(random.nextInt(4000));
    		} catch (InterruptedException e1) {
    			// TODO Auto-generated catch block
    			e1.printStackTrace();
    		}
    		System.out.println(Thread.currentThread().getName() + "到达会议室,等待开会...");
    		if(Thread.currentThread().getName().equals("Thread-1")){
    			throw new RuntimeException();
    		}
    		try {
    			barrier.await();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		System.out.println(Thread.currentThread().getName()+"发言");
    	}
    
    	public static void main(String[] args) {
    		Demo demo = new Demo();
    		CyclicBarrier barrier = new CyclicBarrier(10, new Runnable() {
    			@Override
    			public void run() {
    				System.out.println("好,我们开始开会.....");
    			}
    		});
    		for (int i = 0; i < 10; i++) {
    			new Thread(new Runnable() {
    				@Override
    				public void run() {
    					demo.meeting(barrier);
    				}
    			}).start();
    		}
    	}
    }  

     线程之间的通讯:Semaphore:能控制被多少个线程同时访问

    import java.util.concurrent.Semaphore;
    
    public class Demo22 {
        public void method(Semaphore semaphore){
        	try {
    			semaphore.acquire();
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
        	System.out.println(Thread.currentThread().getName()+"is run...");
        	try {
    			Thread.sleep(2000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
        	semaphore.release();
        }
        public static void main(String[] args) {
    		Demo22 d=new Demo22();
    		Semaphore semaphore=new Semaphore(10);
    		while (true) {
    			new Thread(new Runnable() {
    				@Override
    				public void run() {
    					d.method(semaphore);
    				}
    			}).start();
    			
    		}
    	}
    }
    

      Exchanger:

    import java.util.concurrent.Exchanger;
    
    public class Demo33 {
        public void a(Exchanger<String> exch){
        	System.out.println("a 方法执行....");
        	try {
        		System.out.println("a 线程正在抓取数据....");
    			Thread.sleep(2000);
    			System.out.println("a 抓取到数据.....");
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
        	String res="12345";
        	try {
        		System.out.println("等待对比结果....");
    			exch.exchange(res);
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
        }
        public void b(Exchanger<String> exch){
        	System.out.println("b 方法开始执行....");
        	try {
        		System.out.println("b 方法开始抓取数据....");
    			Thread.sleep(4000);
    			System.out.println("b 方法抓取数据结束....");
    		} catch (InterruptedException e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
        	String res="12345";
        	try {
        		String value=exch.exchange(res);
        		System.out.println("开始进行比对.....");
        		System.out.println("比对结果为:"+value.equals(res));
    			
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
        }
        public static void main(String[] args) {
        	Demo33 d=new Demo33();
        	Exchanger<String> exch=new Exchanger<>();
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    			    d.a(exch);
    			}
    		}).start();
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    			    d.b(exch);
    			}
    		}).start();
    	}
    }
    

      提前完成任务Future使用:

    import java.util.concurrent.Callable;
    import java.util.concurrent.FutureTask;
    
    public class Demo44 {
    public static void main(String[] args)  throws Exception{
    	Callable<Integer> call=new Callable<Integer>() {
    		@Override
    		public Integer call() throws Exception{
    			System.out.println("正在计算结果....");
    			Thread.sleep(3000);
    			return 1;
    		}
    	};
    	FutureTask<Integer> task=new FutureTask<>(call);
    	Thread thread=new Thread(task);
    	thread.start();
    	
    	//do something
    	System.out.println("干点别的...");
    	Integer result=task.get();
    	System.out.println("拿到的结果为:"+result);
    }
    }
    

      用买蛋糕和去上班的例子说明 :

    public class Product {
         private int id;
         private String name;
         public int getId() {
    		return id;
    	}
         public void setId(int id) {
    		this.id = id;
    	}
         public String getName() {
    		return name;
    	}
        public void setName(String name) {
    		this.name = name;
    	}
        @Override
        public String toString() {
        	return "Product [id="+id+",name="+name+"]";
        }
        public Product(int id,String name) {
        	try {
        		System.out.println("开始生产"+name);
    			Thread.sleep(4000);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    		this.id=id;
    		this.name=name;
    		System.out.println(name+"生产完毕");
    	}
    }
    

      

    public class Future {
       private Product product;
       private boolean down;
       public synchronized void setProduct(Product product){
    	   if(down) return;
           
    	   this.product=product;
    	   this.down=true;
    	   notifyAll();
       }
       public synchronized Product get(){
    	   while(!down){
    		   try {
    			wait();
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		}
    	   }
    	   return product;
       }
    }
    

      

    import java.util.Random;
    
    public class ProductFactory {
        public Future createProduct(String name){
        	Future f=new Future();//创建一个订单
        	System.out.println("下单成功,你可以去上班了...");
        	 //生产产品
        	new Thread(new Runnable() {
    			@Override
    			public void run() {
    				Product p=new Product(new Random().nextInt(), name);
    		    	f.setProduct(p);
    			}
    		}).start();
    
        	return f;
        }
    }
    

      

    import java.util.Random;
    
    public class ProductFactory {
        public Future createProduct(String name){
        	Future f=new Future();//创建一个订单
        	System.out.println("下单成功,你可以去上班了...");
        	 //生产产品
        	new Thread(new Runnable() {
    			@Override
    			public void run() {
    				Product p=new Product(new Random().nextInt(), name);
    		    	f.setProduct(p);
    			}
    		}).start();
    
        	return f;
        }
    }
    

      Callalbe和Runnable的区别:

    Runnable run方法是被线程调用的,在run方法是异步执行的

    Callable的call方法,不是异步执行的,是由Future的Run方法调用的

    ForkJoin:
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.ForkJoinTask;
    import java.util.concurrent.RecursiveTask;
    
    public class Demo extends RecursiveTask<Integer>{
    	private int begin;
    	private int end;
    	public Demo(int begin,int end){
    		this.begin=begin;
    		this.end=end;
    	}
    	@Override
    	protected Integer compute(){
    		int sum=0;
    		
    		//拆分任务
    		if(end-begin<=2){
    			//计算
    			for(int i=begin;i<=end;i++){
    				sum+=i;
    			}
    		}else{ //拆分
    			Demo d1=new Demo(begin,(begin+end)/2);
    			Demo d2=new Demo((begin+end)/2+1,end);
    			//执行任务
    			d1.fork();
    			d2.fork();
    			
    			Integer a=d1.join();
    			Integer b=d2.join();
    			sum=a+b;
    		}
    		return sum;
    	}
    	public static void main(String[] args) throws Exception {
    		ForkJoinPool pool=new ForkJoinPool();
    	    ForkJoinTask<Integer> future=pool.submit(new Demo(1,100));
    	    System.out.println("......");
    	    System.out.println("计算的值为:"+future.get());
    	}
    
    }
    

      

  • 相关阅读:
    Flask + vue 前后端分离的 二手书App
    Kafka 0.10.0.1 consumer get earliest partition offset from Kafka broker cluster
    Kafka topic Schema version mismatch error
    ORM的多表查询详述
    ORM多表操作之创建关联表及添加表记录
    ORM的单表操作
    Django的模板层简介
    Django的视图层简介
    Django中的路由配置简介
    Django简介及Django项目的创建详述
  • 原文地址:https://www.cnblogs.com/sunliyuan/p/11182185.html
Copyright © 2011-2022 走看看