zoukankan      html  css  js  c++  java
  • spark-红酒-白酒评估

    Storm
    ------------------
        实时计算,延迟很低。
        吞吐量小。
        tuple()
    
    Spark Streaming
    ------------------
        DStream,离散流计算。
        相当于一序列RDD。
        按照时间片划分RDD。
        DStream分区 = RDD的分区。
        动态数据。
        StreamingContext( , Seconds(2))
        windows话操作,batch的扩展。
        吞吐量大。
        socketTextStream()            //Socket
                                    //分区200ms
    
        kafka流                        //kafka分区 == rdd一个分区。
    
    LocationStrategy
    ------------------
        位置策略,控制主题分区在哪个节点消费。
        PreferBroker                //首选kafka服务器
        PreferConsistent            //首选均衡处理
        PreferFixed                    //首选固定位置
    
    
    ConsumerStrategy
    -----------------
        控制消费者对kafka消息的消费范围界定。
        Assign                        //指定,控制到主题下的分区.
        Subscribe                    //订阅主题集合,控制不到主题下的某个分区。
        SubscribePattern            //正则消费,对Subscribe的增强,支持正则表达式.
    
    消费语义模型
    ----------------
        1.at most once
            submitOffset()
            consumeMessage() ;
    
        2.at least once
            consumeMessage()
            commitOffset()
    
        3.exact once 
            依托于外部事务性资源(例如数据库)产品的事务管理特性。
            将offset存储到事务性资源库中。
    
    
    KafkaRDD分区计算
    ------------------
        通过kafkaRDD的consumer.assignmeng()方法来得到。
        而消费者对象是通过consumerStrategy.onStart获得.
        因此KafkaRDD的分区数区域于消费者策略,原则上每
        个主题分区对应一个rdd分区,有些情况需要考量,比如
        分区上有限速的。
    
    
    java版编程实现spark streaming kafka消息消费
    ---------------------------------------------
        package com.oldboy.spark.java;
    
        import org.apache.kafka.clients.consumer.ConsumerRecord;
        import org.apache.spark.SparkConf;
        import org.apache.spark.api.java.function.Function;
        import org.apache.spark.streaming.Duration;
        import org.apache.spark.streaming.Durations;
        import org.apache.spark.streaming.Seconds;
        import org.apache.spark.streaming.api.java.JavaDStream;
        import org.apache.spark.streaming.api.java.JavaStreamingContext;
        import org.apache.spark.streaming.kafka010.*;
    
        import java.util.ArrayList;
        import java.util.HashMap;
        import java.util.List;
        import java.util.Map;
    
        /**
         * Created by Administrator on 2018/5/22.
         */
        public class SparkStreamingKafkaJavaDemo {
            public static void main(String[] args) throws Exception {
                SparkConf conf = new SparkConf();
                conf.setAppName("stream") ;
                conf.setMaster("local[*]") ;
    
                //创建流上下文
                JavaStreamingContext ssc = new JavaStreamingContext(conf , Durations.seconds(5)) ;
    
                //位置策略
                LocationStrategy loc = LocationStrategies.PreferConsistent();
    
                //消费参数
                /**
                 * "bootstrap.servers" -> "s102:9092,s103:9092",
                 "key.deserializer" -> classOf[StringDeserializer],
                 "value.deserializer" -> classOf[StringDeserializer]
                 "group.id" -> "g1",
                 "auto.offset.reset" -> "latest",
                 "enable.auto.commit" -> (false: java.lang.Boolean)
                 *
                 */
                Map<String,Object> kafkaParams = new HashMap<String,Object>() ;
                kafkaParams.put("bootstrap.servers" ,"s102:9092" ) ;
                kafkaParams.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer") ;
                kafkaParams.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer") ;
                kafkaParams.put("group.id" , "g1") ;
                kafkaParams.put("auto.offset.reset" , "latest") ;
                kafkaParams.put("enable.auto.commit" , false) ;
    
                //消费主题
                List<String> topics = new ArrayList<String>() ;
                topics.add("t1") ;
    
                //消费者策略
                ConsumerStrategy con = ConsumerStrategies.Subscribe(topics , kafkaParams) ;
                JavaDStream<ConsumerRecord<String,String>> ds1 = KafkaUtils.<String,String>createDirectStream(ssc , loc , con) ;
                JavaDStream<String> ds2 = ds1.map(new Function<ConsumerRecord<String,String>, String>() {
                    public String call(ConsumerRecord<String, String> v1) throws Exception {
                        return v1.value();
                    }
                }) ;
                ds2.print();
    
                ssc.start();
    
                ssc.awaitTermination();
    
            }
        }
    
    
    机器学习
    ----------------------
        算法。
        machine learning.
    
    
    数学基础
    ---------------
        1.kmean
            k个均值
        2.median
            中位数
                
        3.mode
            众数
        4.range
            极差 , max - min
        5.variance
            方差
            差平方和平均值
            (x1-x)^2 + (x2-x)^2 + ...
            ----------------------------
                        n
        6.standard deviation
            标准差!
            方差的平方根。
            sqrt(variance)
    
        7.skewness
            偏度。
            对称分布 : mean = median = mod
            左偏分布 : mean < median < mod
            右偏分布 : mean > median > mod
        8.kertosis
            峰度
            正态 :kertosis = 3
            较凸 :kertosis > 3
            平滑 :kertosis < 3
    
    BI
    -------------
        商业智能。
    
    
    监督和非监督
    --------------
        1.监督
            使用的数据都是打了标签的。
            垃圾邮件分类。
            神经网络
            SVM
            朴素贝叶斯
    
        2.非监督
            没有标签。
            Kmean
    
    
    
    贝叶斯
    ----------------
        A事件发生时,B事件发生的概率。
    
                    P(A|B) * P(B)
        P(B | A) = -----------------
                        P(A)
    
    
    
    TF-IDF
    ----------
        1.TF
            term frequence,词频,针对单个文档。
            word count.
            衡量单词描述该文档主题的相关性。
    
            //j : 第j篇文章
            //i : 第i个单词
                        N(ij)
            TF(ij) = --------------------
                        Sum(N(j))
                    
            
        2.IDF
            inverse document frequence,逆文档频率,针对文档集合(语料库)
            计算单词对整个文档集合的区分能力。
    
                              |D| 1000
            idf(i) = log10 -----------------------
                            出现单词i的文档个数 + 1
    
        3.TF-IDF
            tf衡量某个单词在文章的重要性。
            idf衡量单词用来区分整个语料库的重要性。
            1000
    
    
    最小二乘法
    ----------------
        平方和最小值.
    
    线性回归
    ---------------
        regress,
        呈现直线方式变换.
        回归结果是变化的值。
    
    逻辑回归
    ------------------------
        计算的结果是固定值。
        线性回归对结果进行二元判断,就是逻辑回归。
    
        
    向量
    ------------------------
        (1,2,3,4)
         1
         2
         3
         4)
    
         (0,3,8,0,0,9,0,2)
    
    
    松散向量:
    -----------
        sparse vector,
        占用内存少。
        (1000,1:3,2:8,5:9,..)
    
    
    密度向量:
    -----------
        dense vector,
        (0,1,2,0,0,5,0,6)
    
    
    
    hello world , how are you, thank you!!
    
    1/7
    you  = 2/ 7
    
    1.
    ----------
        hello tom1
    
    2.
    ---------------
        hello tom2
    3.
    ---------------
        hello tom3
                    3
    hello = ----------- = 1
                    3
    y = ax1 + bx2 + ... nx11 + C
    
    
    使用线性回归实现酒质量预测
    --------------------------
        1.引入maven依赖
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_2.11</artifactId>
                <version>2.1.0</version>
            </dependency>
        2.编程
            /**
              * 使用spark的线性回归预测红酒质量
              */
    
            import org.apache.spark.{SparkConf, SparkContext}
            import org.apache.spark.ml.regression.LinearRegression
            import org.apache.spark.ml.param.ParamMap
            import org.apache.spark.ml.linalg.{Vector, Vectors}
            import org.apache.spark.sql.{Row, SparkSession}
    
            object SparkMLLibLinearRegress {
    
    
                def main(args: Array[String]): Unit = {
    
                    val conf = new SparkConf()
                    conf.setAppName("ml_linearRegress")
                    conf.setMaster("local[*]")
    
                    val spark = SparkSession.builder().config(conf).getOrCreate()
                    //1.定义样例类
                    case class Wine(FixedAcidity: Double, VolatileAcidity: Double, CitricAcid: Double,
                                    ResidualSugar: Double, Chlorides: Double, FreeSulfurDioxide: Double,
                                    TotalSulfurDioxide: Double, Density: Double, PH: Double,
                                    Sulphates: Double, Alcohol: Double,
                                    Quality: Double)
    
                    //2.加载csv红酒文件,变换形成rdd
                    val file = "file:///D:\ml\data\red.csv" ;
                    val wineDataRDD = spark.sparkContext.textFile(file)
                        .map(_.split(";"))
                        .map(w => Wine(w(0).toDouble, w(1).toDouble, w(2).toDouble, w(3).toDouble,
                            w(4).toDouble, w(5).toDouble, w(6).toDouble, w(7).toDouble, w(8).toDouble,
                            w(9).toDouble, w(10).toDouble,
                            w(11).toDouble))
    
                    //导入sparksession的隐式转换对象的所有成员,才能将rdd转换成Dataframe
                    import spark.implicits._
                    val trainingDF = wineDataRDD.map(w => (w.Quality, Vectors.dense(w.FixedAcidity, w.VolatileAcidity,
                        w.CitricAcid, w.ResidualSugar, w.Chlorides, w.FreeSulfurDioxide, w.TotalSulfurDioxide,
                        w.Density, w.PH, w.Sulphates, w.Alcohol))).toDF("label", "features")
                    trainingDF.show(100,false)
    
                    //3.创建线性回归对象
                    val lr = new LinearRegression()        //val lr = new LinearRegression()
    
                    //4.设置回归对象参数
                    lr.setMaxIter(2)                     //lr.setMaxIter(2)
    
                    //5.拟合模型
                    val model = lr.fit(trainingDF)             //val model = lr.fit(trainingDF)
    
                    //6.构造测试数据集
                    val testDF = spark.createDataFrame(Seq((5.0, Vectors.dense(7.4, 0.7, 0.0, 1.9, 0.076, 25.0, 67.0, 0.9968, 3.2, 0.68, 9.8)),
                        (5.0, Vectors.dense(7.8, 0.88, 0.0, 2.6, 0.098, 11.0, 34.0, 0.9978, 3.51, 0.56, 9.4)),
                        (7.0, Vectors.dense(7.3, 0.65, 0.0, 1.2, 0.065, 15.0, 18.0, 0.9968, 3.36, 0.57, 9.5))))
                        .toDF("label", "features")
    
                    //7.对测试数据集注册临时表
                    testDF.createOrReplaceTempView("test")        //testDF.createOrReolaceTempView("test")
    
                    //8.使用训练的模型对测试数据进行预测,并提取查看项
                    val tested = model.transform(testDF)
                    tested.show(100,false)
    
                    //
                    val tested2 = tested.select("features", "label", "prediction")
                    tested2.show(100,false)
    
                    //9.展示预测结果
                    tested.show()
    
                    //10.通过测试数据集只抽取features,作为预测数据。
                    val predictDF = spark.sql("select features from test")
                    predictDF.show(100,false)
                    model.transform(predictDF).show(1000,false)
                }
            }
    
    
    模型持久化
    ------------------
        //保存模型
        val model = lr.fit(trainingDF)
        model.save("file:///d:/mr/model/linreg")
    
        //加载模型
        val model = LinearRegressionModel.load("file:///d:/mr/model/linreg")
    
    
    使用逻辑回归实现白酒的好坏评估
    -------------------------------
        /**
          * 逻辑回归
          */
    
        import org.apache.spark.SparkConf
        import org.apache.spark.ml.classification.LogisticRegression
        import org.apache.spark.ml.linalg.Vectors
        import org.apache.spark.ml.regression.LinearRegressionModel
        import org.apache.spark.sql.SparkSession
    
        object WineLogisticRegressDemo {
            def main(args: Array[String]): Unit = {
    
                val conf = new SparkConf()
                conf.setAppName("logisticRegress")
                conf.setMaster("local[*]")
    
                val spark = SparkSession.builder().config(conf).getOrCreate()    //val spark=SparkSession.builder().config(conf).getOrCreate()
                //1.定义样例类
                case class Wine(FixedAcidity: Double, VolatileAcidity: Double,
                                CitricAcid: Double, ResidualSugar: Double, Chlorides: Double,
                                FreeSulfurDioxide: Double, TotalSulfurDioxide: Double, Density: Double,
                                PH: Double, Sulphates: Double, Alcohol: Double, Quality: Double)
    
                //2.加载csv红酒文件,变换形成rdd
                val file = "file:///D:\ml\data\white.csv";
                val wineDataRDD = spark.sparkContext.textFile(file)         //val wineDataRDD = spark.sparkContext.textFile(file)
                    .map(_.split(";"))
                    .map(w => Wine(w(0).toDouble, w(1).toDouble, w(2).toDouble, w(3).toDouble,
                        w(4).toDouble, w(5).toDouble, w(6).toDouble, w(7).toDouble, w(8).toDouble,
                        w(9).toDouble, w(10).toDouble,
                        w(11).toDouble))
    
                //导入sparksession的隐式转换对象的所有成员,才能将rdd转换成Dataframe
                import spark.implicits._
                val trainingDF = wineDataRDD.map(w => (if (w.Quality < 7) 0D else
                    1D, Vectors.dense(w.FixedAcidity, w.VolatileAcidity, w.CitricAcid,
                    w.ResidualSugar, w.Chlorides, w.FreeSulfurDioxide, w.TotalSulfurDioxide,
                    w.Density, w.PH, w.Sulphates, w.Alcohol))).toDF("label", "features")
    
                //3.创建逻辑回归对象
                val lr = new LogisticRegression()
                //设置回归对象参数
                lr.setMaxIter(10).setRegParam(0.01)
    
                //4.拟合训练数据,生成模型
                val model = lr.fit(trainingDF)
    
                //5.构造测试数据
                val testDF = spark.createDataFrame(Seq(
                    (1.0, Vectors.dense(6.1, 0.32, 0.24, 1.5, 0.036, 43, 140, 0.9894, 3.36, 0.64, 10.7)),
                    (0.0, Vectors.dense(5.2, 0.44, 0.04, 1.4, 0.036, 38, 124, 0.9898, 3.29, 0.42, 12.4)),
                    (0.0, Vectors.dense(7.2, 0.32, 0.47, 5.1, 0.044, 19, 65, 0.9951, 3.38, 0.36, 9)),
                    (0.0, Vectors.dense(6.4, 0.595, 0.14, 5.2, 0.058, 15, 97, 0.991, 3.03, 0.41, 12.6)))
                ).toDF("label", "features")
    
                testDF.createOrReplaceTempView("test")
    
                //预测测试数据
                val tested = model.transform(testDF).select("features", "label", "prediction")
    
                //
                val realData = spark.sql("select features from test")
    
                model.transform(realData).select("features", "prediction").show(100, false)
            }
        }
    
    
    
    +-----+-----------------------------------+------------------------------------------+-----------------------------------------+
    |label|sentence                           |words                                     |rawFeatures                              |
    +-----+-----------------------------------+------------------------------------------+-----------------------------------------+
    |0.0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |(20,[0,5,9,17],[1.0,1.0,1.0,2.0])        |
    |0.0  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|(20,[2,7,9,13,15],[1.0,1.0,3.0,1.0,1.0]) |
    |1.0  |Logistic regression models are neat|[logistic, regression, models, are, neat] |(20,[4,6,13,15,18],[1.0,1.0,1.0,1.0,1.0])|
    +-----+-----------------------------------+------------------------------------------+-----------------------------------------+
    
    +-----+-----------------------------------+------------------------------------------+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+
    |label|sentence                           |words                                     |rawFeatures                              |features                                                                                                              |
    +-----+-----------------------------------+------------------------------------------+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+
    |0.0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |(20,[0,5,9,17],[1.0,1.0,1.0,2.0])        |(20,[0,5,9,17],[0.6931471805599453,0.6931471805599453,0.28768207245178085,1.3862943611198906])                        |
    |0.0  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|(20,[2,7,9,13,15],[1.0,1.0,3.0,1.0,1.0]) |(20,[2,7,9,13,15],[0.6931471805599453,0.6931471805599453,0.8630462173553426,0.28768207245178085,0.28768207245178085]) |
    |1.0  |Logistic regression models are neat|[logistic, regression, models, are, neat] |(20,[4,6,13,15,18],[1.0,1.0,1.0,1.0,1.0])|(20,[4,6,13,15,18],[0.6931471805599453,0.6931471805599453,0.28768207245178085,0.28768207245178085,0.6931471805599453])|
    +-----+-----------------------------------+------------------------------------------+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+
    
    
    def aggregate[B](z: =>B)
                    (seqop: (B, A) => B, combop: (B, B) => B): B = foldLeft(z)(seqop)
    
    1:
    ----
    comop(comop(comop(comop(seqop(seqop(seqop(zero,10),11),12) ,seqop(seqop(seqop(seqop(zero,13),14),15),16)) , 
                seqop(seqop(seqop(zero,7),8),9)) , 
                
                seqop(seqop(seqop(zero,1),2),3)) , 
                
                seqop(seqop(seqop(zero,4),5),6))
    
    RDD.treeAggregation和depth :
    ---------------------------
        rdd树形聚合,操作结果类似于aggregateByKey(zeroU)(seqop,comop ),
        计算方式也是分区内按照seqop聚合,分区间按照comop聚合,
        树形聚合可以指定深度depth,depth参数对结果没有影响,影响的是
        性能,将一次聚合的过程分成多次聚合。
    
    
    
    Spark DF
    -------------
        文档频率的计算公式:
    
    
                               D + 1
           DF =  log(e)--------------------------
                         出现的单词的文档个数 + 1
    
    parts: 10000
    
    depth : 4
    
    //规模 , 树形聚合的分区数标准,分区数大于该值,
    scale = max( pow(10000 , 1/4), 2) = 10
    
    while(10000 > 10 + ceil(10000 / 10)){
        currParts = 10000 / 10  = 1000
    }
    
    while(1000 > 10 + ceil(1000 / 10)){
        currParts = 10000 / 10  = 100
    }
    
    while(100 > 10 + ceil(100 / 10)){
        currParts = 100 / 10  = 10
    }
    
    while(100 > 10 + ceil(100 / 10)){
        currParts = 100 / 10  = 10
    }
    
    size  indices    values
    100 , [1,3,5] , [100,200,300]
           0,1,2
    
    
    df : 
    
    k = 0 
    while(k < indices.length){
        if(values(k) > 0){
            df(indices(k)) += 1L
        }
    }
    
    //所有的单词的文档频率
    df : vector
    
    doc1: hello tom1 -> (200 , [3,5] , [1,1])
    doc1: hello tom2
    doc1: hello tom3
    doc1: hello tom4
    doc1: hello tom5
            3 + 1
    hello ------------
            3 + 1
    
    
    20
    4
    
    0
        3 + 1
    log------
        0 + 1
    
    0 128:51 129:159 130:253 131:159 132:50 155:48 156:238 1
    0 130:64 131:253 132:255 133:63 157:96 158:205 159:251 1
    0 155:53 156:255 157:253 158:253 159:253 160:124 183:180
    0 128:73 129:253 130:227 131:73 132:21 156:73 157:251 15
    0 154:46 155:105 156:254 157:254 158:254 159:254 160:255
    0 152:56 153:105 154:220 155:254 156:63 178:18 179:166 1
    0 155:21 156:176 157:253 158:253 159:124 182:105 183:176
    
    0:
    
    1 100:166 101:222 102:55 128:197 129:254 130:218 131:5 1
    1 159:124 160:253 161:255 162:63 186:96 187:244 188:251 
    1 125:145 126:255 127:211 128:31 152:32 153:237 154:253 
    1 153:5 154:63 155:197 181:20 182:254 183:230 184:24 209
    1 152:1 153:168 154:242 155:28 180:10 181:228 182:254 18
    1 159:121 160:254 161:136 186:13 187:230 188:253 189:248
    1 155:178 156:255 157:105 182:6 183:188 184:253 185:216 
    1 130:7 131:176 132:254 133:224 158:51 159:253 160:253 1
    
    0 1:2 3:1 5:4
    0 0:6 2:1 4:4
    0 2:2 3:6 4:4
    --------------
      0 1 2 3 4 5 6 7
    0 6 2 3 7 8 4 0 0
    
    1 1:3 3:2 5:1
    1 1:1 3:2 5:3
      0 1 2 3 4 5 6 7
                                词频总数  features
    0=>(3 , (6 2 3 7 8 4 0 0)) = 30            8
    1=>(2 , (0 4 0 4 0 4 0 0)) = 12         8
    
    //计算标签数 : 0,1,2
    val numLabels = aggregated.length
    //计算文档总数: 每个标签数的累加和
    val numDocuments = aggregated.map(_._2._1).sum
    
    
    //标签数组
    val labelArray = new Array[Double](numLabels)
    //
    val piArray = new Array[Double](numLabels)
    val thetaArray = new Array[Double](numLabels * numFeatures)
    
    //                 log(1000 + 2 * 1) = log(1002)
    val pilogDamon = log(docs + labels * λ)
    
    val piArray    = log[(每个标签个数 + lambda) - pilogDamon]
    
    //
    docs: 5
    0    : 3 
    1    : 2
    
    pilogDamon = log(5 + 2 * 1) = log(7)
    pi(0)      = log(3 + 1) - log(7) = log(4) - log(7) = 1.386 - 1.945 = -0.559
    
    
    
    aggregateByKey[(Double, DenseVector)]
        ((0.0, Vectors.zeros(numFeatures).toDense))
        (
          seqOp = {
             case ((weightSum: Double, featureSum: DenseVector), (weight, features)) =>
               requireValues(features)
                          a       x            y       => y = 1 * x + y
               BLAS.axpy(weight, features, featureSum)
               (weightSum + weight, featureSum)
          },
          combOp = {
             case ((weightSum1, featureSum1), (weightSum2, featureSum2)) =>
               BLAS.axpy(1.0, featureSum2, featureSum1)
               (weightSum1 + weightSum2, featureSum1)
          })
    
    U        : (0.0, Vectors.zeros(numFeatures).toDense)
    seqop    :{
                 case ((weightSum: Double, featureSum: DenseVector), (weight, features)) =>
                   requireValues(features)
                   BLAS.axpy(weight, features, featureSum)
                   (weightSum + weight, featureSum)
                }    
  • 相关阅读:
    如何将u盘、移动硬盘转化为活动分区--绝招
    jstl错误排除:According to TLD or attribute directive in tag file, attribute value does not accept any expressions
    eclipse中package explore和project explore 怎么相互切换???
    硬盘知识区
    Sublime Text 3下Emmet使用技巧
    sublime text3中设置Emmet输入标签自动闭合
    window如何分区
    HTTP缓存
    react-router 4实现代码分割(code spliting)
    Vue练手项目(包含typescript版本)
  • 原文地址:https://www.cnblogs.com/zyde/p/9076809.html
Copyright © 2011-2022 走看看