zoukankan      html  css  js  c++  java
  • spark操作数据库的几种方法

    一.使用jdbcRDD的接口:

      1 SparkConf conf = new SparkConf();
      2         conf.setAppName("Simple Application").setMaster("local");
      3         JavaSparkContext jsc = new JavaSparkContext(conf);
      4 
      5         
      6 //1.直接使用jdbcRDD的构造函数
      7 class DbConnection extends AbstractFunction0<Connection> implements
      8         Serializable {
      9     private static final long serialVersionUID = 1L;
     10     private String driverClassName;
     11     private String connectionUrl;
     12     private String userName;
     13     private String password;
     14 
     15     public DbConnection(String driverClassName, String connectionUrl,
     16             String userName, String password) {
     17         this.driverClassName = driverClassName;
     18         this.connectionUrl = connectionUrl;
     19         this.userName = userName;
     20         this.password = password;
     21     }
     22 
     23     @Override
     24     public Connection apply() {
     25         try {
     26             Class.forName(driverClassName);
     27         } catch (ClassNotFoundException e) {
     28         }
     29         Properties properties = new Properties();
     30         properties.setProperty("user", userName);
     31         properties.setProperty("password", password);
     32         Connection connection = null;
     33         try {
     34             connection = DriverManager.getConnection(connectionUrl,
     35                     properties);
     36         } catch (SQLException e) {
     37         }
     38         return connection;
     39     }
     40 }
     41 
     42 class MapResult extends AbstractFunction1<ResultSet, Object[]>
     43         implements Serializable {
     44     private static final long serialVersionUID = 1L;
     45 
     46     public Object[] apply(ResultSet row) {
     47         return JdbcRDD.resultSetToObjectArray(row);
     48     }
     49 }
     50 
     51 String Connection_url = "jdbc:mysql://ip:port/dbname?useUnicode=true&characterEncoding=utf8";
     52 String Driver="com.mysql.jdbc.Driver";
     53 String UserName = "root";
     54 String password = "pd";
     55 DbConnection dbConnection = new DbConnection(Driver,
     56         Connection_url, UserName, password);
     57 sql = "select * from (" + sql + ") as tmp where 0=? and 0=?";
     58 //lowerBound,upperBound均设置0,where条件就为恒真,这个是个处理技巧
     59 JdbcRDD<Object[]> jdbcRDD = new JdbcRDD<>(jsc.sc(), dbConnection,
     60         sql, 0, 0, 1, new MapResult(),
     61         ClassManifestFactory$.MODULE$.fromClass(Object[].class));
     62 JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD,
     63         ClassManifestFactory$.MODULE$.fromClass(Object[].class));
     64         
     65         
     66 //另外一种实现:
     67 class DbConnectionFactory implements JdbcRDD.ConnectionFactory {
     68     private static final long serialVersionUID = 1L;
     69     private String driverClassName;
     70     private String connectionUrl;
     71     private String userName;
     72     private String password;
     73     
     74     public Connection getConnection() throws Exception {
     75         Class.forName(driverClassName);
     76         String url = connectionUrl;
     77         Properties properties = new Properties();
     78         properties.setProperty("user", userName);
     79         properties.setProperty("password", password);
     80         return DriverManager.getConnection(url, properties);
     81     }
     82     
     83     public DbConnectionFactory(String driverClassName, String connectionUrl,
     84             String userName, String password) {
     85         this.driverClassName = driverClassName;
     86         this.connectionUrl = connectionUrl;
     87         this.userName = userName;
     88         this.password = password;
     89     }
     90 
     91 }
     92 
     93 String Connection_url = "jdbc:mysql://ip:port/dbname?useUnicode=true&characterEncoding=utf8";
     94 sql = "select * from (" + sql + ") as tmp where 0=? and 0=?";
     95 DbConnectionFactory ConnectFactory = new DbConnectionFactory(Driver,
     96         Connection_url, UserName, password)
     97 javaRDD = JdbcRDD.create(jsc, new DbConnectionFactory(Driver,
     98                     Connection_url, UserName, password), sql, 0, 0, 1,new Function<ResultSet,Object[]>()
     99             {
    100                 private static final long serialVersionUID = 1L;
    101                 public Object[] call(ResultSet resultSet)
    102                   {
    103                     return JdbcRDD.resultSetToObjectArray(resultSet);
    104                   }
    105             });//直接返回JavaRDD<Object[]>,这个底层调用的是JdbcRDD(SparkContext sc, Function0<Connection> getConnection, String sql, long lowerBound, long upperBound, int numPartitions, Function1<ResultSet, T> mapRow, ClassTag<T> evidence$1)
    106 //javaRDD =JdbcRDD.create(jsc, ConnectFactory, sql, 0, 0, 1);//该方法更加简洁,底层调用上面的create(JavaSparkContext paramJavaSparkContext, ConnectionFactory paramConnectionFactory, String paramString, long paramLong1, long paramLong2, int paramInt, Function<ResultSet, T> paramFunction)

     二.使用通过sparksession的接口:

     1 SparkSession ss = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate()
     2 //读取方式1
     3 String sql = "(select * from xxx) as tmp; //注意这里的sql格式,该sql也可以直接是一个表名
     4 Dataset<Row> df = session.read().format("jdbc")
     5                     .option("url", jdbcURL)
     6                     .option("driver", driver)
     7                     .option("dbtable", sql)
     8                     .option("user", username)
     9                     .option("password", password)
    10                     .load();
    11 //读取方式2:
    12 Properties connectionProperties = new Properties();
    13             connectionProperties.put("user", username);
    14             connectionProperties.put("password", password);
    15             connectionProperties.put("driver", driver);
    16                  session.read().jdbc(url, table, properties)
    17 df = session.read().jdbc(jdbcURL,sql,connectionProperties);
    18 
    19 
    20 //写入方式1:
    21 String saveMode = "Overwrite";
    22 df.write().mode(saveMode).jdbc(jdbcURL, tablename, connectionProperties);
    23 //写入方式2:
    24 final String sql = "insert into tab_xxx (c1,c2,c3) values(?,?,?)";
    25 
    26     df.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() {
    27         private static final long serialVersionUID = -834520661839866305L;
    28         @Override
    29         public void call(Iterator<Row> t) throws Exception {
    30             Class.forName(driver);
    31             Connection conn = (Connection) DriverManager.getConnection(url, username, password);
    32             conn.setAutoCommit(false);
    33             try {
    34                 PreparedStatement pstmt = (PreparedStatement) conn.prepareStatement(sql);
    35                 int loop = 0;
    36                 while (t.hasNext()) {
    37                     Row row = t.next();
    38                     for (int i = 0; i < 3; i++) { //这里的3是插入的列只有3列
    39                         pstmt.setObject(i + 1, row.get(i));
    40                     }
    41                     pstmt.executeUpdate();
    42                     if (++loop % 1000 == 0) {
    43                         conn.commit();
    44                     }
    45                 }
    46                 conn.commit();
    47                 pstmt.close();
    48             } finally {
    49                 conn.close();
    50             }
    51         }
    52     });
    53 }
     1 //写入方法3
     2 3.转换成List<Row>,可以批量写入,但是有可能导致Driver 内存承载过高
     3 
     4 private static void InsertTmpTable(Connection conn,Dataset<Row> ndf,final String m_cols,final String tabname) {
     5         System.out.println("InsertTmpTab "+tabname+" start!");
     6         String  placeholderStr="values(";
     7         int cnt = m_cols.split(",").length+1;
     8         for(int  i = 0;i<cnt;i++){
     9             placeholderStr+="?"+ (i == cnt-1 ? ")" : ",");
    10         }
    11         
    12          String sql = "insert into "+ tabname+"("+m_cols+",orderby_time)"+placeholderStr;
    13          PreparedStatement pstmt = null;
    14         
    15         try {
    16            List<Row> lrow=ndf.collectAsList();
    17            pstmt = (PreparedStatement) conn.prepareStatement(sql);
    18     
    19            for(int j =0;j<lrow.size();j++){
    20                for(int i=0;i<cnt;i++){
    21                 pstmt.setObject(i+1,lrow.get(j).get(i));
    22                }
    23                pstmt.executeUpdate();
    24                if(j%10000==0)  //批量提交
    25                    conn.commit(); 
    26            }
    27            conn.commit();
    28            pstmt.close();
    29         }catch (Exception e){
    30             System.out.println(e.getMessage());
    31             System.exit(-1);
    32         }
    33         System.out.println("InsertTmpTab "+tabname+" success!");
    34     }
  • 相关阅读:
    sed命令
    python常用库
    python标准库
    从 Python 打包到 CLI 工具
    pip
    python包自我理解
    docker常用命令
    chattr命令
    xmss
    live2d-widget.js
  • 原文地址:https://www.cnblogs.com/lyy-blog/p/9633926.html
Copyright © 2011-2022 走看看