zoukankan      html  css  js  c++  java
  • SDP(6):分布式数据库运算环境- Cassandra-Engine

        现代信息系统应该是避不开大数据处理的。作为一个通用的系统集成工具也必须具备大数据存储和读取能力。cassandra是一种分布式的数据库,具备了分布式数据库高可用性(high-availability)特性,对于一个实时大型分布式集成系统来说是核心支柱。与传统的关系数据库对比,cassandra从数据存储结构、读取方式等可以说是皆然不同的。如:cassandra库表设计是反范式的(denormalized)、表结构设计是反过来根据query要求设计的,等等。幸运的是自版本3.0后cassandra提供了CQL来支持数据库操作。简单来说CQL就是cassandra的SQL。CQL是一种query语言,在语法上与SQL相近。最重要的是CQL用SQL的呈现方式来描述cassandra底层数据的存储方式,让熟悉了关系数据库SQL编程人员能够容易开始使用cassandra。与SQL一样,CQL也是一种纯文本语言,可以通过多种终端接口软件包括java-client来运行CQL脚本。 目前在市面上有一些现成的cassandra客户端编程软件,有些为了实现类型安全(type-safety)还提供了Linq-DSL(language-integrated-query),但因为我们需要面向各种cassandra数据库用户,所以还是决定提供一种CQL脚本运算环境,也就是说Cassandra-Engine接受CQL脚本然后运算得出结果。

    和JDBC的运算结构很相似:CQL运算也是先构建statement然后execute。与JDBC不同的是:CQL还提供non-blocking脚本运算: 

       /**
         * Executes the provided query asynchronously.
         * <p/>
         * This method does not block. It returns as soon as the query has been
         * passed to the underlying network stack. In particular, returning from
         * this method does not guarantee that the query is valid or has even been
         * submitted to a live node. Any exception pertaining to the failure of the
         * query will be thrown when accessing the {@link ResultSetFuture}.
         * <p/>
         * Note that for queries that don't return a result (INSERT, UPDATE and
         * DELETE), you will need to access the ResultSetFuture (that is, call one of
         * its {@code get} methods to make sure the query was successful.
         *
         * @param statement the CQL query to execute (that can be any {@code Statement}).
         * @return a future on the result of the query.
         * @throws UnsupportedFeatureException if the protocol version 1 is in use and
         *                                     a feature not supported has been used. Features that are not supported by
         *                                     the version protocol 1 include: BatchStatement, ResultSet paging and binary
         *                                     values in RegularStatement.
         */
        ResultSetFuture executeAsync(Statement statement);

    executeAsync返回结果ResultSsetFuture是个google-guava-future。我们可以用隐式转换(implicit conversion)把它转换成scala-future来使用: 

     implicit def listenableFutureToFuture[T](
                   listenableFuture: ListenableFuture[T]): Future[T] = {
        val promise = Promise[T]()
        Futures.addCallback(listenableFuture, new FutureCallback[T] {
          def onFailure(error: Throwable): Unit = {
            promise.failure(error)
            ()
          }
          def onSuccess(result: T): Unit = {
            promise.success(result)
            ()
          }
        })
        promise.future
      }

    有了这个隐式实例executeAsync返回结果自动转成Future[?],如下:

      def cqlSingleUpdate(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    ...
          session.executeAsync(boundStmt).map(_.wasApplied())
      }

    我们还是通过某种Context方式来构建完整可执行的statement:

    case class CQLContext(
                           statements: Seq[String],
                           parameters: Seq[Seq[Object]] = Nil,
                           consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None
                         ) { ctx =>
      def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
        ctx.copy(consistency = Some(_consistency))
      def setCommand(_statement: String, _parameters: Object*): CQLContext =
        ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
      def appendCommand(_statement: String, _parameters: Object*): CQLContext =
        ctx.copy(statements = ctx.statements :+ _statement,
          parameters = ctx.parameters ++ Seq(_parameters))
    }

    与JDBCContext不同的是这个consistencyLevel。因为数据是重复分布在多个集群节点上的,所以需要通过consistencyLevel来注明分布式数据的读写方式:

      def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
        consistency match {
          case ALL => ConsistencyLevel.ALL
          case ONE => ConsistencyLevel.ONE
          case TWO => ConsistencyLevel.TWO
          case THREE => ConsistencyLevel.THREE
          case ANY => ConsistencyLevel.ANY
          case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
          case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
          case QUORUM => ConsistencyLevel.QUORUM
          case SERIAL => ConsistencyLevel.SERIAL
          case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL
        }
      }

    CQL statement 分simplestatement, preparedstatement和boundstatement。boundstatement可以覆盖所有类型的CQL statement构建要求。下面是一个构建boundstatement的例子:

       val prepStmt = session.prepare(ctx.statement)
    
        var boundStmt =  prepStmt.bind()
        if (ctx.parameter != Nil) {
          val params = processParameters(ctx.parameter)
          boundStmt = prepStmt.bind(params:_*)
        }

    CQL statement参数类型比较复杂,包括date,timestamp等都必须经过processParameters函数进行预处理:

      case class CQLDate(year: Int, month: Int, day: Int)
      case object CQLTodayDate
      case class CQLDateTime(year: Int, Month: Int,
                             day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0)
      case object CQLDateTimeNow
    
      def processParameters(params: Seq[Object]): Seq[Object] = {
        params.map { obj =>
          obj match {
            case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
            case CQLTodayDate =>
              val today = java.time.LocalDate.now()
              LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
            case CQLDateTimeNow => Instant.now()
            case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
              Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
            case p@_ => p
          }
        }
      }

    CassandraEngine更新运算分为单条update和批次update。批次update与事物处理有异曲同工之效:批次中任何一条脚本运算失败则回滚所有更新:

     def cqlExecute(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
        if (ctx.statements.size == 1)
          cqlSingleUpdate(ctx)
        else
          cqlMultiUpdate(ctx)
      }
      def cqlSingleUpdate(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    
          val prepStmt = session.prepare(ctx.statements.head)
    
          var boundStmt =  prepStmt.bind()
          if (ctx.statements != Nil) {
            val params = processParameters(ctx.parameters.head)
            boundStmt = prepStmt.bind(params:_*)
          }
    
        ctx.consistency.foreach {consistency =>
          boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
          session.executeAsync(boundStmt).map(_.wasApplied())
      }
    
      def cqlMultiUpdate(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
        val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters
        var batch = new BatchStatement()
        commands.foreach { case (stm, params) =>
          val prepStmt = session.prepare(stm)
          if (params == Nil)
            batch.add(prepStmt.bind())
          else {
            val p = processParameters(params)
            batch.add(prepStmt.bind(p: _*))
          }
        }
    
        ctx.consistency.foreach {consistency =>
          batch.setConsistencyLevel(consistencyLevel(consistency))}
        session.executeAsync(batch).map(_.wasApplied())
      }

    CassandraEngine update返回运算状态Future[Boolean]。下面是一段update示范:

      val createCQL ="""
      CREATE TABLE testdb.members (
        id UUID primary key,
        name TEXT,
        description TEXT,
        birthday DATE,
        created_at TIMESTAMP,
        picture BLOB
      )"""
    
      val ctxCreate = CQLContext().setCommand(createCQL)
    
      val ctxInsert = CQLContext().setCommand("insert into testdb.members(id,name,description,birthday,created_at,picture)" +
        " values(uuid(),?,?,?,?,?)", "alan xu", "alan-xu", CQLDate(1966, 11, 27), CQLDateTimeNow, cqlFileToBytes("/users/tiger/Nobody.png"))
      
      val createData = for {
        createTable <- cqlExecute(ctxCreate)
        insertData <- cqlExecute(ctxInsert)
      } yield(createTable, insertData)
    
      createData.onComplete {
        case Success((c,i)) => println(s"Create Table: $c, Insert Data $i")
        case Failure(e) => println(e.getMessage)
      }

    在上面的例子里我们用for-comprehension实现了连续运算。注意在这个例子里已经包括了date,datetime,blob等输入参数类型。

    fetch-query的statement构建信息如下:

    case class CQLQueryContext[M](
                           statement: String,
                           parameter: Seq[Object] = Nil,
                           consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
                           extractor: Row => M
                         )

    fetch-query运算也是用execute方式实现的:

        /**
         * Executes the provided query.
         * <p/>
         * This method blocks until at least some result has been received from the
         * database. However, for SELECT queries, it does not guarantee that the
         * result has been received in full. But it does guarantee that some
         * response has been received from the database, and in particular
         * guarantees that if the request is invalid, an exception will be thrown
         * by this method.
         *
         * @param statement the CQL query to execute (that can be any {@link Statement}).
         * @return the result of the query. That result will never be null but can
         * be empty (and will be for any non SELECT query).
         * @throws NoHostAvailableException    if no host in the cluster can be
         *                                     contacted successfully to execute this query.
         * @throws QueryExecutionException     if the query triggered an execution
         *                                     exception, i.e. an exception thrown by Cassandra when it cannot execute
         *                                     the query with the requested consistency level successfully.
         * @throws QueryValidationException    if the query if invalid (syntax error,
         *                                     unauthorized or any other validation problem).
         * @throws UnsupportedFeatureException if the protocol version 1 is in use and
         *                                     a feature not supported has been used. Features that are not supported by
         *                                     the version protocol 1 include: BatchStatement, ResultSet paging and binary
         *                                     values in RegularStatement.
         */
        ResultSet execute(Statement statement);

    返回结果ResultSet经过转换后成为scala collection:

      def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)(
        implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= {
    
        val prepStmt = session.prepare(ctx.statement)
    
        var boundStmt =  prepStmt.bind()
        if (ctx.parameter != Nil) {
          val params = processParameters(ctx.parameter)
          boundStmt = prepStmt.bind(params:_*)
        }
    
        ctx.consistency.foreach {consistency =>
          boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
    
        val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
        (resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C])
    
      }

    fetchResultPage是分页读取的,可以用fetchMoreResults持续读取:

        /**
         * Force fetching the next page of results for this result set, if any.
         * <p/>
         * This method is entirely optional. It will be called automatically while
         * the result set is consumed (through {@link #one}, {@link #all} or iteration)
         * when needed (i.e. when {@code getAvailableWithoutFetching() == 0} and
         * {@code isFullyFetched() == false}).
         * <p/>
         * You can however call this method manually to force the fetching of the
         * next page of results. This can allow to prefetch results before they are
         * strictly needed. For instance, if you want to prefetch the next page of
         * results as soon as there is less than 100 rows readily available in this
         * result set, you can do:
         * <pre>
         *   ResultSet rs = session.execute(...);
         *   Iterator<Row> iter = rs.iterator();
         *   while (iter.hasNext()) {
         *       if (rs.getAvailableWithoutFetching() == 100 && !rs.isFullyFetched())
         *           rs.fetchMoreResults();
         *       Row row = iter.next()
         *       ... process the row ...
         *   }
         * </pre>
         * This method is not blocking, so in the example above, the call to {@code
         * fetchMoreResults} will not block the processing of the 100 currently available
         * rows (but {@code iter.hasNext()} will block once those rows have been processed
         * until the fetch query returns, if it hasn't yet).
         * <p/>
         * Only one page of results (for a given result set) can be
         * fetched at any given time. If this method is called twice and the query
         * triggered by the first call has not returned yet when the second one is
         * performed, then the 2nd call will simply return a future on the currently
         * in progress query.
         *
         * @return a future on the completion of fetching the next page of results.
         * If the result set is already fully retrieved ({@code isFullyFetched() == true}),
         * then the returned future will return immediately but not particular error will be
         * thrown (you should thus call {@link #isFullyFetched()} to know if calling this
         * method can be of any use}).
         */
        ListenableFuture<S> fetchMoreResults();

    下面是分页持续读取的实现:

      def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
           extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
        if (resultSet.isFullyFetched) {
          (resultSet, None)
        } else {
          try {
            val result = Await.result(resultSet.fetchMoreResults(), timeOut)
            (result, Some((result.asScala.view.map(extractor)).to[C]))
          } catch { case e: Throwable => (resultSet, None) }
        }

    我们用这两个函数来读取上面用cqlInsert脚本加入cassandra的数据:

      //data model
      case class Member(
                         id: String,
                         name: String,
                         description: Option[String] = None,
                         birthday: LocalDate,
                         createdAt: java.util.Date,
                         picture: Option[ByteBuffer] = None)
    
      //data row converter
      val toMember = (rs: Row) => Member(
        id = rs.getUUID("id").toString,
        name = rs.getString("name"),
        description = {
          val d = rs.getString("description")
          if (d == null)
            None
          else
            Some(d)
    
          Some(rs.getColumnDefinitions.toString)
        },
        birthday = rs.getDate("birthday"),
        createdAt = rs.getTimestamp("created_at"),
        picture = {
          val pic = rs.getBytes("picture")
          if (pic == null)
            None
          else
            Some(pic)
    
        }
      )
    
     try {
       val qtx = CQLQueryContext(statement = "select * from testdb.members", extractor = toMember)
       val (resultSet, vecResults) = fetchResultPage[Vector, Member](qtx)
    
       var vecMembers: Vector[Member] = vecResults
    
       var isExh = resultSet.isExhausted
       var nextPage: (ResultSet, Option[Vector[Member]]) = (resultSet, Some(vecResults))
       while (!isExh) {
         nextPage = fetchMorePages[Vector,Member](nextPage._1,1 second)(toMember)
         nextPage._2.foreach {vec =>
           vecMembers = vecMembers ++ vec
         }
         isExh = resultSet.isExhausted
       }
       vecMembers.foreach { m =>
         println(s"id: ${m.id}-name:${m.name}-${m.description} birthday: ${m.birthday.toString}")
         println(s"created_at: ${cqlDateTimeString(m.createdAt,"yyyy-MM-dd HH:mm:ss.SSS")}")
         m.picture match {
           case Some(buf) =>
             val fname = s"/users/tiger/pic-${m.name}.png"
             cqlBytesToFile(buf,fname)
             println(s"saving picture to $fname")
           case _ => println("empty picture!")
         }
       }
     } catch {
       case e: Exception => println(e.getMessage)
     }

    在上面的示范里我们还引用了一些helper函数:

     def cqlFileToBytes(fileName: String): ByteBuffer = {
        val fis = new FileInputStream(fileName)
        val b = new Array[Byte](fis.available + 1)
        val length = b.length
        fis.read(b)
        ByteBuffer.wrap(b)
      }
    
    
      def cqlBytesToFile(bytes: ByteBuffer, fileName: String)(
            implicit mat: Materializer): Future[IOResult] = {
        val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def cqlDateTimeString(date: java.util.Date, fmt: String): String = {
        val outputFormat = new java.text.SimpleDateFormat(fmt)
        outputFormat.format(date)
      }
    
      def useJava8DateTime(cluster: Cluster) = {
        //for jdk8 datetime format
        cluster.getConfiguration().getCodecRegistry()
          .register(InstantCodec.instance)
      }

    还需要一个ByteBufferInputStream类型来实现blob内容的读取:

     class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
        override def read: Int = {
          if (!buf.hasRemaining) return -1
          buf.get
        }
    
        override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
          val length: Int = Math.min(len, buf.remaining)
          buf.get(bytes, off, length)
          length
        }
      }
      object ByteBufferInputStream {
        def apply(buf: ByteBuffer): ByteBufferInputStream = {
          new ByteBufferInputStream(buf)
        }
      }

    下面就是本次讨论示范源代码:

    build.sbt

    name := "learn_cassandra"
    
    version := "0.1"
    
    scalaVersion := "2.12.4"
    
    libraryDependencies := Seq(
      "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0",
      "com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0",
      "com.typesafe.akka" %% "akka-actor" % "2.5.4",
      "com.typesafe.akka" %% "akka-stream" % "2.5.4",
      "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16",
      "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test",
      "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
      "com.h2database"  %  "h2"                % "1.4.196",
      "mysql" % "mysql-connector-java" % "6.0.6",
      "org.postgresql" % "postgresql" % "42.2.0",
      "commons-dbcp" % "commons-dbcp" % "1.4",
      "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
      "com.zaxxer" % "HikariCP" % "2.7.4",
      "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
      "com.typesafe.slick" %% "slick" % "3.2.1",
      "ch.qos.logback"  %  "logback-classic"   % "1.2.3")

    CassandraEngine.scala

    import com.datastax.driver.core._
    import scala.concurrent._
    import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}
    import scala.collection.JavaConverters._
    import scala.collection.generic.CanBuildFrom
    import scala.concurrent.duration.Duration
    
    object CQLContext {
      // Consistency Levels
      type CONSISTENCY_LEVEL = Int
      val ANY: CONSISTENCY_LEVEL          =                                        0x0000
      val ONE: CONSISTENCY_LEVEL          =                                        0x0001
      val TWO: CONSISTENCY_LEVEL          =                                        0x0002
      val THREE: CONSISTENCY_LEVEL        =                                        0x0003
      val QUORUM : CONSISTENCY_LEVEL      =                                        0x0004
      val ALL: CONSISTENCY_LEVEL          =                                        0x0005
      val LOCAL_QUORUM: CONSISTENCY_LEVEL =                                        0x0006
      val EACH_QUORUM: CONSISTENCY_LEVEL  =                                        0x0007
      val LOCAL_ONE: CONSISTENCY_LEVEL    =                                      0x000A
      val LOCAL_SERIAL: CONSISTENCY_LEVEL =                                     0x000B
      val SERIAL: CONSISTENCY_LEVEL       =                                      0x000C
    
      def apply(): CQLContext = CQLContext(statements = Nil)
    
      def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
        consistency match {
          case ALL => ConsistencyLevel.ALL
          case ONE => ConsistencyLevel.ONE
          case TWO => ConsistencyLevel.TWO
          case THREE => ConsistencyLevel.THREE
          case ANY => ConsistencyLevel.ANY
          case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
          case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
          case QUORUM => ConsistencyLevel.QUORUM
          case SERIAL => ConsistencyLevel.SERIAL
          case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL
    
        }
      }
    
    }
    case class CQLQueryContext[M](
                           statement: String,
                           parameter: Seq[Object] = Nil,
                           consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
                           extractor: Row => M
                         )
    
    case class CQLContext(
                           statements: Seq[String],
                           parameters: Seq[Seq[Object]] = Nil,
                           consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None
                         ) { ctx =>
    
      def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
        ctx.copy(consistency = Some(_consistency))
      def setCommand(_statement: String, _parameters: Object*): CQLContext =
        ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
      def appendCommand(_statement: String, _parameters: Object*): CQLContext =
        ctx.copy(statements = ctx.statements :+ _statement,
          parameters = ctx.parameters ++ Seq(_parameters))
    }
    
    object CQLEngine {
      import CQLContext._
      import CQLHelpers._
    
      def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)(
        implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= {
    
        val prepStmt = session.prepare(ctx.statement)
    
        var boundStmt =  prepStmt.bind()
        if (ctx.parameter != Nil) {
          val params = processParameters(ctx.parameter)
          boundStmt = prepStmt.bind(params:_*)
        }
    
        ctx.consistency.foreach {consistency =>
          boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
    
        val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
        (resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C])
      }
      def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
           extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
        if (resultSet.isFullyFetched) {
          (resultSet, None)
        } else {
          try {
            val result = Await.result(resultSet.fetchMoreResults(), timeOut)
            (result, Some((result.asScala.view.map(extractor)).to[C]))
          } catch { case e: Throwable => (resultSet, None) }
        }
      def cqlExecute(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
        if (ctx.statements.size == 1)
          cqlSingleUpdate(ctx)
        else
          cqlMultiUpdate(ctx)
      }
      def cqlSingleUpdate(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    
          val prepStmt = session.prepare(ctx.statements.head)
    
          var boundStmt =  prepStmt.bind()
          if (ctx.statements != Nil) {
            val params = processParameters(ctx.parameters.head)
            boundStmt = prepStmt.bind(params:_*)
          }
    
        ctx.consistency.foreach {consistency =>
          boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
          session.executeAsync(boundStmt).map(_.wasApplied())
      }
      def cqlMultiUpdate(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
        val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters
        var batch = new BatchStatement()
        commands.foreach { case (stm, params) =>
          val prepStmt = session.prepare(stm)
          if (params == Nil)
            batch.add(prepStmt.bind())
          else {
            val p = processParameters(params)
            batch.add(prepStmt.bind(p: _*))
          }
        }
        ctx.consistency.foreach {consistency =>
          batch.setConsistencyLevel(consistencyLevel(consistency))}
        session.executeAsync(batch).map(_.wasApplied())
      }
    }
    object CQLHelpers {
      import java.nio.ByteBuffer
      import java.io._
      import java.nio.file._
      import com.datastax.driver.core.LocalDate
      import com.datastax.driver.extras.codecs.jdk8.InstantCodec
      import java.time.Instant
      import akka.stream.scaladsl._
      import akka.stream._
    
      implicit def listenableFutureToFuture[T](
                   listenableFuture: ListenableFuture[T]): Future[T] = {
        val promise = Promise[T]()
        Futures.addCallback(listenableFuture, new FutureCallback[T] {
          def onFailure(error: Throwable): Unit = {
            promise.failure(error)
            ()
          }
          def onSuccess(result: T): Unit = {
            promise.success(result)
            ()
          }
        })
        promise.future
      }
    
      case class CQLDate(year: Int, month: Int, day: Int)
      case object CQLTodayDate
      case class CQLDateTime(year: Int, Month: Int,
                             day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0)
      case object CQLDateTimeNow
    
      def processParameters(params: Seq[Object]): Seq[Object] = {
        params.map { obj =>
          obj match {
            case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
            case CQLTodayDate =>
              val today = java.time.LocalDate.now()
              LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
            case CQLDateTimeNow => Instant.now()
            case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
              Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
            case p@_ => p
          }
        }
      }
      class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
        override def read: Int = {
          if (!buf.hasRemaining) return -1
          buf.get
        }
    
        override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
          val length: Int = Math.min(len, buf.remaining)
          buf.get(bytes, off, length)
          length
        }
      }
      object ByteBufferInputStream {
        def apply(buf: ByteBuffer): ByteBufferInputStream = {
          new ByteBufferInputStream(buf)
        }
      }
      class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream {
    
        override def write(b: Int): Unit = {
          buf.put(b.toByte)
        }
    
        override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
          buf.put(bytes, off, len)
        }
      }
      object FixsizedByteBufferOutputStream {
        def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf)
      }
      class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream {
    
        private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR
    
        override def write(b: Array[Byte], off: Int, len: Int): Unit = {
          val position = buf.position
          val limit = buf.limit
          val newTotal: Long = position + len
          if(newTotal > limit){
            var capacity = (buf.capacity * increasing)
            while(capacity <= newTotal){
              capacity = (capacity*increasing)
            }
            increase(capacity.toInt)
          }
    
          buf.put(b, 0, len)
        }
    
        override def write(b: Int): Unit= {
          if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt)
          buf.put(b.toByte)
        }
        protected def increase(newCapacity: Int): Unit = {
          buf.limit(buf.position)
          buf.rewind
          val newBuffer =
            if (onHeap) ByteBuffer.allocate(newCapacity)
            else  ByteBuffer.allocateDirect(newCapacity)
          newBuffer.put(buf)
          buf.clear
          buf = newBuffer
        }
        def size: Long = buf.position
        def capacity: Long = buf.capacity
        def byteBuffer: ByteBuffer = buf
      }
      object ExpandingByteBufferOutputStream {
        val DEFAULT_INCREASING_FACTOR = 1.5f
        def apply(size: Int, increasingBy: Float, onHeap: Boolean) = {
          if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0")
          val buffer: ByteBuffer =
            if (onHeap) ByteBuffer.allocate(size)
            else ByteBuffer.allocateDirect(size)
          new ExpandingByteBufferOutputStream(buffer,onHeap)
        }
        def apply(size: Int): ExpandingByteBufferOutputStream = {
          apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false)
        }
    
        def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = {
          apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap)
        }
    
        def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = {
          apply(size, increasingBy, false)
        }
    
      }
      def cqlFileToBytes(fileName: String): ByteBuffer = {
        val fis = new FileInputStream(fileName)
        val b = new Array[Byte](fis.available + 1)
        val length = b.length
        fis.read(b)
        ByteBuffer.wrap(b)
      }
      def cqlBytesToFile(bytes: ByteBuffer, fileName: String)(
            implicit mat: Materializer): Future[IOResult] = {
        val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
      def cqlDateTimeString(date: java.util.Date, fmt: String): String = {
        val outputFormat = new java.text.SimpleDateFormat(fmt)
        outputFormat.format(date)
      }
      def useJava8DateTime(cluster: Cluster) = {
        //for jdk8 datetime format
        cluster.getConfiguration().getCodecRegistry()
          .register(InstantCodec.instance)
      }
    }

    CQLEngineDemo.scala

    import scala.util._
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import com.datastax.driver.core._
    import CQLEngine._
    import CQLHelpers._
    import com.datastax.driver.core.LocalDate
    import java.nio.ByteBuffer
    import scala.concurrent.duration._
    
    object CQLEngineDemo extends App {
    
      //#init-mat
      implicit val cqlsys = ActorSystem("cqlSystem")
      implicit val mat = ActorMaterializer()
      implicit val ec = cqlsys.dispatcher
    
      val cluster = new Cluster
      .Builder()
        .addContactPoints("localhost")
        .withPort(9042)
        .build()
    
      useJava8DateTime(cluster)
      implicit val session = cluster.connect()
    
      val createCQL ="""
      CREATE TABLE testdb.members (
        id UUID primary key,
        name TEXT,
        description TEXT,
        birthday DATE,
        created_at TIMESTAMP,
        picture BLOB
      )"""
    
      val ctxCreate = CQLContext().setCommand(createCQL)
    
      val ctxInsert = CQLContext().setCommand("insert into testdb.members(id,name,description,birthday,created_at,picture)" +
        " values(uuid(),?,?,?,?,?)", "alan xu", "alan-xu", CQLDate(1966, 11, 27), CQLDateTimeNow, cqlFileToBytes("/users/tiger/Nobody.png"))
    
      val createData = for {
        createTable <- cqlExecute(ctxCreate)
        insertData <- cqlExecute(ctxInsert)
      } yield(createTable, insertData)
    
      createData.onComplete {
        case Success((c,i)) => println(s"Create Table: $c, Insert Data $i")
        case Failure(e) => println(e.getMessage)
      }
      scala.io.StdIn.readLine()
      //data model
      case class Member(
                         id: String,
                         name: String,
                         description: Option[String] = None,
                         birthday: LocalDate,
                         createdAt: java.util.Date,
                         picture: Option[ByteBuffer] = None)
    
      //data row converter
      val toMember = (rs: Row) => Member(
        id = rs.getUUID("id").toString,
        name = rs.getString("name"),
        description = {
          val d = rs.getString("description")
          if (d == null)
            None
          else
            Some(d)
    
          Some(rs.getColumnDefinitions.toString)
        },
        birthday = rs.getDate("birthday"),
        createdAt = rs.getTimestamp("created_at"),
        picture = {
          val pic = rs.getBytes("picture")
          if (pic == null)
            None
          else
            Some(pic)
    
        }
      )
    
     try {
       val qtx = CQLQueryContext(statement = "select * from testdb.members", extractor = toMember)
       val (resultSet, vecResults) = fetchResultPage[Vector, Member](qtx)
    
       var vecMembers: Vector[Member] = vecResults
    
       var isExh = resultSet.isExhausted
       var nextPage: (ResultSet, Option[Vector[Member]]) = (resultSet, Some(vecResults))
       while (!isExh) {
         nextPage = fetchMorePages[Vector,Member](nextPage._1,1 second)(toMember)
         nextPage._2.foreach {vec =>
           vecMembers = vecMembers ++ vec
         }
         isExh = resultSet.isExhausted
       }
       vecMembers.foreach { m =>
         println(s"id: ${m.id}-name:${m.name}-${m.description} birthday: ${m.birthday.toString}")
         println(s"created_at: ${cqlDateTimeString(m.createdAt,"yyyy-MM-dd HH:mm:ss.SSS")}")
         m.picture match {
           case Some(buf) =>
             val fname = s"/users/tiger/pic-${m.name}.png"
             cqlBytesToFile(buf,fname)
             println(s"saving picture to $fname")
           case _ => println("empty picture!")
         }
       }
     } catch {
       case e: Exception => println(e.getMessage)
     }
      
      scala.io.StdIn.readLine()
      session.close()
      cluster.close()
      cqlsys.terminate()
      
    }
  • 相关阅读:
    OutputCache 缓存key的创建 CreateOutputCachedItemKey
    Asp.net Web Api源码调试
    asp.net mvc源码分析DefaultModelBinder 自定义的普通数据类型的绑定和验证
    Asp.net web Api源码分析HttpParameterBinding
    Asp.net web Api源码分析HttpRequestMessage的创建
    asp.net mvc源码分析ActionResult篇 RazorView.RenderView
    Asp.Net MVC 项目预编译 View
    Asp.net Web.config文件读取路径你真的清楚吗?
    asp.net 动态创建TextBox控件 如何加载状态信息
    asp.net mvc源码分析BeginForm方法 和ClientValidationEnabled 属性
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/8457938.html
Copyright © 2011-2022 走看看