zoukankan      html  css  js  c++  java
  • spark之Markov马尔可夫智能邮件预测

    一.来源

      此项目来源《Data Algorithms Recipes for Scaling Up with Hadoop and Spark》第11章,本程序利用spark3.0以及java8进行改写,

    改写的有:

      1.利用spark3.0与java8

      2.直接利用spark生成最终的状态转移矩阵,不用分开统计

      3.利用python加载状态转移矩阵进行预测

    二.目的

      用户的购买行为看起来是没有规律可循的,但其实从时间有序的角度看,也许是有规律可循的,例如,用户可能每一个月发工资时购买得多,每年某个时间(双十一、生日)等购买得比较多马尔科夫模型能够挖掘出时间上的规律,假设我们能够根据用户上一次购买记录推测其下一次购买时间,就可以在推测时间向其发送邮件进行营销至于营销的商品内容,可以根据其他推荐算法的结果。

    三.程序

    完整程序见,Markov马尔可夫智能邮件预测(https://github.com/jiangnanboy/spark_tutorial)

    1.利用spark统计状态转移矩阵

        /**
         * 建立状态转移概率矩阵
         * @param session
         */
        public static void buildStateTransitionMatrix(SparkSession session, Broadcast<Map<String, String>> broadcastStatesMap, Broadcast<List<Tuple2<String, List<Double>>>> broadcastInitStateList) {
            //customerID,transactionID,purchaseDate,amount(顾客ID,交易ID,交易日期,金额)
            String path = PropertiesReader.get("intermediate_smart_email_txt");
            JavaRDD<String> javaRDD = session.read().textFile(path).toJavaRDD().coalesce(10);
    
            //key=customerID,v=(purchaseDate,amount)
            JavaPairRDD<String, Tuple2<Long, Integer>> javaPairRDD = javaRDD.mapToPair(line -> {
                String[] tokens = StringUtils.split(line, ",");
                if(4 != tokens.length) {
                    return null;
                }
                long date = DateUtils.parseDate(tokens[2], "yyyy-MM-dd").getTime();
                int amount = Integer.parseInt(tokens[3]);
                Tuple2<Long, Integer> t2 = new Tuple2<>(date, amount);
                return new Tuple2<>(tokens[0], t2);
            });
    
            //group by customerID
            JavaPairRDD<String, Iterable<Tuple2<Long, Integer>>> customerRDD = javaPairRDD.groupByKey();
    
            //创建状态序列
            JavaPairRDD<String, List<String>> stateSequence = customerRDD.mapValues(dateAndAmount -> {
                List<Tuple2<Long, Integer>> list = toList(dateAndAmount);
                Collections.sort(list, TupleComparatorAscending.INSTANCE);//对list按日期排序
                return toStateSequence(list);
            });
    
            /**
             * customerID, List<State>
             * 所有状态的频率为1 =》((fromState, toState),1)
             *   | S1   S2   S3   ...
             *---+-----------------------
             *S1 | <probability-value>
             *   |
             *S2 |
             *   |
             *S3 |
             *   |
             *...|
             */
            JavaPairRDD<Tuple2<String, String>, Integer> model = stateSequence.flatMapToPair(s -> {
                List<String> states = s._2;
                List<Tuple2<Tuple2<String, String>, Integer>> mapOut = new ArrayList<>();
                if((null == states) || (states.size() < 2)) {
                    return Collections.emptyIterator();
                }
                for(int i = 0; i < (states.size() - 1); i++) {
                    String fromState = states.get(i);
                    String toState = states.get(i+1);
                    Tuple2<String, String> t2 = new Tuple2<>(fromState, toState);
                    mapOut.add(new Tuple2<>(t2, 1));
                }
                return mapOut.iterator();
            });
    
            // 统计所有状态频率:  ((fromState, toState), frequence)
            JavaPairRDD<Tuple2<String, String>, Integer> fromStateToStateFrequence1 = model.reduceByKey((i1, i2) -> i1 + i2);
    
            // ((fromState, toState), frequence) =》 (fromState, (toState, frequence))
            JavaPairRDD<String, Tuple2<String, Integer>> fromStateToStateFrequence2 = fromStateToStateFrequence1.mapToPair(s -> {
                String key = s._1._1;
                Tuple2<String, Integer> value = new Tuple2<>(s._1._2, s._2);
                return new Tuple2<>(key, value);
            });
    
            // group by fromState =》 fromState,List<Tuple2<toState, frequence>> => rowNumber,List<Tuple2<toState, frequence>>
            JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> groupState = fromStateToStateFrequence2.groupByKey().mapToPair(st2 -> {
                String rowNumber = broadcastStatesMap.getValue().get(st2._1);
                return new Tuple2<>(rowNumber, st2._2);
            });
    
            //初始化矩阵状态,value = 1.0 / size
            //List<Tuple2<String, List<Double>>> initStateList = initState(broadcastStatesMap.getValue().size());
            JavaPairRDD<String, List<Double>> initStatePairRDD = JavaSparkContext.fromSparkContext(session.sparkContext()).parallelizePairs(broadcastInitStateList.getValue());
    
            //initStatePairRDD.leftOuterJoin(groupState)
            JavaPairRDD<String, Tuple2<List<Double>, Optional<Iterable<Tuple2<String, Integer>>>>> joinPairRDD = initStatePairRDD.leftOuterJoin(groupState);
    
            //规范化转移矩阵,使行的概率和为“1”
            JavaPairRDD<String, List<Double>> resultJavaPairRDD = joinPairRDD.mapValues(lot2 -> {
                int size = broadcastStatesMap.getValue().size();
                List<Double> listDouble = lot2._1;
                Optional<Iterable<Tuple2<String, Integer>>> option = lot2._2;
                if(option.isPresent()) {
                    Iterable<Tuple2<String, Integer>> toStateFrequence = option.get();
                    Iterator<Tuple2<String, Integer>> iter = toStateFrequence.iterator();
                    List<Tuple2<String, Integer>> iterList = new ArrayList<>();
                    int sum = 0;
                    while(iter.hasNext()) {
                        Tuple2<String, Integer> t2 = iter.next();
                        iterList.add(t2);
                        sum += t2._2;
                    }
                    //加入平滑,防止概率为0
                    if(iterList.size() < size) {
                        sum += size;
                        for(int i = 0; i < listDouble.size(); i ++) {
                            listDouble.set(i, 1.0/sum);
                        }
                    }
    
                    for(int i = 0; i < iterList.size(); i++) {
                        String stateNumber = broadcastStatesMap.getValue().get(iterList.get(i)._1);
                        double numalizeValue = iterList.get(i)._2 / (double)sum;
                        listDouble.set(Integer.parseInt(stateNumber), numalizeValue);
                    }
    
                } else {
                    return listDouble;
                }
                return listDouble;
            });
    
            //1.利用sortByKey对转移状态排序,最终的状态转移概率矩阵
            //List<Tuple2<String, List<Double>>> stateResult = resultJavaPairRDD.sortByKey().collect();
    
            //2.利用takeOrdered对转移状态排序,最终的状态转移概率矩阵
            List<Tuple2<String, List<Double>>> stateResult = resultJavaPairRDD.takeOrdered(broadcastStatesMap.getValue().size(), StateTupleComparatorAscending.INSTANCE);
    
            //打印转移概率矩阵
            for(Tuple2<String, List<Double>> s : stateResult) {
                StringBuilder sb = new StringBuilder();
                sb.append(s._1).append(",");
                for(int i = 0; i < (s._2.size() - 1); i ++) {
                    sb.append(s._2.get(i)).append(" ");
                }
                sb.append(s._2.get(s._2.size() - 1));
                System.out.println(sb.toString());
            }
    
        }

    2.利用python加载状态转移矩阵并进行预测

    import os
    import time
    import datetime
    
    # 根据(spark)Markov.java统计出的马尔可夫模型(model.txt),对validate.txt中的数据进行预测什么时间应该发出营销邮件
    user_action = {}
    model = []
    #9大状态
    states = ["SL", "SE", "SG", "ML", "ME", "MG", "LL", "LE", "LG"]
    
    validate_path = os.path.join(os.getcwd(), "validate.txt")
    model_path = os.path.join(os.getcwd(), "model.txt")
    
    #读取validate data
    with open(validate_path, 'r', encoding='utf-8') as f_read:
        for line in f_read:
            items = line.strip().split(',')
            user_id = items[0]
            if user_id in user_action.keys():
                hist = user_action[user_id]
                lst = [items[2], items[-1]]
                hist.append(lst)
            else:
                hist = []
                hist.append([items[2], items[-1]])
                user_action[user_id] = hist
    print(user_action)
    
    #读取model data
    with open(model_path, 'r', encoding='utf-8') as f_read:
        for line in f_read:
            items = line.strip().split()
            row = []
            for item in items:
                row.append(float(item))
            model.append(row)
    print(model)
    
    #根据最近客户的行为数据(至少两次交易)make prediciton
    for user_id,user_action_list in user_action.items():
        if len(user_action_list) < 2:
            continue
        state_sequence = []
        last_date = ''
        prior = user_action_list[0]
        for i in range(1, len(user_action_list)):
            current = user_action_list[i]
            prior_date = prior[0]
            current_date = current[0]
    
            #相隔天数
            prior_date = time.strptime(prior_date, '%Y-%m-%d')
            current_date = time.strptime(current_date, '%Y-%m-%d')
            prior_date = datetime.datetime(prior_date[0], prior_date[1], prior_date[2])
            current_date = datetime.datetime(current_date[0], current_date[1], current_date[2])
            days_diff = (current_date - prior_date).days
    
            dd = 'L'
            if days_diff < 30:
                dd = 'S'
            elif days_diff < 60:
                dd = 'M'
    
            #相差金额
            prior_amount = int(prior[1])
            current_amount = int(current[1])
    
            ad = 'G'
            if prior_amount < 0.9 * current_amount:
                ad = 'L'
            elif prior_amount < 1.1 * current_amount:
                ad = 'E'
    
            state_sequence.append(dd+ad)
    
            prior = current
            last_date = current_date
    
        if state_sequence:
            #根据最近一个状态发送营销邮件日期
            last_state = state_sequence[-1]
            row_index = states.index(last_state)
            row_value = model[row_index] #转移矩阵中行号为row_index的这一行值
            max_value = max(row_value) #row_value中最大值
            col_index = row_value.index(max_value) #max_value的索引号
            next_state = states[col_index]
    
            if next_state.startswith('S'):
                next_date = last_date + datetime.timedelta(15)
            elif next_state.startswith('E'):
                next_date = last_date + datetime.timedelta(45)
            else:
                next_date = last_date + datetime.timedelta(90)
    
        print('用户:{}, 预测下次邮件发送时间:{}'.format(user_id, next_date))

     3.状态转移矩阵

    4.396976638863118E-6 4.396976638863118E-6 0.8062208425486636 0.15858575643387607 4.396976638863118E-6 4.396976638863118E-6 0.035153828227710626 4.396976638863118E-6 4.396976638863118E-6
    0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111
    0.8043794973142799 3.1671227323401235E-6 3.1671227323401235E-6 0.1594804651869869 3.1671227323401235E-6 3.1671227323401235E-6 0.03611153339414209 3.1671227323401235E-6 3.1671227323401235E-6
    1.225940909648155E-5 1.225940909648155E-5 0.8156797842343999 1.225940909648155E-5 1.225940909648155E-5 0.15299742552408974 0.031212455559642024 1.225940909648155E-5 1.225940909648155E-5
    0.010869565217391304 0.010869565217391304 0.7282608695652174 0.010869565217391304 0.010869565217391304 0.11956521739130435 0.05434782608695652 0.010869565217391304 0.010869565217391304
    4.122521334047904E-5 4.122521334047904E-5 0.8190625386486375 0.14758626375891495 4.122521334047904E-5 4.122521334047904E-5 0.03298017067238323 4.122521334047904E-5 4.122521334047904E-5
    4.703226413319537E-5 4.703226413319537E-5 0.8348697206283511 4.703226413319537E-5 4.703226413319537E-5 0.14081459881478695 4.703226413319537E-5 4.703226413319537E-5 0.023845357915530052
    0.017857142857142856 0.017857142857142856 0.6785714285714286 0.017857142857142856 0.017857142857142856 0.125 0.017857142857142856 0.017857142857142856 0.03571428571428571
    5.083884087442806E-4 5.083884087442806E-4 0.7961362480935434 5.083884087442806E-4 5.083884087442806E-4 0.16573462125063548 0.03304524656837824 5.083884087442806E-4 5.083884087442806E-4

     4.预测结果

    用户:user1, 预测下次邮件发送时间:2020-01-22 00:00:00
    用户:user2, 预测下次邮件发送时间:2020-02-16 00:00:00
    用户:user3, 预测下次邮件发送时间:2020-01-25 00:00:00
    用户:user4, 预测下次邮件发送时间:2020-04-05 00:00:00
    用户:user5, 预测下次邮件发送时间:2020-01-30 00:00:00
    用户:user6, 预测下次邮件发送时间:2020-01-16 00:00:00
  • 相关阅读:
    习题训练五 题解
    习题训练四 题解
    习题训练三 题解
    习题训练二 题解
    牛客小白月赛25 解题+补题报告
    习题训练一 题解
    2020.5.10 个人rating赛 解题+补题报告
    2020.4.19 个人rating赛 解题+补题报告
    2020.4.11 组队rating赛 解题+补题报告
    How to Unlock Nissan Micra Smart Key by VVDI Key Tool Plus?
  • 原文地址:https://www.cnblogs.com/little-horse/p/14018540.html
Copyright © 2011-2022 走看看