1.接收请求的控制类:
package ..CompletableFuture; import java.util.Map; import java.util.UUID; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.springframework.stereotype.Controller; import org.springframework.web.bind.annotation.RequestMapping; @Controller public class CompleController { /** * 批量处理接口 * @param request * @param response * @throws Exception */ @RequestMapping("/comple.do") public void index(HttpServletRequest request,HttpServletResponse response) throws Exception { request.setCharacterEncoding("UTF-8"); response.setContentType("text/piain;charset=UTF-8"); String st_fj_id = "keys"; String serialNo = UUID.randomUUID().toString();//给每个请求加个标识 Map<String ,Object> map = CompletableFutureService.query(st_fj_id,serialNo); String fhserialNo = map.get("serialNo").toString(); if(!serialNo.equals(fhserialNo)){ System.out.println("请求发送和结果返回的标识是否一致---->: "+serialNo+"---"+fhserialNo); } response.getWriter().print(serialNo+"----"+fhserialNo); } }
2.批量请求处理实现类
package ..CompletableFuture; import org.springframework.stereotype.Service; import com.alibaba.fastjson.JSONObject; import javax.annotation.PostConstruct; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.*; @Service public class CompletableFutureService { static class Reques{ String orderCode; String serialNo; CompletableFuture<Map<String ,Object>> future; } static LinkedBlockingDeque<Reques> queue = new LinkedBlockingDeque<>(); public static Map<String ,Object> query(String orderCode,String serialNo) throws ExecutionException, InterruptedException { //设置回调监听 CompletableFuture<Map<String ,Object>> future = new CompletableFuture<>(); Reques request = new Reques(); request.future = future; request.serialNo = serialNo; request.orderCode = orderCode; queue.put(request); //监听结果 return future.get(); } @PostConstruct public void init(){ System.out.println("进入定时任务..."); //创建执行定时任务的线程池 ScheduledExecutorService service = Executors.newScheduledThreadPool(1); service.scheduleAtFixedRate(new Runnable() { @Override public void run() { int size =queue.size(); if(size ==0){ return; } System.out.println("size-------"+size); JSONObject json = new JSONObject(); ArrayList<Reques> req = new ArrayList<>(); for (int i=0;i<size;i++){ try { json.put("serialNo", reques.serialNo); json.put("orderCode",reques.orderCode); req.add(reques); List<Map<String,Object>> res = sendDB(json); for(Reques requ : req){ String serialNo = requ.serialNo; for (Map<String, Object> reque : res) { // System.out.println("判断 serialNo === "+serialNo.equals(reque.get("serialNo").toString())); if(serialNo.equals(reque.get("serialNo").toString())){ requ.future.complete(reque); break;//判断相等就跳出循环,节省性能 } } } } catch (InterruptedException e) { e.printStackTrace(); } } } },0,10,TimeUnit.SECONDS);//10秒查看一下队列 } public static List<Map<String,Object>> sendDB(JSONObject json) { List<Map<String,Object>> list = new ArrayList<>(); Map<String,Object> fhMap = new HashMap<>(); //业务逻辑 fhMap.put("serialNo",json.get("serialNo")); list.add(fhMap); return list; } }
3.测试类
package ..CompletableFuture; import ..utils.requestUtils.HttpApiClient; import java.util.concurrent.CountDownLatch; public class Main { private static final int Thread_num = 2000; private static final CountDownLatch cd = new CountDownLatch(Thread_num); public static void cdSendData() throws InterruptedException { for (int i=0;i<Thread_num;i++){ Thread t = new Thread(()->{ try { cd.countDown(); cd.await(); String url = "http://localhost:8080/comple.do"; String result = HttpApiClient.getData(url); System.out.println("result: "+result); } catch (Exception e) { e.printStackTrace(); } }); t.start(); } Thread.sleep(5000); } public static void main(String[] args) throws InterruptedException { cdSendData(); } }