zoukankan      html  css  js  c++  java
  • 项目实战从0到1之Spark(8)spark读取mongodb数据写入hive表中

    一 环境:

    spark-2.2.0;
    
    hive-1.1.0;
    
    scala-2.11.8;
    
    hadoop-2.6.0-cdh-5.15.0;
    
    jdk-1.8;

    mongodb-2.4.10;

    二.数据情况:

    MongoDB数据格式
    {
        "_id" : ObjectId("5ba0569cafc9ec432bd310a3"),
        "id" : 7,
        "name" : "7mongoDBi am using mongodb now",
        "location" : "shanghai",
        "sex" : "male",
        "position" : "big data platform engineer"
    }
    Hive普通表
    
    create table mgtohive_2(
    id string,
    name string,
    age string,
    deptno string
    )row format delimited fields terminated by '	';

    create table mgtohive_2(
    id int,
    name string,
    location string,
    sex string,
    position string
    )
    row format delimited fields terminated by ' ';
    Hive分区表

    create table mg_hive_external(
    id int,
    name string,
    location string,
    position string
    )
    PARTITIONED BY(sex string)
    row format delimited fields terminated by ' ';

    .Eclipse+Maven+Java
    3.1 依赖:

    <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-sql_2.11</artifactId>
          <version>2.2.0</version>
        </dependency>
        <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-hive_2.11</artifactId>
          <version>2.2.0</version>
        </dependency>
        <dependency>
          <groupId>org.mongodb</groupId>
          <artifactId>mongo-java-driver</artifactId>
          <version>3.6.3</version>
        </dependency>
        <dependency>
          <groupId>org.mongodb.spark</groupId>
          <artifactId>mongo-spark-connector_2.11</artifactId>
          <version>2.2.2</version>
        </dependency>

    3.2 代码:

    package com.mobanker.mongo2hive.Mongo2Hive;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.hive.HiveContext;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    import org.bson.Document;
    
    import com.mongodb.spark.MongoSpark;
    
    import java.io.File;
    import java.util.ArrayList;
    import java.util.List;
    
    public class Mongo2Hive {
        public static void main(String[] args) {
            //spark 2.x
            String warehouseLocation = new File("spark-warehouse").getAbsolutePath();
            SparkSession spark = SparkSession.builder()
                    .master("local[2]")
                    .appName("SparkReadMgToHive")
                    .config("spark.sql.warehouse.dir", warehouseLocation)
                    .config("spark.mongodb.input.uri", "mongodb://10.40.20.47:27017/test_db.test_table")
                    .enableHiveSupport()
                    .getOrCreate();
            JavaSparkContext sc = new JavaSparkContext(spark.sparkContext());
    
            //  spark 1.x
            //  JavaSparkContext sc = new JavaSparkContext(conf);
            //  sc.addJar("/Users/mac/zhangchun/jar/mongo-spark-connector_2.11-2.2.2.jar");
            //  sc.addJar("/Users/mac/zhangchun/jar/mongo-java-driver-3.6.3.jar");
            //  SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkReadMgToHive");
            //  conf.set("spark.mongodb.input.uri", "mongodb://127.0.0.1:27017/test.mgtest");
            //  conf.set("spark. serializer","org.apache.spark.serializer.KryoSerialzier");
            //  HiveContext sqlContext = new HiveContext(sc);
            //  //create df from mongo
            //  Dataset<Row> df = MongoSpark.read(sqlContext).load().toDF();
            //  df.select("id","name","name").show();
    
            String querysql= "select id,name,location,sex,position from mgtohive_2 b";
            String opType ="P";
    
            SQLUtils sqlUtils = new SQLUtils();
            List<String> column = sqlUtils.getColumns(querysql);
    
            //create rdd from mongo
            JavaRDD<Document> rdd = MongoSpark.load(sc);
            //将Document转成Object
            JavaRDD<Object> Ordd = rdd.map(new Function<Document, Object>() {
                public Object call(Document document){
                    List list = new ArrayList();
                    for (int i = 0; i < column.size(); i++) {
                        list.add(String.valueOf(document.get(column.get(i))));
                    }
                    return list;
    
    //                return list.toString().replace("[","").replace("]","");
                }
            });
            System.out.println(Ordd.first());
            //通过编程方式将RDD转成DF
            List ls= new ArrayList();
            for (int i = 0; i < column.size(); i++) {
                ls.add(column.get(i));
            }
            String schemaString = ls.toString().replace("[","").replace("]","").replace(" ","");
            System.out.println(schemaString);
    
            List<StructField> fields = new ArrayList<StructField>();
            for (String fieldName : schemaString.split(",")) {
                StructField field = DataTypes.createStructField(fieldName, DataTypes.StringType, true);
                fields.add(field);
            }
            StructType schema = DataTypes.createStructType(fields);
    
            JavaRDD<Row> rowRDD = Ordd.map((Function<Object, Row>) record -> {
                List fileds = (List) record;
                // String[] attributes = record.toString().split(",");
                return RowFactory.create(fileds.toArray());
            });
    
            Dataset<Row> df = spark.createDataFrame(rowRDD,schema);
    
            //将DF写入到Hive中
            //选择Hive数据库
            spark.sql("use datalake");
            //注册临时表
            df.registerTempTable("mgtable");
    
            if ("O".equals(opType.trim())) {
                System.out.println("数据插入到Hive ordinary table");
                Long t1 = System.currentTimeMillis();
                spark.sql("insert into mgtohive_2 " + querysql + " " + "where b.id not in (select id from mgtohive_2)");
    
                System.out.println("insert into mgtohive_2 " + querysql + " ");
                
                Long t2 = System.currentTimeMillis();
                System.out.println("共耗时:" + (t2 - t1) / 60000 + "分钟");
            }else if ("P".equals(opType.trim())) {
    
            System.out.println("数据插入到Hive dynamic partition table");
            Long t3 = System.currentTimeMillis();
            //必须设置以下参数 否则报错
            spark.sql("set hive.exec.dynamic.partition.mode=nonstrict");
            //sex为分区字段   select语句最后一个字段必须是sex
            spark.sql("insert into mg_hive_external partition(sex) select id,name,location,position,sex from mgtable b where b.id not in (select id from mg_hive_external)");
            Long t4 = System.currentTimeMillis();
            System.out.println("共耗时:"+(t4 -t3)/60000+ "分钟");
            }
            spark.stop();
        }
    }

    工具类:

    package com.mobanker.mongo2hive.Mongo2Hive;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class SQLUtils {
        public List<String> getColumns(String querysql){
            List<String> column = new ArrayList<String>();
            String tmp = querysql.substring(querysql.indexOf("select") + 6,
                    querysql.indexOf("from")).trim();
            if (tmp.indexOf("*") == -1){
                String cols[] = tmp.split(",");
                for (String c:cols){
                    column.add(c);
                }
            }
            return column;
        }
    
        public String getTBname(String querysql){
            String tmp = querysql.substring(querysql.indexOf("from")+4).trim();
            int sx = tmp.indexOf(" ");
            if(sx == -1){
                return tmp;
            }else {
                return tmp.substring(0,sx);
            }
        }
    }

    四 错误解决办法:

     下载cdh集群Hive的hive-site.xml文件,在项目中新建resources文件夹,讲hive-site.xml配置文件放入其中:

    五 执行情况:

    耗时14mins,写入hive表10398582条数据:

    作者:大码王

    -------------------------------------------

    个性签名:独学而无友,则孤陋而寡闻。做一个灵魂有趣的人!

    如果觉得这篇文章对你有小小的帮助的话,记得在右下角点个“推荐”哦,博主在此感谢!

    万水千山总是情,打赏一分行不行,所以如果你心情还比较高兴,也是可以扫码打赏博主,哈哈哈(っ•?ω•?)っ???!

  • 相关阅读:
    quartz 时间表达式----- Cron表达式详解
    js正则表达式
    tomcat的server.xml文件的详解
    spring web.xml中 过滤器(Filter)的工作原理和代码演示
    tomcat端口占用解决方案
    Spring集成Hibernate映射文件的4种方式
    Hibernate 一二级缓存的使用场景
    Hibernate各种主键生成策略
    Pip的安装,环境变量的配置以及使用方法
    解决pycharm无法调用pip安装的包
  • 原文地址:https://www.cnblogs.com/huanghanyu/p/13634032.html
Copyright © 2011-2022 走看看