zoukankan      html  css  js  c++  java
  • sparksql加载mongodb指定字段,并对加载进来的json做解析

    如果是要读取mongo全表的数据的话,推荐使用mongo-spark,更简单方便

    我个人的需求是要读取mongo的指定列,因为全表数据量太大,

    并对加载进来的json数据进行解析,解析框架用的是alibaba封装的fastjson框架。

    package spark_read;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    import com.mongodb.MongoClient;
    import com.mongodb.MongoClientURI;

    public class Read_Mongo {

    public static void main(String[] args) {
    JavaSparkContext jsc = createJavaSparkContext(args);
    String uri = getMongoClientURI(args);

    SQLContext sqlContext = new SQLContext(jsc);

    //只加载需要读取的字段,减少内存的消耗,并用sparksql的方式进行数据融合,减少资源消耗并提升开发效率以及代码简洁性
    String list = "_record_id,_in_time,_src";
    String[] strings = list.split(",");

    //对想要获取的字段切割后,创建structFields,然后创建StructType(DF的schema)
    List<StructField> structFields = new ArrayList<StructField>();
    for (String s : strings) {
    structFields.add(DataTypes.createStructField(s, DataTypes.StringType, true ));
    }
    StructType schema = DataTypes.createStructType( structFields );
    //将options的配置信息存储到一个map里
    Map<String, String> map = new HashMap<String, String>();
    map.put("uri",uri);
    map.put("database", " ");
    map.put("collection"," ");
    //利用上面自定义的DF的schema 和配置信息读取momngo指定表里的指定列,对读取的列声明成虚拟表,可以进一步操作,如有需要可以转换成RDD进行操作
    Dataset<Row> load = sqlContext.read().schema(schema).format("com.mongodb.spark.sql").options(map).load();
    load.registerTempTable("mongo");
    Dataset<Row> result = sqlContext.sql("select * from mongo");

    result.toJavaRDD().foreach(new VoidFunction<Row>() {
    private static final long serialVersionUID = 1L;
    public void call(Row arg0) throws Exception {
    System.out.println(arg0.toString());
    }
    });


    //一次性加载momgo表里的所有数据,利用alibaba封装的json转换工具转换成想要的格式
    // JavaMongoRDD<Document> mongoRDD = MongoSpark.load(jsc);
    // mongoRDD.foreach(new VoidFunction<Document>() {
    //
    // private static final long serialVersionUID = 1L;
    //
    // public void call(Document document) throws Exception {
    //
    // String data = document.toJson();
    //
    // JSONObject jsonObject = JSON.parseObject(data);
    //
    // JSONArray src = jsonObject.getJSONArray("_src");
    //
    // JSONObject src_obj = (JSONObject) src.get(0);
    //
    // System.out.println(src_obj.getString("site"));

    // }
    // });
    }

    /**
    创建spark连接,并设置mongodb读写路径信息
    */
    private static JavaSparkContext createJavaSparkContext(final String[] args) {
    String uri = getMongoClientURI(args);
    //dropDatabase(uri);
    SparkConf conf = new SparkConf()
    .setMaster("local")
    .setAppName("MongoSparkConnectorTest")
    .set("spark.app.id", "MongoSparkConnectorTour")
    .set("spark.mongodb.input.uri", uri)
    .set("spark.mongodb.output.uri", uri);

    return new JavaSparkContext(conf);
    }

    /**
    删除mongo已存在文件
    */
    private static void dropDatabase(final String connectionString) {
    MongoClientURI uri = new MongoClientURI(connectionString);
    new MongoClient(uri).dropDatabase(uri.getDatabase());
    }

    /**
    获取mondo读写路径
    */
    private static String getMongoClientURI(final String[] args) {
    String uri;
    if (args.length == 0) {
    uri = "mongodb://ip:27017"; // default
    } else {
    uri = args[0];
    }
    return uri;
    }

    }

  • 相关阅读:
    设计模式——设计模式之禅day2
    和阿文一起学H5——音乐素材
    mysql数据库问题
    vue-cli脚手架工具
    webpack总结
    sql和nosql区别
    MongoDB基本命令
    >nbsp修改
    字典
    列表
  • 原文地址:https://www.cnblogs.com/hejianxin/p/8066705.html
Copyright © 2011-2022 走看看