zoukankan      html  css  js  c++  java
  • spark读取mongodb 并使用fastjson对读取json进行处理

    package spark_read;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.bson.Document;

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import com.mongodb.MongoClient;
    import com.mongodb.MongoClientURI;
    import com.mongodb.spark.MongoSpark;
    import com.mongodb.spark.rdd.api.java.JavaMongoRDD;

    public class Read_Mongo {

    public static void main(String[] args) {
    JavaSparkContext jsc = createJavaSparkContext(args);

    JavaMongoRDD<Document> mongoRDD = MongoSpark.load(jsc);

    mongoRDD.foreach(new VoidFunction<Document>() {

    private static final long serialVersionUID = 1L;

    public void call(Document document) throws Exception {

    String data = document.toJson();

    JSONObject jsonObject = JSON.parseObject(data);

    JSONArray src = jsonObject.getJSONArray("_src");

    JSONObject src_obj = (JSONObject) src.get(0);

    System.out.println(src_obj.getString("site"));

    }
    });
    }

    /**
    创建spark连接,并设置mongodb读写路径信息
    */
    private static JavaSparkContext createJavaSparkContext(final String[] args) {
    String uri = getMongoClientURI(args);
    //dropDatabase(uri);
    SparkConf conf = new SparkConf()
    .setMaster("local")
    .setAppName("MongoSparkConnectorTest")
    .set("spark.app.id", "MongoSparkConnectorTour")
    .set("spark.mongodb.input.uri", uri)
    .set("spark.mongodb.output.uri", uri);

    return new JavaSparkContext(conf);
    }

    /**
    删除mongo已存在文件
    */
    private static void dropDatabase(final String connectionString) {
    MongoClientURI uri = new MongoClientURI(connectionString);
    new MongoClient(uri).dropDatabase(uri.getDatabase());
    }

    /**
    获取mondo读写路径
    */
    private static String getMongoClientURI(final String[] args) {
    String uri;
    if (args.length == 0) {
    uri = "mongodb://ip:27017/mongo库名.表名"; // default
    } else {
    uri = args[0];
    }
    return uri;
    }

    }

  • 相关阅读:
    【反射】Java反射机制
    Composer教程之常用命令
    Composer教程之基础用法
    Composer教程之初识Composer
    Composer 的结构详解
    现代 PHP 新特性系列(七) —— 内置的 HTTP 服务器
    现代 PHP 新特性系列(一) —— 命名空间
    现代 PHP 新特性系列(二) —— 善用接口
    现代 PHP 新特性系列(三) —— Trait 概览
    现代 PHP 新特性系列(四) —— 生成器的创建和使用
  • 原文地址:https://www.cnblogs.com/hejianxin/p/8065485.html
Copyright © 2011-2022 走看看