zoukankan      html  css  js  c++  java
  • PICE(3):CassandraStreaming

      在上一篇博文里我们介绍了通过gRPC实现JDBC数据库的streaming,这篇我们介绍关于cassandra的streaming实现方式。如果我们需要从一个未部署cassandra的节点或终端上读取cassandra数据,可以用gRPC来搭建一个数据桥梁来连接这两端。这时cassandra这端就是gRPC-Server端,由它提供cassandra的数据服务。

    在前面sdp系列讨论里我们已经实现了Cassandra-Engine。它的运作原理还是通过某种Context把指令提交给cassandra去执行。我们先设计一个创建库表的例子。CQL语句和Cassandra-Engine程序代码如下,这是客户端部分:

      val dropCQL = "DROP TABLE IF EXISTS testdb.AQMRPT"
    
      val createCQL ="""
      CREATE TABLE testdb.AQMRPT (
         rowid bigint primary key,
         measureid bigint,
         statename text,
         countyname text,
         reportyear int,
         value int,
         created timestamp
      )"""
    
      val cqlddl = CQLUpdate(statements = Seq(dropCQL,createCQL))
      def createTbl: Source[CQLResult,NotUsed] = {
        log.info(s"running createTbl ...")
        Source
          .single(cqlddl)
          .via(stub.runDDL)
      }

    首先,我们在CQLUpdate这个protobuf对应Context里传入两条指令dropCQL和createCQL,可以预计这会是一种批次型batch方式。然后一如既往,我们使用了streaming编程模式。在.proto文件里用DDL来对应Context和Service:

    message CQLUpdate {
        repeated string statements = 1;
        bytes parameters = 2;
        google.protobuf.Int32Value consistency = 3;
        google.protobuf.BoolValue batch = 4;
    }
    
    service CQLServices {
      rpc runDDL(CQLUpdate) returns (CQLResult) {}
    }

    服务函数runDDL程序实现如下:

     override def runDDL: Flow[CQLUpdate, CQLResult, NotUsed] = {
        Flow[CQLUpdate]
          .flatMapConcat { context =>
            //unpack CQLUpdate and construct the context
            val ctx = CQLContext(context.statements)
            log.info(s"**** CQLContext => ${ctx} ***")
    
            Source
              .fromFuture(cqlExecute(ctx))
              .map { r => CQLResult(marshal(r)) }
          }
      }

    这里我们调用了Cassandra-Engine的cqlExecute(ctx)函数:

      def cqlExecute(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    
        var invalidBat = false
        if ( ctx.batch ) {
          if (ctx.parameters == Nil)
            invalidBat = true
          else if (ctx.parameters.size < 2)
            invalidBat = true;
        }
        if (!ctx.batch || invalidBat) {
          if(invalidBat)
           log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.")
    
          if (ctx.statements.size == 1) {
            var param: Seq[Object] = Nil
            if (ctx.parameters != Nil) param =  ctx.parameters.head
            log.info(s"cqlExecute>  single-command: statement: ${ctx.statements.head} parameters: ${param}")
            cqlSingleUpdate(ctx.consistency, ctx.statements.head, param)
          }
          else {
            var params: Seq[Seq[Object]] = Nil
            if (ctx.parameters == Nil)
              params = Seq.fill(ctx.statements.length)(Nil)
            else {
              if (ctx.statements.size > ctx.parameters.size) {
                log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.")
                val nils = Seq.fill(ctx.statements.size - ctx.parameters.size)(Nil)
                params = ctx.parameters ++ nils
    
              }
              else
                params = ctx.parameters
            }
    
            val commands: Seq[(String,Seq[Object])] = ctx.statements zip params
            log.info(s"cqlExecute>  multi-commands: ${commands}")
    /*
            //using sequence to flip List[Future[Boolean]] => Future[List[Boolean]]
            //therefore, make sure no command replies on prev command effect
            val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) =>
              cqlSingleUpdate(ctx.consistency, stmt, param)
            }.toList
    
            val futList = lstCmds.sequence.map(_ => true)   //must map to execute
            */
    /*
            //using traverse to have some degree of parallelism = max(runtimes)
            //therefore, make sure no command replies on prev command effect
            val futList = Future.traverse(commands) { case (stmt,param)  =>
              cqlSingleUpdate(ctx.consistency, stmt, param)
            }.map(_ => true)
    
            Await.result(futList, 3 seconds)
            Future.successful(true)
    */
            // run sync directly
            Future {
              commands.foreach { case (stm, pars) =>
                cqlExecuteSync(ctx.consistency, stm, pars)
              }
              true
            }
          }
        }
        else
          cqlBatchUpdate(ctx)
      }

    特别展示了这个函数的代码是因为对于一批次多条指令可能会涉及到non-blocking和并行计算。可参考上面代码标注段落里函数式方法(cats)sequence,traverse如何实现对一串Future的运算。

    下一个例子是用流方式把JDBC数据库数据并入cassandra数据库里。.proto DDL内容如下:

    message ProtoDate {
      int32 yyyy = 1;
      int32 mm   = 2;
      int32 dd   = 3;
    }
    
    message ProtoTime {
      int32 hh   = 1;
      int32 mm   = 2;
      int32 ss   = 3;
      int32 nnn  = 4;
    }
    
    message ProtoDateTime {
       ProtoDate date = 1;
       ProtoTime time = 2;
    }
    
    message AQMRPTRow {
        int64 rowid = 1;
        string countyname = 2;
        string statename = 3;
        int64 measureid = 4;
        int32 reportyear = 5;
        int32 value = 6;
        ProtoDateTime created = 7;
    }
    
    message CQLResult {
      bytes result = 1;
    }
    
    message CQLUpdate {
        repeated string statements = 1;
        bytes parameters = 2;
        google.protobuf.Int32Value consistency = 3;
        google.protobuf.BoolValue batch = 4;
    }
    
    
    service CQLServices {
      rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
      rpc runDDL(CQLUpdate) returns (CQLResult) {}
    }

    下面是服务函数的实现:

     val toParams: AQMRPTRow => Seq[Object] = row => Seq[Object](
        row.rowid.asInstanceOf[Object],
        row.measureid.asInstanceOf[Object],
        row.statename,
        row.countyname,
        row.reportyear.asInstanceOf[Object],
        row.value.asInstanceOf[Object],
        CQLDateTimeNow
      )
      val cqlInsert ="""
                       |insert into testdb.AQMRPT(
                       | rowid,
                       | measureid,
                       | statename,
                       | countyname,
                       | reportyear,
                       | value,
                       | created)
                       | values(?,?,?,?,?,?,?)
                     """.stripMargin
      
      val cqlActionStream = CassandraActionStream(cqlInsert,toParams).setParallelism(2)
        .setProcessOrder(false)
    
    /*
      val cqlActionFlow: Flow[AQMRPTRow,AQMRPTRow,NotUsed] =
        Flow[AQMRPTRow]
          .via(cqlActionStream.performOnRow)
    */
    
      val cqlActionFlow: Flow[AQMRPTRow,CQLResult,NotUsed] = {
        Flow[AQMRPTRow]
            .mapAsync(cqlActionStream.parallelism){ row =>
              if (IfExists(row.rowid))
                Future.successful(CQLResult(marshal(0)))
              else
                cqlActionStream.perform(row).map {_ => CQLResult(marshal(1))}
            }
      }
    
      override def transferRows: Flow[AQMRPTRow, CQLResult, NotUsed] = {
        Flow[AQMRPTRow]
          .via(cqlActionFlow)
      }
    
      private def IfExists(rowid: Long): Boolean = {
        val cql = "SELECT * FROM testdb.AQMRPT WHERE ROWID = ? ALLOW FILTERING"
        val param = Seq(rowid.asInstanceOf[Object])
        val toRowId: Row => Long = r => r.getLong("rowid")
        val ctx = CQLQueryContext(cql,param)
        val src: Source[Long,NotUsed] = cassandraStream(ctx,toRowId)
        val fut = src.toMat(Sink.headOption)(Keep.right).run()
    
        val result = Await.result(fut,3 seconds)
    
        log.info(s"checking existence: ${result}")
        result match {
          case Some(x) => true
          case None => false
        }
      }

    在上面的代码里我们调用了Cassandra-Engine的CassandraActionStream类型的流处理方法。值得注意的是这里我们尝试在stream Flow里运算另一个Flow,如:IfExists函数里运算一个Source来确定rowid是否存在。不要在意这个函数的实际应用,它只是一个人为的例子。另外,rowid:Long这样的定义是硬性规定的。cassandra对数据类型的匹配要求很弱智,没有提供任何自然转换。所以,Int <> Long被视为类型错误,而且无法catch任何明白的错误信息。

    这项服务的客户端调用如下:

      val stub = CqlGrpcAkkaStream.stub(channel)
    
      val jdbcRows2transfer = JDBCQueryContext[AQMRPTRow](
        dbName = 'h2,
        statement = "select * from AQMRPT where statename='Arkansas'"
      )
    
      def toAQMRPTRow: WrappedResultSet => AQMRPTRow = rs => AQMRPTRow(
        rowid = rs.long("ROWID"),
        measureid = rs.long("MEASUREID"),
        statename = rs.string("STATENAME"),
        countyname = rs.string("COUNTYNAME"),
        reportyear = rs.int("REPORTYEAR"),
        value = rs.int("VALUE"),
        created = Some(ProtoDateTime(Some(ProtoDate(1990, 8, 12)), Some(ProtoTime(23, 56, 23, 0))))
      )
    
      import scala.concurrent.duration._
    
      def transferRows: Source[CQLResult, NotUsed] = {
        log.info(s"**** calling transferRows ****")
        jdbcAkkaStream(jdbcRows2transfer, toAQMRPTRow)
          //      .throttle(1, 500.millis, 1, ThrottleMode.shaping)
          .via(stub.transferRows)
      }

    注意:JDBC在客户端本地,cassandra是远程服务。

    最后我们示范一下cassandra Query。.proto DDL 定义:

    message CQLQuery {
        string statement = 1;
        bytes parameters = 2;
        google.protobuf.Int32Value consistency = 3;
        google.protobuf.Int32Value fetchSize = 4;
    }
    
    service CQLServices {
      rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
      rpc runQuery(CQLQuery) returns (stream AQMRPTRow) {}
      rpc runDDL(CQLUpdate) returns (CQLResult) {}
    }

    服务函数代码如下:

     def toCQLTimestamp(rs: Row) = {
        try {
          val tm = rs.getTimestamp("CREATED")
          if (tm == null) None
          else {
            val localdt = cqlGetTimestamp(tm)
            Some(ProtoDateTime(Some(ProtoDate(localdt.getYear, localdt.getMonthValue, localdt.getDayOfMonth)),
              Some(ProtoTime(localdt.getHour, localdt.getMinute, localdt.getSecond, localdt.getNano))))
          }
        }
        catch {
          case e: Exception => None
        }
      }
    
      val toAQMRow: Row => AQMRPTRow = rs=> AQMRPTRow(
        rowid = rs.getLong("ROWID"),
        measureid = rs.getLong("MEASUREID"),
        statename = rs.getString("STATENAME"),
        countyname = rs.getString("COUNTYNAME"),
        reportyear = rs.getInt("REPORTYEAR"),
        value = rs.getInt("VALUE"),
        created = toCQLTimestamp(rs)
      )
      override def runQuery: Flow[CQLQuery, AQMRPTRow, NotUsed] = {
        log.info("**** runQuery called on service side ***")
        Flow[CQLQuery]
          .flatMapConcat { q =>
            //unpack JDBCQuery and construct the context
            var params: Seq[Object] =  Nil
            if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
              params = unmarshal[Seq[Object]](q.parameters)
            log.info(s"**** query parameters: ${params} ****")
            val ctx = CQLQueryContext(q.statement,params)
            CQLEngine.cassandraStream(ctx,toAQMRow)
          }
      }

    这里值得看看的一是日期转换,二是对于cassandra parameter Seq[Object]的marshal和unmarshal。客户端代码:

      val query = CQLQuery(
        statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE > ? ALLOW FILTERING;",
        parameters = marshal(Seq("Arkansas", 0.toInt))
      )
      val query2 = CQLQuery (
        statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
        parameters = marshal(Seq("Colorado", 3.toInt))
      )
      val query3= CQLQuery (
        statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
        parameters = marshal(Seq("Arkansas", 8.toInt))
      )
      def queryRows: Source[AQMRPTRow,NotUsed] = {
        log.info(s"running queryRows ...")
        Source
          .single(query)
          .via(stub.runQuery)
      }

    这段相对直白。

    下面就是本次讨论涉及的完整源代码:

    project/scalapb.sbt

    addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.18")
    
    resolvers += Resolver.bintrayRepo("beyondthelines", "maven")
    
    libraryDependencies ++= Seq(
      "com.thesamet.scalapb" %% "compilerplugin" % "0.7.4",
      "beyondthelines"         %% "grpcakkastreamgenerator" % "0.0.5"
    )

    build.sbt

    import scalapb.compiler.Version.scalapbVersion
    import scalapb.compiler.Version.grpcJavaVersion
    
    name := "gRPCCassandra"
    
    version := "0.1"
    
    scalaVersion := "2.12.6"
    
    resolvers += Resolver.bintrayRepo("beyondthelines", "maven")
    
    scalacOptions += "-Ypartial-unification"
    
    libraryDependencies := Seq(
      "com.thesamet.scalapb" %% "scalapb-runtime" % scalapbVersion % "protobuf",
      "io.grpc" % "grpc-netty" % grpcJavaVersion,
      "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapbVersion,
      "io.monix" %% "monix" % "2.3.0",
      // for GRPC Akkastream
      "beyondthelines"         %% "grpcakkastreamruntime" % "0.0.5",
      // for scalikejdbc
      "org.scalikejdbc" %% "scalikejdbc"       % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-test"   % "3.2.1"   % "test",
      "org.scalikejdbc" %% "scalikejdbc-config"  % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-streams" % "3.2.1",
      "org.scalikejdbc" %% "scalikejdbc-joda-time" % "3.2.1",
      "com.h2database"  %  "h2"                % "1.4.196",
      "mysql" % "mysql-connector-java" % "6.0.6",
      "org.postgresql" % "postgresql" % "42.2.0",
      "commons-dbcp" % "commons-dbcp" % "1.4",
      "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
      "com.zaxxer" % "HikariCP" % "2.7.4",
      "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
      "com.typesafe.slick" %% "slick" % "3.2.1",
      //for cassandra  340
      "com.datastax.cassandra" % "cassandra-driver-core" % "3.4.0",
      "com.datastax.cassandra" % "cassandra-driver-extras" % "3.4.0",
      "com.typesafe.akka" %% "akka-stream" % "2.5.13",
      "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "0.19",
      "ch.qos.logback"  %  "logback-classic"   % "1.2.3",
      "org.typelevel" %% "cats-core" % "1.1.0"
    )
    
    PB.targets in Compile := Seq(
      scalapb.gen() -> (sourceManaged in Compile).value,
      // generate the akka stream files
      grpc.akkastreams.generators.GrpcAkkaStreamGenerator() -> (sourceManaged in Compile).value
    )

    main/resources/application.conf

    # JDBC settings
    test {
      db {
        h2 {
          driver = "org.h2.Driver"
          url = "jdbc:h2:tcp://localhost/~/slickdemo"
          user = ""
          password = ""
          poolInitialSize = 5
          poolMaxSize = 7
          poolConnectionTimeoutMillis = 1000
          poolValidationQuery = "select 1 as one"
          poolFactoryName = "commons-dbcp2"
        }
      }
    
      db.mysql.driver = "com.mysql.cj.jdbc.Driver"
      db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
      db.mysql.user = "root"
      db.mysql.password = "123"
      db.mysql.poolInitialSize = 5
      db.mysql.poolMaxSize = 7
      db.mysql.poolConnectionTimeoutMillis = 1000
      db.mysql.poolValidationQuery = "select 1 as one"
      db.mysql.poolFactoryName = "bonecp"
    
      # scallikejdbc Global settings
      scalikejdbc.global.loggingSQLAndTime.enabled = true
      scalikejdbc.global.loggingSQLAndTime.logLevel = info
      scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
      scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
      scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
      scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
      scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
      scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
    }
    dev {
      db {
        h2 {
          driver = "org.h2.Driver"
          url = "jdbc:h2:tcp://localhost/~/slickdemo"
          user = ""
          password = ""
          poolFactoryName = "hikaricp"
          numThreads = 10
          maxConnections = 12
          minConnections = 4
          keepAliveConnection = true
        }
        mysql {
          driver = "com.mysql.cj.jdbc.Driver"
          url = "jdbc:mysql://localhost:3306/testdb"
          user = "root"
          password = "123"
          poolInitialSize = 5
          poolMaxSize = 7
          poolConnectionTimeoutMillis = 1000
          poolValidationQuery = "select 1 as one"
          poolFactoryName = "bonecp"
    
        }
        postgres {
          driver = "org.postgresql.Driver"
          url = "jdbc:postgresql://localhost:5432/testdb"
          user = "root"
          password = "123"
          poolFactoryName = "hikaricp"
          numThreads = 10
          maxConnections = 12
          minConnections = 4
          keepAliveConnection = true
        }
      }
      # scallikejdbc Global settings
      scalikejdbc.global.loggingSQLAndTime.enabled = true
      scalikejdbc.global.loggingSQLAndTime.logLevel = info
      scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
      scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
      scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
      scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
      scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
      scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
    }

    main/resources/logback.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
    
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
            <layout class="ch.qos.logback.classic.PatternLayout">
                <Pattern>
                    %d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n
                </Pattern>
            </layout>
        </appender>
    
        <logger name="sdp.cql" level="info"
                additivity="false">
            <appender-ref ref="STDOUT" />
        </logger>
    
        <logger name="demo.sdp.grpc.cql" level="info"
                additivity="false">
            <appender-ref ref="STDOUT" />
        </logger>
    
        <root level="error">
            <appender-ref ref="STDOUT" />
        </root>
    
    </configuration>

    main/protobuf/cql.proto

    syntax = "proto3";
    
    import "google/protobuf/wrappers.proto";
    import "google/protobuf/any.proto";
    import "scalapb/scalapb.proto";
    
    option (scalapb.options) = {
      // use a custom Scala package name
      // package_name: "io.ontherocks.introgrpc.demo"
    
      // don't append file name to package
      flat_package: true
    
      // generate one Scala file for all messages (services still get their own file)
      single_file: true
    
      // add imports to generated file
      // useful when extending traits or using custom types
      // import: "io.ontherocks.hellogrpc.RockingMessage"
    
      // code to put at the top of generated file
      // works only with `single_file: true`
      //preamble: "sealed trait SomeSealedTrait"
    };
    
    /*
     * Demoes various customization options provided by ScalaPBs.
     */
    
    package sdp.grpc.services;
    
    message ProtoDate {
      int32 yyyy = 1;
      int32 mm   = 2;
      int32 dd   = 3;
    }
    
    message ProtoTime {
      int32 hh   = 1;
      int32 mm   = 2;
      int32 ss   = 3;
      int32 nnn  = 4;
    }
    
    message ProtoDateTime {
       ProtoDate date = 1;
       ProtoTime time = 2;
    }
    
    message AQMRPTRow {
        int64 rowid = 1;
        string countyname = 2;
        string statename = 3;
        int64 measureid = 4;
        int32 reportyear = 5;
        int32 value = 6;
        ProtoDateTime created = 7;
    }
    
    message CQLResult {
      bytes result = 1;
    }
    
    message CQLQuery {
        string statement = 1;
        bytes parameters = 2;
        google.protobuf.Int32Value consistency = 3;
        google.protobuf.Int32Value fetchSize = 4;
    }
    
    message CQLUpdate {
        repeated string statements = 1;
        bytes parameters = 2;
        google.protobuf.Int32Value consistency = 3;
        google.protobuf.BoolValue batch = 4;
    }
    
    message HelloMsg {
      string hello = 1;
    }
    
    service CQLServices {
      rpc clientStreaming(stream HelloMsg) returns (stream HelloMsg) {}
      rpc transferRows(stream AQMRPTRow) returns (stream CQLResult) {}
      rpc runQuery(CQLQuery) returns (stream AQMRPTRow) {}
      rpc runDDL(CQLUpdate) returns (CQLResult) {}
    }

    logging/log.scala

    package sdp.logging
    
    import org.slf4j.Logger
    
    /**
      * Logger which just wraps org.slf4j.Logger internally.
      *
      * @param logger logger
      */
    class Log(logger: Logger) {
    
      // use var consciously to enable squeezing later
      var isDebugEnabled: Boolean = logger.isDebugEnabled
      var isInfoEnabled: Boolean = logger.isInfoEnabled
      var isWarnEnabled: Boolean = logger.isWarnEnabled
      var isErrorEnabled: Boolean = logger.isErrorEnabled
    
      def withLevel(level: Symbol)(msg: => String, e: Throwable = null): Unit = {
        level match {
          case 'debug | 'DEBUG => debug(msg)
          case 'info | 'INFO => info(msg)
          case 'warn | 'WARN => warn(msg)
          case 'error | 'ERROR => error(msg)
          case _ => // nothing to do
        }
      }
    
      def debug(msg: => String): Unit = {
        if (isDebugEnabled && logger.isDebugEnabled) {
          logger.debug(msg)
        }
      }
    
      def debug(msg: => String, e: Throwable): Unit = {
        if (isDebugEnabled && logger.isDebugEnabled) {
          logger.debug(msg, e)
        }
      }
    
      def info(msg: => String): Unit = {
        if (isInfoEnabled && logger.isInfoEnabled) {
          logger.info(msg)
        }
      }
    
      def info(msg: => String, e: Throwable): Unit = {
        if (isInfoEnabled && logger.isInfoEnabled) {
          logger.info(msg, e)
        }
      }
    
      def warn(msg: => String): Unit = {
        if (isWarnEnabled && logger.isWarnEnabled) {
          logger.warn(msg)
        }
      }
    
      def warn(msg: => String, e: Throwable): Unit = {
        if (isWarnEnabled && logger.isWarnEnabled) {
          logger.warn(msg, e)
        }
      }
    
      def error(msg: => String): Unit = {
        if (isErrorEnabled && logger.isErrorEnabled) {
          logger.error(msg)
        }
      }
    
      def error(msg: => String, e: Throwable): Unit = {
        if (isErrorEnabled && logger.isErrorEnabled) {
          logger.error(msg, e)
        }
      }
    
    }

    logging/LogSupport.scala

    package sdp.logging
    
    import org.slf4j.LoggerFactory
    
    trait LogSupport {
    
      /**
        * Logger
        */
      protected val log = new Log(LoggerFactory.getLogger(this.getClass))
    
    }

    filestreaming/FileStreaming.scala

    package sdp.file
    
    import java.io.{ByteArrayInputStream, InputStream}
    import java.nio.ByteBuffer
    import java.nio.file.Paths
    
    import akka.stream.Materializer
    import akka.stream.scaladsl.{FileIO, StreamConverters}
    import akka.util._
    
    import scala.concurrent.Await
    import scala.concurrent.duration._
    
    object Streaming {
      def FileToByteBuffer(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer):ByteBuffer = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toByteBuffer
      }
    
      def FileToByteArray(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer): Array[Byte] = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        (Await.result(fut, timeOut)).toArray
      }
    
      def FileToInputStream(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer): InputStream = {
        val fut = FileIO.fromPath(Paths.get(fileName)).runFold(ByteString()) { case (hd, bs) =>
          hd ++ bs
        }
        val buf = (Await.result(fut, timeOut)).toArray
        new ByteArrayInputStream(buf)
      }
    
      def ByteBufferToFile(byteBuf: ByteBuffer, fileName: String)(
        implicit mat: Materializer) = {
        val ba = new Array[Byte](byteBuf.remaining())
        byteBuf.get(ba,0,ba.length)
        val baInput = new ByteArrayInputStream(ba)
        val source = StreamConverters.fromInputStream(() => baInput)  //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def ByteArrayToFile(bytes: Array[Byte], fileName: String)(
        implicit mat: Materializer) = {
        val bb = ByteBuffer.wrap(bytes)
        val baInput = new ByteArrayInputStream(bytes)
        val source = StreamConverters.fromInputStream(() => baInput) //ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
      def InputStreamToFile(is: InputStream, fileName: String)(
        implicit mat: Materializer) = {
        val source = StreamConverters.fromInputStream(() => is)
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
    
    }

    jdbc/JDBCConfig.scala

    package sdp.jdbc.config
    import scala.collection.mutable
    import scala.concurrent.duration.Duration
    import scala.language.implicitConversions
    import com.typesafe.config._
    import java.util.concurrent.TimeUnit
    import java.util.Properties
    import scalikejdbc.config._
    import com.typesafe.config.Config
    import com.zaxxer.hikari._
    import scalikejdbc.ConnectionPoolFactoryRepository
    
    /** Extension methods to make Typesafe Config easier to use */
    class ConfigExtensionMethods(val c: Config) extends AnyVal {
      import scala.collection.JavaConverters._
    
      def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default
      def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default
      def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default
      def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default
    
      def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default
      def getDurationOr(path: String, default: => Duration = Duration.Zero) =
        if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default
    
      def getPropertiesOr(path: String, default: => Properties = null): Properties =
        if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default
    
      def toProperties: Properties = {
        def toProps(m: mutable.Map[String, ConfigValue]): Properties = {
          val props = new Properties(null)
          m.foreach { case (k, cv) =>
            val v =
              if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)
              else if(cv.unwrapped eq null) null
              else cv.unwrapped.toString
            if(v ne null) props.put(k, v)
          }
          props
        }
        toProps(c.root.asScala)
      }
    
      def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None
      def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None
      def getStringOpt(path: String) = Option(getStringOr(path))
      def getPropertiesOpt(path: String) = Option(getPropertiesOr(path))
    }
    
    object ConfigExtensionMethods {
      @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c)
    }
    
    trait HikariConfigReader extends TypesafeConfigReader {
      self: TypesafeConfig =>      // with TypesafeConfigReader => //NoEnvPrefix =>
    
      import ConfigExtensionMethods.configExtensionMethods
    
      def getFactoryName(dbName: Symbol): String = {
        val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
        c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP)
      }
    
      def hikariCPConfig(dbName: Symbol): HikariConfig = {
    
        val hconf = new HikariConfig()
        val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
    
        // Connection settings
        if (c.hasPath("dataSourceClass")) {
          hconf.setDataSourceClassName(c.getString("dataSourceClass"))
        } else {
          Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _)
        }
        hconf.setJdbcUrl(c.getStringOr("url", null))
        c.getStringOpt("user").foreach(hconf.setUsername)
        c.getStringOpt("password").foreach(hconf.setPassword)
        c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties)
    
        // Pool configuration
        hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000))
        hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000))
        hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000))
        hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000))
        hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0))
        hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false))
        c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery)
        c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql)
        val numThreads = c.getIntOr("numThreads", 20)
        hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5))
        hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads))
        hconf.setPoolName(c.getStringOr("poolName", dbName.name))
        hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false))
    
        // Equivalent of ConnectionPreparer
        hconf.setReadOnly(c.getBooleanOr("readOnly", false))
        c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)
        hconf.setCatalog(c.getStringOr("catalog", null))
    
        hconf
    
      }
    }
    
    import scalikejdbc._
    trait ConfigDBs {
      self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader =>
    
      def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
        getFactoryName(dbName) match {
          case "hikaricp" => {
            val hconf = hikariCPConfig(dbName)
            val hikariCPSource = new HikariDataSource(hconf)
            case class HikariDataSourceCloser(src: HikariDataSource) extends DataSourceCloser {
              var closed = false
              override def close(): Unit = src.close()
            }
            if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {
              Class.forName(hconf.getDriverClassName)
            }
            ConnectionPool.add(dbName, new DataSourceConnectionPool(dataSource = hikariCPSource,settings = DataSourceConnectionPoolSettings(),
              closer = HikariDataSourceCloser(hikariCPSource)))
          }
          case _ => {
            val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName)
            val cpSettings = readConnectionPoolSettings(dbName)
            if (driver != null && driver.trim.nonEmpty) {
              Class.forName(driver)
            }
            ConnectionPool.add(dbName, url, user, password, cpSettings)
          }
        }
      }
    
      def setupAll(): Unit = {
        loadGlobalSettings()
        dbNames.foreach { dbName => setup(Symbol(dbName)) }
      }
    
      def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
        ConnectionPool.close(dbName)
      }
    
      def closeAll(): Unit = {
        ConnectionPool.closeAll
      }
    
    }
    
    
    object ConfigDBs extends ConfigDBs
      with TypesafeConfigReader
      with StandardTypesafeConfig
      with HikariConfigReader
    
    case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs
      with TypesafeConfigReader
      with StandardTypesafeConfig
      with HikariConfigReader
      with EnvPrefix {
    
      override val env = Option(envValue)
    }

    jdbc/JDBCEngine.scala

    package sdp.jdbc.engine
    import java.sql.PreparedStatement
    import scala.collection.generic.CanBuildFrom
    import akka.stream.scaladsl._
    import scalikejdbc._
    import scalikejdbc.streams._
    import akka.NotUsed
    import akka.stream._
    import java.time._
    import scala.concurrent.duration._
    import scala.concurrent._
    import sdp.file.Streaming._
    
    import scalikejdbc.TxBoundary.Try._
    
    import scala.concurrent.ExecutionContextExecutor
    import java.io.InputStream
    
    import sdp.logging.LogSupport
    
    object JDBCContext {
      type SQLTYPE = Int
      val SQL_EXEDDL= 1
      val SQL_UPDATE = 2
      val RETURN_GENERATED_KEYVALUE = true
      val RETURN_UPDATED_COUNT = false
    
    }
    
    case class JDBCQueryContext[M](
                                    dbName: Symbol,
                                    statement: String,
                                    parameters: Seq[Any] = Nil,
                                    fetchSize: Int = 100,
                                    autoCommit: Boolean = false,
                                    queryTimeout: Option[Int] = None)
    
    
    case class JDBCContext (
                            dbName: Symbol,
                            statements: Seq[String] = Nil,
                            parameters: Seq[Seq[Any]] = Nil,
                            fetchSize: Int = 100,
                            queryTimeout: Option[Int] = None,
                            queryTags: Seq[String] = Nil,
                            sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_UPDATE,
                            batch: Boolean = false,
                            returnGeneratedKey: Seq[Option[Any]] = Nil,
                            // no return: None, return by index: Some(1), by name: Some("id")
                            preAction: Option[PreparedStatement => Unit] = None,
                            postAction: Option[PreparedStatement => Unit] = None)
                  extends LogSupport {
    
      ctx =>
    
      //helper functions
    
      def appendTag(tag: String): JDBCContext = ctx.copy(queryTags = ctx.queryTags :+ tag)
    
      def appendTags(tags: Seq[String]): JDBCContext = ctx.copy(queryTags = ctx.queryTags ++ tags)
    
      def setFetchSize(size: Int): JDBCContext = ctx.copy(fetchSize = size)
    
      def setQueryTimeout(time: Option[Int]): JDBCContext = ctx.copy(queryTimeout = time)
    
      def setPreAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
        if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
          !ctx.batch && ctx.statements.size == 1) {
          val nc = ctx.copy(preAction = action)
          log.info("setPreAction> set")
          nc
        }
        else {
          log.info("setPreAction> JDBCContex setting error: preAction not supported!")
          throw new IllegalStateException("JDBCContex setting error: preAction not supported!")
        }
      }
    
      def setPostAction(action: Option[PreparedStatement => Unit]): JDBCContext = {
        if (ctx.sqlType == JDBCContext.SQL_UPDATE &&
          !ctx.batch && ctx.statements.size == 1) {
          val nc = ctx.copy(postAction = action)
          log.info("setPostAction> set")
          nc
        }
        else {
          log.info("setPreAction> JDBCContex setting error: postAction not supported!")
          throw new IllegalStateException("JDBCContex setting error: postAction not supported!")
        }
      }
    
      def appendDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
        if (ctx.sqlType == JDBCContext.SQL_EXEDDL) {
          log.info(s"appendDDLCommand> appending: statement: ${_statement}, parameters: ${_parameters}")
          val nc = ctx.copy(
            statements = ctx.statements ++ Seq(_statement),
            parameters = ctx.parameters ++ Seq(Seq(_parameters))
          )
          log.info(s"appendDDLCommand> appended: statement: ${nc.statements}, parameters: ${nc.parameters}")
          nc
        } else {
          log.info(s"appendDDLCommand> JDBCContex setting error: option not supported!")
          throw new IllegalStateException("JDBCContex setting error: option not supported!")
        }
      }
    
      def appendUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {
        if (ctx.sqlType == JDBCContext.SQL_UPDATE && !ctx.batch) {
          log.info(s"appendUpdateCommand> appending: returnGeneratedKey: ${_returnGeneratedKey}, statement: ${_statement}, parameters: ${_parameters}")
          val nc = ctx.copy(
            statements = ctx.statements ++ Seq(_statement),
            parameters = ctx.parameters ++ Seq(_parameters),
            returnGeneratedKey = ctx.returnGeneratedKey ++ (if (_returnGeneratedKey) Seq(Some(1)) else Seq(None))
          )
          log.info(s"appendUpdateCommand> appended: statement: ${nc.statements}, parameters: ${nc.parameters}")
          nc
        } else {
          log.info(s"appendUpdateCommand> JDBCContex setting error: option not supported!")
          throw new IllegalStateException("JDBCContex setting error: option not supported!")
        }
      }
    
      def appendBatchParameters(_parameters: Any*): JDBCContext = {
        log.info(s"appendBatchParameters> appending:  parameters: ${_parameters}")
        if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch) {
          log.info(s"appendBatchParameters> JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")
          throw new IllegalStateException("JDBCContex setting error: batch parameters only supported for SQL_UPDATE and batch = true!")
        }
        var matchParams = true
        if (ctx.parameters != Nil)
          if (ctx.parameters.head.size != _parameters.size)
            matchParams = false
        if (matchParams) {
          val nc = ctx.copy(
            parameters = ctx.parameters ++ Seq(_parameters)
          )
          log.info(s"appendBatchParameters> appended: statement: ${nc.statements}, parameters: ${nc.parameters}")
          nc
        } else {
          log.info(s"appendBatchParameters> JDBCContex setting error: batch command parameters not match!")
          throw new IllegalStateException("JDBCContex setting error: batch command parameters not match!")
        }
      }
    
    
      def setBatchReturnGeneratedKeyOption(returnKey: Boolean): JDBCContext = {
        if (ctx.sqlType != JDBCContext.SQL_UPDATE || !ctx.batch)
          throw new IllegalStateException("JDBCContex setting error: only supported in batch update commands!")
        ctx.copy(
          returnGeneratedKey = if (returnKey) Seq(Some(1)) else Nil
        )
      }
    
      def setDDLCommand(_statement: String, _parameters: Any*): JDBCContext = {
        log.info(s"setDDLCommand> setting: statement: ${_statement}, parameters: ${_parameters}")
        val nc = ctx.copy(
          statements = Seq(_statement),
          parameters = Seq(_parameters),
          sqlType = JDBCContext.SQL_EXEDDL,
          batch = false
        )
        log.info(s"setDDLCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}")
        nc
      }
    
      def setUpdateCommand(_returnGeneratedKey: Boolean, _statement: String,_parameters: Any*): JDBCContext = {
        log.info(s"setUpdateCommand> setting: returnGeneratedKey: ${_returnGeneratedKey}, statement: ${_statement}, parameters: ${_parameters}")
        val nc = ctx.copy(
          statements = Seq(_statement),
          parameters = Seq(_parameters),
          returnGeneratedKey = if (_returnGeneratedKey) Seq(Some(1)) else Seq(None),
          sqlType = JDBCContext.SQL_UPDATE,
          batch = false
        )
        log.info(s"setUpdateCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}")
        nc
      }
      def setBatchCommand(_statement: String): JDBCContext = {
        log.info(s"setBatchCommand> appending: statement: ${_statement}")
        val nc = ctx.copy (
          statements = Seq(_statement),
          sqlType = JDBCContext.SQL_UPDATE,
          batch = true
        )
        log.info(s"setBatchCommand> set: statement: ${nc.statements}, parameters: ${nc.parameters}")
        nc
      }
    
    }
    
    object JDBCEngine extends LogSupport {
      import JDBCContext._
    
      type JDBCDate = LocalDate
      type JDBCDateTime = LocalDateTime
      type JDBCTime = LocalTime
    
      def jdbcSetDate(yyyy: Int, mm: Int, dd: Int) = LocalDate.of(yyyy,mm,dd)
      def jdbcSetTime(hh: Int, mm: Int, ss: Int, nn: Int) = LocalTime.of(hh,mm,ss,nn)
      def jdbcSetDateTime(date: JDBCDate, time: JDBCTime) =  LocalDateTime.of(date,time)
      def jdbcSetNow = LocalDateTime.now()
    
      def jdbcGetDate(sqlDate: java.sql.Date): java.time.LocalDate = sqlDate.toLocalDate
      def jdbcGetTime(sqlTime: java.sql.Time): java.time.LocalTime = sqlTime.toLocalTime
      def jdbcGetTimestamp(sqlTimestamp: java.sql.Timestamp): java.time.LocalDateTime =
                      sqlTimestamp.toLocalDateTime
    
    
      type JDBCBlob = InputStream
    
      def fileToJDBCBlob(fileName: String, timeOut: FiniteDuration = 60 seconds)(
        implicit mat: Materializer) = FileToInputStream(fileName,timeOut)
    
      def jdbcBlobToFile(blob: JDBCBlob, fileName: String)(
        implicit mat: Materializer) =  InputStreamToFile(blob,fileName)
    
    
      private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
        throw new IllegalStateException(message)
      }
    
      def jdbcAkkaStream[A](ctx: JDBCQueryContext[A],extractor: WrappedResultSet => A)
                           (implicit ec: ExecutionContextExecutor): Source[A,NotUsed] = {
          val publisher: DatabasePublisher[A] = NamedDB(ctx.dbName) readOnlyStream {
          val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
          ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
          val sql: SQL[A, HasExtractor] = rawSql.map(extractor)
          sql.iterator
            .withDBSessionForceAdjuster(session => {
              session.connection.setAutoCommit(ctx.autoCommit)
              session.fetchSize(ctx.fetchSize)
            })
        }
        log.info(s"jdbcAkkaStream> Source: db: ${ctx.dbName}, statement: ${ctx.statement}, parameters: ${ctx.parameters}")
        Source.fromPublisher[A](publisher)
      }
    
    
      def jdbcQueryResult[C[_] <: TraversableOnce[_], A](ctx: JDBCQueryContext[A],
                                                         extractor: WrappedResultSet => A)(
                                                          implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
        val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statement, ctx.parameters)(noExtractor(""))
        ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
        rawSql.fetchSize(ctx.fetchSize)
        try {
          implicit val session = NamedAutoSession(ctx.dbName)
          log.info(s"jdbcQueryResult> Source: db: ${ctx.dbName}, statement: ${ctx.statement}, parameters: ${ctx.parameters}")
          val sql: SQL[A, HasExtractor] = rawSql.map(extractor)
          sql.collection.apply[C]()
        } catch {
          case e: Exception =>
            log.error(s"jdbcQueryResult> runtime error: ${e.getMessage}")
            throw new RuntimeException(s"jdbcQueryResult> Error: ${e.getMessage}")
    
        }
    
      }
    
      def jdbcExecuteDDL(ctx: JDBCContext)(implicit ec: ExecutionContextExecutor): Future[String] = {
        if (ctx.sqlType != SQL_EXEDDL) {
          log.info(s"jdbcExecuteDDL> JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!")
          Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_EXEDDL'!"))
        }
        else {
          log.info(s"jdbcExecuteDDL> Source: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
          Future {
            NamedDB(ctx.dbName) localTx { implicit session =>
              ctx.statements.foreach { stm =>
                val ddl = new SQLExecution(statement = stm, parameters = Nil)(
                  before = WrappedResultSet => {})(
                  after = WrappedResultSet => {})
    
                ddl.apply()
              }
              "SQL_EXEDDL executed succesfully."
            }
          }
        }
      }
    
      def jdbcBatchUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
        implicit ec: ExecutionContextExecutor,
                 cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
        if (ctx.statements == Nil) {
          log.info(s"jdbcBatchUpdate> JDBCContex setting error: statements empty!")
          Future.failed(new IllegalStateException("JDBCContex setting error: statements empty!"))
        }
        if (ctx.sqlType != SQL_UPDATE) {
          log.info(s"jdbcBatchUpdate> JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")
          Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
        }
        else {
          if (ctx.batch) {
            if (noReturnKey(ctx)) {
              log.info(s"jdbcBatchUpdate> batch updating no return: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
              val usql = SQL(ctx.statements.head)
                .tags(ctx.queryTags: _*)
                .batch(ctx.parameters: _*)
              Future {
                NamedDB(ctx.dbName) localTx { implicit session =>
                  ctx.queryTimeout.foreach(session.queryTimeout(_))
                  usql.apply[Seq]()
                  Seq.empty[Long].to[C]
                }
              }
            } else {
              log.info(s"jdbcBatchUpdate> batch updating return genkey: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
              val usql = new SQLBatchWithGeneratedKey(ctx.statements.head, ctx.parameters, ctx.queryTags)(None)
              Future {
                NamedDB(ctx.dbName) localTx { implicit session =>
                  ctx.queryTimeout.foreach(session.queryTimeout(_))
                  usql.apply[C]()
                }
              }
            }
    
          } else {
            log.info(s"jdbcBatchUpdate> JDBCContex setting error: must set batch = true !")
            Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = true !"))
          }
        }
      }
      private def singleTxUpdateWithReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
              implicit ec: ExecutionContextExecutor,
                        cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
        val Some(key) :: xs = ctx.returnGeneratedKey
        val params: Seq[Any] = ctx.parameters match {
          case Nil => Nil
          case p@_ => p.head
        }
        log.info(s"singleTxUpdateWithReturnKey> updating: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
        val usql = new SQLUpdateWithGeneratedKey(ctx.statements.head, params, ctx.queryTags)(key)
        Future {
          NamedDB(ctx.dbName) localTx { implicit session =>
            session.fetchSize(ctx.fetchSize)
            ctx.queryTimeout.foreach(session.queryTimeout(_))
            val result = usql.apply()
            Seq(result).to[C]
          }
        }
      }
    
      private def singleTxUpdateNoReturnKey[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
              implicit ec: ExecutionContextExecutor,
                       cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
        val params: Seq[Any] = ctx.parameters match {
          case Nil => Nil
          case p@_ => p.head
        }
        val before = ctx.preAction match {
          case None => pstm: PreparedStatement => {}
          case Some(f) => f
        }
        val after = ctx.postAction match {
          case None => pstm: PreparedStatement => {}
          case Some(f) => f
        }
        log.info(s"singleTxUpdateNoReturnKey> updating: db: ${ctx.dbName}, statement: ${ctx.statements}, parameters: ${ctx.parameters}")
        val usql = new SQLUpdate(ctx.statements.head,params,ctx.queryTags)(before)(after)
        Future {
          NamedDB(ctx.dbName) localTx {implicit session =>
            session.fetchSize(ctx.fetchSize)
            ctx.queryTimeout.foreach(session.queryTimeout(_))
            val result = usql.apply()
            Seq(result.toLong).to[C]
          }
        }
    
      }
    
      private def singleTxUpdate[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
        implicit ec: ExecutionContextExecutor,
                 cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
        if (noReturnKey(ctx))
          singleTxUpdateNoReturnKey(ctx)
        else
          singleTxUpdateWithReturnKey(ctx)
      }
    
      private def noReturnKey(ctx: JDBCContext): Boolean = {
        if (ctx.returnGeneratedKey != Nil) {
          val k :: xs = ctx.returnGeneratedKey
          k match {
            case None => true
            case Some(k) => false
          }
        } else true
      }
    
      def noActon: PreparedStatement=>Unit = pstm => {}
    
      def multiTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
        implicit ec: ExecutionContextExecutor,
                 cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
    
        val keys: Seq[Option[Any]] = ctx.returnGeneratedKey match {
          case Nil => Seq.fill(ctx.statements.size)(None)
          case k@_ => k
        }
        val sqlcmd = ctx.statements zip ctx.parameters zip keys
        log.info(s"multiTxUpdates> updating: db: ${ctx.dbName}, SQL Commands: ${sqlcmd}")
        Future {
          NamedDB(ctx.dbName) localTx { implicit session =>
            session.fetchSize(ctx.fetchSize)
            ctx.queryTimeout.foreach(session.queryTimeout(_))
            val results = sqlcmd.map { case ((stm, param), key) =>
              key match {
                case None =>
                  new SQLUpdate(stm, param, Nil)(noActon)(noActon).apply().toLong
                case Some(k) =>
                  new SQLUpdateWithGeneratedKey(stm, param, Nil)(k).apply().toLong
              }
            }
            results.to[C]
          }
        }
      }
    
    
      def jdbcTxUpdates[C[_] <: TraversableOnce[_]](ctx: JDBCContext)(
        implicit ec: ExecutionContextExecutor,
                 cbf: CanBuildFrom[Nothing, Long, C[Long]]): Future[C[Long]] = {
        if (ctx.statements == Nil) {
          log.info(s"jdbcTxUpdates> JDBCContex setting error: statements empty!")
          Future.failed(new IllegalStateException("JDBCContex setting error: statements empty!"))
        }
        if (ctx.sqlType != SQL_UPDATE) {
          log.info(s"jdbcTxUpdates> JDBCContex setting error: sqlType must be 'SQL_UPDATE'!")
          Future.failed(new IllegalStateException("JDBCContex setting error: sqlType must be 'SQL_UPDATE'!"))
        }
        else {
          if (!ctx.batch) {
            if (ctx.statements.size == 1)
              singleTxUpdate(ctx)
            else
              multiTxUpdates(ctx)
          } else {
            log.info(s"jdbcTxUpdates> JDBCContex setting error: must set batch = false !")
            Future.failed(new IllegalStateException("JDBCContex setting error: must set batch = false !"))
          }
        }
      }
    
      case class JDBCActionStream[R](dbName: Symbol, parallelism: Int = 1, processInOrder: Boolean = true,
                                     statement: String, prepareParams: R => Seq[Any]) extends LogSupport {
        jas =>
        def setDBName(db: Symbol): JDBCActionStream[R] = jas.copy(dbName = db)
        def setParallelism(parLevel: Int): JDBCActionStream[R] = jas.copy(parallelism = parLevel)
        def setProcessOrder(ordered: Boolean): JDBCActionStream[R] = jas.copy(processInOrder = ordered)
    
        private def perform(r: R)(implicit ec: ExecutionContextExecutor) = {
          import scala.concurrent._
          val params = prepareParams(r)
          log.info(s"JDBCActionStream.perform>  db: ${dbName}, statement: ${statement}, parameters: ${params}")
          Future {
            NamedDB(dbName) autoCommit { session =>
              session.execute(statement, params: _*)
            }
            r
          }
        }
        def performOnRow(implicit ec: ExecutionContextExecutor): Flow[R, R, NotUsed] =
          if (processInOrder)
            Flow[R].mapAsync(parallelism)(perform)
          else
            Flow[R].mapAsyncUnordered(parallelism)(perform)
    
      }
    
      object JDBCActionStream {
        def apply[R](_dbName: Symbol, _statement: String, params: R => Seq[Any]): JDBCActionStream[R] =
          new JDBCActionStream[R](dbName = _dbName, statement=_statement, prepareParams = params)
      }
    
    }

    cql/CassandraEngine.scala

    package sdp.cql.engine
    
    import akka.NotUsed
    import akka.stream.alpakka.cassandra.scaladsl._
    import akka.stream.scaladsl._
    import com.datastax.driver.core._
    import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture}
    
    import scala.collection.JavaConverters._
    import scala.collection.generic.CanBuildFrom
    import scala.concurrent._
    import scala.concurrent.duration.Duration
    import sdp.logging.LogSupport
    
    object CQLContext {
      // Consistency Levels
      type CONSISTENCY_LEVEL = Int
      val ANY: CONSISTENCY_LEVEL          =                                        0x0000
      val ONE: CONSISTENCY_LEVEL          =                                        0x0001
      val TWO: CONSISTENCY_LEVEL          =                                        0x0002
      val THREE: CONSISTENCY_LEVEL        =                                        0x0003
      val QUORUM : CONSISTENCY_LEVEL      =                                        0x0004
      val ALL: CONSISTENCY_LEVEL          =                                        0x0005
      val LOCAL_QUORUM: CONSISTENCY_LEVEL =                                        0x0006
      val EACH_QUORUM: CONSISTENCY_LEVEL  =                                        0x0007
      val LOCAL_ONE: CONSISTENCY_LEVEL    =                                        0x000A
      val LOCAL_SERIAL: CONSISTENCY_LEVEL =                                        0x000B
      val SERIAL: CONSISTENCY_LEVEL       =                                        0x000C
    
      def apply(): CQLContext = CQLContext(statements = Nil)
    
      def consistencyLevel: CONSISTENCY_LEVEL => ConsistencyLevel = consistency => {
        consistency match {
          case ALL => ConsistencyLevel.ALL
          case ONE => ConsistencyLevel.ONE
          case TWO => ConsistencyLevel.TWO
          case THREE => ConsistencyLevel.THREE
          case ANY => ConsistencyLevel.ANY
          case EACH_QUORUM => ConsistencyLevel.EACH_QUORUM
          case LOCAL_ONE => ConsistencyLevel.LOCAL_ONE
          case QUORUM => ConsistencyLevel.QUORUM
          case SERIAL => ConsistencyLevel.SERIAL
          case LOCAL_SERIAL => ConsistencyLevel.LOCAL_SERIAL
    
        }
      }
    
    }
    case class CQLQueryContext(
                                   statement: String,
                                   parameter: Seq[Object] = Nil,
                                   consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
                                   fetchSize: Int = 100
                                 ) { ctx =>
      def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLQueryContext =
        ctx.copy(consistency = Some(_consistency))
      def setFetchSize(pageSize: Int): CQLQueryContext =
        ctx.copy(fetchSize = pageSize)
      def setParameters(param: Seq[Object]): CQLQueryContext =
        ctx.copy(parameter = param)
    }
    object CQLQueryContext {
      def apply[M](stmt: String, param: Seq[Object]): CQLQueryContext = new CQLQueryContext(statement = stmt, parameter = param)
    }
    
    case class CQLContext(
                           statements: Seq[String],
                           parameters: Seq[Seq[Object]] = Nil,
                           consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None,
                           batch: Boolean = false
                         ) extends LogSupport { ctx =>
      def setBatch(bat: Boolean) = ctx.copy(batch = bat)
      def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CQLContext =
        ctx.copy(consistency = Some(_consistency))
      def setCommand(_statement: String, _parameters: Object*): CQLContext = {
        log.info(s"setCommand> setting: statement: ${_statement}, parameters: ${_parameters}")
        val nc = ctx.copy(statements = Seq(_statement), parameters = Seq(_parameters))
        log.info(s"setCommand> set: statements: ${nc.statements}, parameters: ${nc.parameters}")
        nc
      }
      def appendCommand(_statement: String, _parameters: Object*): CQLContext = {
        log.info(s"appendCommand> appending: statement: ${_statement}, parameters: ${_parameters}")
        val nc = ctx.copy(statements = ctx.statements :+ _statement,
          parameters = ctx.parameters ++ Seq(_parameters))
        log.info(s"appendCommand> appended: statements: ${nc.statements}, parameters: ${nc.parameters}")
        nc
      }
    }
    
    object CQLEngine extends LogSupport {
      import CQLContext._
      import CQLHelpers._
    
      import cats._, cats.data._, cats.implicits._
      import scala.concurrent.{Await, Future}
      import scala.concurrent.duration._
    
      def fetchResultPage[C[_] <: TraversableOnce[_],A](ctx: CQLQueryContext, pageSize: Int = 100
                                                       ,extractor: Row => A)(
        implicit session: Session, cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet, C[A])= {
    
        val prepStmt = session.prepare(ctx.statement)
    
        var boundStmt =  prepStmt.bind()
        var params: Seq[Object] = Nil
        if (ctx.parameter != Nil) {
          params = processParameters(ctx.parameter)
          boundStmt = prepStmt.bind(params:_*)
        }
        log.info(s"fetchResultPage>  statement: ${prepStmt.getQueryString}, parameters: ${params}")
    
        ctx.consistency.foreach {consistency =>
          boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
    
        val resultSet = session.execute(boundStmt.setFetchSize(pageSize))
        (resultSet,(resultSet.asScala.view.map(extractor)).to[C])
      }
      def fetchMorePages[C[_] <: TraversableOnce[_],A](resultSet: ResultSet, timeOut: Duration)(
        extractor: Row => A)(implicit cbf: CanBuildFrom[Nothing, A, C[A]]): (ResultSet,Option[C[A]]) =
        if (resultSet.isFullyFetched) {
          (resultSet, None)
        } else {
          try {
            val result = Await.result(resultSet.fetchMoreResults(), timeOut)
            (result, Some((result.asScala.view.map(extractor)).to[C]))
          } catch { case e: Throwable => (resultSet, None) }
        }
    
      def cqlExecute(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    
        var invalidBat = false
        if ( ctx.batch ) {
          if (ctx.parameters == Nil)
            invalidBat = true
          else if (ctx.parameters.size < 2)
            invalidBat = true;
        }
        if (!ctx.batch || invalidBat) {
          if(invalidBat)
           log.warn(s"cqlExecute> batch update must have at least 2 sets of parameters! change to single-command.")
    
          if (ctx.statements.size == 1) {
            var param: Seq[Object] = Nil
            if (ctx.parameters != Nil) param =  ctx.parameters.head
            log.info(s"cqlExecute>  single-command: statement: ${ctx.statements.head} parameters: ${param}")
            cqlSingleUpdate(ctx.consistency, ctx.statements.head, param)
          }
          else {
            var params: Seq[Seq[Object]] = Nil
            if (ctx.parameters == Nil)
              params = Seq.fill(ctx.statements.length)(Nil)
            else {
              if (ctx.statements.size > ctx.parameters.size) {
                log.warn(s"cqlExecute> fewer parameters than statements! pad with 'Nil'.")
                val nils = Seq.fill(ctx.statements.size - ctx.parameters.size)(Nil)
                params = ctx.parameters ++ nils
    
              }
              else
                params = ctx.parameters
            }
    
            val commands: Seq[(String,Seq[Object])] = ctx.statements zip params
            log.info(s"cqlExecute>  multi-commands: ${commands}")
    /*
            //using sequence to flip List[Future[Boolean]] => Future[List[Boolean]]
            //therefore, make sure no command replies on prev command effect
            val lstCmds: List[Future[Boolean]] = commands.map { case (stmt,param) =>
              cqlSingleUpdate(ctx.consistency, stmt, param)
            }.toList
    
            val futList = lstCmds.sequence.map(_ => true)   //must map to execute
            */
    /*
            //using traverse to have some degree of parallelism = max(runtimes)
            //therefore, make sure no command replies on prev command effect
            val futList = Future.traverse(commands) { case (stmt,param)  =>
              cqlSingleUpdate(ctx.consistency, stmt, param)
            }.map(_ => true)
    
            Await.result(futList, 3 seconds)
            Future.successful(true)
    */
            // run sync directly
            Future {
              commands.foreach { case (stm, pars) =>
                cqlExecuteSync(ctx.consistency, stm, pars)
              }
              true
            }
          }
        }
        else
          cqlBatchUpdate(ctx)
      }
      def cqlSingleUpdate(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])(
             implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
    
        val prepStmt = session.prepare(stmt)
    
        var boundStmt = prepStmt.bind()
        var pars: Seq[Object] = Nil
        if (params != Nil) {
          pars = processParameters(params)
          boundStmt = prepStmt.bind(pars: _*)
        }
        log.info(s"cqlSingleUpdate>  statement: ${prepStmt.getQueryString}, parameters: ${pars}")
    
        cons.foreach { consistency =>
          boundStmt.setConsistencyLevel(consistencyLevel(consistency))
        }
        session.executeAsync(boundStmt).map(_.wasApplied())
      }
    
      def cqlExecuteSync(cons: Option[CQLContext.CONSISTENCY_LEVEL],stmt: String, params: Seq[Object])(
        implicit session: Session, ec: ExecutionContext): Boolean = {
    
        val prepStmt = session.prepare(stmt)
    
        var boundStmt = prepStmt.bind()
        var pars: Seq[Object] = Nil
        if (params != Nil) {
          pars = processParameters(params)
          boundStmt = prepStmt.bind(pars: _*)
        }
        log.info(s"cqlExecuteSync>  statement: ${prepStmt.getQueryString}, parameters: ${pars}")
    
        cons.foreach { consistency =>
          boundStmt.setConsistencyLevel(consistencyLevel(consistency))
        }
        session.execute(boundStmt).wasApplied()
    
      }
    
      def cqlBatchUpdate(ctx: CQLContext)(
        implicit session: Session, ec: ExecutionContext): Future[Boolean] = {
        var params: Seq[Seq[Object]] = Nil
        if (ctx.parameters == Nil)
          params = Seq.fill(ctx.statements.length)(Nil)
        else
          params = ctx.parameters
        log.info(s"cqlBatchUpdate>  statement: ${ctx.statements.head}, parameters: ${params}")
    
        val prepStmt = session.prepare(ctx.statements.head)
    
        var batch = new BatchStatement()
        params.foreach { p =>
          log.info(s"cqlBatchUpdate>  batch with raw parameter: ${p}")
          val pars = processParameters(p)
          log.info(s"cqlMultiUpdate>  batch with cooked parameters: ${pars}")
          batch.add(prepStmt.bind(pars: _*))
        }
        ctx.consistency.foreach { consistency =>
          batch.setConsistencyLevel(consistencyLevel(consistency))
        }
        session.executeAsync(batch).map(_.wasApplied())
      }
    
      def cassandraStream[A](ctx: CQLQueryContext,extractor: Row => A)
                            (implicit session: Session, ec: ExecutionContextExecutor): Source[A,NotUsed] = {
    
        val prepStmt = session.prepare(ctx.statement)
        var boundStmt =  prepStmt.bind()
        val params = processParameters(ctx.parameter)
        boundStmt = prepStmt.bind(params:_*)
        ctx.consistency.foreach {consistency =>
          boundStmt.setConsistencyLevel(consistencyLevel(consistency))}
    
        log.info(s"cassandraStream>  statement: ${prepStmt.getQueryString}, parameters: ${params}")
        CassandraSource(boundStmt.setFetchSize(ctx.fetchSize)).map(extractor)
      }
    
      case class CassandraActionStream[R](parallelism: Int = 1, processInOrder: Boolean = true,
                                          statement: String, prepareParams: R => Seq[Object],
                                          consistency: Option[CQLContext.CONSISTENCY_LEVEL] = None){ cas =>
        def setParallelism(parLevel: Int): CassandraActionStream[R] = cas.copy(parallelism=parLevel)
        def setProcessOrder(ordered: Boolean): CassandraActionStream[R] = cas.copy(processInOrder = ordered)
        def setConsistencyLevel(_consistency: CQLContext.CONSISTENCY_LEVEL): CassandraActionStream[R] =
          cas.copy(consistency = Some(_consistency))
    
        def perform(r: R)(implicit session: Session, ec: ExecutionContext) = {
          var prepStmt = session.prepare(statement)
          var boundStmt =  prepStmt.bind()
          val params = processParameters(prepareParams(r))
          boundStmt = prepStmt.bind(params: _*)
          consistency.foreach { cons =>
            boundStmt.setConsistencyLevel(CQLContext.consistencyLevel(cons))
          }
          log.info(s"CassandraActionStream.perform>  statement: ${prepStmt.getQueryString}, parameters: ${params}")
          session.executeAsync(boundStmt).map(_ => r)
        }
    
        def performOnRow(implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] =
          if (processInOrder)
            Flow[R].mapAsync(parallelism)(perform)
          else
            Flow[R].mapAsyncUnordered(parallelism)(perform)
    
        def unloggedBatch[K](statementBinder: (
          R, PreparedStatement) => BoundStatement,partitionKey: R => K)(
          implicit session: Session, ec: ExecutionContext): Flow[R, R, NotUsed] = {
          val preparedStatement = session.prepare(statement)
          log.info(s"CassandraActionStream.unloggedBatch>  statement: ${preparedStatement.getQueryString}")
          CassandraFlow.createUnloggedBatchWithPassThrough[R, K](
            parallelism,
            preparedStatement,
            statementBinder,
            partitionKey)
        }
    
      }
      object CassandraActionStream {
        def apply[R](_statement: String, params: R => Seq[Object]): CassandraActionStream[R] =
          new CassandraActionStream[R]( statement=_statement, prepareParams = params)
      }
    
    
    }
    
    object CQLHelpers extends LogSupport {
      import java.nio.ByteBuffer
      import java.io._
      import java.nio.file._
      import com.datastax.driver.core.LocalDate
      import com.datastax.driver.extras.codecs.jdk8.InstantCodec
      import java.time.Instant
      import akka.stream.scaladsl._
      import akka.stream._
    
      implicit def listenableFutureToFuture[T](
                                                listenableFuture: ListenableFuture[T]): Future[T] = {
        val promise = Promise[T]()
        Futures.addCallback(listenableFuture, new FutureCallback[T] {
          def onFailure(error: Throwable): Unit = {
            promise.failure(error)
            ()
          }
          def onSuccess(result: T): Unit = {
            promise.success(result)
            ()
          }
        })
        promise.future
      }
    
      case class CQLDate(year: Int, month: Int, day: Int)
      case object CQLTodayDate
      case class CQLDateTime(year: Int, Month: Int,
                             day: Int, hour: Int, minute: Int, second: Int, millisec: Int = 0)
      case object CQLDateTimeNow
    
      def cqlGetDate(dateToConvert: java.util.Date): java.time.LocalDate =
              dateToConvert.toInstant()
               .atZone(java.time.ZoneId.systemDefault())
               .toLocalDate()
    
      def cqlGetTime(dateToConvert: java.util.Date): java.time.LocalTime =
          dateToConvert.toInstant()
            .atZone(java.time.ZoneId.systemDefault())
            .toLocalTime()
    
      def cqlGetTimestamp(dateToConvert: java.util.Date): java.time.LocalDateTime=
          new java.sql.Timestamp(
               dateToConvert.getTime()
               ).toLocalDateTime()
    
      def processParameters(params: Seq[Object]): Seq[Object] = {
        import java.time.{Clock,ZoneId}
        log.info(s"[processParameters] input: ${params}")
        val outParams = params.map { obj =>
          obj match {
            case CQLDate(yy, mm, dd) => LocalDate.fromYearMonthDay(yy, mm, dd)
            case CQLTodayDate =>
              val today = java.time.LocalDate.now()
              LocalDate.fromYearMonthDay(today.getYear, today.getMonth.getValue, today.getDayOfMonth)
            case CQLDateTimeNow => Instant.now(Clock.system(ZoneId.of("EST", ZoneId.SHORT_IDS)))
            case CQLDateTime(yy, mm, dd, hr, ms, sc, mi) =>
              Instant.parse(f"$yy%4d-$mm%2d-$dd%2dT$hr%2d:$ms%2d:$sc%2d$mi%3d")
            case p@_ => p
          }
        }
        log.info(s"[processParameters] output: ${params}")
        outParams
      }
      class ByteBufferInputStream(buf: ByteBuffer) extends InputStream {
        override def read: Int = {
          if (!buf.hasRemaining) return -1
          buf.get
        }
    
        override def read(bytes: Array[Byte], off: Int, len: Int): Int = {
          val length: Int = Math.min(len, buf.remaining)
          buf.get(bytes, off, length)
          length
        }
      }
      object ByteBufferInputStream {
        def apply(buf: ByteBuffer): ByteBufferInputStream = {
          new ByteBufferInputStream(buf)
        }
      }
      class FixsizedByteBufferOutputStream(buf: ByteBuffer) extends OutputStream {
    
        override def write(b: Int): Unit = {
          buf.put(b.toByte)
        }
    
        override def write(bytes: Array[Byte], off: Int, len: Int): Unit = {
          buf.put(bytes, off, len)
        }
      }
      object FixsizedByteBufferOutputStream {
        def apply(buf: ByteBuffer) = new FixsizedByteBufferOutputStream(buf)
      }
      class ExpandingByteBufferOutputStream(var buf: ByteBuffer, onHeap: Boolean) extends OutputStream {
    
        private val increasing = ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR
    
        override def write(b: Array[Byte], off: Int, len: Int): Unit = {
          val position = buf.position
          val limit = buf.limit
          val newTotal: Long = position + len
          if(newTotal > limit){
            var capacity = (buf.capacity * increasing)
            while(capacity <= newTotal){
              capacity = (capacity*increasing)
            }
            increase(capacity.toInt)
          }
    
          buf.put(b, 0, len)
        }
    
        override def write(b: Int): Unit= {
          if (!buf.hasRemaining) increase((buf.capacity * increasing).toInt)
          buf.put(b.toByte)
        }
        protected def increase(newCapacity: Int): Unit = {
          buf.limit(buf.position)
          buf.rewind
          val newBuffer =
            if (onHeap) ByteBuffer.allocate(newCapacity)
            else  ByteBuffer.allocateDirect(newCapacity)
          newBuffer.put(buf)
          buf.clear
          buf = newBuffer
        }
        def size: Long = buf.position
        def capacity: Long = buf.capacity
        def byteBuffer: ByteBuffer = buf
      }
      object ExpandingByteBufferOutputStream {
        val DEFAULT_INCREASING_FACTOR = 1.5f
        def apply(size: Int, increasingBy: Float, onHeap: Boolean) = {
          if (increasingBy <= 1) throw new IllegalArgumentException("Increasing Factor must be greater than 1.0")
          val buffer: ByteBuffer =
            if (onHeap) ByteBuffer.allocate(size)
            else ByteBuffer.allocateDirect(size)
          new ExpandingByteBufferOutputStream(buffer,onHeap)
        }
        def apply(size: Int): ExpandingByteBufferOutputStream = {
          apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, false)
        }
    
        def apply(size: Int, onHeap: Boolean): ExpandingByteBufferOutputStream = {
          apply(size, ExpandingByteBufferOutputStream.DEFAULT_INCREASING_FACTOR, onHeap)
        }
    
        def apply(size: Int, increasingBy: Float): ExpandingByteBufferOutputStream = {
          apply(size, increasingBy, false)
        }
    
      }
      def cqlFileToBytes(fileName: String): ByteBuffer = {
        val fis = new FileInputStream(fileName)
        val b = new Array[Byte](fis.available + 1)
        val length = b.length
        fis.read(b)
        ByteBuffer.wrap(b)
      }
      def cqlBytesToFile(bytes: ByteBuffer, fileName: String)(
        implicit mat: Materializer): Future[IOResult] = {
        val source = StreamConverters.fromInputStream(() => ByteBufferInputStream(bytes))
        source.runWith(FileIO.toPath(Paths.get(fileName)))
      }
      def cqlDateTimeString(date: java.util.Date, fmt: String): String = {
        val outputFormat = new java.text.SimpleDateFormat(fmt)
        outputFormat.format(date)
      }
      def useJava8DateTime(cluster: Cluster) = {
        //for jdk8 datetime format
        cluster.getConfiguration().getCodecRegistry()
          .register(InstantCodec.instance)
      }
    }

    BytesConverter.scala

    package protobuf.bytes
    import java.io.{ByteArrayInputStream,ByteArrayOutputStream,ObjectInputStream,ObjectOutputStream}
    import com.google.protobuf.ByteString
    object Converter {
    
      def marshal(value: Any): ByteString = {
        val stream: ByteArrayOutputStream = new ByteArrayOutputStream()
        val oos = new ObjectOutputStream(stream)
        oos.writeObject(value)
        oos.close()
        ByteString.copyFrom(stream.toByteArray())
      }
    
      def unmarshal[A](bytes: ByteString): A = {
        val ois = new ObjectInputStream(new ByteArrayInputStream(bytes.toByteArray))
        val value = ois.readObject()
        ois.close()
        value.asInstanceOf[A]
      }
    
    }

    CQLServices.scala

    package demo.sdp.grpc.cql.server
    
    import akka.NotUsed
    import akka.stream.scaladsl._
    
    import protobuf.bytes.Converter._
    import com.datastax.driver.core._
    
    import scala.concurrent.ExecutionContextExecutor
    import sdp.grpc.services._
    import sdp.cql.engine._
    import CQLEngine._
    import CQLHelpers._
    import sdp.logging.LogSupport
    import scala.concurrent._
    import scala.concurrent.duration._
    import akka.stream.ActorMaterializer
    
    
    class CQLStreamingServices(implicit ec: ExecutionContextExecutor,
                               mat: ActorMaterializer,  session: Session)
      extends CqlGrpcAkkaStream.CQLServices with LogSupport{
    
      val toParams: AQMRPTRow => Seq[Object] = row => Seq[Object](
        row.rowid.asInstanceOf[Object],
        row.measureid.asInstanceOf[Object],
        row.statename,
        row.countyname,
        row.reportyear.asInstanceOf[Object],
        row.value.asInstanceOf[Object],
        CQLDateTimeNow
      )
      val cqlInsert ="""
                       |insert into testdb.AQMRPT(
                       | rowid,
                       | measureid,
                       | statename,
                       | countyname,
                       | reportyear,
                       | value,
                       | created)
                       | values(?,?,?,?,?,?,?)
                     """.stripMargin
    
      val cqlActionStream = CassandraActionStream(cqlInsert,toParams).setParallelism(2)
        .setProcessOrder(false)
    
    /*
      val cqlActionFlow: Flow[AQMRPTRow,AQMRPTRow,NotUsed] =
        Flow[AQMRPTRow]
          .via(cqlActionStream.performOnRow)
    */
    
      val cqlActionFlow: Flow[AQMRPTRow,CQLResult,NotUsed] = {
        Flow[AQMRPTRow]
            .mapAsync(cqlActionStream.parallelism){ row =>
              if (IfExists(row.rowid))
                Future.successful(CQLResult(marshal(0)))
              else
                cqlActionStream.perform(row).map {_ => CQLResult(marshal(1))}
            }
      }
    
      override def transferRows: Flow[AQMRPTRow, CQLResult, NotUsed] = {
        Flow[AQMRPTRow]
          .via(cqlActionFlow)
      }
    
    
      private def IfExists(rowid: Long): Boolean = {
    
        val cql = "SELECT * FROM testdb.AQMRPT WHERE ROWID = ? ALLOW FILTERING"
        val param = Seq(rowid.asInstanceOf[Object])
        val toRowId: Row => Long = r => r.getLong("rowid")
        val ctx = CQLQueryContext(cql,param)
        val src: Source[Long,NotUsed] = cassandraStream(ctx,toRowId)
        val fut = src.toMat(Sink.headOption)(Keep.right).run()
    
        val result = Await.result(fut,3 seconds)
    
        log.info(s"checking existence: ${result}")
        result match {
          case Some(x) => true
          case None => false
        }
    
      }
    
      override def clientStreaming: Flow[HelloMsg, HelloMsg, NotUsed] = {
        Flow[HelloMsg]
          .map {r => println(r) ; r}
      }
    
      override def runDDL: Flow[CQLUpdate, CQLResult, NotUsed] = {
        Flow[CQLUpdate]
          .flatMapConcat { context =>
            //unpack CQLUpdate and construct the context
            val ctx = CQLContext(context.statements)
            log.info(s"**** CQLContext => ${ctx} ***")
    
            Source
              .fromFuture(cqlExecute(ctx))
              .map { r => CQLResult(marshal(r)) }
          }
      }
    
      def toCQLTimestamp(rs: Row) = {
        try {
          val tm = rs.getTimestamp("CREATED")
          if (tm == null) None
          else {
            val localdt = cqlGetTimestamp(tm)
            Some(ProtoDateTime(Some(ProtoDate(localdt.getYear, localdt.getMonthValue, localdt.getDayOfMonth)),
              Some(ProtoTime(localdt.getHour, localdt.getMinute, localdt.getSecond, localdt.getNano))))
          }
        }
        catch {
          case e: Exception => None
        }
      }
    
      val toAQMRow: Row => AQMRPTRow = rs=> AQMRPTRow(
        rowid = rs.getLong("ROWID"),
        measureid = rs.getLong("MEASUREID"),
        statename = rs.getString("STATENAME"),
        countyname = rs.getString("COUNTYNAME"),
        reportyear = rs.getInt("REPORTYEAR"),
        value = rs.getInt("VALUE"),
        created = toCQLTimestamp(rs)
      )
      override def runQuery: Flow[CQLQuery, AQMRPTRow, NotUsed] = {
        log.info("**** runQuery called on service side ***")
        Flow[CQLQuery]
          .flatMapConcat { q =>
            //unpack JDBCQuery and construct the context
            var params: Seq[Object] =  Nil
            if (q.parameters != _root_.com.google.protobuf.ByteString.EMPTY)
              params = unmarshal[Seq[Object]](q.parameters)
            log.info(s"**** query parameters: ${params} ****")
            val ctx = CQLQueryContext(q.statement,params)
            CQLEngine.cassandraStream(ctx,toAQMRow)
    
          }
      }
      
    }

    CQLServer.scala

    package demo.sdp.grpc.cql.server
    
    import java.util.logging.Logger
    import com.datastax.driver.core._
    import akka.actor.ActorSystem
    import akka.stream.ActorMaterializer
    import io.grpc.Server
    import io.grpc.ServerBuilder
    import sdp.grpc.services._
    import sdp.cql.engine._
    import CQLHelpers._
    
    class gRPCServer(server: Server) {
    
      val logger: Logger = Logger.getLogger(classOf[gRPCServer].getName)
    
      def start(): Unit = {
        server.start()
        logger.info(s"Server started, listening on ${server.getPort}")
        sys.addShutdownHook {
          // Use stderr here since the logger may has been reset by its JVM shutdown hook.
          System.err.println("*** shutting down gRPC server since JVM is shutting down")
          stop()
          System.err.println("*** server shut down")
        }
        ()
      }
    
      def stop(): Unit = {
        server.shutdown()
      }
    
      /**
        * Await termination on the main thread since the grpc library uses daemon threads.
        */
      def blockUntilShutdown(): Unit = {
        server.awaitTermination()
      }
    }
    
    object CQLServer extends App {
      implicit val cqlsys = ActorSystem("cqlSystem")
      implicit val mat = ActorMaterializer()
      implicit val ec = cqlsys.dispatcher
    
      val cluster = new Cluster
      .Builder()
        .addContactPoints("localhost")
        .withPort(9042)
        .build()
    
      useJava8DateTime(cluster)
      implicit val session = cluster.connect()
    
      val server = new gRPCServer(
        ServerBuilder
          .forPort(50051)
          .addService(
            CqlGrpcAkkaStream.bindService(
              new CQLStreamingServices
            )
          ).build()
      )
      server.start()
      //  server.blockUntilShutdown()
      scala.io.StdIn.readLine()
      session.close()
      cluster.close()
      mat.shutdown()
      cqlsys.terminate()
    }

    CQLClient.scala

    package demo.sdp.grpc.cql.client
    
    import sdp.grpc.services._
    import protobuf.bytes.Converter._
    import akka.stream.scaladsl._
    import akka.NotUsed
    import akka.actor.ActorSystem
    import akka.stream.{ActorMaterializer, ThrottleMode}
    import io.grpc._
    import sdp.logging.LogSupport
    import sdp.jdbc.engine._
    import JDBCEngine._
    import scalikejdbc.WrappedResultSet
    import sdp.cql.engine.CQLHelpers.CQLDateTimeNow
    import scala.util._
    import scala.concurrent.ExecutionContextExecutor
    
    class CQLStreamClient(host: String, port: Int)(
      implicit ec: ExecutionContextExecutor) extends LogSupport {
    
      val channel = ManagedChannelBuilder
        .forAddress(host, port)
        .usePlaintext(true)
        .build()
    
    
      val stub = CqlGrpcAkkaStream.stub(channel)
    
      val jdbcRows2transfer = JDBCQueryContext[AQMRPTRow](
        dbName = 'h2,
        statement = "select * from AQMRPT where statename='Arkansas'"
      )
    
    
      def toAQMRPTRow: WrappedResultSet => AQMRPTRow = rs => AQMRPTRow(
        rowid = rs.long("ROWID"),
        measureid = rs.long("MEASUREID"),
        statename = rs.string("STATENAME"),
        countyname = rs.string("COUNTYNAME"),
        reportyear = rs.int("REPORTYEAR"),
        value = rs.int("VALUE"),
        created = Some(ProtoDateTime(Some(ProtoDate(1990, 8, 12)), Some(ProtoTime(23, 56, 23, 0))))
      )
    
    
      import scala.concurrent.duration._
    
      def transferRows: Source[CQLResult, NotUsed] = {
        log.info(s"**** calling transferRows ****")
        jdbcAkkaStream(jdbcRows2transfer, toAQMRPTRow)
          //      .throttle(1, 500.millis, 1, ThrottleMode.shaping)
          .via(stub.transferRows)
      }
    
      def echoHello: Source[HelloMsg,NotUsed] = {
        val row = HelloMsg("hello world!")
        val rows = List.fill[HelloMsg](100)(row)
        Source
          .fromIterator(() => rows.iterator)
          .via(stub.clientStreaming)
      }
      val query0 = CQLQuery(
        statement = "select * from testdb.AQMRPT"
      )
    
      val query = CQLQuery(
        statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE > ? ALLOW FILTERING;",
        parameters = marshal(Seq("Arkansas", 0.toInt))
      )
      val query2 = CQLQuery (
        statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
        parameters = marshal(Seq("Colorado", 3.toInt))
      )
      val query3= CQLQuery (
        statement = "select * from testdb.AQMRPT where STATENAME = ? and VALUE = ?",
        parameters = marshal(Seq("Arkansas", 8.toInt))
      )
    
      def queryRows: Source[AQMRPTRow,NotUsed] = {
        log.info(s"running queryRows ...")
        Source
          .single(query)
          .via(stub.runQuery)
      }
    
      val dropCQL = "DROP TABLE IF EXISTS testdb.AQMRPT"
    
      val createCQL ="""
      CREATE TABLE testdb.AQMRPT (
         rowid bigint primary key,
         measureid bigint,
         statename text,
         countyname text,
         reportyear int,
         value int,
         created timestamp
      )"""
    
      val cqlddl = CQLUpdate(statements = Seq(dropCQL,createCQL))
      def createTbl: Source[CQLResult,NotUsed] = {
        log.info(s"running createTbl ...")
        Source
          .single(cqlddl)
          .via(stub.runDDL)
      }
      
    }
    
    
    object EchoHelloClient extends App {
      implicit val system = ActorSystem("EchoNumsClient")
      implicit val mat = ActorMaterializer.create(system)
      implicit val ec = system.dispatcher
      val client = new CQLStreamClient("localhost", 50051)
    
      client.echoHello.runForeach(println)
    
      scala.io.StdIn.readLine()
      mat.shutdown()
      system.terminate()
    
    }
    
    
    object TransferRows extends App {
    
      import sdp.jdbc.config._
    
      implicit val system = ActorSystem("JDBCServer")
      implicit val mat = ActorMaterializer.create(system)
      implicit val ec = system.dispatcher
    
      ConfigDBsWithEnv("dev").setup('h2)
      ConfigDBsWithEnv("dev").loadGlobalSettings()
    
    
      val client = new CQLStreamClient("localhost", 50051)
      val fut = client.transferRows.runFold(0){(a,b) => a + unmarshal[Int](b.result)}
      fut.onComplete {
        case scala.util.Success(cnt) => println(s"done transfer ${cnt} rows.")
        case Failure(e) => println(s"!!!!!streaming error: ${e.getMessage}")
      }
    
      scala.io.StdIn.readLine()
      ConfigDBsWithEnv("dev").close('h2)
      mat.shutdown()
      system.terminate()
    
    
    }
    
    object QueryRows extends App {
      implicit val system = ActorSystem("QueryRows")
      implicit val mat = ActorMaterializer.create(system)
      implicit val ec = system.dispatcher
    
    
      val client = new CQLStreamClient("localhost", 50051)
    
      val fut = client.queryRows.runForeach { r => println(r) }
      fut.onComplete {
        case scala.util.Success(d) => println(s"done querying.")
        case Failure(e) => println(s"!!!!!query error: ${e.getMessage}")
      }
    
      scala.io.StdIn.readLine()
      mat.shutdown()
      system.terminate()
    
    }
    
    object RunDDL extends App {
      implicit val system = ActorSystem("RunDDL")
      implicit val mat = ActorMaterializer.create(system)
      implicit val ec = system.dispatcher
    
      val client = new CQLStreamClient("localhost", 50051)
    
      client.createTbl.runForeach { r => println(unmarshal(r.result)) }
    
    
      scala.io.StdIn.readLine()
      mat.shutdown()
      system.terminate()
    
    }
  • 相关阅读:
    supervisor 安装与使用
    CF39C-Moon Craters【dp】
    NWERC2020J-Joint Excavation【构造,贪心】
    CF25E-Test【AC自动机,bfs】
    CF19E-Fairy【树形结构,差分】
    CF11D-A Simple Task【状压dp】
    CF5E-Bindian Signalizing【单调栈】
    P6628-[省选联考 2020 B 卷] 丁香之路【欧拉回路,最小生成树】
    CF666E-Forensic Examination【广义SAM,线段树合并】
    CF235D-Graph Game【LCA,数学期望】
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/9246136.html
Copyright © 2011-2022 走看看