zoukankan      html  css  js  c++  java
  • Spark开发-关联分析

    在机器学习中,常用的主题有分类,回归,聚类和关联分析。而关联分析,在实际中的应用场景,有部分是用于商品零售的分析。在Spark中有相应的案例

    在关联分析中,有一些概念要熟悉。 频繁项集,关联规则,支持度,置信度,提升度。其中 频繁项集(frequent item sets) 是经常出现在一块的物品的集合,关联规则(association rules)    两种物品之间可能存在很强的关系

      1)支持度 support (x => y)     = P(x y) = freq(x y)/total = confidence (x=>y)* freq(x)/total, 事件x和事件y共同出现的概率;用频次也可以计算,也可以通过置信度计算
      2)置信度 confidence (x => y)  = P(y|x) = freq(xy)/freq(x), ,出现事件x的事件中出现事件y的概率;
      3)提升度 lift (x => y)               = P(y|x)/P(x) = P(xy)/(P(x)*P(y)) = confidence (x=>y)*total/ freq(y),出现x的条件下出现事件y的概率和没有条件x出现y的概率
           4) item_two(x=>y)                   = confidence (x=>y)* freq(x),出现x的条件下出现事件y的概率和没有条件x出现y的概率

     主要涉及概率中的乘法公式。条件概率公式,上述的这些知识点可以复习一下基本的概念
    实现的算法有两种。apriori 和FP-growth 其中  Apriori及其变形算法需要多次扫描数据库,并需要生成指数级的候选项集。FP-growth 算法通过构建FP-tree来压缩事务数据库中的信息,从而更加有效地产生频繁项集 (Frequent Patterns)。在这些主要是使用这些算法来驱动业务,所以 主要关注FP-growth 解决实际的频繁项集和关联规则挖掘问题。Demo的具体实现参考了Spark的案例,做了部分修改。在生产环境中,基本上要和业务对接,然后针对具体的场景,做参数的调节和关注输入数据的各种数据预处理。

    import java.util.Arrays;
    import java.util.List;
    import org.apache.spark.ml.fpm.FPGrowth;
    import org.apache.spark.ml.fpm.FPGrowthModel;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.*;
    import static org.apache.spark.sql.functions.col;
    import static org.apache.spark.sql.functions.lit;

    public class JavaFPGrowth {

    public static void main(String[] args) {
    SparkSession spark = SparkSession
    .builder()
    .master("local[2]")
    .appName("JavaFPGrowth_spark2.3")
    .getOrCreate();
    //设定本地日志
    spark.sparkContext().setLogLevel("WARN");

    List<Row> data = Arrays.asList(
    RowFactory.create("1", Arrays.asList("1 2 5".split(" "))),
    RowFactory.create("2",Arrays.asList("1 2 3 5".split(" "))),
    RowFactory.create("4",Arrays.asList("6".split(" "))),
    RowFactory.create("3",Arrays.asList("1 2".split(" ")))
    );
    StructType schema = new StructType(new StructField[]{
    new StructField("user", DataTypes.StringType, false, Metadata.empty()),
    new StructField(
    "items", new ArrayType(DataTypes.StringType, true), false, Metadata.empty())
    });
    Dataset<Row> itemsDF = spark.createDataFrame(data, schema);
    long lengthdata = itemsDF.count();

    FPGrowthModel model = new FPGrowth()
    .setItemsCol("items")
    .setMinSupport(0.1)
    .setMinConfidence(0.6)
    .fit(itemsDF);

    // Display frequent itemsets.
    // | items|freq
    Dataset<Row> ItemFreq = model.freqItemsets().withColumn("total", lit(lengthdata));
    ItemFreq.show();
    ItemFreq.createOrReplaceTempView("test_freq_view_d");

    // Display generated association rules.
    // antecedent 表示前项 consequent 表示后项 confidence 表示规则的置信度
    Dataset<Row> ItemRules = model.associationRules() ;
    // 计算支持度
    Dataset<Row> res = ItemFreq.join(ItemRules, col("items").equalTo(col("antecedent")), "inner");
    Dataset<Row> supportDF = res.withColumn("support",col("freq").divide(lengthdata).multiply(col("confidence")));
    //使用视图sql方式 计算支持度 提升度 
    ItemRules.createOrReplaceTempView("test_rules_view_d");
    String support_sql = "select tt1.antecedent, tt1.antecedent_freq " +
    ", tt1.consequent , tt2.freq as consequent_freq " +
    " , tt1.total , tt1.two_item_freq , tt1.support , tt1.confidence " +
    " , tt1.confidence * tt1.total/ tt2.freq as lift " +
    " from(select t2.freq as antecedent_freq, t2.total, t2.freq/t2.total*t1.confidence as support, " +
    " t2.freq*t1.confidence as two_item_freq, " +
    " t1.antecedent, t1.consequent, t1.confidence " +
    "from test_rules_view_d t1 inner join test_freq_view_d t2 " +
    "on t2.items = t1.antecedent)tt1 inner join test_freq_view_d tt2 " +
    "on tt2.items = tt1.consequent";
    Dataset<Row> suppoerdf = spark.sql(support_sql);
    suppoerdf.show();

    // items | prediction|
    model.transform(itemsDF).show();

    spark.stop();
    }
    }

     以上是实现的代码,也就是根据 model.freqItemsets() 和 model.associationRules()  的计算结果来计算其他相关的数据,在这里保留了两种查询风格 DSL与SQL,一种是使用DSL的方式。一种是createOrReplaceTempView 使用SQL的方式来实现。很多示例都是scala语言写的,而Java的方式有所不同,尝试一下基本就可以确定。

    参考:

       频繁项集与关联规则 FP-growth 的原理和实现 https://developer.ibm.com/zh/articles/machine-learning-hands-on2-fp-growth/

  • 相关阅读:
    操作系统(32-45)
    异或运算( ^ )
    计算机网络(16—30)
    操作系统(13-30)
    win7 linux双系统删除linux
    ubuntu安装matplotlib包
    vmware+CentOS 7 无法上网
    Python命令行清屏的简单办法
    jupyter notebook 工作目录修改
    ipython notebook设置工作路径和自动保存.py文件 ipython_notebook_config.py
  • 原文地址:https://www.cnblogs.com/ytwang/p/13528409.html
Copyright © 2011-2022 走看看