zoukankan      html  css  js  c++  java
  • spark之JDBC开发(连接数据库测试)

    spark之JDBC开发(连接数据库测试)


    以下操作属于本地模式操作:

    1、在Eclipse4.5中建立工程RDDToJDBC,并创建一个文件夹lib用于放置第三方驱动包

    [hadoop@CloudDeskTop software]$ cd /project/RDDToJDBC/
    [hadoop@CloudDeskTop RDDToJDBC]$ mkdir -p lib
    [hadoop@CloudDeskTop RDDToJDBC]$ ls
    bin lib src

    2、添加必要的环境

    2.1、将MySql的jar包拷贝到工程目录RDDToJDBC下的lib目录下
    [hadoop@CloudDeskTop software]$ cp -a /software/hive-1.2.2/lib/mysql-connector-java-3.0.17-ga-bin.jar /project/RDDToJDBC/lib/
    2.1、将Spark的开发库Spark2.1.1-All追加到RDDToJDBC工程的classpath路径中去(可以通过添加用户库的方式来解决);Spark2.1.1-All中包含哪些包,请点击此处

    3、准备spark的源数据:

    [hadoop@CloudDeskTop spark]$ cd /home/hadoop/test/jdbc/
    [hadoop@CloudDeskTop jdbc]$ ls
    myuser  testJDBC.txt
    [hadoop@CloudDeskTop jdbc]$ cat myuser 
    lisi 123456 165 1998-9-9
    lisan 123ss 187 2009-10-19
    wangwu 123qqwe 177 1990-8-3

    4、开发源码:

    package com.mmzs.bigdata.spark.core.local;
    
    import java.io.File;
    import java.sql.Connection;
    import java.sql.Date;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.sql.SQLException;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.VoidFunction;
    
    import scala.Tuple4;
    
    public class TestMain {
        /**
         * 全局计数器
         */
        private static int count;
        
        /**
         * 数据库连接
         */
        private static Connection conn;
        
        /**
         * 预编译语句
         */
        private static PreparedStatement pstat;
        
        private static final File OUT_PATH=new File("/home/hadoop/test/jdbc/output");
        
        static{
            delDir(OUT_PATH);
            try {
                String sql="insert into myuser(userName,passWord,height,birthday) values(?,?,?,?)";
                String url="jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8";
                Class.forName("com.mysql.jdbc.Driver");
                conn=DriverManager.getConnection(url, "root", "123456");
                pstat=conn.prepareStatement(sql);
            } catch (ClassNotFoundException e) {
                e.printStackTrace();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
        /**
         * 删除任何目录或文件
         * @param f
         */
        private static void delDir(File f){
            if(!f.exists())return;
            if(f.isFile()||(f.isDirectory()&&f.listFiles().length==0)){
                f.delete();
                return;
            }
            File[] files=f.listFiles();
            for(File fp:files)delDir(fp);
            f.delete();
        }
        
        private static void batchSave(Tuple4<String,String,Double,Date> line,boolean isOver){
            try{
                pstat.setString(1, line._1());
                pstat.setString(2, line._2());
                pstat.setDouble(3, line._3());
                pstat.setDate(4, line._4());
                
                if(isOver){//如果结束了循环则直接写磁盘
                    pstat.addBatch();
                    pstat.executeBatch();
                    pstat.clearBatch();
                    pstat.clearParameters();
                }else{ //如果没有结束则将sql语句添加到批处理中去
                    pstat.addBatch();
                    count++;
                    if(count%100==0){ //如果满一个批次就提交一次批处理操作
                        pstat.executeBatch();
                        pstat.clearBatch();
                        pstat.clearParameters();
                    }
                }
            }catch(SQLException e){
                e.printStackTrace();
            }
        }
        
        /**
         * 将RDD集合中的数据存储到关系数据库MYSql中去
         * @param statResRDD
         */
        private static void saveToDB(JavaRDD<String> statResRDD){
            final long rddNum=statResRDD.count();
            statResRDD.foreach(new VoidFunction<String>(){
                private long count=0;
                @Override
                public void call(String line) throws Exception {
                    String[] fields=line.split(" ");
                    String userName=fields[0];
                    String passWord=fields[1];
                    Double height=Double.parseDouble(fields[2]);
                    Date birthday=Date.valueOf(fields[3]);
                    Tuple4<String,String,Double,Date> fieldTuple=new Tuple4<String,String,Double,Date>(userName,passWord,height,birthday);
                    if(++count<rddNum){
                        batchSave(fieldTuple,false);
                    }else{
                        batchSave(fieldTuple,true);
                    }
                }
            });
            
            try{
                if(null!=pstat)pstat.close();
                if(null!=conn)conn.close();
            }catch(SQLException e){
                e.printStackTrace();
            }
        }
        
        public static void main(String[] args) {
            SparkConf conf=new SparkConf();
            conf.setAppName("Java Spark local");
            conf.setMaster("local");
            
            //根据Spark配置生成Spark上下文
            JavaSparkContext jsc=new JavaSparkContext(conf);
            
            //读取本地的文本文件成内存中的RDD集合对象
            JavaRDD<String> lineRdd=jsc.textFile("/home/hadoop/test/jdbc/myuser");
            
            //...........其它转换或统计操作................
            
            //存储统计之后的结果到磁盘文件中去
            //lineRdd.saveAsTextFile("/home/hadoop/test/jdbc/output");
            saveToDB(lineRdd);
            
            //关闭Spark上下文
            jsc.close();
        }
    }

    5、初始化MySql数据库服务(节点在192.168.154.134上)

    A、启动MySql数据库服务

    [root@DB03 ~]# cd /software/mysql-5.5.32/multi-data/3306/
    [root@DB03 3306]# ls
    data my.cnf my.cnf.bak mysqld
    [root@DB03 3306]# ./mysqld start
    Starting MySQL...

    B、建立test库

    复制代码
    [root@CloudDeskTop 3306]# cd /software/mysql-5.5.32/bin/
    [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "show databases;"
    +--------------------+
    | Database           |
    +--------------------+
    | information_schema |
    | mysql              |
    | performance_schema |
    +--------------------+
    [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "create database test character set utf8;"
    [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "show databases;"
    +--------------------+
    | Database           |
    +--------------------+
    | information_schema |
    | mysql              |
    | performance_schema |
    | test               |
    +--------------------+
    复制代码

    C、建立myuser表:

    [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "create table if not exists test.myuser(uid int(11) auto_increment primary key,username varchar(30),password varchar(30),height double(10,1),birthday date)engine=myisam charset=utf8;"
    [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "use test;show tables;"
    +-------------------+
    | Tables_in_test    |
    +-------------------+
    | myuser            |
    +-------------------+
    [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "use test;desc test.myuser;"
    +----------+--------------+------+-----+---------+----------------+
    | Field    | Type         | Null | Key | Default | Extra          |
    +----------+--------------+------+-----+---------+----------------+
    | uid      | int(11)      | NO   | PRI | NULL    | auto_increment |
    | username | varchar(30)  | YES  |     | NULL    |                |
    | password | varchar(30)  | YES  |     | NULL    |                |
    | height   | double(10,1) | YES  |     | NULL    |                |
    | birthday | date         | YES  |     | NULL    |                |
    +----------+--------------+------+-----+---------+----------------+
    
    #目前数据库表中还没有数据
    [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.myuser;"

    6、运行并查看数据库中结果

    6.1、在Eclipse4.5中直接运行Spark代码,观察Eclipse控制台输出
    6.2、检查在关系数据库MySql中是否已经存在数据
    [root@CloudDeskTop bin]# ./mysql -h192.168.154.134 -P3306 -uroot -p123456 -e "select * from test.myuser;"
    +-----+----------+----------+--------+------------+
    | uid | username | password | height | birthday   |
    +-----+----------+----------+--------+------------+
    |   1 | lisi     | 123456   |  165.0 | 1998-09-09 |
    |   2 | lisan    | 123ss    |  187.0 | 2009-10-19 |
    |   3 | wangwu   | 123qqwe  |  177.0 | 1990-08-03 |
    +-----+----------+----------+--------+------------+
  • 相关阅读:
    Qt -- 鼠标移入移出事件 enterEvent、leaveEvent
    QT -- QPainter介绍
    Qt -- 浅析QFontMetrics 获取字体宽度,高度
    函数声明后面的const用法
    QT -- 读取file数据/写数据到file
    QT -- QLineEdit按下回车键获取信息
    C++ -- fgets,fputs,fputc,fgetc总结
    QT -- QString / std::string转换为const char*
    C++ -- fopen函数用法
    HTML DOM树
  • 原文地址:https://www.cnblogs.com/mmzs/p/8476286.html
Copyright © 2011-2022 走看看