zoukankan      html  css  js  c++  java
  • redis队列及多线程应用

      由于xxx平台上自己的博客已经很久没更新了,一直以来都是用的印象笔记来做工作中知识的积累存根,不知不觉印象笔记里已经有了四、五百遍文章。为了从新开始能与广大攻城狮共同提高技术能力与水平,随决心另起炉灶在新的博客与大家分享

      经过一段时间项目的沉淀之后,对实际应用中的多线程开发及队列使用产生了深厚的兴趣,也将<<java并发编程实战>>仔细的阅读了两三遍,也看了很多并发编程的实践项目,也有了深刻的理解与在实践中合理应用队列、多线程开发的应用场景

      1、真实应用场景描述:

       由于一段时间以来要针对公司整个电商平台包括官网、移动端所有的交易数据进行统计,统计指标包括:pv、uv、实付金额、转化率、毛利率等等,按照各种不同的维度来统计计算出当前交易系统的各个指标的数据,但要求该项目是独立的,没有任务其它资源的协助及接品提供。经过一番xxxx思考讨论之后。业务上决定用以下解决方案:

        A: 用一个定时服务每隔10秒去别的系统数据库抓取上一次查询时间以来新确认的订单(这种订单表示已经支付完在或者客户已经审核确认了),然后将这些订单的唯一编号放入redis队列。

        B: 由于用到了队列,根据经验自然而然的想到了  启动单独的线程去redis队列中不断获取要统计处理的订单编号,然后将获取到的订单编号放入线程池中进行订单的统计任务处理。

        开发实现:

        FetchConfirmOrdersFromErpJob.java

     1 /**
     2      * 1、从redis中获取上次查询的时间戳
     3      * 2、将当前时间戳放入到redis中,以便 下次按这个时间查询
     4      * 3、去erp订单表查询confirm_time>=上次查询的时间的订单,放入队列中
     5      */
     6     @Scheduled(cron = "0/30 * * * * ?")
     7     public void start(){
     8         logger.info("FetchConfirmOrdersFromErpJob start................."+ new Date());
     9         StopWatch watch=new StopWatch();
    10         watch.start();
    11         //上次查询的时间
    12         String preQueryTimeStr=this.readRedisService.get(Constans.CACHE_PREQUERYORDERTIME);
    13         
    14         Date now=new Date();
    15         if(StringUtils.isBlank(preQueryTimeStr)){
    16             preQueryTimeStr=DateFormatUtils.format(DateUtils.addHours(now, -1), Constans.DATEFORMAT_PATTERN_YYYYMMDDHHMMSS);//第一次查询之前一个小时的订单
    17 //            preQueryTimeStr="2015-05-07 10:00:00";//本地测试的时候使用
    18         }
    19         //设置当前时间为上次查询的时间
    20         this.writeRedisService.set(Constans.CACHE_PREQUERYORDERTIME, DateFormatUtils.format(now, Constans.DATEFORMAT_PATTERN_YYYYMMDDHHMMSS));
    21         
    22         List<Map<String, Object>> confirmOrderIds = this.erpOrderService.selectOrderIdbyConfirmtime(preQueryTimeStr);
    23         if(confirmOrderIds==null){
    24             logger.info("query confirmOrderIds is null,without order data need dealth..........");
    25             return;
    26         }
    27         for (Map<String, Object> map : confirmOrderIds) {
             //将订单编号放入队列中
    28 this.writeRedisService.lpush(Constans.CACHE_ORDERIDS, map.get("channel_orderid").toString()); 29 logger.info("=======lpush orderid:"+map.get("channel_orderid").toString()); 30 } 31 32 watch.stop(); 33 logger.info("FetchConfirmOrdersFromErpJob end................."+ new Date()+" total cost time:"+watch.getTime()+" dealth data count:"+confirmOrderIds.size()); 34 }

        OrderCalculate.java    队列获取订单线程

     1 public class OrderCalculate {
     2 
     3     private static final Log logger = LogFactory.getLog(OrderCalculate.class);
     4     
     5     @Autowired
     6     private static WriteRedisService writeRedisService;
     7     
     8     private static ExecutorService threadPool=Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()*4
     9             ,new TjThreadFactory("CalculateAmount"));
    10     static{
    11         Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
    12             @Override
    13             public void run() {
    14                 QueuePop.stop();
    15                 threadPool.shutdown();
    16             }
    17         }));
    18     }
    19     
    20     public void init(){
    21         if(writeRedisService==null){
    22             writeRedisService=SpringContext.getBean(WriteRedisService.class);
    23         }
    24         new Thread(new QueuePop(),"OrderIdQueuePop").start();//由于是用redis做的队列,所以只要使用一个线程从队列里拿就ok
    25     }
    26     
    27     static class QueuePop implements Runnable{
    28 
    29         volatile static boolean stop=false;
    30         
    31         @Override
    32         public void run() {
    33             while(!stop){
    34                 //不断循环从队列里取出订单id
    35                 String orderId=null;
    36                 try {
    37                     orderId = writeRedisService.rpop(Constans.CACHE_ORDERIDS);
    38                     if(orderId!=null){
    39                         logger.info("pop orderId:"+orderId);
                    //将获取的订单编号交给订单统计任务处理线程处理
    40 threadPool.submit(new CalculateAmount(Integer.parseInt(orderId),new Date())); 41 } 42 } catch (Exception e1) { 43 logger.error("",e1); 44 } 45 //根据上线后的业务反馈来确定是否改成wait/notify策略来及时处理确认的订单 46 try { 47 Thread.sleep(10); 48 } catch (InterruptedException e) { 49 logger.error("",e); 50 // Thread.currentThread().interrupt(); 51 //stop=true;//线程被打算继续执行,不应该被关闭,保证该线程永远不会死掉 52 } 53 } 54 } 55 56 public static void stop(){ 57 stop=true; 58 } 59 60 } 61 62 }

          CalculateAmoiunt.java   订单任务处理

      1 public class CalculateAmount implements Runnable {
      2     private static final Log logger = LogFactory.getLog(CalculateAmount.class);
      3     private int orderId;
      4     private Date now;//确认时间  这个时间有一定的延迟,基本可以忽略,如果没什么用
      5     private OrderService orderServices;
      6     private OrdHaveProductService ordHaveProductService;
      7     private OrdPayByCashbackService ordPayByCashbackService;
      8     private OrdPayByCouponService ordPayByCouponService;
      9     private OrdPayByGiftCardService ordPayByGiftCardService;
     10     private StatisticsService statisticsService;
     11     private WriteRedisService writeRedisService;
     12     private ReadRedisService readRedisService;
     13     private ErpOrderGoodsService erpOrderGoodsService;
     14     private ErpOrderService erpOrderService;
     15     
     16     
     17     public CalculateAmount(int orderId,Date now) {
     18         super();
     19         this.orderId = orderId;
     20         this.now=now;
     21         orderServices=SpringContext.getBean(OrderService.class);
     22         ordHaveProductService=SpringContext.getBean(OrdHaveProductService.class);
     23         ordPayByCashbackService=SpringContext.getBean(OrdPayByCashbackService.class);
     24         ordPayByCouponService=SpringContext.getBean(OrdPayByCouponService.class);
     25         ordPayByGiftCardService=SpringContext.getBean(OrdPayByGiftCardService.class);
     26         statisticsService=SpringContext.getBean(StatisticsService.class);
     27         writeRedisService=SpringContext.getBean(WriteRedisService.class);
     28         readRedisService=SpringContext.getBean(ReadRedisService.class);
     29         erpOrderGoodsService=SpringContext.getBean(ErpOrderGoodsService.class);
     30         erpOrderService=SpringContext.getBean(ErpOrderService.class);
     31     }
     32 
     33     @Override
     34     public void run() {
     35         logger.info("CalculateAmount task run start........orderId:"+orderId);
     36         StopWatch watch=new StopWatch();
     37         watch.start();
     38         /**
     39          * 取出订单相关的所有数据同步到统计的库中
     40          */
     41         //TODO  考虑要不要将下面所有操作放到一个事务里面
     42         List<Map<String, Object>> orders = this.orderServices.selectOrderById(orderId);
     43         if(orders!=null&&orders.size()>0){
     44             Map<String, Object> order = orders.get(0);
     45             
     46             String orderSN=U.nvl(order.get("OrderSN"));//订单编号
     47             Integer userId=U.nvlInt(order.get("usr_UserID"),null);//用户d
     48             Integer status=U.nvlInt(order.get("Status"),null);//状态
     49             Date createTime=now;//(Date)order.get("CreateTime");//创建时间
     50             Date modifyTime=now;//(Date)order.get("ModifyTime");// 更新时间
     51             BigDecimal discountPrice=U.nvlDecimal(order.get("DiscountPrice"),null);//优惠总额  满减金额
     52             BigDecimal payPrice=U.nvlDecimal(order.get("PayPrice"), null);//实付金额
     53             BigDecimal totalPrice=U.nvlDecimal(order.get("TotalPrice"), null);//总金额
     54             
     55             //从erp里查询出订单的确认时间
     56             int dbConfirmTime=0;
     57             try {
     58                 dbConfirmTime = this.erpOrderService.selectConfirmTimeByOrderId(orderId);
     59             } catch (Exception e2) {
     60                 logger.error("",e2);
     61             }
     62             Date ct=new Date(dbConfirmTime*1000L);
     63             
     64             int[] dates=U.getYearMonthDayHour(ct);//
     65             if(modifyTime!=null){
     66                 dates=U.getYearMonthDayHour(modifyTime);//
     67             }
     68             int year=dates[0];//
     69             int month=dates[1];//
     70             int day=dates[2];//
     71             int hour=dates[3];//小时
     72             
     73             String ordersId=orderId+"";//生成订单id
     74             
     75             //查询订单的来源和搜索引擎关键字
     76             String source="";
     77             String seKeyWords="";
     78             List<OrdersData> orderDataList=this.statisticsService.selectOrdersDataByOrdersId(orderSN);
     79             if(orderDataList!=null&&!orderDataList.isEmpty()){
     80                 OrdersData ordersData = orderDataList.get(0);
     81                 source=ordersData.getSource();
     82                 seKeyWords=ordersData.getSeKeyWords();
     83             }
     84             
     85             //TODO 将订单入库
     86             ArrayList<RelOrders> relOrdersList = Lists.newArrayList();
     87             RelOrders relOrders=new RelOrders(orderSN,userId+"",Byte.valueOf(status+""),source,seKeyWords,IsCal.未计算.getFlag(),(byte)U.getSimpleYearByYear(year),(byte)month,(byte)day,(byte)hour,ct,createTime,modifyTime);
     88             relOrdersList.add(relOrders);
     89             
     90             try {
     91                 relOrders.setConfirmTime(ct);
     92                 //查询RelOrders是否存在
     93                 RelOrders dbOrders=this.statisticsService.selectByPrimaryKey(orderSN);
     94                 if(dbOrders!=null){
     95                     //更新
     96                     dbOrders.setStatus(Byte.valueOf(status+""));
     97                     dbOrders.setConfirmTime(ct);
     98                     dbOrders.setModifyTime(modifyTime);
     99                     this.statisticsService.updateByPrimaryKeySelective(dbOrders);
    100                     return;
    101                 }else{
    102                     Integer relResult=this.statisticsService.insertRelOrdersBatch(relOrdersList);
    103                 }
    104             } catch (Exception e) {
    105                 logger.error("insertRelOrdersBatch error",e);
    106             }
    107             /**
    108              * 查这个订单的返现、优惠券、礼品卡  的金额
    109              */
    110             List<Map<String, Object>> cashs = this.ordPayByCashbackService.selectDecutionPriceByOrderId(orderId);
    111             List<Map<String, Object>> coupons = this.ordPayByCouponService.selectDecutionPriceByOrderId(orderId);
    112             
    113             BigDecimal cashAmount=U.getValueByKey(cashs, "DeductionPrice", BigDecimal.class, BigDecimal.ZERO);//返现金额
    114             BigDecimal couponAmont=U.getValueByKey(coupons, "DeductionPrice", BigDecimal.class, BigDecimal.ZERO);//红包金额
    115             /**
    116              * 查询出这个订单的所有商品
    117              */
    118             List<Map<String, Object>> products=null;
    119             Map<String,Object> productToKeyWordMap=Maps.newHashMap();
    120             try {
    121                 products = this.ordHaveProductService.selectByOrderId(orderId);
    122                 List<OrdersItemData> ordersItemDataList=this.statisticsService.selectOrdersItemDataByOrdersId(orderSN);
    123                 if(ordersItemDataList!=null){
    124                     for (OrdersItemData ordersItemData : ordersItemDataList) {
    125                         productToKeyWordMap.put(ordersItemData.getItemId(), ordersItemData.getKeyWords());
    126                     }
    127                 }
    128             } catch (Exception e1) {
    129                 logger.error("",e1);
    130             }
    131             if(products!=null){
    132                 ArrayList<RelOrdersItem> relOrdersItemList = Lists.newArrayList();
    133                 for (Map<String, Object> product : products) {
    134                     Integer productId=U.nvlInt(product.get("pro_ProductID"), null);//商品Id
    135                     Integer buyNo=U.nvlInt(product.get("BuyNo"), 0);//购买数量
    136                     String SN=U.nvl(product.get("SN"),"");
    137                     BigDecimal buyPrice=U.nvlDecimal(product.get("BuyPrice"), BigDecimal.ZERO);//购买价格
    138                     BigDecimal buyTotalPrice=U.nvlDecimal(product.get("BuyTotalPrice"), null);//购买总价格
    139                     BigDecimal productPayPrice=U.nvlDecimal(product.get("PayPrice"), null);//单品实付金额
    140                     
    141                     BigDecimal cost=null;//商品成本  TODO 调别人的接口
    142                     BigDecimal realtimeAmount=null;//实付金额
    143                     
    144                     BigDecimal pdCashAmount=BigDecimal.ZERO;//每个商品的返现
    145                     BigDecimal pdcouponAmont=BigDecimal.ZERO;//每个商品的优惠券
    146                     
    147                     //商品价格所占订单比例
    148                     if(buyTotalPrice!=null&&totalPrice!=null&&totalPrice.doubleValue()!=0){
    149                         pdCashAmount=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(cashAmount).setScale(2,BigDecimal.ROUND_HALF_UP);
    150                         pdcouponAmont=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(couponAmont).setScale(2,BigDecimal.ROUND_HALF_UP);
    151                         discountPrice=buyTotalPrice.divide(totalPrice,8,BigDecimal.ROUND_HALF_UP).multiply(discountPrice).setScale(2,BigDecimal.ROUND_HALF_UP);
    152                     }
    153                     
    154                     realtimeAmount=buyTotalPrice.subtract((pdCashAmount.add(pdcouponAmont).add(discountPrice))).setScale(2,BigDecimal.ROUND_HALF_UP);
    155                     
    156                     RelOrdersItem item=new RelOrdersItem(U.randomUUID(),orderSN,productId,SN,buyNo,realtimeAmount,U.nvl(productToKeyWordMap.get(productId)));
    157                     
    158                     relOrdersItemList.add(item);
    159                     
    160                     //如果确认时间属于同一天的话,将商品实付金额放入到redis排行榜中
    161                     if((status==1||status==5||status==6||status==7||status==11)&&DateUtils.isSameDay(new Date(), ct)){
    162                         //如果订单的状态是这几种,刚将该商品加入到实付金额的排行 榜中
    163                         dates=U.getYearMonthDayHour(ct);//
    164                         int days=dates[2];
    165                         //某一个商品某一天的实付金额
    166                         BigDecimal itemRelAmount=BigDecimal.ZERO;
    167                         //从redis里取出这个商品的实付金额,然后累加
    168                         String itemRelAmountStr=readRedisService.get(Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);
    169                         if(StringUtils.isNotBlank(itemRelAmountStr)){
    170                             itemRelAmount=new BigDecimal(itemRelAmountStr);
    171                         }
    172                         realtimeAmount=itemRelAmount.add(realtimeAmount);
    173                         writeRedisService.set(Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days, realtimeAmount.toPlainString());
    174                         writeRedisService.lpush(Constans.CACHE_DELKEYS_KEY_PRDFIX+days, Constans.CACHE_PERITEMRELAMOUNTSS_KEY_PREFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);
    175                         writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTSS_KEY+days, realtimeAmount.doubleValue(), productId+"");
    176                         //确认的销量
    177                         Long itemCount= writeRedisService.incrBy(Constans.CACHE_ITEMSALES_KEY_PRDFIX+productId+Constans.CACHE_KEY_SEPARATOR+days,buyNo);
    178                         writeRedisService.zadd(Constans.CACHE_ITEMSALES_SS_KEY_PRDFIX+days, itemCount, productId+"");
    179                         
    180                         String itemType="";
    181                         Map<String, String> pMap = this.readRedisService.hmget(Constans.CACHE_PRODUCT_KEY+productId);
    182                         itemType=pMap.get("categoryId");
    183                         if(StringUtils.isNotBlank(itemType)){
    184                             if(ProductCategory.isGuanBai(itemType)){
    185                                 //如果是白酒  官白的访客数排行 
    186                                 this.writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTWHITESS_KEY+days,  realtimeAmount.doubleValue(), productId+"");//
    187                                 //确认的销量排行
    188                                  this.writeRedisService.zadd(Constans.CACHE_ITEMSALESWHITE_SS_KEY_PRDFIX+days, itemCount, productId+"");//
    189                             }else if(ProductCategory.isGuanHong(itemType)){
    190                                 //官红的访客数排行 
    191                                 this.writeRedisService.zadd(Constans.CACHE_ITEMREALAMOUNTREDSS_KEY+days,  realtimeAmount.doubleValue(), productId+"");//
    192                                 //确认的销量排行
    193                                  this.writeRedisService.zadd(Constans.CACHE_ITEMSALESRED_SS_KEY_PRDFIX+days, itemCount, productId+"");//
    194                             }
    195                         }
    196                         
    197                         //某一个商品的销量加入删除列表
    198                         writeRedisService.lpush(Constans.CACHE_DELKEYS_KEY_PRDFIX+days, Constans.CACHE_ITEMSALES_KEY_PRDFIX+productId+Constans.CACHE_KEY_SEPARATOR+days);
    199                     }
    200                 }
    201                 try {
    202                     //TODO 将订单商品明细入库
    203                     this.statisticsService.insertRelOrdersItemBatch(relOrdersItemList);
    204                     //再将订单的状态改为已计算
    205                     this.statisticsService.updateIsCal(orderSN,IsCal.已计算.getFlag());//将是否计算改成已计算
    206                     //该订单的所有商品的成本同步到现在的库中。
    207                     this.calOrderProductCostSync(orderId,orderSN,products);
    208                 } catch (Exception e) {
    209                     logger.error("insertRelOrdersItemBatch or updateIsCal error",e);
    210                 }
    211             }
    212         }
    213         watch.stop();
    214         logger.info("CalculateAmount task run end........total cost time:"+watch.getTime()+"   orderId:"+orderId);
    215     }
    216   
    217     private void calOrderProductCostSync(int orderId,String orderSN,List<Map<String, Object>> products){
    218         List<Map<String, Object>> ordersList = this.erpOrderGoodsService.selectProductCostByOrderSN(orderSN);
    219         if(ordersList==null||ordersList.isEmpty()){
    220             logger.error("according orderId to query some data from erp return is null.........");
    221             return;
    222         }
    223         Map<String, String> itemIdToItemSnMap = U.convertToMapByList(products, "pro_ProductID", "SN");
    224         
    225         List<RelItemCosts> list=Lists.newArrayList();
    226         for (Map<String, Object> map : ordersList) {
    227             RelItemCosts itemCost=new RelItemCosts();
    228             if(map==null){
    229                 continue;
    230             }
    231             Integer itemId=U.nvlInt(map.get("goods_id"),-99);
    232             BigDecimal costs=U.nvlDecimal(map.get("Dynamic_price"), BigDecimal.ZERO);
    233             itemCost.setId(U.randomUUID());
    234             itemCost.setOrdersId(orderId+"");
    235             itemCost.setOrdersNo(orderSN);
    236             itemCost.setItemId(itemId);
    237             itemCost.setItemNo(itemIdToItemSnMap.get(itemId+""));
    238             itemCost.setCosts(costs);
    239             itemCost.setCreateTime(new Date());
    240             itemCost.setModifyTime(new Date());
    241             list.add(itemCost);
    242         }
    243         
    244         this.statisticsService.insertRelItemCostsBatch(list);
    245         
    246     }
    247     
    248 }

      注意:

        1、redis2.6版本使用lpush、rpop出列的时候会丢失数据。换成2.8及以上的版本运行正常。

        2、由于应用会部署到多个结点,所以无法直接采用java的BlockingQueue阻塞队列,帮采用redis提供的队列支持。

        3、如果要做到统计的绝对实时,最好采用大数据的实时计算的解决方案:kafka+storm 来实现

      以上为队列结合线程的实践案例,供大家一起探讨。

        转载请注明出处 ,请大家尊重作者的劳动成果。

  • 相关阅读:
    day17---无参装饰器
    day17---作业
    python面向过程的编程思想
    算法之二分法
    python递归函数
    pyth作业3/25
    三元表达式、生成式、生成器表达式
    python 3/24作业
    python生成器
    python 迭代器
  • 原文地址:https://www.cnblogs.com/jhoney/p/4492324.html
Copyright © 2011-2022 走看看