zoukankan      html  css  js  c++  java
  • 分发计数器

    分发计数器依赖于redis,分发时支持指定范围优先分发,支持分发阈值上限设置,范围成员每次被分发计数后,从小到大排列,能保证尽量均匀分发。

    分发执行器业务主类:

    package com.sankuai.grocerywms.logistics.strategy.domain.remeasuretask.basic;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.collections.CollectionUtils;
    
    import java.util.*;
    import java.util.stream.Collectors;
    
    /**
     * Created by zzq on 2021/3/10.
     */
    @Slf4j
    public abstract class AbstractDistributionCounter<Type> {
        /**
         * 锁定member15秒
         */
        public final String lockPrefix = "distributionCounterLockTag";
        public final Integer lockExpireInSeconds = 15;
        public final Integer offset = 0;
        public final Integer count = 10;
    
        /**
         * 获取ZSet的key值
         *
         * @return
         */
        protected String getMembersZSetKey() {
            return "allMember";
        }
    
        /**
         * 分发计数时,ZSet正序或倒序
         *
         * @return
         */
        protected Boolean orderASC() {
            return true;
        }
    
        /**
         * ==========ZSet操作抽象方法==========
         */
        /**
         * 将原生对象,转换为框架内对象
         *
         * @return
         */
        protected abstract MemberInfoTuple convertToMemberInfoTuple(Type t);
    
        protected abstract void zadd(String allMember, double l, Object memberId);
    
        protected abstract void zremrangeByRank(String allMember, long l, long l1);
    
        protected abstract Double zincrby(String key, double count, Object memberId);
    
        protected abstract Boolean setnx(String key, Object value, int expireInSeconds);
    
        protected abstract Boolean delete(String memberId);
    
        protected abstract Boolean exists(String memberId);
    
        protected abstract Set<Type> zrangeByScoreWithScore(String key, double min, double max, int offset, int count);
    
        protected abstract Set<Type> zrevrangeByScoreWithScore(String key, double max, double min, int offset, int count);
    
        protected abstract Double zscore(String key, Object memberId);
    
        /**
         * ==========均匀分发业务操作==========
         */
        /**
         * 初始化ZSet中的members元素
         *
         * @param members
         */
        public void membersRefresh(Collection<String> members) {
            if (CollectionUtils.isEmpty(members)) {
                log.info("params is null");
                return;
            }
            String lockName = lockPrefix.concat(getMembersZSetKey());
            Boolean locked = lock(lockName, 30);
            if (!locked) {
                log.info("membersRefreshing。。。");
                return;
            }
            log.info("membersRefreshing-locked=[{}]", lockName);
            try {
                removeMembersAllRank();
                for (String memberId : members) {
                    addOne(memberId);
                }
            } catch (Exception e) {
                log.error("membersRefreshing invoke fail");
            } finally {
                unLock(lockName);
            }
        }
    
        /**
         * 全范围匹配操作
         * 当不提供范围信息时,则需要(从小到大排列)进行全范围匹配操作
         *
         * @return
         */
        public Boolean fullRangeExecute(DistributionProcessor distributionProcessor) {
            MemberInfoSetTuple memberInfoSetTuple = getBatchMembersInfo(distributionProcessor);
            for (; ; ) {
                if (memberInfoSetTuple.getTimes() > 1) {
                    memberInfoSetTuple = getBatchMembersInfo(memberInfoSetTuple, distributionProcessor);
                }
                List<MemberInfoTuple> membersInfo = memberInfoSetTuple.getMemberInfo();
                if (CollectionUtils.isEmpty(membersInfo)) {
                    return false;
                }
                Boolean ret = syncDistributionToMember(distributionProcessor, membersInfo, true);
                if (ret != null) {
                    return ret;
                }
                memberInfoSetTuple.increaseTimes();
            }
        }
    
        /**
         * 固定范围匹配操作
         *
         * @param distributionProcessor
         * @return
         */
        public Boolean rangeExecute(DistributionProcessor distributionProcessor) {
            List<String> rangeMembers = distributionProcessor.getRangeMembers();
            if (CollectionUtils.isEmpty(rangeMembers)) {
                return false;
            }
            List<MemberInfoTuple> membersInfo = getMembersInfoByIds(rangeMembers);
            if (CollectionUtils.isEmpty(membersInfo)) {
                return false;
            }
            Boolean ret = syncDistributionToMember(distributionProcessor, membersInfo, false);
            if (ret != null) {
                return ret;
            }
            return false;
        }
    
        /**
         * 如果范围匹配操作成功,则结束,否则继续使用全范围匹配
         *
         * @param distributionProcessor
         * @return
         */
        public Boolean execute(DistributionProcessor distributionProcessor) {
            Boolean step1Ret = rangeExecute(distributionProcessor);
            if (!step1Ret) {
                Boolean step2Ret = fullRangeExecute(distributionProcessor);
                return step2Ret;
            }
            return true;
        }
    
        /**
         * 优先获取分配较少异常商品的大仓集合,分批获取
         *
         * @param memberInfoSetTuple
         * @return
         */
        private MemberInfoSetTuple getBatchMembersInfo(MemberInfoSetTuple memberInfoSetTuple, DistributionProcessor distributionProcessor) {
            /**
             * 每次获取10个,持有问题商品最少的仓库id列表
             */
            MemberInfoSetTuple ret;
            Integer offsetTmp;
            if (memberInfoSetTuple == null) {
                /**
                 * 首次查询ZSet时会初始化返回对象默认值
                 */
                ret = new MemberInfoSetTuple();
                offsetTmp = offset;
            } else {
                ret = memberInfoSetTuple;
                offsetTmp = memberInfoSetTuple.getOffset() + count;
            }
            Long maxCount = distributionProcessor.getMaxCount();
            if (maxCount == null) {
                maxCount = 60L;
            }
            Long minCount = distributionProcessor.getMinCount();
            if (minCount == null) {
                minCount = 0L;
            }
            List<MemberInfoTuple> membersTmp = getRangeMembers(minCount, maxCount, offsetTmp, count);
            ret.setCount(count);
            ret.setOffset(offsetTmp);
            ret.setMemberInfo(membersTmp);
            return ret;
        }
    
        private MemberInfoSetTuple getBatchMembersInfo(DistributionProcessor distributionProcessor) {
            return getBatchMembersInfo(null, distributionProcessor);
        }
    
        /**
         * 锁定一个member后,进行该count值同步操作
         *
         * @param distributionProcessor
         * @param membersInfo
         * @return 返回值可以是null ,fullRangeExecute判断如果为null则继续尝试寻找下一批可用的member
         */
        private Boolean syncDistributionToMember(DistributionProcessor distributionProcessor, List<MemberInfoTuple> membersInfo, Boolean fullRangeMembers) {
            for (MemberInfoTuple item : membersInfo) {
                if (!geCount(item, distributionProcessor)) {
                    return false;
                }
                String memberId = item.getMemberId();
                Boolean locked = lock(memberId, lockExpireInSeconds);
                if (!locked) {
                    continue;
                }
                /**
                 * step 1 如果每第一个成员超过了阈值,那么ZSet从小到大排列,则直接返回结束即可,拿到后面的成员count值会更大
                 */
                if (!geCount(item, distributionProcessor)) {
                    return false;
                }
                /**
                 * step 2 如果业务处理成功,直接结束流程,否则继续尝试ZSet中的其它成员项
                 */
                log.info("locked success memberId=[{}],lockExpireInSeconds=[{}]", memberId, lockExpireInSeconds);
                try {
                    if (distributionProcessor.process(memberId, fullRangeMembers)) {
                        increaseOne(memberId);
                        log.info("distributionCounter#process,#increaseOne invoke success | memberId=[{}],currMembers=[{}]", memberId, membersInfo);
                        return true;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                    log.info("distributionCounter#process,#increaseOne invoke fail | memberId=[{}],currMembers=[{}],error=[{}]", memberId, membersInfo, e);
                } finally {
                    unLock(memberId);
                    log.info("unLocked success memberId=[{}]", memberId);
                }
            }
            return null;
        }
    
        /**
         * 如果传入的count数大于阈值则返回false
         *
         * @param item
         * @return
         */
        private Boolean geCount(MemberInfoTuple item, DistributionProcessor distributionProcessor) {
            Long count = item.getCount();
            if (count >= distributionProcessor.getMaxCount()) {
                return false;
            }
            return true;
        }
    
        /**
         * 根据一批成员id,获取member信息列表
         *
         * @param memberIds
         * @return
         */
        private List<MemberInfoTuple> getMembersInfoByIds(List<String> memberIds) {
            Long countMin = 0L;
            List<MemberInfoTuple> ret = new ArrayList<>();
            for (int i = 0; i < memberIds.size(); i++) {
                String memberIdTmp = memberIds.get(i);
                Long countTmp;
                try {
                    countTmp = getCountByMember(memberIdTmp);
                } catch (Exception e) {
                    e.printStackTrace();
                    log.error("memberId=[{}]在ZSet中不存在,已经从rangeMembers中放弃该数据 error=[{}]", memberIdTmp, e);
                    continue;
                }
                MemberInfoTuple mTmp = new MemberInfoTuple();
                mTmp.setMemberId(memberIdTmp);
                mTmp.setCount(countTmp);
                if (countTmp < countMin) {
                    ret.add(0, mTmp);
                } else {
                    ret.add(mTmp);
                }
                countMin = countTmp;
            }
            return ret;
        }
    
        /**
         * 档期数据优先入推荐仓,该列表由算法提供,优先选择一个已经分配商品最少的仓
         *
         * @param memberIds
         * @return
         */
        private MemberInfoTuple getMinCountMember(List<String> memberIds) {
            if (CollectionUtils.isEmpty(memberIds)) {
                return null;
            }
            String memberId = memberIds.get(0);
            Long count = getCountByMember(memberId);
            for (int i = 1; i < memberIds.size(); i++) {
                String memberIdTmp = memberIds.get(i);
                Long countTmp = getCountByMember(memberIdTmp);
                if (countTmp < count) {
                    count = countTmp;
                    memberId = memberIdTmp;
                }
            }
            MemberInfoTuple memberInfoTuple = new MemberInfoTuple();
            memberInfoTuple.setCount(count);
            memberInfoTuple.setMemberId(memberId);
            return memberInfoTuple;
        }
    
        /**
         * ==========基于ZSet操作==========
         */
        /**
         * ZSet中初始化一个仓id,用于初始化仓列表
         *
         * @param memberId
         */
        private void addOne(String memberId) {
            zadd(getMembersZSetKey(), 0L, memberId);
        }
    
        /**
         * 删除ZSet列表所有的成员
         */
        private void removeMembersAllRank() {
            zremrangeByRank(getMembersZSetKey(), 0L, -1L);
        }
    
        /**
         * ZSet中选定的成员,value数值加一操作
         *
         * @param memberId
         * @return
         */
        private Long increaseOne(String memberId) {
            return zincrby(getMembersZSetKey(), 1L, memberId).longValue();
        }
    
        /**
         * 锁定member成员,锁定标识为memberId,value=1
         *
         * @param memberId
         * @return
         */
        private Boolean lock(String memberId, Integer expireInSeconds) {
            return setnx(getMembersZSetKey().concat(memberId), 1, expireInSeconds);
        }
    
        /**
         * 解锁member成员,成员不存在直接返回true
         *
         * @param memberId
         * @return
         */
        private Boolean unLock(String memberId) {
            if (exists(getMembersZSetKey().concat(memberId))) {
                return delete(getMembersZSetKey().concat(memberId));
            }
            return true;
        }
    
        /**
         * 按照ZSet成员的value数值范围,获取成员列表并排序,返回有序的List
         * <p>
         * orderASC  true从小到大排序,false从大到小排序
         *
         * @param minCount
         * @param maxCount
         * @param offset
         * @param count
         * @return
         */
        private List<MemberInfoTuple> getRangeMembers(Long minCount, Long maxCount, Integer offset, Integer count) {
            Comparator<MemberInfoTuple> comparing = Comparator.comparing(MemberInfoTuple::getCount);
            Set<Type> set;
            if (orderASC()) {
                set = zrangeByScoreWithScore(getMembersZSetKey(), minCount, maxCount, offset, count);
            } else {
                comparing = comparing.reversed();
                set = zrevrangeByScoreWithScore(getMembersZSetKey(), minCount, maxCount, offset, count);
            }
            return set.stream().map(item -> convertToMemberInfoTuple(item)).sorted(comparing).collect(Collectors.toList());
        }
    
        /**
         * 在ZSet中,获取一个当前成员的value数值
         *
         * @param memberId
         * @return
         */
        private Long getCountByMember(String memberId) {
            return zscore(getMembersZSetKey(), memberId).longValue();
        }
    }
    View Code

    分发处理器实现接口:

    package com.sankuai.grocerywms.logistics.strategy.domain.remeasuretask.basic;
    
    import java.util.List;
    
    /**
     * Created by zzq on 2021/3/11.
     */
    public interface DistributionProcessor {
        /**
         * 分发规则处理方法
         *
         * @param memberId         成员id
         * @param fullRangeMembers 是否在执行ZSet集合中的全范围匹配
         * @return
         */
        Boolean process(String memberId, Boolean fullRangeMembers);
    
        /**
         * 指定了member集合,相当于给出了ZSet中每个项的Key,可以直接进行项的get操作
         *
         * @return
         */
        List<String> getRangeMembers();
    
        /**
         * 指定member集合最大的value值上限
         *
         * @return
         */
        Long getMaxCount();
    
        /**
         * 指定member集合小的value值下限
         *
         * @return
         */
        Long getMinCount();
    }
    View Code

    分发执行器实现类(示例):

    package com.sankuai.grocerywms.logistics.strategy.domain.remeasuretask.basic;
    
    import com.dianping.squirrel.client.StoreKey;
    import com.dianping.squirrel.client.impl.redis.RedisStoreClient;
    import com.dianping.squirrel.client.impl.redis.StoreTuple;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import java.util.Set;
    
    /**
     * Created by zzq on 2021/3/12.
     */
    @Service
    @Slf4j
    public class POIDistributionCounter extends AbstractDistributionCounter<StoreTuple> {
        public final String memberIdSetKeyPrefix = "strategy_poi_set_";
        @Resource(name = "redisClient")
        private RedisStoreClient redisClient;
    
        /**
         * ==========================ZSet Redis操作============================
         */
    
        private StoreKey getCategory(String key) {
            return new StoreKey(memberIdSetKeyPrefix, key);
        }
    
        @Override
        public void zadd(String key, double count, Object memberId) {
            redisClient.zadd(getCategory(key), count, memberId);
        }
    
        @Override
        public Double zincrby(String key, double count, Object memberId) {
            return redisClient.zincrby(getCategory(key), count, memberId);
        }
    
        @Override
        public Double zscore(String key, Object memberId) {
            return redisClient.zscore(getCategory(key), memberId);
        }
    
        @Override
        public Set<StoreTuple> zrangeByScoreWithScore(String key, double min, double max, int offset, int count) {
            return redisClient.zrangeByScoreWithScore(getCategory(key), min, max, offset, count);
        }
    
        @Override
        public Set<StoreTuple> zrevrangeByScoreWithScore(String key, double min, double max, int offset, int count) {
            return redisClient.zrevrangeBytesByScoreWithScore(getCategory(key), max, min, offset, count);
        }
    
        @Override
        public Boolean setnx(String key, Object value, int expireInSeconds) {
            return redisClient.setnx(getCategory(key), value, expireInSeconds);
        }
    
        @Override
        public Boolean delete(String key) {
            return redisClient.delete(getCategory(key));
        }
    
        @Override
        public Boolean exists(String key) {
            return redisClient.exists(getCategory(key));
        }
    
        @Override
        public void zremrangeByRank(String key, long start, long end) {
            redisClient.zremrangeByRank(getCategory(key), start, end);
        }
    
        public Long zrem(String key, String[] ary) {
            return redisClient.zrem(getCategory(key), ary);
        }
    
        @Override
        protected MemberInfoTuple convertToMemberInfoTuple(StoreTuple storeTuple) {
            MemberInfoTuple memberInfoTuple = new MemberInfoTuple();
            memberInfoTuple.setCount(storeTuple.getScore().longValue());
            memberInfoTuple.setMemberId(storeTuple.getElement());
            return memberInfoTuple;
        }
    }
    View Code

    分发处理器实现类(示例):

    package com.sankuai.grocerywms.logistics.strategy.domain.remeasuretask;
    
    import com.google.common.collect.Lists;
    import com.sankuai.grocerywms.logistics.strategy.common.constant.MccConfigConstants;
    import com.sankuai.grocerywms.logistics.strategy.dal.mapper.RemeasureItemMapper;
    import com.sankuai.grocerywms.logistics.strategy.dal.model.RemeasureItemDO;
    import com.sankuai.grocerywms.logistics.strategy.domain.remeasuretask.basic.DistributionProcessor;
    import com.sankuai.grocerywms.logistics.strategy.domain.remeasuretask.model.RemeasureItemModelBO;
    import com.sankuai.grocerywms.logistics.strategy.gateway.WMSGateWay;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.collections.CollectionUtils;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.stream.Collectors;
    
    /**
     * Created by zzq on 2021/3/14.
     */
    @Slf4j
    public class DistributionRule implements DistributionProcessor {
        private final RemeasureItemModelBO message;
        private final Integer sale;
        private final String skuId;
        private final WMSGateWay wmsGateWay;
        private final POIDistributionService poiDistributionService;
        private final Long poiMaxCount;
        private final RemeasureItemMapper remeasureItemMapper;
    
        public DistributionRule(RemeasureItemModelBO message, WMSGateWay wmsGateWay, POIDistributionService poiDistributionService, RemeasureItemMapper remeasureItemMapper, Long poiMaxCount) {
            this.poiMaxCount = poiMaxCount;
            this.poiDistributionService = poiDistributionService;
            this.message = message;
            sale = message.getIsSale();
            skuId = message.getSkuId();
            this.wmsGateWay = wmsGateWay;
            this.remeasureItemMapper = remeasureItemMapper;
        }
    
        /**
         * 验证库存是否大于0,大于0则下发消息
         * <p>
         * 档期数据如果指定分配范围,则不需要校验库存校验库存
         * <p>
         * ***验证库存接口应该接近实时去调用,不应该全范围调用后缓存,因为可能库存有延迟
         */
        @Override
        public Boolean process(String poiId, Boolean fullRangeMembers) {
            /**
             * (1)非档期sku都要校验库存 并且 (2)档期数据,不在档期仓库分配也需要校验库存
             */
            log.info("range匹配,skuId=[{}],poiId=[{}]", skuId, poiId);
            try {
                Long availableNum = wmsGateWay.getAvailableInventory(poiId, skuId);
                if (availableNum > 0) {
                    log.info("sku=[{}]已经找到有库存的仓库,poiId=[{}],正在分配... inventoryQuery=[{}]", skuId, poiId, availableNum);
                    poiDistributionService.sendMessageToMQ(message, poiId);
                    return true;
                }
                log.info("poiDistributionService#getAvailableInventory没有库存,未发送MQ到库内,skuId=[{}],poiId=[{}],inventoryQuery=[{}]", skuId, poiId, availableNum);
                /**
                 * "档期sku数据但无库存",需要分配poiId,只做mysql更新,不下发MQ
                 */
                if (!fullRangeMembers) {
                    RemeasureItemDO newDO = new RemeasureItemDO();
                    newDO.setDeliveryPoiId(poiId);
                    RemeasureItemDO oldDO = new RemeasureItemDO();
                    String id = message.getBusinessId();
                    oldDO.setBusinessId(id);
                    /**
                     * step 3 修改数据状态(update为send_mq_state=1且delivery_poi_id=分配的id),即使更新失败也没问题,下次任务重新查询出来后执行即可
                     */
                    remeasureItemMapper.updateSelective(newDO, oldDO);
                    log.info("档期sku数据分配成功,mysql-Update sku=[{}],poiId=[{}],msgInfo=[{}]", message.getSkuId(), poiId, message);
                    return true;
                }
            } catch (Exception e) {
                e.printStackTrace();
                log.info("poiDistributionService#getAvailableInventory库存接口调用异常,skuId=[{}],poiId=[{}],error=[{}]", skuId, poiId, e);
            }
            return false;
        }
    
        /**
         * 分发到指定仓库的范围,返回null,从所有仓库开始匹配发送,返回list,则仅仅向所在的list进行商品分发
         *
         * @return
         */
        @Override
        public List<String> getRangeMembers() {
            if (sale == 1) {
                String poiIds = message.getPoiIds();
                String[] poiIdAry = poiIds.split("\,");
                ArrayList<String> priorityPoiIds = Lists.newArrayList(poiIdAry);
                ArrayList<String> distributionPoiIds = MccConfigConstants.DISTRIBUTION_POIIDS;
                if (CollectionUtils.isNotEmpty(distributionPoiIds)) {
                    List<String> overPoiIds = priorityPoiIds.stream().filter(item -> distributionPoiIds.contains(item)).collect(Collectors.toList());
                    return overPoiIds;
                }
                return priorityPoiIds;
            }
            return null;
        }
    
        @Override
        public Long getMaxCount() {
            if (poiMaxCount != null) {
                return poiMaxCount;
            }
            return 60L;
        }
    
        @Override
        public Long getMinCount() {
            return 0L;
        }
    }
    /*
     if (sale != 1 || fullRangeMembers) {
                log.info("fullRange匹配,skuId=[{}],poiId=[{}]", skuId, poiId);
    
                try {
                    Long availableNum = poiDistributionService.getAvailableInventory(poiId, skuId);
                    if (availableNum > 0) {
                        log.info("sku=[{}]已经找到有库存的仓库,poiId=[{}],正在分配... inventoryQuery=[{}]", skuId, poiId, availableNum);
                        poiDistributionService.sendMessageToMQ(message, poiId);
                        return true;
                    }
                    log.info("poiDistributionService#getAvailableInventory没有库存,skuId=[{}],poiId=[{}],inventoryQuery=[{}]", skuId, poiId, availableNum);
                } catch (Exception e) {
                    e.printStackTrace();
                    log.info("poiDistributionService#getAvailableInventory库存接口调用异常,skuId=[{}],poiId=[{}],error=[{}]", skuId, poiId, e);
                }
                return false;
    
            }
                    log.info("range匹配,skuId=[{}],poiId=[{}]", skuId, poiId);
                    poiDistributionService.sendMessageToMQ(message, poiId);
                    return true;
     */
    View Code

    使用代码示例:

     /**
             * step 1 如果sku在异常上报期内,则不插入mysql
             */
            RemeasureItemDO remeasureItemDO = remeasureItemMapper.selectToDayOneBySKU(message.getSkuId());
            if (remeasureItemDO != null) {
                log.info("算法下发重复数据-已经丢弃,MQ-message=[{}]", message);
                return ConsumeStatus.CONSUME_SUCCESS;
            }
            /**
             * step 2 先保存数据库
             */
            try {
                addRemeasureItem(message);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("接收算法MQ消息,解析后插入mysql失败,error msg=[{}]", e);
            }
            /**
             * step 3 将mq消息分配到对应的大仓
             *
             * (1) 档期内则直接获取算法提供的POI列表,进行分发
             * (2) 非档期数据,需要拿到拥有最少量异常商品的大仓,并分配
             */
            message.setVersion(MccConfigConstants.task_version);
            DistributionRule distributionRule = new DistributionRule
                    (message, wmsGateWay, this, remeasureItemMapper, Long.valueOf(MccConfigConstants.POI_LIMIT));
            try {
                distributionExecutor.execute(distributionRule);
            } catch (Exception e) {
                e.printStackTrace();
                return ConsumeStatus.RECONSUME_LATER;
            }
            return ConsumeStatus.CONSUME_SUCCESS;
  • 相关阅读:
    C++基础学习6:内联函数
    lvm
    yum源
    mysql性能优化
    PXE
    dns配置
    进程命令
    ssh免密登陆和加密解密
    RAID阵列
    快速部署postfix邮件服务器
  • 原文地址:https://www.cnblogs.com/zzq-include/p/14593226.html
Copyright © 2011-2022 走看看