zoukankan      html  css  js  c++  java
  • Spark 连接mysql 及MongoDB

    在spark 运算过程中,常常需要连接不同类型的数据库以获取或者存储数据,这里将提及Spark如何连接mysql和MongoDB.

    1. 连接mysql , 在1.3版本提出了一个新概念DataFrame ,因此以下方式获取到的是DataFrame,但是可通过JavaRDD<Row> rows = jdbcDF.toJavaRDD()转化为JavaRDD。

    import java.io.Serializable;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.sql.DataFrame;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SQLContext;
    
    public class Main implements Serializable {
    
        private static final org.apache.log4j.Logger LOGGER = org.apache.log4j.Logger.getLogger(Main.class);
    
        private static final String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
        private static final String MYSQL_USERNAME = "expertuser";
        private static final String MYSQL_PWD = "expertuser123";
        private static final String MYSQL_CONNECTION_URL =
                "jdbc:mysql://localhost:3306/employees?user=" + MYSQL_USERNAME + "&password=" + MYSQL_PWD;
    
        private static final JavaSparkContext sc =
                new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));
    
        private static final SQLContext sqlContext = new SQLContext(sc);
    
        public static void main(String[] args) {
            //Data source options
            Map<String, String> options = new HashMap<>();
            options.put("driver", MYSQL_DRIVER);
            options.put("url", MYSQL_CONNECTION_URL); //getConnection 返回一个已经打开的结构化数据库连接,JdbcRDD会自动维护关闭。
            options.put("dbtable",
                        "(select emp_no, concat_ws(' ', first_name, last_name) as full_name from employees) as employees_name");
    //     sql 是查询语句,此查询语句必须包含两处占位符?来作为分割数据库ResulSet的参数,例如:"select title, author from books where ? < = id and id <= ?"
            options.put("partitionColumn", "emp_no");//进行分区的表字段
            options.put("lowerBound", "10001");
    //     owerBound, upperBound, numPartitions 分别为第一、第二占位符,partition的个数。例如,给出lowebound 1,upperbound 20, numpartitions 2,则查询分别为(1, 10)与(11, 20)
            options.put("upperBound", "499999");
            options.put("numPartitions", "10");
    
            //Load MySQL query result as DataFrame
            DataFrame jdbcDF = sqlContext.load("jdbc", options);
            JavaRDD<Row> rows = jdbcDF.toJavaRDD(); 
    List
    <Row> employeeFullNameRows = jdbcDF.collectAsList(); for (Row employeeFullNameRow : employeeFullNameRows) { LOGGER.info(employeeFullNameRow); } } }

    2. 连接mongoDB

    可参考 https://github.com/mongodb/mongo-hadoop/wiki/Spark-Usage

  • 相关阅读:
    visio中插入顶边大括号
    undefined reference to `SetPduPowerConsumptionCnt'的解决办法
    JMS : Java Message Service (Java消息服务)
    C#自己编写的一个函数 可以删除字符串中指定开头和结尾中间的字符串
    完全JSP分页代码
    用ASP+Access创建网站RSS格式内容摘要
    微软考试杭州考点
    split 分隔字符串
    JSP连接SQL Server 2000系统配置
    全球测试管理系统TestDirector(上)
  • 原文地址:https://www.cnblogs.com/xuexue-bit/p/5091005.html
Copyright © 2011-2022 走看看