1、任何的高并发,请求总是会有一个顺序的
2、java的队列的数据结构是先进先出的取值顺序
3、BlockingQueue类(线程安全)(使用方法可以百度)
一般使用LinkedBlockingQueue
利用以上几点,我们可以把高并发时候的请求放入一个队列,队列的大小可以自己定义,比如队列容量为1000个数据,那么可以利用过滤器或者拦截器把当前的请求放入队列,如果队列的容量满了,其余的请求可以丢掉或者作出相应回复
具体实施:
利用生产者、消费者模型:
将队列的请求一一处理完。
上代码:
1 /** 2 * @author fuguangli 3 * @description 前沿消费者类 4 * @Create date: 2017/3/7 5 * @using EXAMPLE 6 */ 7 public class Customer implements Runnable{ 8 9 10 /** 11 * 抛出异常 特殊值 阻塞 超时 12 插入 add(e) offer(e) put(e) offer(e, time, unit) 13 移除 remove() poll() take() poll(time, unit) 14 检查 element() peek() 不可用 不可用 15 16 */ 17 private BlockingQueue blockingQueue; 18 private AtomicInteger count = new AtomicInteger(); 19 public Customer(BlockingQueue blockingQueue) { 20 this.blockingQueue = blockingQueue; 21 } 22 23 /** 24 * When an object implementing interface <code>Runnable</code> is used 25 * to create a thread, starting the thread causes the object's 26 * <code>run</code> method to be called in that separately executing 27 * thread. 28 * <p/> 29 * The general contract of the method <code>run</code> is that it may 30 * take any action whatsoever. 31 * 32 * @see Thread#run() 33 */ 34 @Override 35 public void run() { 36 System.out.println("消费者线程启动..."); 37 LockFlag.setCustomerRunningFlag(true); 38 try { 39 while (LockFlag.getProducerRunningFlag()){ 40 System.out.println(Thread.currentThread().getId()+"I'm Customer.Queue current size="+blockingQueue.size()); 41 String data = (String) blockingQueue.poll(10, TimeUnit.SECONDS); 42 if(data!=null){ 43 System.out.println(Thread.currentThread().getId()+"*************正在消费数据 data="+data); 44 }else{ 45 //表示超过取值时间,视为生产者不再生产数据 46 System.out.println(Thread.currentThread().getId()+"队列为空无数据,请检查生产者是否阻塞"); 47 } 48 Thread.sleep(50); 49 } 50 System.err.println("消费者程序执行完毕"); 51 } catch (InterruptedException e) { 52 e.printStackTrace(); 53 System.err.println("消费者程序退出"); 54 LockFlag.setCustomerRunningFlag(false);//异常退出线程 55 Thread.currentThread().interrupt(); 56 } 57 } 58 }
1 package com.qysxy.framework.queue; 2 3 import java.util.concurrent.BlockingQueue; 4 import java.util.concurrent.TimeUnit; 5 import java.util.concurrent.atomic.AtomicInteger; 6 7 /** 8 * @author fuguangli 9 * @description 队列生产者类 10 * @Create date: 2017/3/7 11 * @using EXAMPLE 12 */ 13 public class Producer implements Runnable{ 14 15 16 /** 17 * 抛出异常 特殊值 阻塞 超时 18 插入 add(e) offer(e) put(e) offer(e, time, unit) 19 移除 remove() poll() take() poll(time, unit) 20 检查 element() peek() 不可用 不可用 21 22 */ 23 private BlockingQueue blockingQueue; 24 private AtomicInteger count = new AtomicInteger(); 25 public Producer(BlockingQueue blockingQueue) { 26 this.blockingQueue = blockingQueue; 27 } 28 29 /** 30 * When an object implementing interface <code>Runnable</code> is used 31 * to create a thread, starting the thread causes the object's 32 * <code>run</code> method to be called in that separately executing 33 * thread. 34 * <p/> 35 * The general contract of the method <code>run</code> is that it may 36 * take any action whatsoever. 37 * 38 * @see Thread#run() 39 */ 40 @Override 41 public void run() { 42 System.out.println("生产者线程启动..."); 43 LockFlag.setProducerRunningFlag(true); 44 try { 45 while (LockFlag.getProducerRunningFlag()){ 46 String data = "data:"+count.incrementAndGet(); 47 if(blockingQueue.offer(data,10, TimeUnit.SECONDS)){ 48 //返回true表示生产数据正确 49 System.out.println("^^^^^^^^^^^^^^正在生产数据 data="+data); 50 }else { 51 //表示阻塞时间内还没有生产者生产数据 52 System.out.println("生产者异常,无法生产数据"); 53 } 54 Thread.sleep(50); 55 56 } 57 } catch (InterruptedException e) { 58 e.printStackTrace(); 59 System.err.println("生产者程序退出"); 60 LockFlag.setProducerRunningFlag(false);//异常退出线程 61 Thread.currentThread().interrupt(); 62 } 63 } 64 }
1 package com.qysxy.framework.queue; 2 3 /** 4 * @author fuguangli 5 * @description 前沿生产者消费者模型的锁类 6 * @Create date: 2017/3/7 7 */ 8 public class LockFlag { 9 /** 10 * 生产者互斥锁 11 */ 12 private static Boolean producerRunningFlag = false; 13 /** 14 * 消费者互斥锁 15 */ 16 private static Boolean customerRunningFlag = false; 17 18 public static Boolean getProducerRunningFlag() { 19 return producerRunningFlag; 20 } 21 22 public static void setProducerRunningFlag(Boolean producerRunningFlag) { 23 LockFlag.producerRunningFlag = producerRunningFlag; 24 } 25 26 public static Boolean getCustomerRunningFlag() { 27 return customerRunningFlag; 28 } 29 30 public static void setCustomerRunningFlag(Boolean customerRunningFlag) { 31 LockFlag.customerRunningFlag = customerRunningFlag; 32 } 33 }
1 package com.qysxy.framework.queue; 2 3 import javax.servlet.http.HttpServletRequest; 4 import javax.servlet.http.HttpServletResponse; 5 import java.util.Queue; 6 import java.util.concurrent.*; 7 8 /** 9 * @author fuguangli 10 * @description 前沿队列实用类,用于大量并发用户 11 * @Create date: 2017/3/7 12 */ 13 public class BlockingQueueHelper { 14 15 16 private static final Integer maxQueueSize = 1000; 17 private static BlockingQueue blockingQueue = new LinkedBlockingQueue(maxQueueSize); 18 private static ExecutorService threadPool = Executors.newCachedThreadPool(); 19 20 21 public static BlockingQueue getBlockingQueue() { 22 if (blockingQueue == null) { 23 blockingQueue = new LinkedBlockingQueue(maxQueueSize); 24 } 25 return blockingQueue; 26 } 27 28 /** 29 * @param o 队列处理对象(包含request,response,data) 30 */ 31 public static void requestQueue(Object o) { 32 //检测当前的队列大小 33 if (blockingQueue != null && blockingQueue.size() < maxQueueSize) { 34 //可以正常进入队列 35 if (blockingQueue.offer(o)) { 36 //添加成功,检测数据处理线程是否正常 37 if (LockFlag.getCustomerRunningFlag()) { 38 //说明处理线程类正常运行 39 } else { 40 //说明处理线程类停止,此时,应重新启动线程进行数据处理 41 LockFlag.setCustomerRunningFlag(true); 42 43 //example:run 44 Customer customer = new Customer(blockingQueue); 45 threadPool.execute(customer); 46 47 } 48 49 } else { 50 //进入队列失败,做出相应的处理,或者尝试重新进入队列 51 52 } 53 } else { 54 //队列不正常,或队列大小已达上限,做出相应处理 55 56 } 57 58 } 59 }
好了,这时候,利用过滤器或者拦截器将每个请求封装成队列元素进行处理就行。
当然了,对于多应用服务器的部署架构来说,数据库也需要加锁,数据库隔离级别下篇再说。