zoukankan      html  css  js  c++  java
  • 机器学习-微博精准营销

    思路

    • 研究特征
    • 所有的微博变成词袋
    • 一条微博形成一条向量,数量代表出现的次数
    • 使用TF-IDF计算词的重要性
    • 选取主要的重要的词进行KMeans聚类
    • 筛选出的N个次就能很好的代表当前类的主旨思想,可以给与到网络营销部分

    代码

    package com.test
    
    
    import scala.collection.mutable.ArrayBuffer
    import scala.collection.mutable.ListBuffer
    
    import org.apache.lucene.analysis.TokenStream
    import org.apache.lucene.analysis.tokenattributes.CharTermAttribute
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    import org.apache.spark.mllib.clustering.KMeans
    import org.apache.spark.mllib.feature.HashingTF
    import org.apache.spark.mllib.feature.IDF
    import org.apache.spark.mllib.feature.IDFModel
    import org.apache.spark.rdd.RDD
    import org.wltea.analyzer.lucene.IKAnalyzer
    import org.apache.spark.mllib.linalg.Vector
    import org.apache.spark.mllib.clustering.KMeansModel
    
    
    object Test02 {
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setAppName("BlogMeans").setMaster("local[*]")
        val sc = new SparkContext(conf)
        val rdd = sc.textFile("data/original.txt")
    
        var wordRDD: RDD[(String, ArrayBuffer[String])] = rdd.mapPartitions(iterator => {
          val list = new ListBuffer[(String, ArrayBuffer[String])]
          while (iterator.hasNext) {
            //创建分词对象   IKAnalyzer支持两种分词模式:最细粒度和智能分词模式,如果构造函数参数为false,那么使用最细粒度分词。
            val analyzer = new IKAnalyzer(true)
            val line = iterator.next()
            val textArr = line.split("	")
            val id = textArr(0)
            val text = textArr(1)
            //分词     第一个参数只是标识性,没有实际作用,第二个读取的数据
            val ts: TokenStream = analyzer.tokenStream("", text)
            // 得到相应词汇的内容
            val term: CharTermAttribute = ts.getAttribute(classOf[CharTermAttribute])
            //重置分词器,使得tokenstream可以重新返回各个分词
            ts.reset()
            val arr = new ArrayBuffer[String]()
            //遍历分词数据
            while (ts.incrementToken()) {
              arr.+=(term.toString)
            }
            list.append((id, arr))
            analyzer.close()
          }
          list.iterator
        })
    
        /**
         * action算子之后,才会将wordRDD 写入到内存中
         * 相当于 persist()
         */
        wordRDD = wordRDD.cache()
    
        /**
         * 1000:生成长度为1000的向量
         * 带来新的问题: hash碰撞的问题,(效率和碰撞问题取中间)
         * * 为了提高训练模型的效率,向量长度(特征个数)设置为1000个
         * * 向量不同的位置的值与每篇微博的单词并不是一一对应的
         */
        val hashingTF: HashingTF = new HashingTF(1000)
    
    
        /**
         * hashingTF.transform(x._2)
         * 按照hashingTF规则 计算分词频数(TF)
         */
        val tfRDD: RDD[(String, Vector)] = wordRDD.map(x => {
          (x._1, hashingTF.transform(x._2))
        })
    
    
        /**
         *
         * * tfRDD
         * * K:微博ID
         * * V:Vector(tf,tf,tf.....)
         */
        val idf: IDFModel = new IDF().fit(tfRDD.map(_._2))
    
        /**
         * K:微博 ID
         * V:每一个单词的TF-IDF值
         * tfIdfs这个RDD中的Vector就是训练模型的训练集
         * dfIdfs: 计算TFIDF值
         */
        val tfIdfs: RDD[(String, Vector)] = tfRDD.mapValues(idf.transform(_))
    
        //设置聚类个数
        val kCluster = 20
        val kMeans = new KMeans()
        kMeans.setK(kCluster)
    
        //使用的是kemans++算法来训练模型  "random" "k-means||" 代表  kmeans kmeans++
        kMeans.setInitializationMode("k-means||")
    
        // 设置最大迭代次数,本次中心的坐标
        kMeans.setMaxIterations(1000)
        //训练模型   聚类的模型(k个中心点的坐标)      线性回归(w0,w1)
        val kmeansModel: KMeansModel = kMeans.run(tfIdfs.map(_._2))
        // 打印模型的20个中心点
        println(kmeansModel.clusterCenters)
    
        /**
         * 模型预测
         */
        val modelBroadcast = sc.broadcast(kmeansModel)
    
        /**
         * predicetionRDD KV格式的RDD
         * K:微博ID V:分类号
         */
        val predicetionRDD: RDD[(String, Int)] = tfIdfs.mapValues(vetor => {
          val model = modelBroadcast.value
          model.predict(vetor)
        })
    
        /**
         * 总结预测结果
         * tfIdfs2wordsRDD:kv格式的RDD
         * K:微博ID
         * V:二元组(Vector(tfidf1,tfidf2....),ArrayBuffer(word,word,word....))
         */
        val tfIdfs2wordsRDD: RDD[(String, (Vector, ArrayBuffer[String]))] = tfIdfs.join(wordRDD)
    
        /**
         * result:KV
         * K:微博ID
         * V:(类别号,(Vector(tfidf1,tfidf2....),ArrayBuffer(word,word,word....)))
         */
        val result: RDD[(String, (Int, (Vector, ArrayBuffer[String])))] = predicetionRDD.join(tfIdfs2wordsRDD)
    
    
        result.filter(x => x._2._1 == 2).flatMap(line => {
          val tfIdfV: Vector = line._2._2._1
          val words: ArrayBuffer[String] = line._2._2._2
          val list = new ListBuffer[(Double, String)]
          for (i <- 0 until words.length) {
            list.append((tfIdfV(hashingTF.indexOf(words(i))), words(i)))
          }
          list
        }).sortBy(x => x._1, false)
          .map(_._2)
          .distinct()
          .take(30)
          .foreach(println)
        sc.stop()
    
    
      }
    
    }

    pom

            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_2.11</artifactId>
                <version>2.3.4</version>
            </dependency>
            <dependency>
                <groupId>com.janeluo</groupId>
                <artifactId>ikanalyzer</artifactId>
                <version>2012_u6</version>
            </dependency>
  • 相关阅读:
    Java快速教程
    让我们来了解一下:操作系统和平台相关性
    初窥Linux 之 我最常用的20条命令
    ES6学习笔记一
    Data时间管理大全
    generator多返回值写法
    箭头函数=>
    闭包
    高阶函数:map/reduce
    函数方法that与apply
  • 原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/14643500.html
Copyright © 2011-2022 走看看