zoukankan      html  css  js  c++  java
  • SDP(9):MongoDB-Scala

        MongoDB是一种文件型数据库,对数据格式没有硬性要求,所以可以实现灵活多变的数据存储和读取。MongoDB又是一种分布式数据库,与传统关系数据库不同的是,分布式数据库不支持table-join,所以在设计数据库表结构方面与关系数据库有很大的不同。分布式数据库有一套与传统观念不同的数据模式,在设计库表结构时必须从满足各种数据抽取的需要为主要目的。关系数据库设计要求遵循范式模式(normalization)库表结构,在抽取数据时再通过table-join联结关系表。因为分布式数据库不支持table-join,在读取跨表数据时就需要多次抽取,影响数据处理的效率。MongoDB作为文件型数据库最大的特点就是容许嵌入Document:我们可以把相关联的Document嵌入在另一个关联Document中,这样就可以一次性读取全部数据,实现反范式(denormalization)的数据模式了。这方面MongoDB比Cassandra更加优胜。MongoDB支持灵活多样的索引方式,使它成为提供高效数据读取的分布式数据库最佳选择。另外,MongoDB还通过提供sort、aggregation、map-reduce来支持丰富强大的大数据统计功能。

       在使用MongoDB前我们必须熟悉它的数据模式和设计理念:在大数据时代的今天,数据的产生和使用发生了质的变化,传统关系数据库数据模式已经无法满足现代信息系统的要求。比如,在设计个人信息表时要考虑有些人有两个地址,有些甚至没有地址,又有些有传真号,还有这个那个的其它特点等等。在关系数据库模式设计中我们必须作出取舍,牺牲一些属性。但MongoDB的文件类数据库特点容许不同的数据格式,能实现完整的数据采集与储存。下面是一个采购单的Document设计:

      val po1 = Document (
        "ponum" -> "po18012301",
        "vendor" -> "The smartphone compay",
        "podate" -> podate1,
        "remarks" -> "urgent, rush order",
        "handler" -> pic,
        "podtl" -> Seq(
          Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
          Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
        )
      )
    
      val po2 = Document (
        "ponum" -> "po18022002",
        "vendor" -> "The Samsung compay",
        "podate" -> podate2,
        "podtl" -> Seq(
          Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"),
          Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"),
          Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury")
        )
      )

    po1和po2都在podtl键嵌入了多条采购项目Document。首先,po1与po2有结构上的不同:po1多出了remarks、handler这两个键。嵌入的Document各自也有不同的结构。在这个例子里我特别加了date、binary、array类型的使用示范:

      val ca = Calendar.getInstance()
      ca.set(2011,10,23)
      val podate1 = ca.getTime
      ca.set(2012,12,23)
      val podate2 = ca.getTime
    
      val pic = FileToByteArray("/users/tiger-macpro/sample.png",3 seconds)

    MongoDB的Date是java.util.Date,可以用Calendar来操作。再看看下面类型转换中的数据类型对应: 

      case class PO (
                     ponum: String,
                     podate: java.util.Date,
                     vendor: String,
                     remarks: Option[String],
                     podtl: Option[BsonArray],
                     handler: Option[BsonBinary]
                     )
      def toPO(doc: Document): PO = {
          val ks = doc.keySet
          PO(
            ponum = doc.getString("ponum"),
            podate = doc.getDate("podate"),
            vendor = doc.getString("vendor"),
            remarks = {
              if (ks.contains("remarks"))
                Some(doc.getString("remarks"))
              else
                None
            },
            podtl = {
              if (ks.contains("podtl"))
                doc.get("podtl").asInstanceOf[Option[BsonArray]]
              else
                None
            },
            handler = {
              if (ks.contains("handler"))
                doc.get("handler").asInstanceOf[Option[BsonBinary]]
              else
                None
            }
          )
        }
    
       case class PODTL(
                       item: String,
                       price: Double,
                       qty: Int,
                       packing: Option[String],
                       payTerm: Option[String]
                       )
       def toPODTL(podtl: Document): PODTL = {
         val ks = podtl.keySet
         PODTL(
           item = podtl.getString("item"),
           price = podtl.getDouble("price"),
           qty = podtl.getInteger("qty"),
           packing = {
             if (ks.contains("packing"))
               Some(podtl.getString("packing"))
             else None
           },
           payTerm = {
             if(ks.contains("payterm"))
               Some(podtl.getString("payterm"))
             else None
           }
         )
       }

    注意BsonBinary和BsonArray这两个类型和它们的使用方法。我们可以用嵌入Document的键作为查询条件:

       poCollection.find(equal("podtl.qty",100)).toFuture().onComplete {
        case Success(docs) => docs.map(toPO).foreach (showPO)
          println("-------------------------------")
        case Failure(e) => println(e.getMessage)
      }

    我们可以用toPO和toPODTL把po,podtl对应到case class,然后用强类型方式来使用它们:

       def showPO(po: PO) = {
         println(s"po number: ${po.ponum}")
         println(s"po date: ${po.podate.toString}")
         println(s"vendor: ${po.vendor}")
         if (po.remarks != None)
           println(s"remarks: ${po.remarks.get}")
         po.podtl match {
           case Some(barr) =>
             val docs = barr.getValues.asScala.toList
             docs.map { dc =>
               toPODTL(dc.asInstanceOf[org.bson.BsonDocument])
             }.foreach { doc: PODTL =>
                 print(s"==>Item: ${doc.item} ")
                 print(s"price: ${doc.price} ")
                 print(s"qty: ${doc.qty} ")
                 doc.packing.foreach(pk => print(s"packing: ${pk} "))
                 doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
                 println("")
               }
           case _ =>
         }
    
         po.handler match {
           case Some(bs) =>
             val fileName = s"/users/tiger-macpro/${po.ponum}.png"
             ByteArrayToFile(bs.getData,fileName)
             println(s"picture saved to ${fileName}")
           case None => println("no picture provided")
         }
       }
       poCollection.find(equal("podtl.qty",100)).toFuture().onComplete {
         case Success(docs) => docs.map(toPO).foreach (showPO)
           println("------------------------------")
         case Failure(e) => println(e.getMessage)
       }
       poCollection.find().toFuture().onComplete {
        case Success(docs) => docs.map(toPO).foreach (showPO)
          println("-------------------------------")
        case Failure(e) => println(e.getMessage)
      }

    试运行显示结果如下:

    po number: po18022002
    po date: Wed Jan 23 11:57:50 HKT 2013
    vendor: The Samsung compay
    ==>Item: samsung galaxy s8 price: 2300.0 qty: 100 packing: standard 
    ==>Item: samsung galaxy s7 price: 1897.0 qty: 1000 payTerm: 30 days 
    ==>Item: apple iphone7 price: 6500.0 qty: 100 packing: luxury 
    no picture provided
    -------------------------------
    po number: po18012301
    po date: Wed Nov 23 11:57:50 HKT 2011
    vendor: The smartphone compay
    remarks: urgent, rush order
    ==>Item: sony smartphone price: 2389.0 qty: 1239 packing: standard 
    ==>Item: ericson smartphone price: 897.0 qty: 1000 payTerm: 30 days 
    picture saved to /users/tiger-macpro/po18012301.png
    po number: po18022002
    po date: Wed Jan 23 11:57:50 HKT 2013
    vendor: The Samsung compay
    ==>Item: samsung galaxy s8 price: 2300.0 qty: 100 packing: standard 
    ==>Item: samsung galaxy s7 price: 1897.0 qty: 1000 payTerm: 30 days 
    ==>Item: apple iphone7 price: 6500.0 qty: 100 packing: luxury 
    no picture provided
    ------------------------------

    下面是本次示范的源代码:

    build.sbt

    name := "learn-mongo"
    
    version := "0.1"
    
    scalaVersion := "2.12.4"
    
    libraryDependencies := Seq(
      "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1",
      "com.lightbend.akka" %% "akka-stream-alpakka-mongodb" % "0.17"
    )

    FileStreaming.scala

    import java.nio.file.Paths
    
    import akka.stream.{Materializer}
    import akka.stream.scaladsl.{FileIO, StreamConverters}
    
    import scala.concurrent.{Await}
    import akka.util._
    import scala.concurrent.duration._
    
    object FileStreaming {
      def FileToByteBuffer(fileName: String, timeOut: FiniteDuration)(
        implicit mat: Materializer):ByteBuffer = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toByteBuffer
      }
    
      def FileToByteArray(fileName: String, timeOut: FiniteDuration)(
        implicit mat: Materializer): Array[Byte] = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toArray
      }
    
      def FileToInputStream(fileName: String, timeOut: FiniteDuration)(
        implicit mat: Materializer): InputStream = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        val buf = (Await.result(fut, timeOut)).toArray
        new ByteArrayInputStream(buf)
      }
    
      def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
        implicit mat: Materializer) = {
        val ba = new Array[Byte](byteBuf.remaining())
        byteBuf.get(ba,0,ba.length)
        val baInput = new ByteArrayInputStream(ba)
        val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
        implicit mat: Materializer) = {
        val bb = ByteBuffer.wrap(bytes)
        val baInput = new ByteArrayInputStream(bytes)
        val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def InputStreamToFile(is: InputStream, fileName: String)(
        implicit mat: Materializer) = {
        val source = StreamConverters.fromInputStream(() => is)
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    }

    MongoScala103.scala

    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import java.util.Calendar
    
    import org.bson.BsonBinary
    
    import scala.util._
    import FileStreaming._
    
    import scala.concurrent.duration._
    import org.mongodb.scala._
    import org.mongodb.scala.bson.{BsonArray, BsonDocument}
    
    import scala.collection.JavaConverters._
    
    import org.mongodb.scala.connection.ClusterSettings
    import org.mongodb.scala.model.Filters._
    object MongoScala103 extends App {
      import Helpers._
    
      val clusterSettings = ClusterSettings.builder()
        .hosts(List(new ServerAddress("localhost:27017")).asJava).build()
      val clientSettings = MongoClientSettings.builder().clusterSettings(clusterSettings).build()
      val client = MongoClient(clientSettings)
    
      implicit val system = ActorSystem()
      implicit val mat = ActorMaterializer()
      implicit val ec = system.dispatcher
    
    
      val db: MongoDatabase = client.getDatabase("testdb")
      val poOrgCollection: MongoCollection[Document] = db.getCollection("po")
      poOrgCollection.drop.headResult()
      val poCollection: MongoCollection[Document] = db.getCollection("po")
    
    
      val ca = Calendar.getInstance()
      ca.set(2011,10,23)
      val podate1 = ca.getTime
      ca.set(2012,12,23)
      val podate2 = ca.getTime
    
      val pic = FileToByteArray("/users/tiger-macpro/sample.png",3 seconds)
    
      val po1 = Document (
        "ponum" -> "po18012301",
        "vendor" -> "The smartphone compay",
        "podate" -> podate1,
        "remarks" -> "urgent, rush order",
        "handler" -> pic,
        "podtl" -> Seq(
          Document("item" -> "sony smartphone", "price" -> 2389.00, "qty" -> 1239, "packing" -> "standard"),
          Document("item" -> "ericson smartphone", "price" -> 897.00, "qty" -> 1000, "payterm" -> "30 days")
        )
      )
    
      val po2 = Document (
        "ponum" -> "po18022002",
        "vendor" -> "The Samsung compay",
        "podate" -> podate2,
        "podtl" -> Seq(
          Document("item" -> "samsung galaxy s8", "price" -> 2300.00, "qty" -> 100, "packing" -> "standard"),
          Document("item" -> "samsung galaxy s7", "price" -> 1897.00, "qty" -> 1000, "payterm" -> "30 days"),
          Document("item" -> "apple iphone7", "price" -> 6500.00, "qty" -> 100, "packing" -> "luxury")
        )
      )
    
    
      poCollection.insertMany(Seq(po1,po2)).headResult()
    
      case class PO (
                     ponum: String,
                     podate: java.util.Date,
                     vendor: String,
                     remarks: Option[String],
                     podtl: Option[BsonArray],
                     handler: Option[BsonBinary]
                     )
      def toPO(doc: Document): PO = {
          val ks = doc.keySet
          PO(
            ponum = doc.getString("ponum"),
            podate = doc.getDate("podate"),
            vendor = doc.getString("vendor"),
            remarks = {
              if (ks.contains("remarks"))
                Some(doc.getString("remarks"))
              else
                None
            },
            podtl = {
              if (ks.contains("podtl"))
                doc.get("podtl").asInstanceOf[Option[BsonArray]]
              else
                None
            },
            handler = {
              if (ks.contains("handler"))
                doc.get("handler").asInstanceOf[Option[BsonBinary]]
              else
                None
            }
          )
        }
    
       case class PODTL(
                       item: String,
                       price: Double,
                       qty: Int,
                       packing: Option[String],
                       payTerm: Option[String]
                       )
       def toPODTL(podtl: Document): PODTL = {
         val ks = podtl.keySet
         PODTL(
           item = podtl.getString("item"),
           price = podtl.getDouble("price"),
           qty = podtl.getInteger("qty"),
           packing = {
             if (ks.contains("packing"))
               Some(podtl.getString("packing"))
             else None
           },
           payTerm = {
             if(ks.contains("payterm"))
               Some(podtl.getString("payterm"))
             else None
           }
         )
       }
    
       def showPO(po: PO) = {
         println(s"po number: ${po.ponum}")
         println(s"po date: ${po.podate.toString}")
         println(s"vendor: ${po.vendor}")
         if (po.remarks != None)
           println(s"remarks: ${po.remarks.get}")
         po.podtl match {
           case Some(barr) =>
             val docs = barr.getValues.asScala.toList
             docs.map { dc =>
               toPODTL(dc.asInstanceOf[org.bson.BsonDocument])
             }.foreach { doc: PODTL =>
                 print(s"==>Item: ${doc.item} ")
                 print(s"price: ${doc.price} ")
                 print(s"qty: ${doc.qty} ")
                 doc.packing.foreach(pk => print(s"packing: ${pk} "))
                 doc.payTerm.foreach(pt => print(s"payTerm: ${pt} "))
                 println("")
               }
           case _ =>
         }
    
         po.handler match {
           case Some(bs) =>
             val fileName = s"/users/tiger-macpro/${po.ponum}.png"
             ByteArrayToFile(bs.getData,fileName)
             println(s"picture saved to ${fileName}")
           case None => println("no picture provided")
         }
    
       }
    
       poCollection.find().toFuture().onComplete {
         case Success(docs) => docs.map(toPO).foreach (showPO)
           println("------------------------------")
         case Failure(e) => println(e.getMessage)
       }
    
    
       poCollection.find(equal("podtl.qty",100)).toFuture().onComplete {
        case Success(docs) => docs.map(toPO).foreach (showPO)
          println("-------------------------------")
        case Failure(e) => println(e.getMessage)
      }
    
    
      scala.io.StdIn.readLine()
      system.terminate()
    
    }
  • 相关阅读:
    Qt QPainter::end: Painter ended whith 2 saced states
    2月6日学习内容
    2月5日学习总结
    2月4日所学内容
    2月3日学习内容
    2月2日学习收获
    2月1日学习内容
    构建之法读后感(一)
    11月从小工到专家读后感(二)
    11月从小工到专家的读后感(一)
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/8513685.html
Copyright © 2011-2022 走看看