zoukankan      html  css  js  c++  java
  • 并发编程-concurrent指南-阻塞队列-延迟队列DelayQueue

    DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。

    Delayed

    一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。

    此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。

    下面例子是订单超时处理的具体代码:

    重点是DelayOrderComponent 和OrderMessage

    import com.concurrent.delayqueue.component.DelayOrderComponent;
    import com.concurrent.delayqueue.model.OrderInfo;
    import com.concurrent.delayqueue.service.OrderService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.util.Date;
    
    @RestController
    @RequestMapping("/order")
    public class OrderController {
        @Autowired
        private OrderService orderService;
    
        //创建订单
        @RequestMapping("insert")
        public void insert() {
            OrderInfo orderInfo = new OrderInfo();
            orderInfo.setCreateTime(new Date());
            orderInfo.setStatus(0);
            orderService.insert(orderInfo);
        }
    
        //取消订单
        @RequestMapping("cancel")
        public void cancel(Long orderId) {
            orderService.cancel(orderId);
        }
    
        //支付订单
        @RequestMapping("paysuccess")
        public void paysuccess(Long orderId) {
            orderService.paysuccess(orderId);
        }
    
        //查看队列中剩余处理数
        @RequestMapping("queuecount")
        public int queuecount() {
            return DelayOrderComponent.getDelayQueueCount();
        }
    }
    @Service
    public class OrderService {
        @Autowired
        private OrderInfoMapper orderInfoMapper;
        @Autowired
        private DelayOrderComponent delayOrderComponent;
    
        /**
         * 插入
         * @param orderInfo
         */
        @Transactional
        public void insert(OrderInfo orderInfo){
            orderInfoMapper.insert(orderInfo);
            //加入到延时队列中,用于超时未支付
            boolean flag = delayOrderComponent.addDelayQueue(new OrderMessage(orderInfo.getOrderId(),orderInfo.getCreateTime().getTime()));
            if(!flag){
                throw new RuntimeException();
            }
        }
    
        /**
         * 取消
         */
        @Transactional
        public void cancel(Long orderId){
            orderInfoMapper.updateByStatus(orderId,0,-1);
            delayOrderComponent.removeDelayQueue(orderId);
        }
    
        /**
         * 用户支付成功
         */
        public void paysuccess(Long orderId){
            orderInfoMapper.updateByStatus(orderId,0,1);
            delayOrderComponent.removeDelayQueue(orderId);
        }
    
    }
    import com.concurrent.delayqueue.mapper.OrderInfoMapper;
    import com.concurrent.delayqueue.message.OrderMessage;
    import com.concurrent.delayqueue.model.OrderInfo;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Lazy;
    import org.springframework.stereotype.Component;
    
    import javax.annotation.PostConstruct;
    import java.util.Iterator;
    import java.util.List;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Executors;
    
    /**
     * 处理订单超时
     */
    @Component
    @Lazy(false)
    public class DelayOrderComponent {
        @Autowired
        private OrderInfoMapper orderInfoMapper;
    
        private static DelayQueue<OrderMessage> delayQueue = new DelayQueue<OrderMessage>();
        public static int getDelayQueueCount(){
            return delayQueue.size();
        }
    
        /**
         * 系统启动时,预先加载的数据@PostConstruct
         */
        @PostConstruct
        public void init(){
            /**初始化时加载数据库中需处理超时的订单**/
            System.out.println("获取数据库中需要处理的超时的订单");
            List<OrderInfo> list = orderInfoMapper.selectByStatus(0);
            for(int i=0;i<list.size();i++){
                OrderInfo orderInfo = list.get(i);
                OrderMessage orderMessage = new OrderMessage(orderInfo.getOrderId(),orderInfo.getCreateTime().getTime());
                this.addDelayQueue(orderMessage);//加入队列
            }
    
            /**
             * 启动线程,取延时消息
             */
            Executors.newSingleThreadExecutor().execute(new Runnable() {
                @Override
                public void run() {
                    while(true){
                        try {
                            OrderMessage orderMessage = delayQueue.take();
                            //处理超时订单
                            orderInfoMapper.updateByStatus(orderMessage.getOrderId(),0,2);//订单状态改成超时订单
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
    
        /**
         * 加入延时队列
         * 用户下单时,调用此方法
         */
        public boolean addDelayQueue(OrderMessage orderMessage){
            return delayQueue.add(orderMessage);
        }
    
        /**
         * 从延时队列中删除
         * 用户主动取消,或者支付成功后,调用此方法
         */
        public boolean removeDelayQueue(Long orderId){
            for (Iterator<OrderMessage> iterator = delayQueue.iterator(); iterator.hasNext();) {
                OrderMessage queue = iterator.next();
                if(orderId.equals(queue.getOrderId())){
                    return delayQueue.remove(queue);
                }
            }
            return false;
        }
    
    }
    public class OrderMessage implements Delayed {
        private final static long DELAY = 15*60*1000L;//默认延迟15分钟
    
        private Long orderId;//订单号
        private Long expireTime;//过期时间
        public OrderMessage(Long orderId,Long createTime){
            this.orderId = orderId;
            this.expireTime = createTime + DELAY;
        }
    
        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(this.expireTime - System.currentTimeMillis() , TimeUnit.MILLISECONDS);
        }
    
        @Override
        public int compareTo(Delayed other) {
            if (other == this){
                return 0;
            }
            if(other instanceof OrderMessage){
                OrderMessage otherRequest = (OrderMessage)other;
                long otherStartTime = otherRequest.expireTime;
                return (int)(this.expireTime - otherStartTime);
            }
            return 0;
        }
    
        public Long getOrderId() {
            return orderId;
        }
    
        public void setOrderId(Long orderId) {
            this.orderId = orderId;
        }
    
        public Long getExpireTime() {
            return expireTime;
        }
    
        public void setExpireTime(Long expireTime) {
            this.expireTime = expireTime;
        }
    }
    import java.util.Date;
    
    public class OrderInfo {
        private Long orderId;//订单状态
        private Date createTime;//创建时间
        private Integer status;//订单状态:0待支付1已支付-1取消2已超时
    
        public Long getOrderId() {
            return orderId;
        }
    
        public void setOrderId(Long orderId) {
            this.orderId = orderId;
        }
    
        public Date getCreateTime() {
            return createTime;
        }
    
        public void setCreateTime(Date createTime) {
            this.createTime = createTime;
        }
    
        public Integer getStatus() {
            return status;
        }
    
        public void setStatus(Integer status) {
            this.status = status;
        }
    }
    import com.concurrent.delayqueue.model.OrderInfo;
    import org.apache.ibatis.annotations.Mapper;
    import org.apache.ibatis.annotations.Param;
    
    import java.util.List;
    
    @Mapper
    public interface OrderInfoMapper {
        int deleteByPrimaryKey(Long orderId);
    
        int insert(OrderInfo record);
    
        int insertSelective(OrderInfo record);
    
        OrderInfo selectByPrimaryKey(Long orderId);
    
        int updateByPrimaryKeySelective(OrderInfo record);
    
        int updateByPrimaryKey(OrderInfo record);
    
        List<OrderInfo> selectByStatus(int status);
        int updateByStatus(@Param("orderId")Long orderId, @Param("oldstatus")Integer oldstatus,@Param("newstatus")Integer newstatus);
    }
    OrderInfoMapper.xml
    
    <?xml version="1.0" encoding="UTF-8" ?>
    <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" "http://mybatis.org/dtd/mybatis-3-mapper.dtd" >
    <mapper namespace="com.concurrent.delayqueue.mapper.OrderInfoMapper" >
      <resultMap id="BaseResultMap" type="com.concurrent.delayqueue.model.OrderInfo" >
        <id column="order_id" property="orderId" jdbcType="BIGINT" />
        <result column="create_time" property="createTime" jdbcType="TIMESTAMP" />
        <result column="status" property="status" jdbcType="INTEGER" />
      </resultMap>
      <sql id="Base_Column_List" >
        order_id, create_time, status
      </sql>
      <select id="selectByPrimaryKey" resultMap="BaseResultMap" parameterType="java.lang.Long" >
        select
        <include refid="Base_Column_List" />
        from t_order
        where order_id = #{orderId,jdbcType=BIGINT}
      </select>
      <delete id="deleteByPrimaryKey" parameterType="java.lang.Long" >
        delete from t_order
        where order_id = #{orderId,jdbcType=BIGINT}
      </delete>
      <insert id="insert" parameterType="com.concurrent.delayqueue.model.OrderInfo"
              useGeneratedKeys="true" keyProperty="orderId">
        insert into t_order (order_id, create_time, status
          )
        values (#{orderId,jdbcType=BIGINT}, #{createTime,jdbcType=TIMESTAMP}, #{status,jdbcType=INTEGER}
          )
      </insert>
      <insert id="insertSelective" parameterType="com.concurrent.delayqueue.model.OrderInfo" >
        insert into t_order
        <trim prefix="(" suffix=")" suffixOverrides="," >
          <if test="orderId != null" >
            order_id,
          </if>
          <if test="createTime != null" >
            create_time,
          </if>
          <if test="status != null" >
            status,
          </if>
        </trim>
        <trim prefix="values (" suffix=")" suffixOverrides="," >
          <if test="orderId != null" >
            #{orderId,jdbcType=BIGINT},
          </if>
          <if test="createTime != null" >
            #{createTime,jdbcType=TIMESTAMP},
          </if>
          <if test="status != null" >
            #{status,jdbcType=INTEGER},
          </if>
        </trim>
      </insert>
      <update id="updateByPrimaryKeySelective" parameterType="com.concurrent.delayqueue.model.OrderInfo" >
        update t_order
        <set >
          <if test="createTime != null" >
            create_time = #{createTime,jdbcType=TIMESTAMP},
          </if>
          <if test="status != null" >
            status = #{status,jdbcType=INTEGER},
          </if>
        </set>
        where order_id = #{orderId,jdbcType=BIGINT}
      </update>
      <update id="updateByPrimaryKey" parameterType="com.concurrent.delayqueue.model.OrderInfo" >
        update t_order
        set create_time = #{createTime,jdbcType=TIMESTAMP},
          status = #{status,jdbcType=INTEGER}
        where order_id = #{orderId,jdbcType=BIGINT}
      </update>
    
    
      <select id="selectByStatus" resultMap="BaseResultMap" parameterType="java.lang.Integer" >
        select
        <include refid="Base_Column_List" />
        from t_order
        where status = #{status,jdbcType=INTEGER}
      </select>
      <update id="updateByStatus">
        update t_order
        set status = #{newstatus,jdbcType=INTEGER}
        where order_id = #{orderId,jdbcType=BIGINT}
        and status = #{oldstatus,jdbcType=INTEGER}
      </update>
    </mapper>
    application.properties
    
    spring.datasource.url = jdbc:mysql://localhost:3306/concurrent?useUnicode=true&characterEncoding=UTF-8&serverTimezone=UTC
    spring.datasource.username = root
    spring.datasource.password =  123456
    
    mybatis.mapper-locations=classpath:/mybatis/*Mapper.xml

    源码地址:https://github.com/qjm201000/concurrent_delayqueue.git

    数据库sql文件:到源码里面查看readme,按照步骤来就行。

  • 相关阅读:
    NOIP2016-2020 复盘
    「笔记」线段树合并/分裂
    「笔记」线性基
    20210628模拟赛解题报告
    「笔记」左偏树
    题解 CF718C Sasha and Array
    一些杂碎的知识点
    20210614 模拟赛
    洛谷 P4249 [WC2007]剪刀石头布
    CF132E Bits of merry old England
  • 原文地址:https://www.cnblogs.com/qjm201000/p/10142004.html
Copyright © 2011-2022 走看看