zoukankan      html  css  js  c++  java
  • Reactive MySQL Client

    Reactive MySQL Client是MySQL的客户端,具有直观的API,侧重于可伸缩性和低开销。

    特征

    • 事件驱动

    • 轻量级

    • 内置连接池

    • 准备好的查询缓存

    • 游标支持

    • 行流

    • RxJava 1和RxJava 2

    • 将内存直接写入对象而不需要不必要

    • Java 8日期和时间

    • MySQL实用程序命令支持

    • 兼容MySQL 5.6和5.7

    用法

    要使用Reactive MySQL Client,请将以下依赖项添加到构建描述符dependencies部分:

    • Maven(在你的pom.xml):

    <dependency>
     <groupId>io.vertx</groupId>
     <artifactId>vertx-mysql-client</artifactId>
     <version>3.8.0</version>
    </dependency>
    • Gradle(在您的build.gradle文件中):

    dependencies {
     compile 'io.vertx:vertx-mysql-client:3.8.0'
    }

    入门

    这是连接,查询和断开连接的最简单方法

    MySQLConnectOptions connectOptions = new MySQLConnectOptions()
      .setPort(3306)
      .setHost("the-host")
      .setDatabase("the-db")
      .setUser("user")
      .setPassword("secret");
    
    // Pool options
    PoolOptions poolOptions = new PoolOptions()
      .setMaxSize(5);
    
    // Create the client pool
    MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);
    
    // A simple query
    client.query("SELECT * FROM users WHERE id='julien'", ar -> {
      if (ar.succeeded()) {
        RowSet result = ar.result();
        System.out.println("Got " + result.size() + " rows ");
      } else {
        System.out.println("Failure: " + ar.cause().getMessage());
      }
    
      // Now close the pool
      client.close();
    });

    连接到MySQL

    大多数情况下,您将使用池连接到MySQL:

    MySQLConnectOptions connectOptions = new MySQLConnectOptions()
      .setPort(3306)
      .setHost("the-host")
      .setDatabase("the-db")
      .setUser("user")
      .setPassword("secret");
    
    // Pool options
    PoolOptions poolOptions = new PoolOptions()
      .setMaxSize(5);
    
    // Create the pooled client
    MySQLPool client = MySQLPool.pool(connectOptions, poolOptions);

    池化客户端使用连接池,任何操作都将从池中借用连接以执行操作并将其释放到池中。

    如果您使用Vert.x运行,可以将Vertx实例传递给它:

    MySQLConnectOptions connectOptions = new MySQLConnectOptions()
      .setPort(3306)
      .setHost("the-host")
      .setDatabase("the-db")
      .setUser("user")
      .setPassword("secret");
    
    // Pool options
    PoolOptions poolOptions = new PoolOptions()
      .setMaxSize(5);
    // Create the pooled client
    MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);

    您需要在不再需要时释放池:

    pool.close();

    当您需要在同一连接上执行多个操作时,您需要使用客户端 connection

    您可以从游泳池轻松获得一个:

    MySQLConnectOptions connectOptions = new MySQLConnectOptions()
      .setPort(3306)
      .setHost("the-host")
      .setDatabase("the-db")
      .setUser("user")
      .setPassword("secret");
    
    // Pool options
    PoolOptions poolOptions = new PoolOptions()
      .setMaxSize(5);
    
    // Create the pooled client
    MySQLPool client = MySQLPool.pool(vertx, connectOptions, poolOptions);
    
    // Get a connection from the pool
    client.getConnection(ar1 -> {
    
      if (ar1.succeeded()) {
    
        System.out.println("Connected");
    
        // Obtain our connection
        SqlConnection conn = ar1.result();
    
        // All operations execute on the same connection
        conn.query("SELECT * FROM users WHERE id='julien'", ar2 -> {
          if (ar2.succeeded()) {
            conn.query("SELECT * FROM users WHERE id='emad'", ar3 -> {
              // Release the connection to the pool
              conn.close();
            });
          } else {
            // Release the connection to the pool
            conn.close();
          }
        });
      } else {
        System.out.println("Could not connect: " + ar1.cause().getMessage());
      }
    });

    完成连接后,必须将其关闭以将其释放到池中,以便可以重复使用。

    组态

    您可以使用多种方法来配置客户端。

    数据对象

    配置客户端的一种简单方法是指定MySQLConnectOptions数据对象。

    MySQLConnectOptions connectOptions = new MySQLConnectOptions()
      .setPort(3306)
      .setHost("the-host")
      .setDatabase("the-db")
      .setUser("user")
      .setPassword("secret");
    
    // Pool Options
    PoolOptions poolOptions = new PoolOptions().setMaxSize(5);
    
    // Create the pool from the data object
    MySQLPool pool = MySQLPool.pool(vertx, connectOptions, poolOptions);
    
    pool.getConnection(ar -> {
      // Handling your connection
    });

    您还可以使用setPropertiesaddProperty方法配置连接属性注意setProperties将覆盖默认的客户端属性。

    MySQLConnectOptions connectOptions = new MySQLConnectOptions();
    
    // Add a connection attribute
    connectOptions.addProperty("_java_version", "1.8.0_212");
    
    // Override the attributes
    Map<String, String> attributes = new HashMap<>();
    attributes.put("_client_name", "myapp");
    attributes.put("_client_version", "1.0.0");
    connectOptions.setProperties(attributes);

    有关客户端连接属性的更多信息,请参阅MySQL参考手册

    连接uri

    除了使用MySQLConnectOptions数据对象进行配置之外,当您要使用连接URI进行配置时,我们还为您提供了另一种连接方式:

    String connectionUri = "mysql://dbuser:secretpassword@database.server.com:3211/mydb";
    
    // Create the pool from the connection URI
    MySQLPool pool = MySQLPool.pool(connectionUri);
    
    // Create the connection from the connection URI
    MySQLConnection.connect(vertx, connectionUri, res -> {
      // Handling your connection
    });

    有关连接字符串格式的更多信息,请参阅MySQL参考手册

    目前,客户端支持连接uri中的以下参数关键字

    • 主办

    • 港口

    • 用户

    • 密码

    • 模式

    • 插座

    运行查询

    当您不需要事务或运行单个查询时,您可以直接在池上运行查询; 池将使用其中一个连接来运行查询并将结果返回给您。

    以下是如何运行简单查询:

    client.query("SELECT * FROM users WHERE id='julien'", ar -> {
      if (ar.succeeded()) {
        RowSet result = ar.result();
        System.out.println("Got " + result.size() + " rows ");
      } else {
        System.out.println("Failure: " + ar.cause().getMessage());
      }
    });

    您可以使用准备好的查询执行相同操作。

    SQL字符串可以通过位置参考参数,使用$1$2等...

    client.preparedQuery("SELECT * FROM users WHERE id=?", Tuple.of("julien"), ar -> {
      if (ar.succeeded()) {
        RowSet rows = ar.result();
        System.out.println("Got " + rows.size() + " rows ");
      } else {
        System.out.println("Failure: " + ar.cause().getMessage());
      }
    });

    查询方法提供了一个RowSet适用于SELECT查询的异步实例

    client.preparedQuery("SELECT first_name, last_name FROM users", ar -> {
      if (ar.succeeded()) {
        RowSet rows = ar.result();
        for (Row row : rows) {
          System.out.println("User " + row.getString(0) + " " + row.getString(1));
        }
      } else {
        System.out.println("Failure: " + ar.cause().getMessage());
      }
    });

    UPDATE / INSERT查询:

    client.preparedQuery("INSERT INTO users (first_name, last_name) VALUES (?, ?)", Tuple.of("Julien", "Viet"), ar -> {
      if (ar.succeeded()) {
        RowSet rows = ar.result();
        System.out.println(rows.rowCount());
      } else {
        System.out.println("Failure: " + ar.cause().getMessage());
      }
    });

    Row让你通过索引访问您的数据

    System.out.println("User " + row.getString(0) + " " + row.getString(1));

    或按名称

    System.out.println("User " + row.getString("first_name") + " " + row.getString("last_name"));

    您可以访问各种类型

    String firstName = row.getString("first_name");
    Boolean male = row.getBoolean("male");
    Integer age = row.getInteger("age");

    您可以执行准备批处理

     

    您可以缓存准备好的查询:

     

    您可以在查询中使用“RETURNING”子句获取生成的键:

     

    使用连接

    当您需要执行顺序查询(没有事务)时,您可以创建新连接或从池中借用一个连接:

    pool.getConnection(ar1 -> {
      if (ar1.succeeded()) {
        SqlConnection connection = ar1.result();
    
        connection.query("SELECT * FROM users WHERE id='julien'", ar2 -> {
          if (ar1.succeeded()) {
            connection.query("SELECT * FROM users WHERE id='paulo'", ar3 -> {
              // Do something with rows and return the connection to the pool
              connection.close();
            });
          } else {
            // Return the connection to the pool
            connection.close();
          }
        });
      }
    });

    可以创建准备好的查询:

    connection.prepare("SELECT * FROM users WHERE first_name LIKE ?", ar1 -> {
      if (ar1.succeeded()) {
        PreparedQuery pq = ar1.result();
        pq.execute(Tuple.of("julien"), ar2 -> {
          if (ar2.succeeded()) {
            // All rows
            RowSet rows = ar2.result();
          }
        });
      }
    });
    注意
    准备好的查询缓存取决于setCachePreparedStatements并且不依赖于您是创建准备好的查询还是使用direct prepared queries

    PreparedQuery 可以执行有效的批处理:

     

    游标和流媒体

    默认情况下,准备好的查询执行会获取所有行,您可以使用a Cursor来控制要读取的行数:

    connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
      if (ar1.succeeded()) {
        PreparedQuery pq = ar1.result();
    
        // Create a cursor
        Cursor cursor = pq.cursor(Tuple.of(18));
    
        // Read 50 rows
        cursor.read(50, ar2 -> {
          if (ar2.succeeded()) {
            RowSet rows = ar2.result();
    
            // Check for more ?
            if (cursor.hasMore()) {
              // Repeat the process...
            } else {
              // No more rows - close the cursor
              cursor.close();
            }
          }
        });
      }
    });

    PostreSQL在事务结束时销毁游标,因此游标API将在事务中使用,否则您可能会收到34000PostgreSQL错误。

    游标过早释放时应关闭:

    cursor.read(50, ar2 -> {
      if (ar2.succeeded()) {
        // Close the cursor
        cursor.close();
      }
    });

    流API也可用于游标,这可以更方便,特别是使用Rxified版本。

    connection.prepare("SELECT * FROM users WHERE age > ?", ar1 -> {
      if (ar1.succeeded()) {
        PreparedQuery pq = ar1.result();
    
        // Fetch 50 rows at a time
        RowStream<Row> stream = pq.createStream(50, Tuple.of(18));
    
        // Use the stream
        stream.exceptionHandler(err -> {
          System.out.println("Error: " + err.getMessage());
        });
        stream.endHandler(v -> {
          System.out.println("End of stream");
        });
        stream.handler(row -> {
          System.out.println("User: " + row.getString("last_name"));
        });
      }
    });

    流按批次读取行50并流式传输,当行已传递给处理程序时,将50读取新批次,依此类推。

    流可以恢复或暂停,加载的行将保留在内存中,直到它们被传递并且光标将停止迭代。

    MySQL类型映射

    目前,客户端支持以下MySQL类型

    • BOOL,BOOLEAN(java.lang.Byte

    • TINYINT(java.lang.Byte

    • SMALLINT(java.lang.Short

    • MEDIUMINT(java.lang.Integer

    • INT,INTEGER(java.lang.Integer

    • BIGINT(java.lang.Long

    • FLOAT(java.lang.Float

    • DOUBLE(java.lang.Double

    • NUMERIC(io.vertx.sqlclient.data.Numeric

    • 日期(java.time.LocalDate

    • DATETIME(java.time.LocalDateTime

    • 时间(java.time.Duration

    • TIMESTAMP(java.time.LocalDateTime

    • 年(java.lang.Short

    • CHAR(java.lang.String

    • VARCHAR(java.lang.String

    • BINARY(io.vertx.core.buffer.Buffer

    • VARBINARY(io.vertx.core.buffer.Buffer

    • TINYBLOB(io.vertx.core.buffer.Buffer

    • TINYTEXT(java.lang.String

    • BLOB(io.vertx.core.buffer.Buffer

    • 文字(java.lang.String

    • MEDIUMBLOB(io.vertx.core.buffer.Buffer

    • MEDIUMTEXT(java.lang.String

    • LONGBLOB(io.vertx.core.buffer.Buffer

    • LONGTEXT(java.lang.String

    元组解码在存储值时使用上述类型

    处理BOOLEAN

    在MySQL中BOOLEANBOOL数据类型是同义词TINYINT(1)零值被视为假,非零值被视为真。一个BOOLEAN数据类型值存储在RowTuple作为java.lang.Byte类型,你可以调用Row#getValue来检索它的java.lang.Byte值,或者可以称之为Row#getBoolean检索它java.lang.Boolean的价值。

    client.query("SELECT graduated FROM students WHERE id = 0", ar -> {
      if (ar.succeeded()) {
        RowSet rowSet = ar.result();
        for (Row row : rowSet) {
          int pos = row.getColumnIndex("graduated");
          Byte value = row.get(Byte.class, pos);
          Boolean graduated = row.getBoolean("graduated");
        }
      } else {
        System.out.println("Failure: " + ar.cause().getMessage());
      }
    });

    如果要使用BOOLEAN的参数执行预准备语句,只需将该java.lang.Boolean添加到参数列表即可。

    client.preparedQuery("UPDATE students SET graduated = ? WHERE id = 0", Tuple.of(true), ar -> {
      if (ar.succeeded()) {
        System.out.println("Updated with the boolean value");
      } else {
        System.out.println("Failure: " + ar.cause().getMessage());
      }
    });

    处理NUMERIC

    NumericJava类型用于表示MySQL的NUMERIC类型。

    Numeric numeric = row.get(Numeric.class, 0);
    if (numeric.isNaN()) {
      // Handle NaN
    } else {
      BigDecimal value = numeric.bigDecimalValue();
    }

    收集器查询

    您可以将Java收集器与查询API一起使用:

    Collector<Row, ?, Map<Long, String>> collector = Collectors.toMap(
      row -> row.getLong("id"),
      row -> row.getString("last_name"));
    
    // Run the query with the collector
    client.query("SELECT * FROM users",
      collector,
      ar -> {
        if (ar.succeeded()) {
          SqlResult<Map<Long, String>> result = ar.result();
    
          // Get the map created by the collector
          Map<Long, String> map = result.value();
          System.out.println("Got " + map);
        } else {
          System.out.println("Failure: " + ar.cause().getMessage());
        }
      });

    收集器处理不能保留对它的引用,Row因为有一行用于处理整个集合。

    Java Collectors提供了许多有趣的预定义收集器,例如,您可以直接从行集创建一个字符串:

    Collector<Row, ?, String> collector = Collectors.mapping(
      row -> row.getString("last_name"),
      Collectors.joining(",", "(", ")")
    );
    
    // Run the query with the collector
    client.query("SELECT * FROM users",
      collector,
      ar -> {
        if (ar.succeeded()) {
          SqlResult<String> result = ar.result();
    
          // Get the string created by the collector
          String list = result.value();
          System.out.println("Got " + list);
        } else {
          System.out.println("Failure: " + ar.cause().getMessage());
        }
      });

    MySQL实用程序命令

    有时您想使用MySQL实用程序命令,我们为此提供支持。可以在MySQL实用程序命令中找到更多信息

    COM_PING

    您可以使用COM_PING命令检查服务器是否处于活动状态。如果服务器响应PING,将通知处理程序,否则将永远不会调用处理程序。

    connection.ping(ar -> {
      System.out.println("The server has responded to the PING");
    });

    COM_RESET_CONNECTION

    您可以使用COM_RESET_CONNECTION命令重置会话状态,这将重置连接状态,如: - 用户变量 - 临时表 - 预准备语句

    connection.resetConnection(ar -> {
      if (ar.succeeded()) {
        System.out.println("Connection has been reset now");
      } else {
        System.out.println("Failure: " + ar.cause().getMessage());
      }
    });

    COM_CHANGE_USER

    您可以更改当前连接的用户,这将执行重新身份验证并重置连接状态COM_RESET_CONNECTION

    MySQLConnectOptions authenticationOptions = new MySQLConnectOptions()
      .setUser("newuser")
      .setPassword("newpassword")
      .setDatabase("newdatabase");
    connection.changeUser(authenticationOptions, ar -> {
      if (ar.succeeded()) {
        System.out.println("User of current connection has been changed.");
      } else {
        System.out.println("Failure: " + ar.cause().getMessage());
      }
    });

    COM_INIT_DB

    您可以使用COM_INIT_DB命令更改连接的默认架构。

    connection.specifySchema("newschema", ar -> {
      if (ar.succeeded()) {
        System.out.println("Default schema changed to newschema");
      } else {
        System.out.println("Failure: " + ar.cause().getMessage());
      }
    });

    COM_STATISTICS

    您可以使用COM_STATISTICS命令在MySQL服务器中获取一些人类可读的内部状态变量字符串。

    connection.getInternalStatistics(ar -> {
      if (ar.succeeded()) {
        System.out.println("Statistics: " + ar.result());
      } else {
        System.out.println("Failure: " + ar.cause().getMessage());
      }
    });

    COM_DEBUG

    您可以使用COM_DEBUG命令将调试信息转储到MySQL服务器的STDOUT。

    connection.debug(ar -> {
      if (ar.succeeded()) {
        System.out.println("Debug info dumped to server's STDOUT");
      } else {
        System.out.println("Failure: " + ar.cause().getMessage());
      }
    });

    COM_SET_OPTION

    您可以使用COM_SET_OPTION命令为当前连接设置选项。目前只能CLIENT_MULTI_STATEMENTS设置。

    例如,您可以CLIENT_MULTI_STATEMENTS使用此命令禁用

    connection.setOption(MySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_OFF, ar -> {
      if (ar.succeeded()) {
        System.out.println("CLIENT_MULTI_STATEMENTS is off now");
      } else {
        System.out.println("Failure: " + ar.cause().getMessage());
      }
    });
  • 相关阅读:
    myeclipse导入项目中的乱码问题的解决
    myeclipse中的jar包的引入与新建
    myeclipse如何修改默认存储文件路径
    oracle迁移数据到mysql
    如何设置myeclipse的编码格式
    tns的查找与修改
    在PL/SQL中输入SQL语句时关键字的首字母自动变成大写
    滤器处理中文编码
    题解导航
    莫队总结应用
  • 原文地址:https://www.cnblogs.com/endv/p/11257577.html
Copyright © 2011-2022 走看看