一:意义
1.意义
如果可以实现这个功能,就可以使用spark代替sqoop,功能程序就实现这个功能。
二:hive操作
1.准备数据
启动hive
否则报错,因为在hive与spark集成的时候,配置过配置项。
后来,又看见这个文档,感觉很好的解释了我存在的问题:https://blog.csdn.net/freedomboy319/article/details/44828337
2.新建部门员工表
1 -》创建员工表 2 create table emp( 3 empno int, 4 ename string, 5 job string, 6 mgr int, 7 hiredate string, 8 sal double, 9 comm double, 10 deptno int 11 ) 12 row format delimited fields terminated by ' '; 13 load data local inpath '/opt/datas/emp.txt' into table emp; 14 15 16 -》部门表 17 create table dept( 18 deptno int, 19 dname string, 20 loc string 21 ) 22 row format delimited fields terminated by ' '; 23 load data local inpath '/opt/datas/dept.txt' into table dept;
3.效果
三:程序
1.大纲
2.前提
需要hive-site.xml
3.需要的依赖
1 <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-hive --> 2 <dependency> 3 <groupId>org.apache.spark</groupId> 4 <artifactId>spark-hive_2.10</artifactId> 5 <version>${spark.version}</version> 6 <scope>provided</scope> 7 </dependency> 8 9 <!-- https://mvnrepository.com/artifact/mysql/mysql-connector-java --> 10 <dependency> 11 <groupId>mysql</groupId> 12 <artifactId>mysql-connector-java</artifactId> 13 <version>6.0.4</version> 14 </dependency>
4.报错如下
1 Exception in thread "main" java.sql.SQLNonTransientConnectionException: CLIENT_PLUGIN_AUTH is required 2 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:550) 3 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:537) 4 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:527) 5 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:512) 6 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:480) 7 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:498) 8 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:494) 9 at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:72) 10 at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1634) 11 at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:637) 12 at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:351) 13 at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:224) 14 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:61) 15 at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$2.apply(JdbcUtils.scala:52) 16 at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:278) 17 at com.scala.it.HiveToMysql$.main(HiveToMysql.scala:28) 18 at com.scala.it.HiveToMysql.main(HiveToMysql.scala) 19 Caused by: com.mysql.cj.core.exceptions.UnableToConnectException: CLIENT_PLUGIN_AUTH is required 20 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) 21 at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) 22 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) 23 at java.lang.reflect.Constructor.newInstance(Constructor.java:423) 24 at com.mysql.cj.core.exceptions.ExceptionFactory.createException(ExceptionFactory.java:54) 25 at com.mysql.cj.core.exceptions.ExceptionFactory.createException(ExceptionFactory.java:73) 26 at com.mysql.cj.mysqla.io.MysqlaProtocol.rejectConnection(MysqlaProtocol.java:319) 27 at com.mysql.cj.mysqla.authentication.MysqlaAuthenticationProvider.connect(MysqlaAuthenticationProvider.java:207) 28 at com.mysql.cj.mysqla.io.MysqlaProtocol.connect(MysqlaProtocol.java:1361) 29 at com.mysql.cj.mysqla.MysqlaSession.connect(MysqlaSession.java:132) 30 at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:1754) 31 at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1624) 32 ... 8 more
原因:
mysql-connect版本不匹配,换5.1.17版本。
5.程序
1 package com.scala.it 2 3 import java.util.Properties 4 5 import org.apache.spark.sql.SaveMode 6 import org.apache.spark.sql.hive.HiveContext 7 import org.apache.spark.{SparkConf, SparkContext} 8 9 object HiveToMysql { 10 def main(args: Array[String]): Unit = { 11 val conf = new SparkConf() 12 .setMaster("local[*]") 13 .setAppName("hive-yo-mysql") 14 val sc = SparkContext.getOrCreate(conf) 15 val sqlContext = new HiveContext(sc) 16 val (url, username, password) = ("jdbc:mysql://linux-hadoop01.ibeifeng.com:3306/hadoop09", "root", "123456") 17 val props = new Properties() 18 props.put("user", username) 19 props.put("password", password) 20 21 // ================================== 22 // 第一步:同步hive的dept表到mysql中 23 sqlContext 24 .read 25 .table("hadoop09.dept") // database.tablename 26 .write 27 .mode(SaveMode.Overwrite) // 存在覆盖 28 .jdbc(url, "mysql_dept", props) 29 } 30 }
6.效果