zoukankan      html  css  js  c++  java
  • SDP(3):ScalikeJDBC- JDBC-Engine:Fetching

      ScalikeJDBC在覆盖JDBC基本功能上是比较完整的,而且实现这些功能的方式比较简洁,运算效率方面自然会稍高一筹了。理论上用ScalikeJDBC作为一种JDBC-Engine还是比较理想的:让它处于各种JDBC工具库和数据库实例之间接收JDBC运算指令然后连接目标数据库进行相关运算后返回结果。一般来说,各种JDBC工具库如ORM,FRM软件通过各自的DSL在复杂的数据库表关系环境内进行数据管理编程,最终产生相关的SQL语句即(prepared)statement+parameters传递给指定类型的数据库JDBC驱动程序去运算并产生结果。如果这样描述,那么JDBC-Engine主要的功能就是支持下面这个函数:

    jdbcRunSQL(context: JDBCContext): JDBCResultSet

    这个函数的用户提供一个JDBCContext类型值,然后由jdbcRunSQL进行接下来的运算并返回结果。从这个角度分析,JDBCContext最起码需要提供下面的属性: 

    1、数据库连接:选择数据库连接池

    2、运算参数:fetchSize, queryTimeout,queryTag。这几个参数都针对当前运算的SQL

    3、Query参数:

        Query类型:select/execute/update、单条/成批、前置/后置query、generateKey

        SQL语句:statement:Seq[String]、parameters: Seq[Option[Seq[Any]]]

    下面就是JDBCContext类型定义

    import java.sql.PreparedStatement
    import scala.collection.generic.CanBuildFrom
    import scalikejdbc._
    
      object JDBCContext {
        type SQLTYPE = Int
        val SQL_SELECT: Int = 0
        val SQL_EXECUTE = 1
        val SQL_UPDATE = 2
    
        def returnColumnByIndex(idx: Int) = Some(idx)
    
        def returnColumnByName(col: String) = Some(col)
      }
    
      case class JDBCContext(
                              dbName: Symbol,
                              statements: Seq[String],
                              parameters: Seq[Seq[Any]] = Nil,
                              fetchSize: Int = 100,
                              queryTimeout: Option[Int] = None,
                              queryTags: Seq[String] = Nil,
                              sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,
                              batch: Boolean = false,
                              returnGeneratedKey: Option[Any] = None,
                              // no return: None, return by index: Some(1), by name: Some("id")
                              preAction: Option[PreparedStatement => Unit] = None,
                              postAction: Option[PreparedStatement => Unit] = None)
    重新考虑了一下,觉着把jdbc读写分开两个函数来实现更容易使用,因为这样比较符合编程模式和习性。所以最好把sqlType=SQL_SELECT类型SQL独立一个函数出来运算:
       def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
             ctx: JDBCContext, rowConverter: WrappedResultSet => A)(
              implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
          ctx.sqlType match {
            case SQL_SELECT => {
              val params: Seq[Any] = ctx.parameters match {
                case Nil => Nil
                case p@_ => p.head
              }
              val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor(""))
              ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
              ctx.queryTags.foreach(rawSql.tags(_))
              rawSql.fetchSize(ctx.fetchSize)
              implicit val session = NamedAutoSession(ctx.dbName)
              val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter)
              sql.collection.apply[C]()
            }
            case _ => throw new IllegalStateException("sqlType must be 'SQL_SELECT'!")
          }
        }

    还需要提供noExtractor函数来符合SQLToCollectionImpl类型的参数款式要求:

      private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
          throw new IllegalStateException(message)
        }
    我们来测试用一下jdbcQueryResult:
    import scalikejdbc._
    import JDBCEngine._
    import configdbs._
    import org.joda.time._
    object JDBCQueryDemo extends App {
      ConfigDBsWithEnv("dev").setupAll()
    
      val ctx = JDBCContext(
        dbName = 'h2,
        statements = Seq("select * from members where id = ?"),
        parameters = Seq(Seq(2))
      )
    
      //data model
      case class Member(
                         id: Long,
                         name: String,
                         description: Option[String] = None,
                         birthday: Option[LocalDate] = None,
                         createdAt: DateTime)
    
      //data row converter
      val toMember = (rs: WrappedResultSet) => Member(
        id = rs.long("id"),
        name = rs.string("name"),
        description = rs.stringOpt("description"),
        birthday = rs.jodaLocalDateOpt("birthday"),
        createdAt = rs.jodaDateTime("created_at")
      )
    
      val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember)
    
      println(s"members in vector: $vecMember")
    
      val ctx1 = ctx.copy(dbName = 'mysql)
    
      val names: List[String] = jdbcQueryResult[List,String](ctx1,{rs: WrappedResultSet => rs.string("name")})
    
      println(s"selected name: $names")
    
      val ctx2 = ctx1.copy(dbName = 'postgres)
      val idname: List[(Long,String)] = jdbcQueryResult[List,(Long,String)](ctx2,{rs: WrappedResultSet => (rs.long("id"),rs.string("name"))})
    
      println(s"selected id+name: $idname")
    }

    如果我们使用Slick-DSL进行数据库管理编程后应该如何与JDBC-Engine对接:

     object SlickDAO {
        import slick.jdbc.H2Profile.api._
    
        case class CountyModel(id: Int, name: String)
        case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
          def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
          def name = column[String]("NAME",O.Length(64))
          def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
        }
        val CountyQuery = TableQuery[CountyTable]
        val filter = "Kansas"
        val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
        val statement = qry.result.statements.head
      }
      import SlickDAO._
    
    
      val slickCtx = JDBCContext(
        dbName = 'h2,
        statements = Seq(statement),
      )
    
      val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx,{
        rs: WrappedResultSet => CountyModel(id=rs.int("id"),name=rs.string("name"))})
      vecCounty.foreach(r => println(s"${r.id},${r.name}"))

    输出正确。

    下面就是本次示范的源代码:

     build.sbt

    name := "learn-scalikeJDBC"
    
    version := "0.1"
    
    scalaVersion := "2.12.4"
    
    // Scala 2.10, 2.11, 2.12
    libraryDependencies ++= Seq(
      "org.scalikejdbc" %% "scalikejdbc"       % "3.1.0",
      "org.scalikejdbc" %% "scalikejdbc-test"   % "3.1.0"   % "test",
      "org.scalikejdbc" %% "scalikejdbc-config"  % "3.1.0",
      "com.h2database"  %  "h2"                % "1.4.196",
      "mysql" % "mysql-connector-java" % "6.0.6",
      "org.postgresql" % "postgresql" % "42.2.0",
      "commons-dbcp" % "commons-dbcp" % "1.4",
      "org.apache.tomcat" % "tomcat-jdbc" % "9.0.2",
      "com.zaxxer" % "HikariCP" % "2.7.4",
      "com.jolbox" % "bonecp" % "0.8.0.RELEASE",
      "com.typesafe.slick" %% "slick" % "3.2.1",
      "ch.qos.logback"  %  "logback-classic"   % "1.2.3"
    )

    resources/application.conf 包括H2,MySQL,PostgreSQL

    # JDBC settings
    test {
      db {
        h2 {
          driver = "org.h2.Driver"
          url = "jdbc:h2:tcp://localhost/~/slickdemo"
          user = ""
          password = ""
          poolInitialSize = 5
          poolMaxSize = 7
          poolConnectionTimeoutMillis = 1000
          poolValidationQuery = "select 1 as one"
          poolFactoryName = "commons-dbcp2"
        }
      }
    
      db.mysql.driver = "com.mysql.cj.jdbc.Driver"
      db.mysql.url = "jdbc:mysql://localhost:3306/testdb"
      db.mysql.user = "root"
      db.mysql.password = "123"
      db.mysql.poolInitialSize = 5
      db.mysql.poolMaxSize = 7
      db.mysql.poolConnectionTimeoutMillis = 1000
      db.mysql.poolValidationQuery = "select 1 as one"
      db.mysql.poolFactoryName = "bonecp"
    
      # scallikejdbc Global settings
      scalikejdbc.global.loggingSQLAndTime.enabled = true
      scalikejdbc.global.loggingSQLAndTime.logLevel = info
      scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
      scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
      scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
      scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
      scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
      scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10
    }
    dev {
      db {
        h2 {
          driver = "org.h2.Driver"
          url = "jdbc:h2:tcp://localhost/~/slickdemo"
          user = ""
          password = ""
          poolFactoryName = "hikaricp"
          numThreads = 10
          maxConnections = 12
          minConnections = 4
          keepAliveConnection = true
        }
        mysql {
          driver = "com.mysql.cj.jdbc.Driver"
          url = "jdbc:mysql://localhost:3306/testdb"
          user = "root"
          password = "123"
          poolInitialSize = 5
          poolMaxSize = 7
          poolConnectionTimeoutMillis = 1000
          poolValidationQuery = "select 1 as one"
          poolFactoryName = "bonecp"
    
        }
        postgres {
          driver = "org.postgresql.Driver"
          url = "jdbc:postgresql://localhost:5432/testdb"
          user = "root"
          password = "123"
          poolFactoryName = "hikaricp"
          numThreads = 10
          maxConnections = 12
          minConnections = 4
          keepAliveConnection = true
        }
      }
      # scallikejdbc Global settings
      scalikejdbc.global.loggingSQLAndTime.enabled = true
      scalikejdbc.global.loggingSQLAndTime.logLevel = info
      scalikejdbc.global.loggingSQLAndTime.warningEnabled = true
      scalikejdbc.global.loggingSQLAndTime.warningThresholdMillis = 1000
      scalikejdbc.global.loggingSQLAndTime.warningLogLevel = warn
      scalikejdbc.global.loggingSQLAndTime.singleLineMode = false
      scalikejdbc.global.loggingSQLAndTime.printUnprocessedStackTrace = false
      scalikejdbc.global.loggingSQLAndTime.stackTraceDepth = 10

    HikariConfig.scala  HikariCP连接池实现

    package configdbs
    import scala.collection.mutable
    import scala.concurrent.duration.Duration
    import scala.language.implicitConversions
    import com.typesafe.config._
    import java.util.concurrent.TimeUnit
    import java.util.Properties
    import scalikejdbc.config._
    import com.typesafe.config.Config
    import com.zaxxer.hikari._
    import scalikejdbc.ConnectionPoolFactoryRepository
    
    /** Extension methods to make Typesafe Config easier to use */
    class ConfigExtensionMethods(val c: Config) extends AnyVal {
      import scala.collection.JavaConverters._
    
      def getBooleanOr(path: String, default: => Boolean = false) = if(c.hasPath(path)) c.getBoolean(path) else default
      def getIntOr(path: String, default: => Int = 0) = if(c.hasPath(path)) c.getInt(path) else default
      def getStringOr(path: String, default: => String = null) = if(c.hasPath(path)) c.getString(path) else default
      def getConfigOr(path: String, default: => Config = ConfigFactory.empty()) = if(c.hasPath(path)) c.getConfig(path) else default
    
      def getMillisecondsOr(path: String, default: => Long = 0L) = if(c.hasPath(path)) c.getDuration(path, TimeUnit.MILLISECONDS) else default
      def getDurationOr(path: String, default: => Duration = Duration.Zero) =
        if(c.hasPath(path)) Duration(c.getDuration(path, TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) else default
    
      def getPropertiesOr(path: String, default: => Properties = null): Properties =
        if(c.hasPath(path)) new ConfigExtensionMethods(c.getConfig(path)).toProperties else default
    
      def toProperties: Properties = {
        def toProps(m: mutable.Map[String, ConfigValue]): Properties = {
          val props = new Properties(null)
          m.foreach { case (k, cv) =>
            val v =
              if(cv.valueType() == ConfigValueType.OBJECT) toProps(cv.asInstanceOf[ConfigObject].asScala)
              else if(cv.unwrapped eq null) null
              else cv.unwrapped.toString
            if(v ne null) props.put(k, v)
          }
          props
        }
        toProps(c.root.asScala)
      }
    
      def getBooleanOpt(path: String): Option[Boolean] = if(c.hasPath(path)) Some(c.getBoolean(path)) else None
      def getIntOpt(path: String): Option[Int] = if(c.hasPath(path)) Some(c.getInt(path)) else None
      def getStringOpt(path: String) = Option(getStringOr(path))
      def getPropertiesOpt(path: String) = Option(getPropertiesOr(path))
    }
    
    object ConfigExtensionMethods {
      @inline implicit def configExtensionMethods(c: Config): ConfigExtensionMethods = new ConfigExtensionMethods(c)
    }
    
    trait HikariConfigReader extends TypesafeConfigReader {
      self: TypesafeConfig =>      // with TypesafeConfigReader => //NoEnvPrefix =>
    
      import ConfigExtensionMethods.configExtensionMethods
    
      def getFactoryName(dbName: Symbol): String = {
        val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
        c.getStringOr("poolFactoryName", ConnectionPoolFactoryRepository.COMMONS_DBCP)
      }
    
      def hikariCPConfig(dbName: Symbol): HikariConfig = {
    
        val hconf = new HikariConfig()
        val c: Config = config.getConfig(envPrefix + "db." + dbName.name)
    
        // Connection settings
        if (c.hasPath("dataSourceClass")) {
          hconf.setDataSourceClassName(c.getString("dataSourceClass"))
        } else {
          Option(c.getStringOr("driverClassName", c.getStringOr("driver"))).map(hconf.setDriverClassName _)
        }
        hconf.setJdbcUrl(c.getStringOr("url", null))
        c.getStringOpt("user").foreach(hconf.setUsername)
        c.getStringOpt("password").foreach(hconf.setPassword)
        c.getPropertiesOpt("properties").foreach(hconf.setDataSourceProperties)
    
        // Pool configuration
        hconf.setConnectionTimeout(c.getMillisecondsOr("connectionTimeout", 1000))
        hconf.setValidationTimeout(c.getMillisecondsOr("validationTimeout", 1000))
        hconf.setIdleTimeout(c.getMillisecondsOr("idleTimeout", 600000))
        hconf.setMaxLifetime(c.getMillisecondsOr("maxLifetime", 1800000))
        hconf.setLeakDetectionThreshold(c.getMillisecondsOr("leakDetectionThreshold", 0))
        hconf.setInitializationFailFast(c.getBooleanOr("initializationFailFast", false))
        c.getStringOpt("connectionTestQuery").foreach(hconf.setConnectionTestQuery)
        c.getStringOpt("connectionInitSql").foreach(hconf.setConnectionInitSql)
        val numThreads = c.getIntOr("numThreads", 20)
        hconf.setMaximumPoolSize(c.getIntOr("maxConnections", numThreads * 5))
        hconf.setMinimumIdle(c.getIntOr("minConnections", numThreads))
        hconf.setPoolName(c.getStringOr("poolName", dbName.name))
        hconf.setRegisterMbeans(c.getBooleanOr("registerMbeans", false))
    
        // Equivalent of ConnectionPreparer
        hconf.setReadOnly(c.getBooleanOr("readOnly", false))
        c.getStringOpt("isolation").map("TRANSACTION_" + _).foreach(hconf.setTransactionIsolation)
        hconf.setCatalog(c.getStringOr("catalog", null))
    
        hconf
    
      }
    }
    
    import scalikejdbc._
    trait ConfigDBs {
      self: TypesafeConfigReader with TypesafeConfig with HikariConfigReader =>
    
      def setup(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
        getFactoryName(dbName) match {
          case "hikaricp" => {
            val hconf = hikariCPConfig(dbName)
            val hikariCPSource = new HikariDataSource(hconf)
            if (hconf.getDriverClassName != null && hconf.getDriverClassName.trim.nonEmpty) {
              Class.forName(hconf.getDriverClassName)
            }
            ConnectionPool.add(dbName, new DataSourceConnectionPool(hikariCPSource))
          }
          case _ => {
            val JDBCSettings(url, user, password, driver) = readJDBCSettings(dbName)
            val cpSettings = readConnectionPoolSettings(dbName)
            if (driver != null && driver.trim.nonEmpty) {
              Class.forName(driver)
            }
            ConnectionPool.add(dbName, url, user, password, cpSettings)
          }
        }
      }
    
      def setupAll(): Unit = {
        loadGlobalSettings()
        dbNames.foreach { dbName => setup(Symbol(dbName)) }
      }
    
      def close(dbName: Symbol = ConnectionPool.DEFAULT_NAME): Unit = {
        ConnectionPool.close(dbName)
      }
    
      def closeAll(): Unit = {
        ConnectionPool.closeAll
      }
    
    }
    
    
    object ConfigDBs extends ConfigDBs
      with TypesafeConfigReader
      with StandardTypesafeConfig
      with HikariConfigReader
    
    case class ConfigDBsWithEnv(envValue: String) extends ConfigDBs
      with TypesafeConfigReader
      with StandardTypesafeConfig
      with HikariConfigReader
      with EnvPrefix {
    
      override val env = Option(envValue)
    }

    JDBCEngine.scala jdbcQueryResult函数实现

    import java.sql.PreparedStatement
    import scala.collection.generic.CanBuildFrom
    import scalikejdbc._
    
      object JDBCContext {
        type SQLTYPE = Int
        val SQL_SELECT: Int = 0
        val SQL_EXECUTE = 1
        val SQL_UPDATE = 2
    
        def returnColumnByIndex(idx: Int) = Some(idx)
    
        def returnColumnByName(col: String) = Some(col)
      }
    
      case class JDBCContext(
                              dbName: Symbol,
                              statements: Seq[String],
                              parameters: Seq[Seq[Any]] = Nil,
                              fetchSize: Int = 100,
                              queryTimeout: Option[Int] = None,
                              queryTags: Seq[String] = Nil,
                              sqlType: JDBCContext.SQLTYPE = JDBCContext.SQL_SELECT,
                              batch: Boolean = false,
                              returnGeneratedKey: Option[Any] = None,
                              // no return: None, return by index: Some(1), by name: Some("id")
                              preAction: Option[PreparedStatement => Unit] = None,
                              postAction: Option[PreparedStatement => Unit] = None)
    
      object JDBCEngine {
    
        import JDBCContext._
    
        private def noExtractor(message: String): WrappedResultSet => Nothing = { (rs: WrappedResultSet) =>
          throw new IllegalStateException(message)
        }
    
        def jdbcQueryResult[C[_] <: TraversableOnce[_], A](
             ctx: JDBCContext, rowConverter: WrappedResultSet => A)(
              implicit cbf: CanBuildFrom[Nothing, A, C[A]]): C[A] = {
          ctx.sqlType match {
            case SQL_SELECT => {
              val params: Seq[Any] = ctx.parameters match {
                case Nil => Nil
                case p@_ => p.head
              }
              val rawSql = new SQLToCollectionImpl[A, NoExtractor](ctx.statements.head, params)(noExtractor("boom!"))
              ctx.queryTimeout.foreach(rawSql.queryTimeout(_))
              ctx.queryTags.foreach(rawSql.tags(_))
              rawSql.fetchSize(ctx.fetchSize)
              implicit val session = NamedAutoSession(ctx.dbName)
              val sql: SQL[A, HasExtractor] = rawSql.map(rowConverter)
              sql.collection.apply[C]()
            }
            case _ => throw new IllegalStateException("sqlType must be 'SQL_SELECT'!")
          }
        }
    
      }

    JDBCQueryDemo.scala  功能测试代码

    import scalikejdbc._
    import JDBCEngine._
    import configdbs._
    import org.joda.time._
    object JDBCQueryDemo extends App {
      ConfigDBsWithEnv("dev").setupAll()
    
      val ctx = JDBCContext(
        dbName = 'h2,
        statements = Seq("select * from members where id = ?"),
        parameters = Seq(Seq(2))
      )
    
      //data model
      case class Member(
                         id: Long,
                         name: String,
                         description: Option[String] = None,
                         birthday: Option[LocalDate] = None,
                         createdAt: DateTime)
    
      //data row converter
      val toMember = (rs: WrappedResultSet) => Member(
        id = rs.long("id"),
        name = rs.string("name"),
        description = rs.stringOpt("description"),
        birthday = rs.jodaLocalDateOpt("birthday"),
        createdAt = rs.jodaDateTime("created_at")
      )
    
      val vecMember: Vector[Member] = jdbcQueryResult[Vector,Member](ctx,toMember)
    
      println(s"members in vector: $vecMember")
    
      val ctx1 = ctx.copy(dbName = 'mysql)
    
      val names: List[String] = jdbcQueryResult[List,String](ctx1,{rs: WrappedResultSet => rs.string("name")})
    
      println(s"selected name: $names")
    
      val ctx2 = ctx1.copy(dbName = 'postgres)
      val idname: List[(Long,String)] = jdbcQueryResult[List,(Long,String)](ctx2,{rs: WrappedResultSet => (rs.long("id"),rs.string("name"))})
    
      println(s"selected id+name: $idname")
    
    
      object SlickDAO {
        import slick.jdbc.H2Profile.api._
    
        case class CountyModel(id: Int, name: String)
        case class CountyTable(tag: Tag) extends Table[CountyModel](tag,"COUNTY") {
          def id = column[Int]("ID",O.AutoInc,O.PrimaryKey)
          def name = column[String]("NAME",O.Length(64))
          def * = (id,name)<>(CountyModel.tupled,CountyModel.unapply)
        }
        val CountyQuery = TableQuery[CountyTable]
        val filter = "Kansas"
        val qry = CountyQuery.filter {_.name.toUpperCase like s"%${filter.toUpperCase}%"}
        val statement = qry.result.statements.head
      }
      import SlickDAO._
    
    
      val slickCtx = JDBCContext(
        dbName = 'h2,
        statements = Seq(statement),
      )
    
      val vecCounty: Vector[CountyModel] = jdbcQueryResult[Vector,CountyModel](slickCtx,{
        rs: WrappedResultSet => CountyModel(id=rs.int("id"),name=rs.string("name"))})
      vecCounty.foreach(r => println(s"${r.id},${r.name}"))
    
    
    }
     

     

     

  • 相关阅读:
    微信公众平台可通过UnionID机制在多公众号间帐号互通
    草根玩微博 中产玩微信 土豪玩什么?支持Yo的iWatch?
    The Model Complexity Myth
    BuzzSumo:什么样的文章能获得疯转?(基于1亿篇文章大数据分析)
    深度学习成长的烦恼
    猜你喜欢-----推荐系统原理介绍
    程序设计小问题
    机器学习是什么--周志华
    提高matlab运行速度和节省空间的心得
    matlab提速技巧(自matlab帮助文件)
  • 原文地址:https://www.cnblogs.com/tiger-xc/p/8375984.html
Copyright © 2011-2022 走看看