zoukankan      html  css  js  c++  java
  • spark之RFM客户价值分群挖掘

    一.RFM
    RFM是一种用来衡量当前客户价值和潜在客户价值的重要工具和手段。
    在面向客户制定运营策略、营销策略时,我们希望能够针对不同的客户推行不同的策略,实现精准化运营,以期获取最大的转化率。精准化运营的前提是客户关系管理,而客户关系管理的核心是客户分类。
    通过客户分类,对客户群体进行细分,区别出低价值客户、高价值客户,对不同的客户群体开展不同的个性化服务,将有限的资源合理地分配给不同价值的客户,实现效益最大化。
    在客户分类中,RFM模型是一个经典的客户分群模型,模型利用通用交易环节中最核心的三个维度:
      1.最近消费(Recency)自上次购买以来的持续时间
      2.消费频率(Frequency)购买总数
      3.消费金额(Monetary)该客户花费的总金额

    二.分群方法

    接下来需要对统计后的rfm数据进行分隔,以划分和分析不同的客户价值,即对客户分群,有3种方案可以对客户分群:
      1.根据经验,熟悉业务的人进行定义划分标准,需要不断修正
      2.统计每列的分位数,根据分位数进行划分(spark中没有分位统计函数,可利用python)
      3.利用聚类自动划分(r、f、m作为特征,可统计更多特征)

    三.数据

    以下是数据集[online_retail.csv]的属性描述(数据集来自https://archive.ics.uci.edu/ml/datasets/statlog+(german+credit+data)):
      1.InvoiceNo: Invoice number. Nominal, a 6-digit integral number uniquely assigned to each transaction. If this code starts with letter 'c', it indicates a cancellation.
      2.StockCode: Product (item) code. Nominal, a 5-digit integral number uniquely assigned to each distinct product.
      3.Description: Product (item) name. Nominal.
      4.Quantity: The quantities of each product (item) per transaction. Numeric.
      5.InvoiceDate: Invice Date and time. Numeric, the day and time when each transaction was generated.
      6.UnitPrice: Unit price. Numeric, Product price per unit in sterling.
      7.CustomerID: Customer number. Nominal, a 5-digit integral number uniquely assigned to each customer.
      8.Country: Country name. Nominal, the name of the country where each customer resides.

    四.代码(spark3.0,java1.8)

    详细代码见:RFM客户价值分群挖掘(https://github.com/jiangnanboy/spark_tutorial),本程序统计RFM后利用kmeans进行顾客价值自动聚类分群。

    /**
         * @param session
         */
        public static Dataset<Row> rfmStatistics(SparkSession session) {
            /**
             * 以下是数据集[online_retail.csv]的属性描述(数据集来自https://archive.ics.uci.edu/ml/datasets/statlog+(german+credit+data)):
             *
             * InvoiceNo: Invoice number. Nominal, a 6-digit integral number uniquely assigned to each transaction. If this code starts with letter 'c', it indicates a cancellation.
             * StockCode: Product (item) code. Nominal, a 5-digit integral number uniquely assigned to each distinct product.
             * Description: Product (item) name. Nominal.
             * Quantity: The quantities of each product (item) per transaction. Numeric.
             * InvoiceDate: Invice Date and time. Numeric, the day and time when each transaction was generated.
             * UnitPrice: Unit price. Numeric, Product price per unit in sterling.
             * CustomerID: Customer number. Nominal, a 5-digit integral number uniquely assigned to each customer.
             * Country: Country name. Nominal, the name of the country where each customer resides.
             */
            String path = PropertiesReader.get("intermediate_customer_value_csv");
            Dataset<Row> dataset = session.read()
                    .option("sep", ",")
                    .option("header", "true")
                    .option("inferSchema", "true")
                    .csv(path);
    
            /**统计每列值的数量
             * +----------------+---------------+-----------------+--------------+-----------------+---------------+----------------+-------------+
             * |InvoiceNo_count |StockCode_count|Description_count|Quantity_count|InvoiceDate_count|UnitPrice_count|CustomerID_count|Country_count|
             * +----------------+---------------+-----------------+--------------+-----------------+---------------+----------------+-------------+
             * |          541909|         541909|           540455|        541909|           541909|         541909|          406829|       541909|
             * +----------------+---------------+-----------------+--------------+-----------------+---------------+----------------+-------------+
             */
            //datasetColumnCount(dataset);
    
            //以上统计发现CustomerID有空值,进行过滤去除含有null/NAN的行,“any”为只要有缺失值就删除这一行
            dataset = dataset.na().drop("any");
    
            /**过滤空值后,统计每列值的数量
             * +----------------+---------------+-----------------+--------------+-----------------+---------------+----------------+-------------+
             * |InvoiceNo_count |StockCode_count|Description_count|Quantity_count|InvoiceDate_count|UnitPrice_count|CustomerID_count|Country_count|
             * +----------------+---------------+-----------------+--------------+-----------------+---------------+----------------+-------------+
             * |          406829|         406829|           406829|        406829|           406829|         406829|          406829|       406829|
             * +----------------+---------------+-----------------+--------------+-----------------+---------------+----------------+-------------+
             */
            //datasetColumnCount(dataset);
    
            //将InvoiceDate列中的非标准日期转为标准格式
            dataset = dataset.map((MapFunction<Row, Row>) row -> {
    
                    StringBuilder sb = new StringBuilder();
                    String invoiceDate = row.getString(4);
                    String[] tokens = invoiceDate.trim().split("\s+"); // 2011/7/27 15:12
    
                    // year/month/day
                    String yearMonthDay = tokens[0];
                    String[] yMD = yearMonthDay.split("\/");
                    sb.append(yMD[0]).append("/");
                    if(1 == yMD[1].length()) {
                        sb.append("0").append(yMD[1]).append("/");
                    } else {
                        sb.append(yMD[1]).append("/");
                    }
                    if(1 == yMD[2].length()) {
                        sb.append("0").append(yMD[2]).append(" ");
                    } else {
                        sb.append(yMD[2]).append(" ");
                    }
    
                    // hour/min
                    String hourMin = tokens[1];
                    String[] hm = hourMin.split(":");
                    if(1 == hm[0].length()) {
                        sb.append("0").append(hm[0]).append(":");
                    } else {
                        sb.append(hm[0]).append(":");
                    }
                    if(1 == hm[1].length()) {
                        sb.append("0").append(hm[1]);
                    } else {
                        sb.append(hm[1]);
                    }
                    return RowFactory.create(row.getString(0), row.getString(1), row.getString(2), row.getInt(3), sb.toString(), row.getDouble(5), row.getInt(6), row.getString(7));
    
            }, RowEncoder.apply(InitSchema.initOnlineRetailSchema()));
    
            /**
             * 将InvoiceDate列转为时间戳,新增一列时间戳NewInvoiceDate
             * +---------+---------+--------------------+--------+----------------+---------+----------+--------------+-------------------+
             * |InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|     NewInvoiceDate|
             * +---------+---------+--------------------+--------+----------------+---------+----------+--------------+-------------------+
             * |   536365|   85123A|WHITE HANGING HEA...|       6|2010/12/01 08:26|     2.55|     17850|United Kingdom|2010-12-01 08:26:00|
             * |   536365|    71053| WHITE METAL LANTERN|       6|2010/12/01 08:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
             * |   536365|   84406B|CREAM CUPID HEART...|       8|2010/12/01 08:26|     2.75|     17850|United Kingdom|2010-12-01 08:26:00|
             * |   536365|   84029G|KNITTED UNION FLA...|       6|2010/12/01 08:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
             * |   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010/12/01 08:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|
             * +---------+---------+--------------------+--------+----------------+---------+----------+--------------+-------------------+
             */
            //dataset = dataset.withColumn("NewInvoiceDate",functions.to_utc_timestamp(functions.unix_timestamp(col("InvoiceDate"), "yyyy/MM/dd HH:mm").cast("timestamp"), "UTC"));
            dataset = dataset.withColumn("NewInvoiceDate", functions.unix_timestamp(col("InvoiceDate"),"yyyy/MM/dd HH:mm").cast(DataTypes.TimestampType));
    
            /**
             * 计算总额:=》 Quantity*UnitPrice作为新列TotalPrice
             * +---------+---------+--------------------+--------+----------------+---------+----------+--------------+-------------------+----------+
             * |InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|     NewInvoiceDate|TotalPrice|
             * +---------+---------+--------------------+--------+----------------+---------+----------+--------------+-------------------+----------+
             * |   536365|   85123A|WHITE HANGING HEA...|       6|2010/12/01 08:26|     2.55|     17850|United Kingdom|2010-12-01 08:26:00|      15.3|
             * |   536365|    71053| WHITE METAL LANTERN|       6|2010/12/01 08:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|     20.34|
             * |   536365|   84406B|CREAM CUPID HEART...|       8|2010/12/01 08:26|     2.75|     17850|United Kingdom|2010-12-01 08:26:00|      22.0|
             * |   536365|   84029G|KNITTED UNION FLA...|       6|2010/12/01 08:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|     20.34|
             * |   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010/12/01 08:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|     20.34|
             * +---------+---------+--------------------+--------+----------------+---------+----------+--------------+-------------------+----------+
             */
            dataset = dataset.withColumn("TotalPrice", functions.round(col("Quantity").multiply(col("UnitPrice")), 2));
    
            //获取NewInvoiceDate列中最大时间戳
            List<Row> maxInvoiceRow = dataset.select(functions.max(col("NewInvoiceDate")).as("MaxInvoiceDate")).collectAsList();
            Timestamp maxTimeStamp = maxInvoiceRow.get(0).getTimestamp(0);
    
            /**
             * 计算时间差=NewInvoiceDate列中最大时间 - 每列的时间,新增时间差列Duration为相差天数
             * +---------+---------+--------------------+--------+----------------+---------+----------+--------------+-------------------+----------+--------+
             * |InvoiceNo|StockCode|         Description|Quantity|     InvoiceDate|UnitPrice|CustomerID|       Country|     NewInvoiceDate|TotalPrice|Duration|
             * +---------+---------+--------------------+--------+----------------+---------+----------+--------------+-------------------+----------+--------+
             * |   536365|   85123A|WHITE HANGING HEA...|       6|2010/12/01 08:26|     2.55|     17850|United Kingdom|2010-12-01 08:26:00|      15.3|     373|
             * |   536365|    71053| WHITE METAL LANTERN|       6|2010/12/01 08:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|     20.34|     373|
             * |   536365|   84406B|CREAM CUPID HEART...|       8|2010/12/01 08:26|     2.75|     17850|United Kingdom|2010-12-01 08:26:00|      22.0|     373|
             * |   536365|   84029G|KNITTED UNION FLA...|       6|2010/12/01 08:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|     20.34|     373|
             * |   536365|   84029E|RED WOOLLY HOTTIE...|       6|2010/12/01 08:26|     3.39|     17850|United Kingdom|2010-12-01 08:26:00|     20.34|     373|
             * +---------+---------+--------------------+--------+----------------+---------+----------+--------------+-------------------+----------+--------+
             */
            dataset = dataset.withColumn("Duration", functions.datediff(functions.lit(maxTimeStamp), col("NewInvoiceDate")));
    
            /**
             * 计算 RFM => Recency, Frequency, Monetary
             *      最近-客户最近购买了多少?即自上次购买以来的持续时间
             *      频率——他们多久购买一次?即购买总数
             *      货币价值——他们花了多少钱?即该客户花费的总金额
             *
             * +----------+-------+
             * |CustomerID|Recency|
             * +----------+-------+
             * |     17420|     50|
             * |     16861|     59|
             * |     16503|    106|
             * |     15727|     16|
             * |     17389|      0|
             * +----------+-------+
             *
             * +----------+---------+
             * |CustomerID|Frequence|
             * +----------+---------+
             * |     15619|        1|
             * |     17389|       43|
             * |     12940|        4|
             * |     13623|        7|
             * |     14450|        3|
             * +----------+---------+
             *
             * +----------+--------+
             * |CustomerID|Monetary|
             * +----------+--------+
             * |     17420|  598.83|
             * |     16861|  151.65|
             * |     16503| 1421.43|
             * |     15727| 5178.96|
             * |     17389|31300.08|
             * +----------+--------+
             *
             */
            Dataset<Row> recencyDataset = dataset.groupBy("CustomerID").agg(functions.min(col("Duration")).as("Recency"));
            Dataset<Row> frequencyDataset = dataset.groupBy("CustomerID", "InvoiceNo").count().groupBy("CustomerID").agg(functions.count("*").as("Frequence"));
            Dataset<Row> monetaryDataset = dataset.groupBy("CustomerID").agg(functions.round(functions.sum("TotalPrice"), 2).as("Monetary"));
    
            /**
             * 连接recencyDataset、frequencyDataset、monetaryDataset,获得RFM的统计
             * +----------+-------+---------+--------+
             * |CustomerID|Recency|Frequence|Monetary|
             * +----------+-------+---------+--------+
             * |     12940|     46|        4|  876.29|
             * |     13285|     23|        4| 2709.12|
             * |     13623|     30|        7|  672.44|
             * |     13832|     17|        2|   40.95|
             * |     14450|    180|        3|  483.25|
             * +----------+-------+---------+--------+
             */
            Dataset<Row> rfmDataset = recencyDataset.join(frequencyDataset, "CustomerID").join(monetaryDataset, "CustomerID");
    
            return rfmDataset;
        }
  • 相关阅读:
    Could not connect to '192.168.80.145' (port 22): Connection failed的解决办法(远程连不上xshell)
    分布式集群HBase启动后某节点的HRegionServer自动消失问题
    Hive环境的安装部署(完美安装)(集群内或集群外都适用)(含卸载自带mysql安装指定版本)
    大数据各子项目的环境搭建之建立与删除软连接(博主推荐)
    TeamViewer的下载、安装和使用(windows7、CentOS6.5和Ubuntu14.04(64bit))(图文详解)
    Python *的下载、安装和使用
    JetBrains PyCharm(Community版本)的下载、安装和初步使用
    Spark SQL概念学习系列之DataFrame与RDD的区别
    手游接入Facebook的那些坑
    J2EE基础篇——十三个规范
  • 原文地址:https://www.cnblogs.com/little-horse/p/14014812.html
Copyright © 2011-2022 走看看