一.来源
此项目来源《Data Algorithms Recipes for Scaling Up with Hadoop and Spark》第11章,本程序利用spark3.0以及java8进行改写,
改写的有:
1.利用spark3.0与java8
2.直接利用spark生成最终的状态转移矩阵,不用分开统计
3.利用python加载状态转移矩阵进行预测
二.目的
用户的购买行为看起来是没有规律可循的,但其实从时间有序的角度看,也许是有规律可循的,例如,用户可能每一个月发工资时购买得多,每年某个时间(双十一、生日)等购买得比较多马尔科夫模型能够挖掘出时间上的规律,假设我们能够根据用户上一次购买记录推测其下一次购买时间,就可以在推测时间向其发送邮件进行营销至于营销的商品内容,可以根据其他推荐算法的结果。
三.程序
完整程序见,Markov马尔可夫智能邮件预测(https://github.com/jiangnanboy/spark_tutorial)
1.利用spark统计状态转移矩阵
/** * 建立状态转移概率矩阵 * @param session */ public static void buildStateTransitionMatrix(SparkSession session, Broadcast<Map<String, String>> broadcastStatesMap, Broadcast<List<Tuple2<String, List<Double>>>> broadcastInitStateList) { //customerID,transactionID,purchaseDate,amount(顾客ID,交易ID,交易日期,金额) String path = PropertiesReader.get("intermediate_smart_email_txt"); JavaRDD<String> javaRDD = session.read().textFile(path).toJavaRDD().coalesce(10); //key=customerID,v=(purchaseDate,amount) JavaPairRDD<String, Tuple2<Long, Integer>> javaPairRDD = javaRDD.mapToPair(line -> { String[] tokens = StringUtils.split(line, ","); if(4 != tokens.length) { return null; } long date = DateUtils.parseDate(tokens[2], "yyyy-MM-dd").getTime(); int amount = Integer.parseInt(tokens[3]); Tuple2<Long, Integer> t2 = new Tuple2<>(date, amount); return new Tuple2<>(tokens[0], t2); }); //group by customerID JavaPairRDD<String, Iterable<Tuple2<Long, Integer>>> customerRDD = javaPairRDD.groupByKey(); //创建状态序列 JavaPairRDD<String, List<String>> stateSequence = customerRDD.mapValues(dateAndAmount -> { List<Tuple2<Long, Integer>> list = toList(dateAndAmount); Collections.sort(list, TupleComparatorAscending.INSTANCE);//对list按日期排序 return toStateSequence(list); }); /** * customerID, List<State> * 所有状态的频率为1 =》((fromState, toState),1) * | S1 S2 S3 ... *---+----------------------- *S1 | <probability-value> * | *S2 | * | *S3 | * | *...| */ JavaPairRDD<Tuple2<String, String>, Integer> model = stateSequence.flatMapToPair(s -> { List<String> states = s._2; List<Tuple2<Tuple2<String, String>, Integer>> mapOut = new ArrayList<>(); if((null == states) || (states.size() < 2)) { return Collections.emptyIterator(); } for(int i = 0; i < (states.size() - 1); i++) { String fromState = states.get(i); String toState = states.get(i+1); Tuple2<String, String> t2 = new Tuple2<>(fromState, toState); mapOut.add(new Tuple2<>(t2, 1)); } return mapOut.iterator(); }); // 统计所有状态频率: ((fromState, toState), frequence) JavaPairRDD<Tuple2<String, String>, Integer> fromStateToStateFrequence1 = model.reduceByKey((i1, i2) -> i1 + i2); // ((fromState, toState), frequence) =》 (fromState, (toState, frequence)) JavaPairRDD<String, Tuple2<String, Integer>> fromStateToStateFrequence2 = fromStateToStateFrequence1.mapToPair(s -> { String key = s._1._1; Tuple2<String, Integer> value = new Tuple2<>(s._1._2, s._2); return new Tuple2<>(key, value); }); // group by fromState =》 fromState,List<Tuple2<toState, frequence>> => rowNumber,List<Tuple2<toState, frequence>> JavaPairRDD<String, Iterable<Tuple2<String, Integer>>> groupState = fromStateToStateFrequence2.groupByKey().mapToPair(st2 -> { String rowNumber = broadcastStatesMap.getValue().get(st2._1); return new Tuple2<>(rowNumber, st2._2); }); //初始化矩阵状态,value = 1.0 / size //List<Tuple2<String, List<Double>>> initStateList = initState(broadcastStatesMap.getValue().size()); JavaPairRDD<String, List<Double>> initStatePairRDD = JavaSparkContext.fromSparkContext(session.sparkContext()).parallelizePairs(broadcastInitStateList.getValue()); //initStatePairRDD.leftOuterJoin(groupState) JavaPairRDD<String, Tuple2<List<Double>, Optional<Iterable<Tuple2<String, Integer>>>>> joinPairRDD = initStatePairRDD.leftOuterJoin(groupState); //规范化转移矩阵,使行的概率和为“1” JavaPairRDD<String, List<Double>> resultJavaPairRDD = joinPairRDD.mapValues(lot2 -> { int size = broadcastStatesMap.getValue().size(); List<Double> listDouble = lot2._1; Optional<Iterable<Tuple2<String, Integer>>> option = lot2._2; if(option.isPresent()) { Iterable<Tuple2<String, Integer>> toStateFrequence = option.get(); Iterator<Tuple2<String, Integer>> iter = toStateFrequence.iterator(); List<Tuple2<String, Integer>> iterList = new ArrayList<>(); int sum = 0; while(iter.hasNext()) { Tuple2<String, Integer> t2 = iter.next(); iterList.add(t2); sum += t2._2; } //加入平滑,防止概率为0 if(iterList.size() < size) { sum += size; for(int i = 0; i < listDouble.size(); i ++) { listDouble.set(i, 1.0/sum); } } for(int i = 0; i < iterList.size(); i++) { String stateNumber = broadcastStatesMap.getValue().get(iterList.get(i)._1); double numalizeValue = iterList.get(i)._2 / (double)sum; listDouble.set(Integer.parseInt(stateNumber), numalizeValue); } } else { return listDouble; } return listDouble; }); //1.利用sortByKey对转移状态排序,最终的状态转移概率矩阵 //List<Tuple2<String, List<Double>>> stateResult = resultJavaPairRDD.sortByKey().collect(); //2.利用takeOrdered对转移状态排序,最终的状态转移概率矩阵 List<Tuple2<String, List<Double>>> stateResult = resultJavaPairRDD.takeOrdered(broadcastStatesMap.getValue().size(), StateTupleComparatorAscending.INSTANCE); //打印转移概率矩阵 for(Tuple2<String, List<Double>> s : stateResult) { StringBuilder sb = new StringBuilder(); sb.append(s._1).append(","); for(int i = 0; i < (s._2.size() - 1); i ++) { sb.append(s._2.get(i)).append(" "); } sb.append(s._2.get(s._2.size() - 1)); System.out.println(sb.toString()); } }
2.利用python加载状态转移矩阵并进行预测
import os import time import datetime # 根据(spark)Markov.java统计出的马尔可夫模型(model.txt),对validate.txt中的数据进行预测什么时间应该发出营销邮件 user_action = {} model = [] #9大状态 states = ["SL", "SE", "SG", "ML", "ME", "MG", "LL", "LE", "LG"] validate_path = os.path.join(os.getcwd(), "validate.txt") model_path = os.path.join(os.getcwd(), "model.txt") #读取validate data with open(validate_path, 'r', encoding='utf-8') as f_read: for line in f_read: items = line.strip().split(',') user_id = items[0] if user_id in user_action.keys(): hist = user_action[user_id] lst = [items[2], items[-1]] hist.append(lst) else: hist = [] hist.append([items[2], items[-1]]) user_action[user_id] = hist print(user_action) #读取model data with open(model_path, 'r', encoding='utf-8') as f_read: for line in f_read: items = line.strip().split() row = [] for item in items: row.append(float(item)) model.append(row) print(model) #根据最近客户的行为数据(至少两次交易)make prediciton for user_id,user_action_list in user_action.items(): if len(user_action_list) < 2: continue state_sequence = [] last_date = '' prior = user_action_list[0] for i in range(1, len(user_action_list)): current = user_action_list[i] prior_date = prior[0] current_date = current[0] #相隔天数 prior_date = time.strptime(prior_date, '%Y-%m-%d') current_date = time.strptime(current_date, '%Y-%m-%d') prior_date = datetime.datetime(prior_date[0], prior_date[1], prior_date[2]) current_date = datetime.datetime(current_date[0], current_date[1], current_date[2]) days_diff = (current_date - prior_date).days dd = 'L' if days_diff < 30: dd = 'S' elif days_diff < 60: dd = 'M' #相差金额 prior_amount = int(prior[1]) current_amount = int(current[1]) ad = 'G' if prior_amount < 0.9 * current_amount: ad = 'L' elif prior_amount < 1.1 * current_amount: ad = 'E' state_sequence.append(dd+ad) prior = current last_date = current_date if state_sequence: #根据最近一个状态发送营销邮件日期 last_state = state_sequence[-1] row_index = states.index(last_state) row_value = model[row_index] #转移矩阵中行号为row_index的这一行值 max_value = max(row_value) #row_value中最大值 col_index = row_value.index(max_value) #max_value的索引号 next_state = states[col_index] if next_state.startswith('S'): next_date = last_date + datetime.timedelta(15) elif next_state.startswith('E'): next_date = last_date + datetime.timedelta(45) else: next_date = last_date + datetime.timedelta(90) print('用户:{}, 预测下次邮件发送时间:{}'.format(user_id, next_date))
3.状态转移矩阵
4.396976638863118E-6 4.396976638863118E-6 0.8062208425486636 0.15858575643387607 4.396976638863118E-6 4.396976638863118E-6 0.035153828227710626 4.396976638863118E-6 4.396976638863118E-6 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.1111111111111111 0.8043794973142799 3.1671227323401235E-6 3.1671227323401235E-6 0.1594804651869869 3.1671227323401235E-6 3.1671227323401235E-6 0.03611153339414209 3.1671227323401235E-6 3.1671227323401235E-6 1.225940909648155E-5 1.225940909648155E-5 0.8156797842343999 1.225940909648155E-5 1.225940909648155E-5 0.15299742552408974 0.031212455559642024 1.225940909648155E-5 1.225940909648155E-5 0.010869565217391304 0.010869565217391304 0.7282608695652174 0.010869565217391304 0.010869565217391304 0.11956521739130435 0.05434782608695652 0.010869565217391304 0.010869565217391304 4.122521334047904E-5 4.122521334047904E-5 0.8190625386486375 0.14758626375891495 4.122521334047904E-5 4.122521334047904E-5 0.03298017067238323 4.122521334047904E-5 4.122521334047904E-5 4.703226413319537E-5 4.703226413319537E-5 0.8348697206283511 4.703226413319537E-5 4.703226413319537E-5 0.14081459881478695 4.703226413319537E-5 4.703226413319537E-5 0.023845357915530052 0.017857142857142856 0.017857142857142856 0.6785714285714286 0.017857142857142856 0.017857142857142856 0.125 0.017857142857142856 0.017857142857142856 0.03571428571428571 5.083884087442806E-4 5.083884087442806E-4 0.7961362480935434 5.083884087442806E-4 5.083884087442806E-4 0.16573462125063548 0.03304524656837824 5.083884087442806E-4 5.083884087442806E-4
4.预测结果
用户:user1, 预测下次邮件发送时间:2020-01-22 00:00:00 用户:user2, 预测下次邮件发送时间:2020-02-16 00:00:00 用户:user3, 预测下次邮件发送时间:2020-01-25 00:00:00 用户:user4, 预测下次邮件发送时间:2020-04-05 00:00:00 用户:user5, 预测下次邮件发送时间:2020-01-30 00:00:00 用户:user6, 预测下次邮件发送时间:2020-01-16 00:00:00