zoukankan      html  css  js  c++  java
  • 请求合并实战版本

    package com.example.demo.concurrent.requestmerge;
    
    import com.example.demo.schedule.entity.ISysJobRepository;
    import com.example.demo.schedule.entity.SysJobPO;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    import javax.annotation.PostConstruct;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.concurrent.*;
    
    
    /**
     * 请求合并:
     * 高并发调用这里的服务,针对查询对请求进行合并,
     */
    @Service(value="JobServiceImpl2")
    public class JobServiceImpl2 implements IJobService {
        class Request{
            Integer jobId;
            CompletableFuture<SysJobPO> result;
        }
        //存放并发线程调用的参数
        private LinkedBlockingQueue<Request> requestQue=new LinkedBlockingQueue<>();
        @Autowired
        private ISysJobRepository jobRepository;
    
        /**
         * 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,并且只会被服务器执行一次
         * 执行顺序在@Autowired之后
         */
        @PostConstruct
        public void init() {
            System.out.println("合并请求");
            ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
            //参数1运行的任务,参数2执行时间0立即执行,参数3间隔时间,参数4时间单位
            //每10毫秒批量查询一次队列
            threadPool.scheduleAtFixedRate(() -> {
                ArrayList<Request> requestList = new ArrayList<Request>();
                List<Integer> jobIdList = new ArrayList<Integer>();
                int size = requestQue.size();
                if(size==0)
                    return;
                System.out.println("批量查询大小"+size);
    
                for (int i = 0; i <size; i++) {
                    Request request = requestQue.poll();
                    requestList.add(request);
                    jobIdList.add(request.jobId);
                }
                //批量查询出所有结果
                List<SysJobPO> jobList = getSysJobListByJobIdList(jobIdList);
                //讲结果转化为map<k,v>  key为jobId value为查询的结果
                HashMap<Integer,SysJobPO> map = new HashMap<>();
                for (SysJobPO job : jobList) {
                    map.put(job.getJobId(), job);
                }
                //讲结果分发给每个线程,即分发给CompletableFuture,根据jobid分发
                for (Request request : requestList) {
                    request.result.complete(map.get(request.jobId));
                }
            }, 0, 10, TimeUnit.MILLISECONDS);
        }
    
        @Override
        public SysJobPO selectByPrimaryKey(Integer jobId) {
            Request request=new Request();
            CompletableFuture<SysJobPO> futureResult = new CompletableFuture<>();
            request.jobId=jobId;
            request.result=futureResult;
            requestQue.add(request);
            SysJobPO job=null;
            try {
                //阻塞至futureResult.complete()方法
                job = futureResult.get();
            } catch (InterruptedException | ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return job;
        }
    
        @Override
        public List<SysJobPO> getSysJobListByJobIdList(List<Integer> jobIds) {
            return jobRepository.findByJobIdIn(jobIds);
        }
    }
    View Code
    package com.example.demo.concurrent.requestmerge;
    
    import com.example.demo.schedule.entity.SysJobPO;
    
    import java.util.List;
    
    public interface IJobService {
        SysJobPO selectByPrimaryKey(Integer jobId);
    
        List<SysJobPO> getSysJobListByJobIdList(List<Integer> jobIds);
    }
    View Code
    package com.example.demo.concurrent;
    
    import com.example.demo.DemoApplication;
    import com.example.demo.concurrent.requestmerge.IJobService;
    import com.example.demo.schedule.entity.SysJobPO;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    import org.springframework.test.context.web.WebAppConfiguration;
    import javax.annotation.Resource;
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    //请求合并测试
    @SpringBootTest(classes= DemoApplication.class)
    @WebAppConfiguration
    @RunWith(SpringRunner.class)
    public class SpringConcurrentTest {
        @Resource(name="JobServiceImpl2")
        private IJobService jobService;
        @Test
        public void test1(){
            SysJobPO job = jobService.selectByPrimaryKey(2);
            System.out.println(job);
        }
    
        @Test
        public void bingfa() throws InterruptedException, IOException {
            CountDownLatch countDownLatch=new CountDownLatch(1000);
            for (int i = 0; i <1000; i++) {
                final int j=i;
                new java.lang.Thread(() -> {
                    try {
                        countDownLatch.await();
                        SysJobPO job = jobService.selectByPrimaryKey(j);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                }
                ).start();
                countDownLatch.countDown();
            }
            System.in.read();
        }
    }
    View Code
  • 相关阅读:
    go语言教程零基础入门到精通
    php探针文件内容
    一篇文章揭穿创业公司的套路
    Google资深工程师深度讲解Go语言面向接口(五)
    完全解析<atlalloc.h>
    巧妙的Section — — 剖析ATL OBJECT_MAP的自动建立
    ATL中的各种CriticalSection
    C++中的INL
    如何剖析一个类
    ATL线程模型解析
  • 原文地址:https://www.cnblogs.com/sunny-miss/p/13057754.html
Copyright © 2011-2022 走看看