zoukankan      html  css  js  c++  java
  • SDP(6):分布式数据库运算环境- Cassandra-Engine

        现代信息系统应该是避不开大数据处理的。作为一个通用的系统集成工具也必须具备大数据存储和读取能力。cassandra是一种分布式的数据库,具备了分布式数据库高可用性(high-availability)特性,对于一个实时大型分布式集成系统来说是核心支柱。与传统的关系数据库对比,cassandra从数据存储结构、读取方式等可以说是皆然不同的。如:cassandra库表设计是反范式的(denormalized)、表结构设计是反过来根据query要求设计的,等等。幸运的是自版本3.0后cassandra提供了CQL来支持数据库操作。简单来说CQL就是cassandra的SQL。CQL是一种query语言,在语法上与SQL相近。最重要的是CQL用SQL的呈现方式来描述cassandra底层数据的存储方式,让熟悉了关系数据库SQL编程人员能够容易开始使用cassandra。与SQL一样,CQL也是一种纯文本语言,可以通过多种终端接口软件包括java-client来运行CQL脚本。 目前在市面上有一些现成的cassandra客户端编程软件,有些为了实现类型安全(type-safety)还提供了Linq-DSL(language-integrated-query),但因为我们需要面向各种cassandra数据库用户,所以还是决定提供一种CQL脚本运算环境,也就是说Cassandra-Engine接受CQL脚本然后运算得出结果。

    和JDBC的运算结构很相似:CQL运算也是先构建statement然后execute。与JDBC不同的是:CQL还提供non-blocking脚本运算: 

       /**
         * Executes the provided query asynchronously.
         * <p/>
         * This method does not block. It returns as soon as the query has been
         * passed to the underlying network stack. In particular, returning from
         * this method does not guarantee that the query is valid or has even been
         * submitted to a live node. Any exception pertaining to the failure of the
         * query will be thrown when accessing the {@link ResultSetFuture}.
         * <p/>
         * Note that for queries that don't return a result (INSERT, UPDATE and
         * DELETE), you will need to access the ResultSetFuture (that is, call one of
         * its {@code get} methods to make sure the query was successful.
         *
         * @param statement the CQL query to execute (that can be any {@code Statement}).
         * @return a future on the result of the query.
         * @throws UnsupportedFeatureException if the protocol version 1 is in use and
         *                                     a feature not supported has been used. Features that are not supported by
         *                                     the version protocol 1 include: BatchStatement, ResultSet paging and binary
         *                                     values in RegularStatement.
         */
        ResultSetFuture executeAsync(Statement statement);

    executeAsync返回结果ResultSsetFuture是个google-guava-future。我们可以用隐式转换(implicit conversion)把它转换成scala-future来使用: 

     implicit def listenableFutureToFuture[T](
                   listenableFuture: ListenableFuture[T]): Future[T] = {
        val promise = Promise[T]()
        Futures.addCallback(listenableFuture, new FutureCallback[T] {
          def onFailure(error: Throwable): Unit = {
            promise.failure(error)
            ()
          }
          def onSuccess(result: T): Unit = {
            promise.success(result)
            ()
          }
        })
        promise.future
      }

    有了这个隐式实例executeAsync返回结果自动转成Future[?],如下:

      def cqlSingleUpdate(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    ...
          session.executeAsync(boundStmt).map(_.wasApplied())
      }

    我们还是通过某种Context方式来构建完整可执行的statement:

    case class CQLContext(
                           statements: Seq[String],
                           parameters: Seq[Seq[Object]] = Nil,
                           consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None
                         ) { ctx =>
      def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
        ctx.copy(consistency = Some(_consistency))
      def setCommand(_statement: String, _parameters: Object*): CQLContext =
        ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
      def appendCommand(_statement: String, _parameters: Object*): CQLContext =
        ctx.copy(statements = ctx.statements :+ _statement,
          parameters = ctx.parameters ++ Seq(_parameters))
    }

    与JDBCContext不同的是这个consistencyLevel。因为数据是重复分布在多个集群节点上的,所以需要通过consistencyLevel来注明分布式数据的读写方式:

      def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
        consistency match {
          case ALL => ConsistencyLevel.ALL
          case ONE => ConsistencyLevel.ONE
          case TWO => ConsistencyLevel.TWO
          case THREE => ConsistencyLevel.THREE
          case ANY => ConsistencyLevel.ANY
          case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
          case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
          case QUORUM => ConsistencyLevel.QUORUM
          case SERIAL => ConsistencyLevel.SERIAL
          case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL
        }
      }

    CQL statement 分simplestatement, preparedstatement和boundstatement。boundstatement可以覆盖所有类型的CQL statement构建要求。下面是一个构建boundstatement的例子:

       val prepStmt = session.prepare(ctx.statement)
    
        var boundStmt =  prepStmt.bind()
        if (ctx.parameter != Nil) {
          val params = processParameters(ctx.parameter)
          boundStmt = prepStmt.bind(params:_*)
        }

    CQL statement参数类型比较复杂,包括date,timestamp等都必须经过processParameters函数进行预处理:

      case class CQLDate(year: Int, month: Int, day: Int)
      case object CQLTodayDate
      case class CQLDateTime(year: Int, Month: Int,
                             day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0)
      case object CQLDateTimeNow
    
      def processParameters(params: Seq[Object]): Seq[Object] = {
        params.map { obj =>
          obj match {
            case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
            case CQLTodayDate =>
              val today = java.time.LocalDate.now()
              LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
            case CQLDateTimeNow => Instant.now()
            case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
              Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
            case p@_ => p
          }
        }
      }

    CassandraEngine更新运算分为单条update和批次update。批次update与事物处理有异曲同工之效:批次中任何一条脚本运算失败则回滚所有更新:

     def cqlExecute(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
        if (ctx.statements.size == 1)
          cqlSingleUpdate(ctx)
        else
          cqlMultiUpdate(ctx)
      }
      def cqlSingleUpdate(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    
          val prepStmt = session.prepare(ctx.statements.head)
    
          var boundStmt =  prepStmt.bind()
          if (ctx.statements != Nil) {
            val params = processParameters(ctx.parameters.head)
            boundStmt = prepStmt.bind(params:_*)
          }
    
        ctx.consistency.foreach {consistency =>
          boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
          session.executeAsync(boundStmt).map(_.wasApplied())
      }
    
      def cqlMultiUpdate(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
        val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters
        var batch = new BatchStatement()
        commands.foreach { case (stm, params) =>
          val prepStmt = session.prepare(stm)
          if (params == Nil)
            batch.add(prepStmt.bind())
          else {
            val p = processParameters(params)
            batch.add(prepStmt.bind(p: _*))
          }
        }
    
        ctx.consistency.foreach {consistency =>
          batch.setConsistencyLevel(consistencyLevel(consistency))}
        session.executeAsync(batch).map(_.wasApplied())
      }

    CassandraEngine update返回运算状态Future[Boolean]。下面是一段update示范:

      val createCQL ="""
      CREATE TABLE testdb.members (
        id UUID primary key,
        name TEXT,
        description TEXT,
        birthday DATE,
        created_at TIMESTAMP,
        picture BLOB
      )"""
    
      val ctxCreate = CQLContext().setCommand(createCQL)
    
      val ctxInsert = CQLContext().setCommand("insert into testdb.members(id,name,description,birthday,created_at,picture)" +
        " values(uuid(),?,?,?,?,?)", "alan xu", "alan-xu", CQLDate(1966, 11, 27), CQLDateTimeNow, cqlFileToBytes("/users/tiger/Nobody.png"))
      
      val createData = for {
        createTable <- cqlExecute(ctxCreate)
        insertData <- cqlExecute(ctxInsert)
      } yield(createTable, insertData)
    
      createData.onComplete {
        case Success((c,i)) => println(s"Create Table: $c, Insert Data $i")
        case Failure(e) => println(e.getMessage)
      }

    在上面的例子里我们用for-comprehension实现了连续运算。注意在这个例子里已经包括了date,datetime,blob等输入参数类型。

    fetch-query的statement构建信息如下:

    case class CQLQueryContext[M](
                           statement: String,
                           parameter: Seq[Object] = Nil,
                           consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
                           extractor: Row => M
                         )

    fetch-query运算也是用execute方式实现的:

        /**
         * Executes the provided query.
         * <p/>
         * This method blocks until at least some result has been received from the
         * database. However, for SELECT queries, it does not guarantee that the
         * result has been received in full. But it does guarantee that some
         * response has been received from the database, and in particular
         * guarantees that if the request is invalid, an exception will be thrown
         * by this method.
         *
         * @param statement the CQL query to execute (that can be any {@link Statement}).
         * @return the result of the query. That result will never be null but can
         * be empty (and will be for any non SELECT query).
         * @throws NoHostAvailableException    if no host in the cluster can be
         *                                     contacted successfully to execute this query.
         * @throws QueryExecutionException     if the query triggered an execution
         *                                     exception, i.e. an exception thrown by Cassandra when it cannot execute
         *                                     the query with the requested consistency level successfully.
         * @throws QueryValidationException    if the query if invalid (syntax error,
         *                                     unauthorized or any other validation problem).
         * @throws UnsupportedFeatureException if the protocol version 1 is in use and
         *                                     a feature not supported has been used. Features that are not supported by
         *                                     the version protocol 1 include: BatchStatement, ResultSet paging and binary
         *                                     values in RegularStatement.
         */
        ResultSet execute(Statement statement);

    返回结果ResultSet经过转换后成为scala collection:

      def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)(
        implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= {
    
        val prepStmt = session.prepare(ctx.statement)
    
        var boundStmt =  prepStmt.bind()
        if (ctx.parameter != Nil) {
          val params = processParameters(ctx.parameter)
          boundStmt = prepStmt.bind(params:_*)
        }
    
        ctx.consistency.foreach {consistency =>
          boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
    
        val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
        (resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C])
    
      }

    fetchResultPage是分页读取的,可以用fetchMoreResults持续读取:

        /**
         * Force fetching the next page of results for this result set, if any.
         * <p/>
         * This method is entirely optional. It will be called automatically while
         * the result set is consumed (through {@link #one}, {@link #all} or iteration)
         * when needed (i.e. when {@code getAvailableWithoutFetching() == 0} and
         * {@code isFullyFetched() == false}).
         * <p/>
         * You can however call this method manually to force the fetching of the
         * next page of results. This can allow to prefetch results before they are
         * strictly needed. For instance, if you want to prefetch the next page of
         * results as soon as there is less than 100 rows readily available in this
         * result set, you can do:
         * <pre>
         *   ResultSet rs = session.execute(...);
         *   Iterator<Row> iter = rs.iterator();
         *   while (iter.hasNext()) {
         *       if (rs.getAvailableWithoutFetching() == 100 && !rs.isFullyFetched())
         *           rs.fetchMoreResults();
         *       Row row = iter.next()
         *       ... process the row ...
         *   }
         * </pre>
         * This method is not blocking, so in the example above, the call to {@code
         * fetchMoreResults} will not block the processing of the 100 currently available
         * rows (but {@code iter.hasNext()} will block once those rows have been processed
         * until the fetch query returns, if it hasn't yet).
         * <p/>
         * Only one page of results (for a given result set) can be
         * fetched at any given time. If this method is called twice and the query
         * triggered by the first call has not returned yet when the second one is
         * performed, then the 2nd call will simply return a future on the currently
         * in progress query.
         *
         * @return a future on the completion of fetching the next page of results.
         * If the result set is already fully retrieved ({@code isFullyFetched() == true}),
         * then the returned future will return immediately but not particular error will be
         * thrown (you should thus call {@link #isFullyFetched()} to know if calling this
         * method can be of any use}).
         */
        ListenableFuture<S> fetchMoreResults();

    下面是分页持续读取的实现:

      def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
           extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
        if (resultSet.isFullyFetched) {
          (resultSet, None)
        } else {
          try {
            val result = Await.result(resultSet.fetchMoreResults(), timeOut)
            (result, Some((result.asScala.view.map(extractor)).to[C]))
          } catch { case e: Throwable => (resultSet, None) }
        }

    我们用这两个函数来读取上面用cqlInsert脚本加入cassandra的数据:

      //data model
      case class Member(
                         id: String,
                         name: String,
                         description: Option[String] = None,
                         birthday: LocalDate,
                         createdAt: java.util.Date,
                         picture: Option[ByteBuffer] = None)
    
      //data row converter
      val toMember = (rs: Row) => Member(
        id = rs.getUUID("id").toString,
        name = rs.getString("name"),
        description = {
          val d = rs.getString("description")
          if (d == null)
            None
          else
            Some(d)
    
          Some(rs.getColumnDefinitions.toString)
        },
        birthday = rs.getDate("birthday"),
        createdAt = rs.getTimestamp("created_at"),
        picture = {
          val pic = rs.getBytes("picture")
          if (pic == null)
            None
          else
            Some(pic)
    
        }
      )
    
     try {
       val qtx = CQLQueryContext(statement = "select * from testdb.members", extractor = toMember)
       val (resultSet, vecResults) = fetchResultPage[Vector, Member](qtx)
    
       var vecMembers: Vector[Member] = vecResults
    
       var isExh = resultSet.isExhausted
       var nextPage: (ResultSet, Option[Vector[Member]]) = (resultSet, Some(vecResults))
       while (!isExh) {
         nextPage = fetchMorePages[Vector,Member](nextPage._1,1 second)(toMember)
         nextPage._2.foreach {vec =>
           vecMembers = vecMembers ++ vec
         }
         isExh = resultSet.isExhausted
       }
       vecMembers.foreach { m =>
         println(s"id: ${m.id}-name:${m.name}-${m.description} birthday: ${m.birthday.toString}")
         println(s"created_at: ${cqlDateTimeString(m.createdAt,"yyyy-MM-dd HH:mm:ss.SSS")}")
         m.picture match {
           case Some(buf) =>
             val fname = s"/users/tiger/pic-${m.name}.png"
             cqlBytesToFile(buf,fname)
             println(s"saving picture to $fname")
           case _ => println("empty picture!")
         }
       }
     } catch {
       case e: Exception => println(e.getMessage)
     }

    在上面的示范里我们还引用了一些helper函数:

     def cqlFileToBytes(fileName: String): ByteBuffer = {
        val fis = new FileInputStream(fileName)
        val b = new Array[Byte](fis.available + 1)
        val length = b.length
        fis.read(b)
        ByteBuffer.wrap(b)
      }
    
    
      def cqlBytesToFile(bytes: ByteBuffer, fileName: String)(
            implicit mat: Materializer): Future[IOResult] = {
        val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def cqlDateTimeString(date: java.util.Date, fmt: String): String = {
        val outputFormat = new java.text.SimpleDateFormat(fmt)
        outputFormat.format(date)
      }
    
      def useJava8DateTime(cluster: Cluster) = {
        //for jdk8 datetime format
        cluster.getConfiguration().getCodecRegistry()
          .register(InstantCodec.instance)
      }

    还需要一个ByteBufferInputStream类型来实现blob内容的读取:

     class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
        override def read: Int = {
          if (!buf.hasRemaining) return -1
          buf.get
        }
    
        override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
          val length: Int = Math.min(len, buf.remaining)
          buf.get(bytes, off, length)
          length
        }
      }
      object ByteBufferInputStream {
        def apply(buf: ByteBuffer): ByteBufferInputStream = {
          new ByteBufferInputStream(buf)
        }
      }

    下面就是本次讨论示范源代码:

    build.sbt

    name := "learn_cassandra"
    
    version := "0.1"
    
    scalaVersion := "2.12.4"
    
    libraryDependencies := Seq(
      "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0",
      "com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0",
      "com.typesafe.akka" %% "akka-actor" % "2.5.4",
      "com.typesafe.akka" %% "akka-stream" % "2.5.4",
      "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.16",
      "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test",
      "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
      "com.h2database"  %  "h2"                % "1.4.196",
      "mysql" % "mysql-connector-java" % "6.0.6",
      "org.postgresql" % "postgresql" % "42.2.0",
      "commons-dbcp" % "commons-dbcp" % "1.4",
      "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
      "com.zaxxer" % "HikariCP" % "2.7.4",
      "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
      "com.typesafe.slick" %% "slick" % "3.2.1",
      "ch.qos.logback"  %  "logback-classic"   % "1.2.3")

    CassandraEngine.scala

    import com.datastax.driver.core._
    import scala.concurrent._
    import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}
    import scala.collection.JavaConverters._
    import scala.collection.generic.CanBuildFrom
    import scala.concurrent.duration.Duration
    
    object CQLContext {
      // Consistency Levels
      type CONSISTENCY_LEVEL = Int
      val ANY: CONSISTENCY_LEVEL          =                                        0x0000
      val ONE: CONSISTENCY_LEVEL          =                                        0x0001
      val TWO: CONSISTENCY_LEVEL          =                                        0x0002
      val THREE: CONSISTENCY_LEVEL        =                                        0x0003
      val QUORUM : CONSISTENCY_LEVEL      =                                        0x0004
      val ALL: CONSISTENCY_LEVEL          =                                        0x0005
      val LOCAL_QUORUM: CONSISTENCY_LEVEL =                                        0x0006
      val EACH_QUORUM: CONSISTENCY_LEVEL  =                                        0x0007
      val LOCAL_ONE: CONSISTENCY_LEVEL    =                                      0x000A
      val LOCAL_SERIAL: CONSISTENCY_LEVEL =                                     0x000B
      val SERIAL: CONSISTENCY_LEVEL       =                                      0x000C
    
      def apply(): CQLContext = CQLContext(statements = Nil)
    
      def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
        consistency match {
          case ALL => ConsistencyLevel.ALL
          case ONE => ConsistencyLevel.ONE
          case TWO => ConsistencyLevel.TWO
          case THREE => ConsistencyLevel.THREE
          case ANY => ConsistencyLevel.ANY
          case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
          case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
          case QUORUM => ConsistencyLevel.QUORUM
          case SERIAL => ConsistencyLevel.SERIAL
          case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL
    
        }
      }
    
    }
    case class CQLQueryContext[M](
                           statement: String,
                           parameter: Seq[Object] = Nil,
                           consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
                           extractor: Row => M
                         )
    
    case class CQLContext(
                           statements: Seq[String],
                           parameters: Seq[Seq[Object]] = Nil,
                           consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None
                         ) { ctx =>
    
      def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
        ctx.copy(consistency = Some(_consistency))
      def setCommand(_statement: String, _parameters: Object*): CQLContext =
        ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
      def appendCommand(_statement: String, _parameters: Object*): CQLContext =
        ctx.copy(statements = ctx.statements :+ _statement,
          parameters = ctx.parameters ++ Seq(_parameters))
    }
    
    object CQLEngine {
      import CQLContext._
      import CQLHelpers._
    
      def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext[A], pageSize: Int = 100)(
        implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= {
    
        val prepStmt = session.prepare(ctx.statement)
    
        var boundStmt =  prepStmt.bind()
        if (ctx.parameter != Nil) {
          val params = processParameters(ctx.parameter)
          boundStmt = prepStmt.bind(params:_*)
        }
    
        ctx.consistency.foreach {consistency =>
          boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
    
        val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
        (resultSet,(resultSet.asScala.view.map(ctx.extractor)).to[C])
      }
      def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
           extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
        if (resultSet.isFullyFetched) {
          (resultSet, None)
        } else {
          try {
            val result = Await.result(resultSet.fetchMoreResults(), timeOut)
            (result, Some((result.asScala.view.map(extractor)).to[C]))
          } catch { case e: Throwable => (resultSet, None) }
        }
      def cqlExecute(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
        if (ctx.statements.size == 1)
          cqlSingleUpdate(ctx)
        else
          cqlMultiUpdate(ctx)
      }
      def cqlSingleUpdate(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    
          val prepStmt = session.prepare(ctx.statements.head)
    
          var boundStmt =  prepStmt.bind()
          if (ctx.statements != Nil) {
            val params = processParameters(ctx.parameters.head)
            boundStmt = prepStmt.bind(params:_*)
          }
    
        ctx.consistency.foreach {consistency =>
          boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
          session.executeAsync(boundStmt).map(_.wasApplied())
      }
      def cqlMultiUpdate(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
        val commands: Seq[(String,Seq[Object])] = ctx.statements zip ctx.parameters
        var batch = new BatchStatement()
        commands.foreach { case (stm, params) =>
          val prepStmt = session.prepare(stm)
          if (params == Nil)
            batch.add(prepStmt.bind())
          else {
            val p = processParameters(params)
            batch.add(prepStmt.bind(p: _*))
          }
        }
        ctx.consistency.foreach {consistency =>
          batch.setConsistencyLevel(consistencyLevel(consistency))}
        session.executeAsync(batch).map(_.wasApplied())
      }
    }
    object CQLHelpers {
      import java.nio.ByteBuffer
      import java.io._
      import java.nio.file._
      import com.datastax.driver.core.LocalDate
      import com.datastax.driver.extras.codecs.jdk8.InstantCodec
      import java.time.Instant
      import akka.stream.scaladsl._
      import akka.stream._
    
      implicit def listenableFutureToFuture[T](
                   listenableFuture: ListenableFuture[T]): Future[T] = {
        val promise = Promise[T]()
        Futures.addCallback(listenableFuture, new FutureCallback[T] {
          def onFailure(error: Throwable): Unit = {
            promise.failure(error)
            ()
          }
          def onSuccess(result: T): Unit = {
            promise.success(result)
            ()
          }
        })
        promise.future
      }
    
      case class CQLDate(year: Int, month: Int, day: Int)
      case object CQLTodayDate
      case class CQLDateTime(year: Int, Month: Int,
                             day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0)
      case object CQLDateTimeNow
    
      def processParameters(params: Seq[Object]): Seq[Object] = {
        params.map { obj =>
          obj match {
            case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
            case CQLTodayDate =>
              val today = java.time.LocalDate.now()
              LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
            case CQLDateTimeNow => Instant.now()
            case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
              Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
            case p@_ => p
          }
        }
      }
      class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
        override def read: Int = {
          if (!buf.hasRemaining) return -1
          buf.get
        }
    
        override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
          val length: Int = Math.min(len, buf.remaining)
          buf.get(bytes, off, length)
          length
        }
      }
      object ByteBufferInputStream {
        def apply(buf: ByteBuffer): ByteBufferInputStream = {
          new ByteBufferInputStream(buf)
        }
      }
      class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream {
    
        override def write(b: Int): Unit = {
          buf.put(b.toByte)
        }
    
        override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
          buf.put(bytes, off, len)
        }
      }
      object FixsizedByteBufferOutputStream {
        def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf)
      }
      class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream {
    
        private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR
    
        override def write(b: Array[Byte], off: Int, len: Int): Unit = {
          val position = buf.position
          val limit = buf.limit
          val newTotal: Long = position + len
          if(newTotal > limit){
            var capacity = (buf.capacity * increasing)
            while(capacity <= newTotal){
              capacity = (capacity*increasing)
            }
            increase(capacity.toInt)
          }
    
          buf.put(b, 0, len)
        }
    
        override def write(b: Int): Unit= {
          if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt)
          buf.put(b.toByte)
        }
        protected def increase(newCapacity: Int): Unit = {
          buf.limit(buf.position)
          buf.rewind
          val newBuffer =
            if (onHeap) ByteBuffer.allocate(newCapacity)
            else  ByteBuffer.allocateDirect(newCapacity)
          newBuffer.put(buf)
          buf.clear
          buf = newBuffer
        }
        def size: Long = buf.position
        def capacity: Long = buf.capacity
        def byteBuffer: ByteBuffer = buf
      }
      object ExpandingByteBufferOutputStream {
        val DEFAULT_INCREASING_FACTOR = 1.5f
        def apply(size: Int, increasingBy: Float, onHeap: Boolean) = {
          if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0")
          val buffer: ByteBuffer =
            if (onHeap) ByteBuffer.allocate(size)
            else ByteBuffer.allocateDirect(size)
          new ExpandingByteBufferOutputStream(buffer,onHeap)
        }
        def apply(size: Int): ExpandingByteBufferOutputStream = {
          apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false)
        }
    
        def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = {
          apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap)
        }
    
        def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = {
          apply(size, increasingBy, false)
        }
    
      }
      def cqlFileToBytes(fileName: String): ByteBuffer = {
        val fis = new FileInputStream(fileName)
        val b = new Array[Byte](fis.available + 1)
        val length = b.length
        fis.read(b)
        ByteBuffer.wrap(b)
      }
      def cqlBytesToFile(bytes: ByteBuffer, fileName: String)(
            implicit mat: Materializer): Future[IOResult] = {
        val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
      def cqlDateTimeString(date: java.util.Date, fmt: String): String = {
        val outputFormat = new java.text.SimpleDateFormat(fmt)
        outputFormat.format(date)
      }
      def useJava8DateTime(cluster: Cluster) = {
        //for jdk8 datetime format
        cluster.getConfiguration().getCodecRegistry()
          .register(InstantCodec.instance)
      }
    }

    CQLEngineDemo.scala

    import scala.util._
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import com.datastax.driver.core._
    import CQLEngine._
    import CQLHelpers._
    import com.datastax.driver.core.LocalDate
    import java.nio.ByteBuffer
    import scala.concurrent.duration._
    
    object CQLEngineDemo extends App {
    
      //#init-mat
      implicit val cqlsys = ActorSystem("cqlSystem")
      implicit val mat = ActorMaterializer()
      implicit val ec = cqlsys.dispatcher
    
      val cluster = new Cluster
      .Builder()
        .addContactPoints("localhost")
        .withPort(9042)
        .build()
    
      useJava8DateTime(cluster)
      implicit val session = cluster.connect()
    
      val createCQL ="""
      CREATE TABLE testdb.members (
        id UUID primary key,
        name TEXT,
        description TEXT,
        birthday DATE,
        created_at TIMESTAMP,
        picture BLOB
      )"""
    
      val ctxCreate = CQLContext().setCommand(createCQL)
    
      val ctxInsert = CQLContext().setCommand("insert into testdb.members(id,name,description,birthday,created_at,picture)" +
        " values(uuid(),?,?,?,?,?)", "alan xu", "alan-xu", CQLDate(1966, 11, 27), CQLDateTimeNow, cqlFileToBytes("/users/tiger/Nobody.png"))
    
      val createData = for {
        createTable <- cqlExecute(ctxCreate)
        insertData <- cqlExecute(ctxInsert)
      } yield(createTable, insertData)
    
      createData.onComplete {
        case Success((c,i)) => println(s"Create Table: $c, Insert Data $i")
        case Failure(e) => println(e.getMessage)
      }
      scala.io.StdIn.readLine()
      //data model
      case class Member(
                         id: String,
                         name: String,
                         description: Option[String] = None,
                         birthday: LocalDate,
                         createdAt: java.util.Date,
                         picture: Option[ByteBuffer] = None)
    
      //data row converter
      val toMember = (rs: Row) => Member(
        id = rs.getUUID("id").toString,
        name = rs.getString("name"),
        description = {
          val d = rs.getString("description")
          if (d == null)
            None
          else
            Some(d)
    
          Some(rs.getColumnDefinitions.toString)
        },
        birthday = rs.getDate("birthday"),
        createdAt = rs.getTimestamp("created_at"),
        picture = {
          val pic = rs.getBytes("picture")
          if (pic == null)
            None
          else
            Some(pic)
    
        }
      )
    
     try {
       val qtx = CQLQueryContext(statement = "select * from testdb.members", extractor = toMember)
       val (resultSet, vecResults) = fetchResultPage[Vector, Member](qtx)
    
       var vecMembers: Vector[Member] = vecResults
    
       var isExh = resultSet.isExhausted
       var nextPage: (ResultSet, Option[Vector[Member]]) = (resultSet, Some(vecResults))
       while (!isExh) {
         nextPage = fetchMorePages[Vector,Member](nextPage._1,1 second)(toMember)
         nextPage._2.foreach {vec =>
           vecMembers = vecMembers ++ vec
         }
         isExh = resultSet.isExhausted
       }
       vecMembers.foreach { m =>
         println(s"id: ${m.id}-name:${m.name}-${m.description} birthday: ${m.birthday.toString}")
         println(s"created_at: ${cqlDateTimeString(m.createdAt,"yyyy-MM-dd HH:mm:ss.SSS")}")
         m.picture match {
           case Some(buf) =>
             val fname = s"/users/tiger/pic-${m.name}.png"
             cqlBytesToFile(buf,fname)
             println(s"saving picture to $fname")
           case _ => println("empty picture!")
         }
       }
     } catch {
       case e: Exception => println(e.getMessage)
     }
      
      scala.io.StdIn.readLine()
      session.close()
      cluster.close()
      cqlsys.terminate()
      
    }
  • 相关阅读:
    javascript中的常用表单事件用法
    关于js键盘事件的例子
    对象间引用赋值及方法时引用传递
    反编译工具reflector破解方法
    使用委托(C# 编程指南)
    委托(C# 编程指南)
    浅谈线程池(下):相关试验及注意事项
    Lambda 表达式(C# 编程指南)
    浅谈线程池(中):独立线程池的作用及IO线程池
    浅谈线程池(上):线程池的作用及CLR线程池
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/8457938.html
Copyright © 2011-2022 走看看