zoukankan      html  css  js  c++  java
  • Spark Sreaming与MLlib机器学习

    
    
    背景:机器学习的模型可以部署到spark streaming应用上,比如接入kafka数据源。
    以下为本人的初步解决思路,欢迎专业人士批评指正。
     
    import java.util
    import java.util.Properties

    import mlaas.spark.listener.utils.JSONUtil
    import mlaas.spark.main.SparkJob
    import mlaas.spark.main.utils.JsonUtils
    import kafka.javaapi.producer.Producer
    import kafka.producer.{KeyedMessage, ProducerConfig}
    import kafka.serializer.StringDecoder
    import org.apache.hadoop.conf.Configuration
    import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.catalyst.expressions.{GenericRow, GenericRowWithSchema}
    import org.apache.spark.sql.hive.HiveContext
    import org.apache.spark.sql.types._
    import org.apache.spark.sql.{Row, SQLContext}
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}

    import scala.io.Source
    import scala.xml.{Elem, XML}

    /**
    * Created by 15020218 on 2017/1/17.
    * 机器学习的模型可以部署到spark streaming应用上,比如接入kafka数据源。
    * 处理逻辑如下:
    * 1.kafka数据流对应的批数据转换为dataframe。
    * 2.模型对dataframe预测,生成新的dataframe。
    * 3.最终的dataframe通过producer输出到kafka。
    * 亮点:
    * 1.完整的机器学习实验流程可在dataframe到dataframe的转换过程中完成。
    * 2.支持checkpoint功能。
    * 3.支持流控
    *
    *初版备注:
    * 1.目前仅支持单入单出kafka数据源。
    * 2.输入(实验的配置sample.xml, 输入源对应的schema,输入数据的分隔符,输入输出kafka的topic&brokerLst)
    *
    * 提交格式如下:
    * source change_spark_version spark-2.1.0.2;export HADOOP_USER_NAME=mlp; /home/bigdata/software/spark-2.0.2.3-bin-2.4.0.10/bin/spark-submit --master yarn-cluster --driver-memory 4g --num-executors 4 --executor-memory 4g --conf spark.yarn.maxAppAttempts=1 --jars /home/mlp/mlaas/lib/spring-jms-3.1.2.RELEASE.jar,/home/mlp/mlaas/lib/json-serde.jar --class com.suning.mlaas.spark.StreamingExecutePref --jars ssmpConfigration.json,expConfigration.xml mlaas-mllib.jar
    */
    object StreamingExecutePref extends Logging {

    var reader_topic = ""
    var reader_brokerLst = ""
    var writer_topic = ""
    var writer_brokerLst = ""
    var tableName = ""
    var jsonStr = ""
    //模型拷贝临时目录使用
    var ssmpJobId = ""
    def main(args: Array[String]) {
    val argfile = Source.fromFile("ssmpConfigration.json","UTF-8") //中文乱码
    val jsonArgStr = argfile.mkString
    log.info("argfile is " + jsonArgStr)
    val jsonArg = JsonUtils.parseObject(jsonArgStr)
    val batchTime = Integer.parseInt(jsonArg.getString("batchTime"))
    ssmpJobId = jsonArg.getString("id")
    val mlData = jsonArg.getJSONArray("mlData")
    if(mlData.size() != 2){
    log.error("数据源暂时仅支持一读一写")
    System.exit(1)
    }

    val char_Sep = ","
    val dataSource1 = mlData.getJSONObject(0)
    val dataSource2 = mlData.getJSONObject(1)
    List(dataSource1,dataSource2).map(ds => {
    if("IN".equalsIgnoreCase(ds.getString("dsType"))){
    reader_topic = ds.getString("topic")
    reader_brokerLst = ds.getString("brokerList")
    tableName = ds.getString("dsName")
    log.info("reader_topic is " + reader_topic )
    log.info("reader_brokerLst is " + reader_brokerLst )

    if(tableName.indexOf(".") != -1){
    tableName = tableName.substring(tableName.indexOf(".")+1)
    }
    log.info("tableName is " + tableName )
    jsonStr = ds.getJSONArray("dsSchema").toString
    log.info("jsonStr is " + jsonStr )
    }else {
    writer_topic = ds.getString("topic")
    writer_brokerLst = ds.getString("brokerList")
    log.info("writer_topic is " + writer_topic )
    log.info("writer_brokerLst is " + writer_brokerLst )
    }
    })

    val checkPointDir = s"/user/mlp/mlaas/ssmpcheckpoint/$ssmpJobId/"
    val jobXml = XML.loadFile("expConfigration.xml")
    /**
    * 1.创建StreamingContext
    */
    // @transient
    val ssc = StreamingContext.getActiveOrCreate(checkPointDir,
    () => {
    createSsc(checkPointDir, batchTime, jobXml, char_Sep)
    }
    )
    ssc.start()
    ssc.awaitTermination()
    }

    //sep:输入分隔符,如逗号, 目前支持1读1写
    def kafkaTopic2tmptable(jobXml: Elem, sep: String, srcTableName: String, jsonStr: String, topics: String, brokerLst: String, writeKafkaTopic: String, writeKafkaBrokerLst: String, ssc: StreamingContext) = {
    val topicsSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokerLst)
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topicsSet)

    val orderedType = Map(
    "boolean" -> BooleanType,
    "byte" -> ByteType,
    "short" -> ShortType,
    "int" -> IntegerType,
    "long" -> LongType,
    "float" -> FloatType,
    "double" -> DoubleType,
    // "decimal" -> DecimalType,
    "time" -> TimestampType,
    "date" -> DateType,
    "string" -> StringType,
    "binary" -> BinaryType
    )

    val colArr = JsonUtils.parseArray(jsonStr)
    var ii = 0
    val fields = new util.ArrayList[StructField]()
    val smap = scala.collection.mutable.HashMap[Int, DataType]()
    while (ii < colArr.size()) {
    val obj = colArr.getJSONObject(ii)
    smap += (ii -> orderedType.getOrElse(obj.getString("columnType"), StringType))
    val sf = StructField(obj.getString("columnName"), orderedType.getOrElse(obj.getString("columnType"), StringType), nullable = true)
    fields.add(sf)
    ii = ii + 1
    }
    val schema = StructType(fields)
    log.info("******************************************************************************")
    log.info("数据源schema: " + schema.mkString(","))
    val rowRDD = messages.map(_._2).map(attr => {
    (attr.toString.split(sep))
    })
    log.info("******************************************************************************")
    log.info("go to foreachRDD: " + schema.mkString(","))
    rowRDD.foreachRDD(rdd => {
    // @transient
    val sqlContext = new SQLContext(rdd.sparkContext)
    // @transient
    val hiveContext = new HiveContext(rdd.sparkContext)
    if (rdd.count() > 0) {
    // println(rdd.take(1)(0).mkString(":") + " ; count is :"+ rdd.count())
    val rowRdd = rdd.map(Row(_)).map(row => row.getAs[Array[Any]](0))
    .map(row => new GenericRow(row))
    .map(genericToRow(_, schema, smap))
    .map(row => row)
    val df = sqlContext.createDataFrame(rowRdd, schema)
    df.createOrReplaceTempView(srcTableName)
    log.info("******************************************************************************")
    log.info("table count is : " + sqlContext.table(srcTableName).count())
    // println("table count is : " + sqlContext.table(srcTableName).count())
    /**
    * 3.类cbt离线任务的处理方式、处理转化后的dataframe
    * 读数据源的节点、不做处理。
    * 写数据源的节点、变更为写kafka数据流。
    */
    val saveNodeSourceTable = processExpLogic(jobXml, hiveContext)
    /**
    * 4.对最终处理结束的dataframe、写入kafka数据源。 3和4步骤,放到DStream的foreachRDD逻辑中做处理。
    */
    val targetdf = sqlContext.table(saveNodeSourceTable)
    targetdf.foreachPartition(it => writeKafka(it, sep, srcTableName, writeKafkaTopic, writeKafkaBrokerLst))
    }
    })
    }

    def getValue(value: String, dataType: DataType): Any = {
    if (IntegerType == dataType) {
    Integer.parseInt(value)
    } else if (DoubleType == dataType) {
    java.lang.Double.parseDouble(value)
    } else if (LongType == dataType) {
    java.lang.Long.parseLong(value)
    } else {
    value
    }
    }

    def genericToRow(row: GenericRow, schema: StructType, smap: scala.collection.mutable.HashMap[Int, DataType]): Row = {
    val cols: Seq[Any] = (0 to smap.size - 1).map(x => getValue(row.getAs(x).toString, smap.get(x).get))
    new GenericRowWithSchema(cols.toArray, schema)
    }

    /**
    * 除了读数据源节点和写数据源节点的操作
    *
    * @param jobXml
    * @param hiveContext
    */
    def processExpLogic(jobXml: Elem, hiveContext: HiveContext): String = {
    var saveNodeSourceTable = ""
    val list = (jobXml "job").map(n => {
    (n "@CDATA", n.text);
    n.text
    })
    val it = list.iterator
    while (it.hasNext) {
    val value = it.next().replace("\", "")
    val json = JSONUtil.toJSONString(value)
    val classStr = json.get("class").toString

    //模型和写数据源节点的逻辑跳过
    if (!classStr.contains("DealWithSave") && !classStr.contains("DealWithModel")) {
    val argsStr = json.get("params").toString
    /** *********************registerTempTable中不能是mlp.zpc 的样式的处理 begin **/
    val params = JsonUtils.parseObject(argsStr)
    val keySet = params.keySet()
    val itk = keySet.iterator()
    while (itk.hasNext) {
    val key = itk.next().toString
    if (key.startsWith("source") || key.startsWith("target_")) {
    val value = params.get(key).toString
    if (value.contains(".")) {
    val newVal = value.substring(value.indexOf(".") + 1)
    params.put(key, newVal)
    }
    }
    }
    /** *********************registerTempTable中不能是mlp.zpc 的样式的处理 end **/
    val obj: SparkJob = Class.forName(classStr).getMethod("self").invoke(null).asInstanceOf[SparkJob]
    obj.runCbt(hiveContext, params)
    } else if (classStr.contains("DealWithSave")) {
    val argsStr = json.get("params").toString
    /** *********************registerTempTable中不能是mlp.zpc 的样式的处理 begin **/
    val params = JsonUtils.parseObject(argsStr)
    val keySet = params.keySet()
    val itk = keySet.iterator()
    while (itk.hasNext) {
    val key = itk.next().toString
    if (key.startsWith("source_")) {
    val value = params.get(key).toString
    if (value.contains(".")) {
    val newVal = value.substring(value.indexOf(".") + 1)
    saveNodeSourceTable = newVal
    //params.put(key, newVal)
    }
    }
    }
    }
    else if (classStr.contains("DealWithModel")) {//模型节点执行,复制模型到执行目录
    val argsStr = json.get("params").toString
    /** *********************检查临时目录,拷贝模型文件,修改json中params的path属性begin **/
    val params = JsonUtils.parseObject(argsStr)
    val keySet = params.keySet()
    val itk = keySet.iterator()
    while (itk.hasNext) {
    val key = itk.next().toString
    if (key.startsWith("path") ) {
    val value = params.get(key).toString
    //原path形如:/user/mlp/mlaas/savedModel/transformers/experiment-2852/node-17144/Model-20170223202226
    val newPath = value.replace("savedModel", "ssmpJob/" + ssmpJobId)
    params.put(key, newPath)
    log.info("******************************************************************************")
    log.info("newPath : " + newPath + " oldPath : " + value)
    val conf = new Configuration()
    val fs = FileSystem.get(conf)
    if(!fs.exists(new Path(newPath))){
    val src = new Path(value)
    val dst = new Path(newPath)
    FileUtil.copy(fs, src, fs, dst, false, true, conf)
    }
    }
    }
    /** *********************检查临时目录,拷贝模型文件,修改json中params的path属性 end **/
    val obj: SparkJob = Class.forName(classStr).getMethod("self").invoke(null).asInstanceOf[SparkJob]
    obj.runCbt(hiveContext, params)
    }
    }
    saveNodeSourceTable
    }

    /**
    * checkPoint功能的支持,注意点:
    * 1.需要将实现的逻辑放入本方法体内。
    * 2.SQLContext的创建,需要依赖于rdd.sprkContext,
    * 不可依赖于传参过来的StreamingContext
    * 如:val sqlContext = new SQLContext(rdd.sparkContext)
    * @param checkPointDir
    * @param sec
    * @param jobXml
    * @param char_sep
    * @return
    */
    def createSsc(checkPointDir: String, sec : Int, jobXml: Elem, char_sep: String): StreamingContext = {
    // Create context with 2 second batch interval
    val sparkConf = new SparkConf().setAppName("StreamingExecutePref")
    .set("spark.streaming.kafka.maxRatePerPartition", "10000")
    val ssc = new StreamingContext(sparkConf, Seconds(sec))
    // val ssc = new StreamingContext("local[*]", "KafkaExample", Seconds(sec))
    /**
    * 2.处理数据源,将kafka数据源数据rdd映射为tmp table,以便于其他类的处理
    * 每个读写数据源包含以下元素(brokerLst、 topics、 colNameList(name,type)、 输入格式默认以逗号分隔)
    * Create direct kafka stream with brokers and topics
    */
    kafkaTopic2tmptable(jobXml, char_sep, tableName, jsonStr, reader_topic, reader_brokerLst, writer_topic, writer_brokerLst, ssc)

    ssc.checkpoint(checkPointDir)
    ssc
    }

    def writeKafka(it: Iterator[Row], sep: String, srcTableName: String, topic: String, brokerLst: String)
    = {

    val props = new Properties()
    props.put("metadata.broker.list", brokerLst)
    props.put("serializer.class", "kafka.serializer.StringEncoder")
    props.put("request.required.acks", "1")
    props.put("producer.type", "async")
    val kafkaConfig = new ProducerConfig(props)
    val producer = new Producer[String, String](kafkaConfig)
    while (it.hasNext) {
    val row = it.next()
    log.info("******************************************************************************")
    log.info("topic : " + topic + " msg : " + row.mkString(sep))
    producer.send(new KeyedMessage[String, String](topic, row.mkString(sep)))
    }
    }
    }
     



    sample.xml形如:
    <?xml version="1.0" encoding="UTF-8" standalone="no"?>
    <jobs>
        <job><![CDATA[{"class":"mlaas.spark.main.DealWithSelect","expId":3109,"libjars":"/home/mlp/mlaas/lib/spring-jms-3.1.2.RELEASE.jar,/home/mlp/mlaas/lib/json-serde.jar","nodeId":18788,"params":{"expId":3109,"nodeId":18788,"partitioncheckbox":"0","source":"mlp.adult_income","targetType":[{"targetName":"target_1","targetType":"dataset"}],"target_1":"mlp.tmp_mlp_table_3109_18788_1","user":"mlp"},"runnablejar":"/home/mlp/mlaas/mllib/mlaas-mllib.jar","user":"mlp"}]]></job>
        <job><![CDATA[{"class":"mlaas.spark.main.DealWithModel","expId":3109,"libjars":"/home/mlp/mlaas/lib/scalaz-core_2.10-7.2.2.jar,/home/mlp/mlaas/lib/spring-jms-3.1.2.RELEASE.jar","nodeId":18791,"params":{"expId":3109,"modelId":"564","nodeId":18791,"path":"/user/mlp/mlaas/savedModel/transformers/experiment-2852/node-17144/Model-20170223202226","targetType":[{"targetName":"target_1","targetType":"model"}],"target_1":"/user/mlp/mlaas/transformers/experiment-3109/node-18791/","user":"mlp"},"runnablejar":"/home/mlp/mlaas/mllib/mlaas-mllib.jar","user":"mlp"}]]></job>
        <job><![CDATA[{"class":"mlaas.spark.main.DealWithSort","expId":3109,"nodeId":18789,"params":{"expId":3109,"k":"1000","nodeId":18789,"outputs":"age","sortOrder":"asc","source_1":"mlp.tmp_mlp_table_3109_18788_1","targetType":[{"targetName":"target_1","targetType":"dataset"}],"target_1":"mlp.tmp_mlp_table_3109_18789_1","user":"mlp"},"runnablejar":"/home/mlp/mlaas/mllib/mlaas-mllib.jar","user":"mlp"}]]></job>
        <job><![CDATA[{"class":"mlaas.spark.main.DealWithMutiMinMax","expId":3109,"libjars":"/home/mlp/mlaas/lib/scalaz-core_2.10-7.2.2.jar,/home/mlp/mlaas/lib/spring-jms-3.1.2.RELEASE.jar","nodeId":18790,"params":{"expId":3109,"features":"fnlwgt,education_num,hours_per_week,capital_gain,age,capital_loss","keepOriginal":"1","nodeId":18790,"source_1":"mlp.tmp_mlp_table_3109_18789_1","targetType":[{"targetName":"target_1","targetType":"dataset"},{"targetName":"target_2","targetType":"transformer"}],"target_1":"mlp.tmp_mlp_table_3109_18790_1","target_2":"/user/mlp/mlaas/transformers/experiment-3109/node-18790/","user":"mlp"},"runnablejar":"/home/mlp/mlaas/mllib/mlaas-mllib.jar","user":"mlp"}]]></job>
        <job><![CDATA[{"class":"mlaas.spark.main.DealWithApplyRegModel","expId":3109,"libjars":"/home/mlp/mlaas/lib/scalaz-core_2.10-7.2.2.jar,/home/mlp/mlaas/lib/spring-jms-3.1.2.RELEASE.jar","nodeId":18792,"params":{"expId":3109,"nodeId":18792,"source_1":"/user/mlp/mlaas/transformers/experiment-3109/node-18791/","source_2":"mlp.tmp_mlp_table_3109_18790_1","targetColumn":"sex","targetType":[{"targetName":"target_1","targetType":"dataset"}],"target_1":"mlp.tmp_mlp_table_3109_18792_1","user":"mlp"},"runnablejar":"/home/mlp/mlaas/mllib/mlaas-mllib.jar","user":"mlp"}]]></job>
        <job><![CDATA[{"class":"mlaas.spark.main.DealWithSave","expId":3109,"libjars":"/home/mlp/mlaas/lib/spring-jms-3.1.2.RELEASE.jar","nodeId":18793,"params":{"expId":3109,"nodeId":18793,"source_1":"mlp.tmp_mlp_table_3109_18792_1","target":"mlp.zpcstreamingresult","targetType":[],"user":"mlp","writeType":"create"},"runnablejar":"/home/mlp/mlaas/mllib/mlaas-mllib.jar","user":"mlp"}]]></job>
    </jobs>
    
    

     提交命令形如:

    spark-2.0.2.3-bin-2.4.0.10/bin/spark-submit --master yarn-cluster --driver-memory 4g  --num-executors 4  --executor-memory 4g  --conf spark.yarn.maxAppAttempts=1 --jars /home/mlp/mlaas/lib/spring-jms-3.1.2.RELEASE.jar,/home/mlp/mlaas/lib/json-serde.jar --class mlaas.spark.StreamingExecutePref  --jars sample.xml mlaas-mllib.jar "{"batchTime":20,"mlData":[{"topic":"mlaas_event","brokerList":"10.27.189.238:9092,10.27.189.239:9092","dsName":"mlp.adult_income","dsSchema":[{"columnName":"age","columnType":"int","dataType":"none"},{"columnName":"workclass","columnType":"string","dataType":"none"},{"columnName":"fnlwgt","columnType":"int","dataType":"none"},{"columnName":"education","columnType":"string","dataType":"none"},{"columnName":"education_num","columnType":"int","dataType":"none"},{"columnName":"marital_status","columnType":"string","dataType":"none"},{"columnName":"occupation","columnType":"string","dataType":"none"},{"columnName":"relationship","columnType":"string","dataType":"none"},{"columnName":"race","columnType":"string","dataType":"none"},{"columnName":"sex","columnType":"string","dataType":"none"},{"columnName":"capital_gain","columnType":"int","dataType":"none"},{"columnName":"capital_loss","columnType":"int","dataType":"none"},{"columnName":"hours_per_week","columnType":"int","dataType":"none"},{"columnName":"native_country","columnType":"string","dataType":"none"},{"columnName":"income","columnType":"string","dataType":"none"}],"dsType":"IN","nodeInstanceId":18788},{"topic":"mlaas_writeTopic","brokerList":"10.27.189.238:9092,10.27.189.239:9092","dsName":"mlp.zpcstreamingresult","dsType":"OUT","nodeInstanceId":18793}]}"

    测试辅助类:

    import java.util.Properties
    
    import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
    
    import scala.io.Source
    import scala.reflect.io.Path
    /**
      * Created by zpc on 2017/2/23.
      */
    object KafkaProducer {
    
      def main(args: Array[String]): Unit = {
        val BROKER_LIST = "10.27.189.238:9092,10.27.189.239:9092"
        val TARGET_TOPIC = "mlaas_event" //"new"
        val DIR = "/root/Documents/"
    
        /**
          * 1、配置属性
          * metadata.broker.list : kafka集群的broker,只需指定2个即可
          * serializer.class : 如何序列化发送消息
          * request.required.acks : 1代表需要broker接收到消息后acknowledgment,默认是0
          * producer.type : 默认就是同步sync
          */
        val props = new Properties()
        props.put("metadata.broker.list", BROKER_LIST)
        props.put("serializer.class", "kafka.serializer.StringEncoder")
        props.put("request.required.acks", "1")
        props.put("producer.type", "async")
    
        /**
          * 2、创建Producer
          */
        val config = new ProducerConfig(props)
        val producer = new Producer[String, String](config)
        while(true){
          val line = "50,Self-emp-not-inc,83311,Bachelors,13,Married-civ-spouse,Exec-managerial,Husband,White,Male,0,0,13,United-States,<=50K"
          val message = new KeyedMessage[String, String](TARGET_TOPIC, line)
          println(line)
          producer.send(message)
          Thread.sleep(2000)
        }
    
      }
    
    }



    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    /**
    * Created by zpc on 2017/2/23.
    */
    object KafkaReader {

    def main(args: Array[String]): Unit = {
    val brokers = "10.27.189.238:9092,10.27.189.239:9092"
    // val topics = "mlaas_event"
    val topics = "mlaas_writeTopic"

    val sparkconf = new SparkConf().setAppName("kafkastreaming").setMaster("local[2]")
    val ssc = new StreamingContext(sparkconf,Seconds(20))

    // ssc.checkpoint("w_checkpoints") //windows 路径

    val topicSet = topics.split(",").toSet
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)


    //{"@timestamp":"2016-12-14T16:26:21.746Z","beat":{"hostname":"root","name":"root","version":"5.1.1"},"metricset":{"module":"system","name":"process","rtt":28025},"system":{"process":{"cmdline":""C:\WINDOWS\system32\SearchFilterHost.exe" 0 624 628 644 8192 632 ","cpu":{"start_time":"2016-12-14T16:24:15.240Z","total":{"pct":0.000000}},"memory":{"rss":{"bytes":7495680,"pct":0.000400},"share":0,"size":1806336},"name":"SearchFilterHost.exe","pgid":0,"pid":8776,"ppid":2524,"state":"running","username":"NT AUTHORITY\SYSTEM"}},"type":"metricsets"}
    val lines = KafkaUtils.createDirectStream[String, String,StringDecoder, StringDecoder](ssc,kafkaParams,topicSet)
    //val message = lines.map(_._1) map(_._1) 数据是空的 null
    val message = lines.map(_._2) //map(_._2) 才是Kafka里面打入的数据
    val words = message.flatMap(_.split(","))

    val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
    wordCounts.print()
    //message.print() checked

    ssc.start()
    ssc.awaitTermination()
    }
    }
     
     
  • 相关阅读:
    协方差
    小世界网络和无标度网络
    复杂网络谱分析
    图谱
    复杂网络基本概念
    Smarty模板引擎的使用
    ThinkPHP6使用过程中的一些总结。
    ThinkPHP6.0使用富文本编辑器wangEditor3
    ThinkPHP6.0在phpstorm添加查询构造器和模型的代码提示
    在线生成二维码API接口
  • 原文地址:https://www.cnblogs.com/drawwindows/p/6519931.html
Copyright © 2011-2022 走看看