zoukankan      html  css  js  c++  java
  • spark读取mongodb 并使用fastjson对读取json进行处理

    package spark_read;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.bson.Document;

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import com.mongodb.MongoClient;
    import com.mongodb.MongoClientURI;
    import com.mongodb.spark.MongoSpark;
    import com.mongodb.spark.rdd.api.java.JavaMongoRDD;

    public class Read_Mongo {

    public static void main(String[] args) {
    JavaSparkContext jsc = createJavaSparkContext(args);

    JavaMongoRDD<Document> mongoRDD = MongoSpark.load(jsc);

    mongoRDD.foreach(new VoidFunction<Document>() {

    private static final long serialVersionUID = 1L;

    public void call(Document document) throws Exception {

    String data = document.toJson();

    JSONObject jsonObject = JSON.parseObject(data);

    JSONArray src = jsonObject.getJSONArray("_src");

    JSONObject src_obj = (JSONObject) src.get(0);

    System.out.println(src_obj.getString("site"));

    }
    });
    }

    /**
    创建spark连接,并设置mongodb读写路径信息
    */
    private static JavaSparkContext createJavaSparkContext(final String[] args) {
    String uri = getMongoClientURI(args);
    //dropDatabase(uri);
    SparkConf conf = new SparkConf()
    .setMaster("local")
    .setAppName("MongoSparkConnectorTest")
    .set("spark.app.id", "MongoSparkConnectorTour")
    .set("spark.mongodb.input.uri", uri)
    .set("spark.mongodb.output.uri", uri);

    return new JavaSparkContext(conf);
    }

    /**
    删除mongo已存在文件
    */
    private static void dropDatabase(final String connectionString) {
    MongoClientURI uri = new MongoClientURI(connectionString);
    new MongoClient(uri).dropDatabase(uri.getDatabase());
    }

    /**
    获取mondo读写路径
    */
    private static String getMongoClientURI(final String[] args) {
    String uri;
    if (args.length == 0) {
    uri = "mongodb://ip:27017/mongo库名.表名"; // default
    } else {
    uri = args[0];
    }
    return uri;
    }

    }

  • 相关阅读:
    第二章:变量和简单数据类型
    第四章:操作列表
    第三章:列表简介
    老男孩Day6作业:计算器
    老男孩Day5作业:电子银行购物商城
    老男孩Day4作业:员工信息查询系统
    老男孩Day3作业:工资管理系统
    老男孩Day2作业:购物车程序
    改进地图的vo类
    slam kf
  • 原文地址:https://www.cnblogs.com/hejianxin/p/8065485.html
Copyright © 2011-2022 走看看