一.使用jdbcRDD的接口:
1 SparkConf conf = new SparkConf(); 2 conf.setAppName("Simple Application").setMaster("local"); 3 JavaSparkContext jsc = new JavaSparkContext(conf); 4 5 6 //1.直接使用jdbcRDD的构造函数 7 class DbConnection extends AbstractFunction0<Connection> implements 8 Serializable { 9 private static final long serialVersionUID = 1L; 10 private String driverClassName; 11 private String connectionUrl; 12 private String userName; 13 private String password; 14 15 public DbConnection(String driverClassName, String connectionUrl, 16 String userName, String password) { 17 this.driverClassName = driverClassName; 18 this.connectionUrl = connectionUrl; 19 this.userName = userName; 20 this.password = password; 21 } 22 23 @Override 24 public Connection apply() { 25 try { 26 Class.forName(driverClassName); 27 } catch (ClassNotFoundException e) { 28 } 29 Properties properties = new Properties(); 30 properties.setProperty("user", userName); 31 properties.setProperty("password", password); 32 Connection connection = null; 33 try { 34 connection = DriverManager.getConnection(connectionUrl, 35 properties); 36 } catch (SQLException e) { 37 } 38 return connection; 39 } 40 } 41 42 class MapResult extends AbstractFunction1<ResultSet, Object[]> 43 implements Serializable { 44 private static final long serialVersionUID = 1L; 45 46 public Object[] apply(ResultSet row) { 47 return JdbcRDD.resultSetToObjectArray(row); 48 } 49 } 50 51 String Connection_url = "jdbc:mysql://ip:port/dbname?useUnicode=true&characterEncoding=utf8"; 52 String Driver="com.mysql.jdbc.Driver"; 53 String UserName = "root"; 54 String password = "pd"; 55 DbConnection dbConnection = new DbConnection(Driver, 56 Connection_url, UserName, password); 57 sql = "select * from (" + sql + ") as tmp where 0=? and 0=?"; 58 //lowerBound,upperBound均设置0,where条件就为恒真,这个是个处理技巧 59 JdbcRDD<Object[]> jdbcRDD = new JdbcRDD<>(jsc.sc(), dbConnection, 60 sql, 0, 0, 1, new MapResult(), 61 ClassManifestFactory$.MODULE$.fromClass(Object[].class)); 62 JavaRDD<Object[]> javaRDD = JavaRDD.fromRDD(jdbcRDD, 63 ClassManifestFactory$.MODULE$.fromClass(Object[].class)); 64 65 66 //另外一种实现: 67 class DbConnectionFactory implements JdbcRDD.ConnectionFactory { 68 private static final long serialVersionUID = 1L; 69 private String driverClassName; 70 private String connectionUrl; 71 private String userName; 72 private String password; 73 74 public Connection getConnection() throws Exception { 75 Class.forName(driverClassName); 76 String url = connectionUrl; 77 Properties properties = new Properties(); 78 properties.setProperty("user", userName); 79 properties.setProperty("password", password); 80 return DriverManager.getConnection(url, properties); 81 } 82 83 public DbConnectionFactory(String driverClassName, String connectionUrl, 84 String userName, String password) { 85 this.driverClassName = driverClassName; 86 this.connectionUrl = connectionUrl; 87 this.userName = userName; 88 this.password = password; 89 } 90 91 } 92 93 String Connection_url = "jdbc:mysql://ip:port/dbname?useUnicode=true&characterEncoding=utf8"; 94 sql = "select * from (" + sql + ") as tmp where 0=? and 0=?"; 95 DbConnectionFactory ConnectFactory = new DbConnectionFactory(Driver, 96 Connection_url, UserName, password) 97 javaRDD = JdbcRDD.create(jsc, new DbConnectionFactory(Driver, 98 Connection_url, UserName, password), sql, 0, 0, 1,new Function<ResultSet,Object[]>() 99 { 100 private static final long serialVersionUID = 1L; 101 public Object[] call(ResultSet resultSet) 102 { 103 return JdbcRDD.resultSetToObjectArray(resultSet); 104 } 105 });//直接返回JavaRDD<Object[]>,这个底层调用的是JdbcRDD(SparkContext sc, Function0<Connection> getConnection, String sql, long lowerBound, long upperBound, int numPartitions, Function1<ResultSet, T> mapRow, ClassTag<T> evidence$1) 106 //javaRDD =JdbcRDD.create(jsc, ConnectFactory, sql, 0, 0, 1);//该方法更加简洁,底层调用上面的create(JavaSparkContext paramJavaSparkContext, ConnectionFactory paramConnectionFactory, String paramString, long paramLong1, long paramLong2, int paramInt, Function<ResultSet, T> paramFunction)
二.使用通过sparksession的接口:
1 SparkSession ss = SparkSession.builder().config(conf).enableHiveSupport().getOrCreate() 2 //读取方式1 3 String sql = "(select * from xxx) as tmp; //注意这里的sql格式,该sql也可以直接是一个表名 4 Dataset<Row> df = session.read().format("jdbc") 5 .option("url", jdbcURL) 6 .option("driver", driver) 7 .option("dbtable", sql) 8 .option("user", username) 9 .option("password", password) 10 .load(); 11 //读取方式2: 12 Properties connectionProperties = new Properties(); 13 connectionProperties.put("user", username); 14 connectionProperties.put("password", password); 15 connectionProperties.put("driver", driver); 16 session.read().jdbc(url, table, properties) 17 df = session.read().jdbc(jdbcURL,sql,connectionProperties); 18 19 20 //写入方式1: 21 String saveMode = "Overwrite"; 22 df.write().mode(saveMode).jdbc(jdbcURL, tablename, connectionProperties); 23 //写入方式2: 24 final String sql = "insert into tab_xxx (c1,c2,c3) values(?,?,?)"; 25 26 df.javaRDD().foreachPartition(new VoidFunction<Iterator<Row>>() { 27 private static final long serialVersionUID = -834520661839866305L; 28 @Override 29 public void call(Iterator<Row> t) throws Exception { 30 Class.forName(driver); 31 Connection conn = (Connection) DriverManager.getConnection(url, username, password); 32 conn.setAutoCommit(false); 33 try { 34 PreparedStatement pstmt = (PreparedStatement) conn.prepareStatement(sql); 35 int loop = 0; 36 while (t.hasNext()) { 37 Row row = t.next(); 38 for (int i = 0; i < 3; i++) { //这里的3是插入的列只有3列 39 pstmt.setObject(i + 1, row.get(i)); 40 } 41 pstmt.executeUpdate(); 42 if (++loop % 1000 == 0) { 43 conn.commit(); 44 } 45 } 46 conn.commit(); 47 pstmt.close(); 48 } finally { 49 conn.close(); 50 } 51 } 52 }); 53 }
1 //写入方法3 2 3.转换成List<Row>,可以批量写入,但是有可能导致Driver 内存承载过高 3 4 private static void InsertTmpTable(Connection conn,Dataset<Row> ndf,final String m_cols,final String tabname) { 5 System.out.println("InsertTmpTab "+tabname+" start!"); 6 String placeholderStr="values("; 7 int cnt = m_cols.split(",").length+1; 8 for(int i = 0;i<cnt;i++){ 9 placeholderStr+="?"+ (i == cnt-1 ? ")" : ","); 10 } 11 12 String sql = "insert into "+ tabname+"("+m_cols+",orderby_time)"+placeholderStr; 13 PreparedStatement pstmt = null; 14 15 try { 16 List<Row> lrow=ndf.collectAsList(); 17 pstmt = (PreparedStatement) conn.prepareStatement(sql); 18 19 for(int j =0;j<lrow.size();j++){ 20 for(int i=0;i<cnt;i++){ 21 pstmt.setObject(i+1,lrow.get(j).get(i)); 22 } 23 pstmt.executeUpdate(); 24 if(j%10000==0) //批量提交 25 conn.commit(); 26 } 27 conn.commit(); 28 pstmt.close(); 29 }catch (Exception e){ 30 System.out.println(e.getMessage()); 31 System.exit(-1); 32 } 33 System.out.println("InsertTmpTab "+tabname+" success!"); 34 }