zoukankan      html  css  js  c++  java
  • spark-红酒-白酒评估

    Storm
    ------------------
        实时计算,延迟很低。
        吞吐量小。
        tuple()
    
    Spark Streaming
    ------------------
        DStream,离散流计算。
        相当于一序列RDD。
        按照时间片划分RDD。
        DStream分区 = RDD的分区。
        动态数据。
        StreamingContext( , Seconds(2))
        windows话操作,batch的扩展。
        吞吐量大。
        socketTextStream()            //Socket
                                    //分区200ms
    
        kafka流                        //kafka分区 == rdd一个分区。
    
    LocationStrategy
    ------------------
        位置策略,控制主题分区在哪个节点消费。
        PreferBroker                //首选kafka服务器
        PreferConsistent            //首选均衡处理
        PreferFixed                    //首选固定位置
    
    
    ConsumerStrategy
    -----------------
        控制消费者对kafka消息的消费范围界定。
        Assign                        //指定,控制到主题下的分区.
        Subscribe                    //订阅主题集合,控制不到主题下的某个分区。
        SubscribePattern            //正则消费,对Subscribe的增强,支持正则表达式.
    
    消费语义模型
    ----------------
        1.at most once
            submitOffset()
            consumeMessage() ;
    
        2.at least once
            consumeMessage()
            commitOffset()
    
        3.exact once 
            依托于外部事务性资源(例如数据库)产品的事务管理特性。
            将offset存储到事务性资源库中。
    
    
    KafkaRDD分区计算
    ------------------
        通过kafkaRDD的consumer.assignmeng()方法来得到。
        而消费者对象是通过consumerStrategy.onStart获得.
        因此KafkaRDD的分区数区域于消费者策略,原则上每
        个主题分区对应一个rdd分区,有些情况需要考量,比如
        分区上有限速的。
    
    
    java版编程实现spark streaming kafka消息消费
    ---------------------------------------------
        package com.oldboy.spark.java;
    
        import org.apache.kafka.clients.consumer.ConsumerRecord;
        import org.apache.spark.SparkConf;
        import org.apache.spark.api.java.function.Function;
        import org.apache.spark.streaming.Duration;
        import org.apache.spark.streaming.Durations;
        import org.apache.spark.streaming.Seconds;
        import org.apache.spark.streaming.api.java.JavaDStream;
        import org.apache.spark.streaming.api.java.JavaStreamingContext;
        import org.apache.spark.streaming.kafka010.*;
    
        import java.util.ArrayList;
        import java.util.HashMap;
        import java.util.List;
        import java.util.Map;
    
        /**
         * Created by Administrator on 2018/5/22.
         */
        public class SparkStreamingKafkaJavaDemo {
            public static void main(String[] args) throws Exception {
                SparkConf conf = new SparkConf();
                conf.setAppName("stream") ;
                conf.setMaster("local[*]") ;
    
                //创建流上下文
                JavaStreamingContext ssc = new JavaStreamingContext(conf , Durations.seconds(5)) ;
    
                //位置策略
                LocationStrategy loc = LocationStrategies.PreferConsistent();
    
                //消费参数
                /**
                 * "bootstrap.servers" -> "s102:9092,s103:9092",
                 "key.deserializer" -> classOf[StringDeserializer],
                 "value.deserializer" -> classOf[StringDeserializer]
                 "group.id" -> "g1",
                 "auto.offset.reset" -> "latest",
                 "enable.auto.commit" -> (false: java.lang.Boolean)
                 *
                 */
                Map<String,Object> kafkaParams = new HashMap<String,Object>() ;
                kafkaParams.put("bootstrap.servers" ,"s102:9092" ) ;
                kafkaParams.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer") ;
                kafkaParams.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer") ;
                kafkaParams.put("group.id" , "g1") ;
                kafkaParams.put("auto.offset.reset" , "latest") ;
                kafkaParams.put("enable.auto.commit" , false) ;
    
                //消费主题
                List<String> topics = new ArrayList<String>() ;
                topics.add("t1") ;
    
                //消费者策略
                ConsumerStrategy con = ConsumerStrategies.Subscribe(topics , kafkaParams) ;
                JavaDStream<ConsumerRecord<String,String>> ds1 = KafkaUtils.<String,String>createDirectStream(ssc , loc , con) ;
                JavaDStream<String> ds2 = ds1.map(new Function<ConsumerRecord<String,String>, String>() {
                    public String call(ConsumerRecord<String, String> v1) throws Exception {
                        return v1.value();
                    }
                }) ;
                ds2.print();
    
                ssc.start();
    
                ssc.awaitTermination();
    
            }
        }
    
    
    机器学习
    ----------------------
        算法。
        machine learning.
    
    
    数学基础
    ---------------
        1.kmean
            k个均值
        2.median
            中位数
                
        3.mode
            众数
        4.range
            极差 , max - min
        5.variance
            方差
            差平方和平均值
            (x1-x)^2 + (x2-x)^2 + ...
            ----------------------------
                        n
        6.standard deviation
            标准差!
            方差的平方根。
            sqrt(variance)
    
        7.skewness
            偏度。
            对称分布 : mean = median = mod
            左偏分布 : mean < median < mod
            右偏分布 : mean > median > mod
        8.kertosis
            峰度
            正态 :kertosis = 3
            较凸 :kertosis > 3
            平滑 :kertosis < 3
    
    BI
    -------------
        商业智能。
    
    
    监督和非监督
    --------------
        1.监督
            使用的数据都是打了标签的。
            垃圾邮件分类。
            神经网络
            SVM
            朴素贝叶斯
    
        2.非监督
            没有标签。
            Kmean
    
    
    
    贝叶斯
    ----------------
        A事件发生时,B事件发生的概率。
    
                    P(A|B) * P(B)
        P(B | A) = -----------------
                        P(A)
    
    
    
    TF-IDF
    ----------
        1.TF
            term frequence,词频,针对单个文档。
            word count.
            衡量单词描述该文档主题的相关性。
    
            //j : 第j篇文章
            //i : 第i个单词
                        N(ij)
            TF(ij) = --------------------
                        Sum(N(j))
                    
            
        2.IDF
            inverse document frequence,逆文档频率,针对文档集合(语料库)
            计算单词对整个文档集合的区分能力。
    
                              |D| 1000
            idf(i) = log10 -----------------------
                            出现单词i的文档个数 + 1
    
        3.TF-IDF
            tf衡量某个单词在文章的重要性。
            idf衡量单词用来区分整个语料库的重要性。
            1000
    
    
    最小二乘法
    ----------------
        平方和最小值.
    
    线性回归
    ---------------
        regress,
        呈现直线方式变换.
        回归结果是变化的值。
    
    逻辑回归
    ------------------------
        计算的结果是固定值。
        线性回归对结果进行二元判断,就是逻辑回归。
    
        
    向量
    ------------------------
        (1,2,3,4)
         1
         2
         3
         4)
    
         (0,3,8,0,0,9,0,2)
    
    
    松散向量:
    -----------
        sparse vector,
        占用内存少。
        (1000,1:3,2:8,5:9,..)
    
    
    密度向量:
    -----------
        dense vector,
        (0,1,2,0,0,5,0,6)
    
    
    
    hello world , how are you, thank you!!
    
    1/7
    you  = 2/ 7
    
    1.
    ----------
        hello tom1
    
    2.
    ---------------
        hello tom2
    3.
    ---------------
        hello tom3
                    3
    hello = ----------- = 1
                    3
    y = ax1 + bx2 + ... nx11 + C
    
    
    使用线性回归实现酒质量预测
    --------------------------
        1.引入maven依赖
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_2.11</artifactId>
                <version>2.1.0</version>
            </dependency>
        2.编程
            /**
              * 使用spark的线性回归预测红酒质量
              */
    
            import org.apache.spark.{SparkConf, SparkContext}
            import org.apache.spark.ml.regression.LinearRegression
            import org.apache.spark.ml.param.ParamMap
            import org.apache.spark.ml.linalg.{Vector, Vectors}
            import org.apache.spark.sql.{Row, SparkSession}
    
            object SparkMLLibLinearRegress {
    
    
                def main(args: Array[String]): Unit = {
    
                    val conf = new SparkConf()
                    conf.setAppName("ml_linearRegress")
                    conf.setMaster("local[*]")
    
                    val spark = SparkSession.builder().config(conf).getOrCreate()
                    //1.定义样例类
                    case class Wine(FixedAcidity: Double, VolatileAcidity: Double, CitricAcid: Double,
                                    ResidualSugar: Double, Chlorides: Double, FreeSulfurDioxide: Double,
                                    TotalSulfurDioxide: Double, Density: Double, PH: Double,
                                    Sulphates: Double, Alcohol: Double,
                                    Quality: Double)
    
                    //2.加载csv红酒文件,变换形成rdd
                    val file = "file:///D:\ml\data\red.csv" ;
                    val wineDataRDD = spark.sparkContext.textFile(file)
                        .map(_.split(";"))
                        .map(w => Wine(w(0).toDouble, w(1).toDouble, w(2).toDouble, w(3).toDouble,
                            w(4).toDouble, w(5).toDouble, w(6).toDouble, w(7).toDouble, w(8).toDouble,
                            w(9).toDouble, w(10).toDouble,
                            w(11).toDouble))
    
                    //导入sparksession的隐式转换对象的所有成员,才能将rdd转换成Dataframe
                    import spark.implicits._
                    val trainingDF = wineDataRDD.map(w => (w.Quality, Vectors.dense(w.FixedAcidity, w.VolatileAcidity,
                        w.CitricAcid, w.ResidualSugar, w.Chlorides, w.FreeSulfurDioxide, w.TotalSulfurDioxide,
                        w.Density, w.PH, w.Sulphates, w.Alcohol))).toDF("label", "features")
                    trainingDF.show(100,false)
    
                    //3.创建线性回归对象
                    val lr = new LinearRegression()        //val lr = new LinearRegression()
    
                    //4.设置回归对象参数
                    lr.setMaxIter(2)                     //lr.setMaxIter(2)
    
                    //5.拟合模型
                    val model = lr.fit(trainingDF)             //val model = lr.fit(trainingDF)
    
                    //6.构造测试数据集
                    val testDF = spark.createDataFrame(Seq((5.0, Vectors.dense(7.4, 0.7, 0.0, 1.9, 0.076, 25.0, 67.0, 0.9968, 3.2, 0.68, 9.8)),
                        (5.0, Vectors.dense(7.8, 0.88, 0.0, 2.6, 0.098, 11.0, 34.0, 0.9978, 3.51, 0.56, 9.4)),
                        (7.0, Vectors.dense(7.3, 0.65, 0.0, 1.2, 0.065, 15.0, 18.0, 0.9968, 3.36, 0.57, 9.5))))
                        .toDF("label", "features")
    
                    //7.对测试数据集注册临时表
                    testDF.createOrReplaceTempView("test")        //testDF.createOrReolaceTempView("test")
    
                    //8.使用训练的模型对测试数据进行预测,并提取查看项
                    val tested = model.transform(testDF)
                    tested.show(100,false)
    
                    //
                    val tested2 = tested.select("features", "label", "prediction")
                    tested2.show(100,false)
    
                    //9.展示预测结果
                    tested.show()
    
                    //10.通过测试数据集只抽取features,作为预测数据。
                    val predictDF = spark.sql("select features from test")
                    predictDF.show(100,false)
                    model.transform(predictDF).show(1000,false)
                }
            }
    
    
    模型持久化
    ------------------
        //保存模型
        val model = lr.fit(trainingDF)
        model.save("file:///d:/mr/model/linreg")
    
        //加载模型
        val model = LinearRegressionModel.load("file:///d:/mr/model/linreg")
    
    
    使用逻辑回归实现白酒的好坏评估
    -------------------------------
        /**
          * 逻辑回归
          */
    
        import org.apache.spark.SparkConf
        import org.apache.spark.ml.classification.LogisticRegression
        import org.apache.spark.ml.linalg.Vectors
        import org.apache.spark.ml.regression.LinearRegressionModel
        import org.apache.spark.sql.SparkSession
    
        object WineLogisticRegressDemo {
            def main(args: Array[String]): Unit = {
    
                val conf = new SparkConf()
                conf.setAppName("logisticRegress")
                conf.setMaster("local[*]")
    
                val spark = SparkSession.builder().config(conf).getOrCreate()    //val spark=SparkSession.builder().config(conf).getOrCreate()
                //1.定义样例类
                case class Wine(FixedAcidity: Double, VolatileAcidity: Double,
                                CitricAcid: Double, ResidualSugar: Double, Chlorides: Double,
                                FreeSulfurDioxide: Double, TotalSulfurDioxide: Double, Density: Double,
                                PH: Double, Sulphates: Double, Alcohol: Double, Quality: Double)
    
                //2.加载csv红酒文件,变换形成rdd
                val file = "file:///D:\ml\data\white.csv";
                val wineDataRDD = spark.sparkContext.textFile(file)         //val wineDataRDD = spark.sparkContext.textFile(file)
                    .map(_.split(";"))
                    .map(w => Wine(w(0).toDouble, w(1).toDouble, w(2).toDouble, w(3).toDouble,
                        w(4).toDouble, w(5).toDouble, w(6).toDouble, w(7).toDouble, w(8).toDouble,
                        w(9).toDouble, w(10).toDouble,
                        w(11).toDouble))
    
                //导入sparksession的隐式转换对象的所有成员,才能将rdd转换成Dataframe
                import spark.implicits._
                val trainingDF = wineDataRDD.map(w => (if (w.Quality < 7) 0D else
                    1D, Vectors.dense(w.FixedAcidity, w.VolatileAcidity, w.CitricAcid,
                    w.ResidualSugar, w.Chlorides, w.FreeSulfurDioxide, w.TotalSulfurDioxide,
                    w.Density, w.PH, w.Sulphates, w.Alcohol))).toDF("label", "features")
    
                //3.创建逻辑回归对象
                val lr = new LogisticRegression()
                //设置回归对象参数
                lr.setMaxIter(10).setRegParam(0.01)
    
                //4.拟合训练数据,生成模型
                val model = lr.fit(trainingDF)
    
                //5.构造测试数据
                val testDF = spark.createDataFrame(Seq(
                    (1.0, Vectors.dense(6.1, 0.32, 0.24, 1.5, 0.036, 43, 140, 0.9894, 3.36, 0.64, 10.7)),
                    (0.0, Vectors.dense(5.2, 0.44, 0.04, 1.4, 0.036, 38, 124, 0.9898, 3.29, 0.42, 12.4)),
                    (0.0, Vectors.dense(7.2, 0.32, 0.47, 5.1, 0.044, 19, 65, 0.9951, 3.38, 0.36, 9)),
                    (0.0, Vectors.dense(6.4, 0.595, 0.14, 5.2, 0.058, 15, 97, 0.991, 3.03, 0.41, 12.6)))
                ).toDF("label", "features")
    
                testDF.createOrReplaceTempView("test")
    
                //预测测试数据
                val tested = model.transform(testDF).select("features", "label", "prediction")
    
                //
                val realData = spark.sql("select features from test")
    
                model.transform(realData).select("features", "prediction").show(100, false)
            }
        }
    
    
    
    +-----+-----------------------------------+------------------------------------------+-----------------------------------------+
    |label|sentence                           |words                                     |rawFeatures                              |
    +-----+-----------------------------------+------------------------------------------+-----------------------------------------+
    |0.0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |(20,[0,5,9,17],[1.0,1.0,1.0,2.0])        |
    |0.0  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|(20,[2,7,9,13,15],[1.0,1.0,3.0,1.0,1.0]) |
    |1.0  |Logistic regression models are neat|[logistic, regression, models, are, neat] |(20,[4,6,13,15,18],[1.0,1.0,1.0,1.0,1.0])|
    +-----+-----------------------------------+------------------------------------------+-----------------------------------------+
    
    +-----+-----------------------------------+------------------------------------------+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+
    |label|sentence                           |words                                     |rawFeatures                              |features                                                                                                              |
    +-----+-----------------------------------+------------------------------------------+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+
    |0.0  |Hi I heard about Spark             |[hi, i, heard, about, spark]              |(20,[0,5,9,17],[1.0,1.0,1.0,2.0])        |(20,[0,5,9,17],[0.6931471805599453,0.6931471805599453,0.28768207245178085,1.3862943611198906])                        |
    |0.0  |I wish Java could use case classes |[i, wish, java, could, use, case, classes]|(20,[2,7,9,13,15],[1.0,1.0,3.0,1.0,1.0]) |(20,[2,7,9,13,15],[0.6931471805599453,0.6931471805599453,0.8630462173553426,0.28768207245178085,0.28768207245178085]) |
    |1.0  |Logistic regression models are neat|[logistic, regression, models, are, neat] |(20,[4,6,13,15,18],[1.0,1.0,1.0,1.0,1.0])|(20,[4,6,13,15,18],[0.6931471805599453,0.6931471805599453,0.28768207245178085,0.28768207245178085,0.6931471805599453])|
    +-----+-----------------------------------+------------------------------------------+-----------------------------------------+----------------------------------------------------------------------------------------------------------------------+
    
    
    def aggregate[B](z: =>B)
                    (seqop: (B, A) => B, combop: (B, B) => B): B = foldLeft(z)(seqop)
    
    1:
    ----
    comop(comop(comop(comop(seqop(seqop(seqop(zero,10),11),12) ,seqop(seqop(seqop(seqop(zero,13),14),15),16)) , 
                seqop(seqop(seqop(zero,7),8),9)) , 
                
                seqop(seqop(seqop(zero,1),2),3)) , 
                
                seqop(seqop(seqop(zero,4),5),6))
    
    RDD.treeAggregation和depth :
    ---------------------------
        rdd树形聚合,操作结果类似于aggregateByKey(zeroU)(seqop,comop ),
        计算方式也是分区内按照seqop聚合,分区间按照comop聚合,
        树形聚合可以指定深度depth,depth参数对结果没有影响,影响的是
        性能,将一次聚合的过程分成多次聚合。
    
    
    
    Spark DF
    -------------
        文档频率的计算公式:
    
    
                               D + 1
           DF =  log(e)--------------------------
                         出现的单词的文档个数 + 1
    
    parts: 10000
    
    depth : 4
    
    //规模 , 树形聚合的分区数标准,分区数大于该值,
    scale = max( pow(10000 , 1/4), 2) = 10
    
    while(10000 > 10 + ceil(10000 / 10)){
        currParts = 10000 / 10  = 1000
    }
    
    while(1000 > 10 + ceil(1000 / 10)){
        currParts = 10000 / 10  = 100
    }
    
    while(100 > 10 + ceil(100 / 10)){
        currParts = 100 / 10  = 10
    }
    
    while(100 > 10 + ceil(100 / 10)){
        currParts = 100 / 10  = 10
    }
    
    size  indices    values
    100 , [1,3,5] , [100,200,300]
           0,1,2
    
    
    df : 
    
    k = 0 
    while(k < indices.length){
        if(values(k) > 0){
            df(indices(k)) += 1L
        }
    }
    
    //所有的单词的文档频率
    df : vector
    
    doc1: hello tom1 -> (200 , [3,5] , [1,1])
    doc1: hello tom2
    doc1: hello tom3
    doc1: hello tom4
    doc1: hello tom5
            3 + 1
    hello ------------
            3 + 1
    
    
    20
    4
    
    0
        3 + 1
    log------
        0 + 1
    
    0 128:51 129:159 130:253 131:159 132:50 155:48 156:238 1
    0 130:64 131:253 132:255 133:63 157:96 158:205 159:251 1
    0 155:53 156:255 157:253 158:253 159:253 160:124 183:180
    0 128:73 129:253 130:227 131:73 132:21 156:73 157:251 15
    0 154:46 155:105 156:254 157:254 158:254 159:254 160:255
    0 152:56 153:105 154:220 155:254 156:63 178:18 179:166 1
    0 155:21 156:176 157:253 158:253 159:124 182:105 183:176
    
    0:
    
    1 100:166 101:222 102:55 128:197 129:254 130:218 131:5 1
    1 159:124 160:253 161:255 162:63 186:96 187:244 188:251 
    1 125:145 126:255 127:211 128:31 152:32 153:237 154:253 
    1 153:5 154:63 155:197 181:20 182:254 183:230 184:24 209
    1 152:1 153:168 154:242 155:28 180:10 181:228 182:254 18
    1 159:121 160:254 161:136 186:13 187:230 188:253 189:248
    1 155:178 156:255 157:105 182:6 183:188 184:253 185:216 
    1 130:7 131:176 132:254 133:224 158:51 159:253 160:253 1
    
    0 1:2 3:1 5:4
    0 0:6 2:1 4:4
    0 2:2 3:6 4:4
    --------------
      0 1 2 3 4 5 6 7
    0 6 2 3 7 8 4 0 0
    
    1 1:3 3:2 5:1
    1 1:1 3:2 5:3
      0 1 2 3 4 5 6 7
                                词频总数  features
    0=>(3 , (6 2 3 7 8 4 0 0)) = 30            8
    1=>(2 , (0 4 0 4 0 4 0 0)) = 12         8
    
    //计算标签数 : 0,1,2
    val numLabels = aggregated.length
    //计算文档总数: 每个标签数的累加和
    val numDocuments = aggregated.map(_._2._1).sum
    
    
    //标签数组
    val labelArray = new Array[Double](numLabels)
    //
    val piArray = new Array[Double](numLabels)
    val thetaArray = new Array[Double](numLabels * numFeatures)
    
    //                 log(1000 + 2 * 1) = log(1002)
    val pilogDamon = log(docs + labels * λ)
    
    val piArray    = log[(每个标签个数 + lambda) - pilogDamon]
    
    //
    docs: 5
    0    : 3 
    1    : 2
    
    pilogDamon = log(5 + 2 * 1) = log(7)
    pi(0)      = log(3 + 1) - log(7) = log(4) - log(7) = 1.386 - 1.945 = -0.559
    
    
    
    aggregateByKey[(Double, DenseVector)]
        ((0.0, Vectors.zeros(numFeatures).toDense))
        (
          seqOp = {
             case ((weightSum: Double, featureSum: DenseVector), (weight, features)) =>
               requireValues(features)
                          a       x            y       => y = 1 * x + y
               BLAS.axpy(weight, features, featureSum)
               (weightSum + weight, featureSum)
          },
          combOp = {
             case ((weightSum1, featureSum1), (weightSum2, featureSum2)) =>
               BLAS.axpy(1.0, featureSum2, featureSum1)
               (weightSum1 + weightSum2, featureSum1)
          })
    
    U        : (0.0, Vectors.zeros(numFeatures).toDense)
    seqop    :{
                 case ((weightSum: Double, featureSum: DenseVector), (weight, features)) =>
                   requireValues(features)
                   BLAS.axpy(weight, features, featureSum)
                   (weightSum + weight, featureSum)
                }    
  • 相关阅读:
    SP笔记:交叉实现七行并成一行
    HTML tag 学习
    操作哈希表
    Efficient bipedal robots based on passivedynamic walkers
    Pushing People Around
    ZEROMOMENT PONTTHIRTY FIVE YEARS OF ITS LIFE

    Active Learning for RealTime Motion Controllers
    Accelerometerbased User Interfaces for the Control of a Physically Simulated Character
    Dynamic Response for Motion Capture Animation
  • 原文地址:https://www.cnblogs.com/zyde/p/9076809.html
Copyright © 2011-2022 走看看