zoukankan      html  css  js  c++  java
  • 使用数据库乐观锁解决高并发秒杀问题,以及如何模拟高并发的场景,CyclicBarrier和CountDownLatch类的用法

    数据库:mysql

    数据库的乐观锁:一般通过数据表加version来实现,相对于悲观锁的话,更能省数据库性能,废话不多说,直接看代码

    第一步:

    建立数据库表:

     

     CREATE TABLE `skill_activity` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '活动id',
      `name` varchar(20) NOT NULL COMMENT '活动名称',
      `num` bigint(10) NOT NULL COMMENT '活动数量限制',
      `surplus_num` bigint(10) NOT NULL COMMENT '活动剩余数量',
      `person_limit` bigint(10) NOT NULL COMMENT '单人上传限制',
      `version` bigint(10) DEFAULT '0',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
    
    CREATE TABLE `skill_activity_order` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键', 
    `activity_id` bigint(20) NOT NULL COMMENT '活动id',
    `thread_id` bigint(20) NOT NULL COMMENT '线程id',
    `create_at` datetime DEFAULT NULL COMMENT '创建时间',
    `name` varchar(20) NOT NULL COMMENT '活动名称',
    `url` varchar(20) DEFAULT NULL, PRIMARY KEY (`id`),
    KEY `index_thread_id` (`thread_id`) ) ENGINE=InnoDB AUTO_INCREMENT=106338 DEFAULT CHARSET=utf8

    往数据库活动表(skill_activity)插入一条数据

    num:商品数量;surplus_num:商品剩余数量;person_limit:单人上传数量限制;version:版本号,解决高并发问题

    具体的活动秒杀订单表(skill_activity_order):

    activity_id:就是上面活动表的id;thread_id:就是线程id,实际秒杀就是用户id,name和url就是秒杀填写的一些内容,不必关注

    第二步:

    java代码:

    1)2个表对应的mapper.java类

    public interface SkillActivityMapper {
        int deleteByPrimaryKey(Long id);
    
        int insert(SkillActivity record);
    
        int insertSelective(SkillActivity record);
    
        SkillActivity selectByPrimaryKey(Long id);
    
        int updateByPrimaryKeySelective(SkillActivity record);
    
        int updateByPrimaryKey(SkillActivity record);
        
        int updateSkillActivityNum(SkillActivity record);//秒杀修改剩余数量的方法
    }
    public interface SkillActivityOrderMapper {
        int deleteByPrimaryKey(Long id);
    
        int insert(SkillActivityOrder record);
    
        int insertSelective(SkillActivityOrder record);
    
        SkillActivityOrder selectByPrimaryKey(Long id);
        
        List<SkillActivityOrder> selectBythreadId(Long threadId);
        
        int updateByPrimaryKeySelective(SkillActivityOrder record);
    
        int updateByPrimaryKey(SkillActivityOrder record);
    }
    

      

    2)2个类对应的mapper.xml方法就不一一写出来了,看SkillActivityMapper的updateSkillActivityNum这个方法的sql语句:

    <update id="updateSkillActivityNum" parameterType="com.ouer.model.SkillActivity" >
        update skill_activity
        set 
          surplus_num = #{surplusNum,jdbcType=BIGINT},
          version = #{version,jdbcType=BIGINT}+1
        where id = #{id,jdbcType=BIGINT} and version=#{version,jdbcType=BIGINT} and surplus_num>0
      </update>

    还有SkillActivityOrderMapper.selectBythreadId方法

    <select id="selectBythreadId" resultMap="BaseResultMap"  >
        select 
        <include refid="Base_Column_List" />
        from skill_activity_order
        where thread_id = #{threadId,jdbcType=BIGINT}
      </select>

    3)version版本号是关键,update成功会导致version加1,而其他线程如果是原先的version就无法update。

    4)看一下service代码:

    @Override
        public SkillActivityResponse SkillActivity(SkillActivirtReq req) {
            SkillActivityResponse skillActivityResponse=new SkillActivityResponse();
            int failNum=0;
            SkillActivity skillActivity=skillActivityMapper.selectByPrimaryKey(req.getActivityId());
            List<String> urls=req.getUrls();
            if(skillActivity.getSurplusNum()<=0){
                skillActivityResponse.setErrorMsg("活动已经结束");
                skillActivityResponse.setFailNum(urls.size());
                skillActivityResponse.setSucceed(false);
                return skillActivityResponse;
            }else{
                //先查询用户上传了多少张
                int count=skillActivityOrderMapper.selectBythreadId(req.getThreadId()).size();//查询每个用户上传了多少张
                if(count>skillActivity.getPersonLimit()){
                    skillActivityResponse.setErrorMsg("已经超出上传上限,上传失败");
                    skillActivityResponse.setFailNum(urls.size());
                    skillActivityResponse.setSucceed(false);
                    return skillActivityResponse;
                }
                
                int index=(int) (skillActivity.getPersonLimit()-count);//表示还能上传的数量
                if(urls.size()<=index){
                    //都可以上传
                }else{
                    //表示只能上传index张图片
                    urls=urls.subList(0, index);
                }
                
                //上传订单
                for(int i=0;i<urls.size();i++){
                    skillActivity= skillActivityMapper.selectByPrimaryKey(req.getActivityId());
                    skillActivity.setSurplusNum(skillActivity.getSurplusNum()-1);
                    if(skillActivity.getSurplusNum()<0){
                        failNum++;
                        continue;
                    }
                    int result=skillActivityMapper.updateSkillActivityNum(skillActivity);//这个是关键
                    if(result>0){
                        //上传成功
                        SkillActivityOrder activityOrder=new SkillActivityOrder();
                        activityOrder.setActivityId(skillActivity.getId());
                        activityOrder.setCreateAt(new Date());
                        activityOrder.setName(skillActivity.getName());
                        activityOrder.setThreadId(req.getThreadId());
                        activityOrder.setUrl(urls.get(i));
                        skillActivityOrderMapper.insertSelective(activityOrder);
                    }else{
                        //上传失败
                        failNum++;
                    }
                }
                
                skillActivityResponse.setFailNum(failNum);
                skillActivityResponse.setSucceed(true);
                return skillActivityResponse;
            }
            
            
            
        }

    5)使用spring的junit来单元测试

    @RunWith(SpringJUnit4ClassRunner.class)  
    @ContextConfiguration(locations={"classpath:spring/applicationContext.xml"})
    public class SkillActivityServieTest {
        @Autowired
        private SkillActivityService skillActivityService;
        
        class MyRun implements Runnable{
            private CyclicBarrier barrier;
            
            private CountDownLatch countDownLatch;
            
            private Long threadId;
            
            
            
            public MyRun(CyclicBarrier barrier, CountDownLatch countDownLatch,
                    Long threadId) {
                super();
                this.barrier = barrier;
                this.countDownLatch = countDownLatch;
                this.threadId = threadId;
            }
    
    
    
            @Override
            public void run() {
                System.err.println("线程"+threadId+"准备完毕");
                try {
                    barrier.await();
                    SkillActivirtReq req=new SkillActivirtReq();
                    req.setActivityId(1L);
                    req.setThreadId(threadId);
                    req.setUrls(Lists.newArrayList("url1","url2","url3","url4","url5","url6","url7"
                            ,"url7","url8","url9","url10"));
                    SkillActivityResponse skillActivityResponse= skillActivityService.SkillActivity(req);
                    if(skillActivityResponse.isSucceed()){
                        System.err.println("线程:"+threadId+",failNum:"+skillActivityResponse.getFailNum());
                    }else{
                        System.err.println("线程:"+threadId+",errmsg:"+skillActivityResponse.getErrorMsg());
                    }
                } catch (InterruptedException | BrokenBarrierException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }finally{
                    countDownLatch.countDown();
                }
                
            }
            
        }
        
        @Test
        public void test01() throws InterruptedException{
            CyclicBarrier barrier = new CyclicBarrier(20000);// 让20000个线程同时进行操作 调用20000次方法await() 才会让20000个线程同时执行
            CountDownLatch countDownLatch=new CountDownLatch(20000);//统计耗时
            ExecutorService executorService= Executors.newCachedThreadPool();
            long start=System.currentTimeMillis();
            for(int i=1;i<=20000;i++){
                executorService.submit(new MyRun(barrier, countDownLatch, new Long(i+"")));
            }
            executorService.shutdown();
            countDownLatch.await();
            System.err.println("耗时:"+(System.currentTimeMillis()-start)+"ms");
            try {
                Thread.sleep(Integer.MAX_VALUE); //防止线程结束
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    6)运行junit

    会看见20000线程同时准备完毕后才会同时去秒杀商品,这个就是CyclicBarrier的作用

    然后可以看见耗时:160945ms也就是160秒多一点

    7)看看数据库表:

    surplus_num的数量:0,version:2000,在多运行就几次也是这个结果,通过使用数据库的乐观锁来实现高并发下的秒杀

    总结:数据库的乐观锁一般使用version版本号结合业务来实现,CyclicBarrier和CountDownLatch也是高并发下常用的工具类,CyclicBarrier的作用:就是让多个线程同时去操作,

    CountDownLatch一般可以用来统计总耗时,由于作者水平有限,如有不足请见谅.

  • 相关阅读:
    form表单中有bootstrap-switch时怎么提交表单
    Django:drf过滤、搜索、排序功能
    pycharm:使用豆瓣源安装第三方库
    pycharm安装包时:Non-zero exit code (1)
    js: 前端通过ajax方式获取后台数据填充下拉列表
    Django:drf框架中序列化组件—字段验证
    ROS Reality: A Virtual Reality Framework Using Consumer-Grade Hardware for ROS-Enabled Robots David
    Virtual and Augmented Reality for Robotics in Unity environment using Mimic plugin
    Human-machine collaboration in virtual reality for adaptive production engineering
    An open modular framework for efficient and interactive simulation and analysis of realistic human motions for professional applications
  • 原文地址:https://www.cnblogs.com/shangxinfeng/p/8891118.html
Copyright © 2011-2022 走看看