zoukankan      html  css  js  c++  java
  • spark的javaAPI,对Json读取和操作,视图创建和sql的操作

    json格式

    {"user_id":3,"user_name":"张三3","user_age":17,"user_balance":20.53}
    {"user_id":4,"user_name":"张三4","user_age":15,"user_balance":20.53}
    {"user_id":5,"user_name":"张三5","user_age":19,"user_balance":20.54}
    {"user_id":6,"user_name":"张三6","user_age":15,"user_balance":20.55}

    代码:  读取json格式的文件

    import org.apache.spark.sql.{DataFrame, SQLContext, SparkSession}
    import org.apache.spark.{SparkConf, SparkContext}
    
    object Dome {
      /**
        * 单词统计
        */
      def fun2: Unit = {
        val conf = new SparkConf()
        conf.setAppName("wordCount")
        conf.setMaster("local")
        val sc = new SparkContext(conf)
        val line = sc.textFile("E:\words.txt")
        line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,true).foreach(println(_))
        sc.stop()
      }
    
      /**
        *从hdfs上读取,需要打成jar包
        */
      def fun3(args:Array[String]): Unit = {
        val conf = new SparkConf().setAppName("WC")
        val sc = new SparkContext(conf)
        val line = sc.textFile(args(0))
        line.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile(args(1))
        sc.stop()
      }
    
      /**
        * 对统计相邻单词出现的个数
        * A;B;C;D;E;F
        * C;A;D;C;E;S
        * C;E;F
        */
      def fun(): Unit = {
        val conf = new SparkConf()
        conf.setAppName("wordCount")
        conf.setMaster("local")
        val sc = new SparkContext(conf)
        val line = sc.textFile("E:\words.txt")
        line.map(_.split(";")).flatMap(x=>{
          for(i<-0 until x.length-1) yield (x(i)+","+x(i+1),1)
        }).foreach(println(_))
        sc.stop()
      }
    
      /**
        * 读取JSON格式的内容
        *      1.创建SparkConf
        *        2.创建SparkContext
        *        3.得到SqlContext
        *        4.得到DataFrame
        *        5.显示DataFrame中的数据
        */
      def fun1(): Unit ={
        val conf = new SparkConf().setAppName("JSON").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        val people = sqlContext.read.format("JSON").load("E:\user.json")
        people.show(30,100)//num 读取前面多山行  truncat显示列宽
        sc.stop()
      }
    
      /**
        * 不过时方法,参考官网
        */
      def fun4(): Unit ={
        val conf = new SparkConf().setAppName("JSON").setMaster("local")
        val spark = SparkSession.builder().appName("JSONDome")
          .config(conf).getOrCreate()
        val people:DataFrame = spark.read.json("E:\user.json")
        people.show(4,100)
        people.printSchema()  //显示表结构
        people.select("name").show()
        import spark.implicits._  //条件查询需要将数据进行隐式转换
        people.select($"name", $"age"+10).show()
        people.select($"name", $"age">15).show()
        people.select("name","age").filter( $"age">15).show()
        people.groupBy("age").count().show()  //返回DataFrame对象
        spark.stop()
      }
      /**
        * 生成临时表,使用SQL查询语句查询
        */
      def fun5(): Unit ={
        val conf = new SparkConf().setAppName("JSON").setMaster("local")
        val spark = SparkSession.builder().appName("JSONDome")
          .config(conf).getOrCreate()
        val people:DataFrame = spark.read.json("E:\user.json")
    
        //createGlobalTempView是临时的视图
        people.createOrReplaceTempView("user")  //创建
        //sql返回的还是DataFrame
        spark.sql("select * from user").show()
        spark.sql("select id,name,age,balance yue from user where age>15").show() //起别名不能用中文
    //    spark.sql("update set age = age+100 from user")   //不能对表中数据进行修改
        spark.sql("select * from user").show()
    
        people.createGlobalTempView("user1")  //创建的是临时表,如果表名与user相同,会覆盖之前一个表
        spark.sql("select * from global_temp.user1").show()
        spark.newSession().sql("select * from global_temp.user1").show()  //  跨会话框操作
        spark.sql("select id,name,age,balance yue from global_temp.user1 where age>15").show() //起别名不能用中文
    
        people.createTempView("user2")  //使用这个创建不会覆盖之前参 存在的表
        spark.sql("select * from user2").show()
        //spark.sql("alter table user2 drop COLUMN balance RESTRICT") //不能修改表结构
        spark.sql("select * from user2").show()
        spark.stop()
      }
    
      /**
        * 读取多个文件
        */
      def fun6(): Unit ={
        val conf = new SparkConf().setAppName("JSON").setMaster("local")
        val spark = SparkSession.builder().appName("JSONDome")
          .config(conf).getOrCreate()
        val people:DataFrame = spark.read.json("E:\empAndDept/*.json")
        people.show(100)
      }
    
      def main(args: Array[String]): Unit = {
        fun6()
      }
    }

     代码:  读取远程数据库中的表数据,并写入磁盘

    package sql
    
    import java.util.Properties
    
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    
    object SQLDome {
      /**
        * 读取数据库中的内容
        */
      def fun1(): Unit ={
        val conf = new SparkConf().setAppName("JSON").setMaster("local")
        val spark = SparkSession.builder().appName("JSONDome")
          .config(conf).getOrCreate()
        val url = "jdbc:mysql://192.168.188.130/mydatabase?user=root&password=root"
        val tableEmp = "emp"
        val tableDept = "dept"
        val pop = new Properties()
        pop.put("Driver","com.mysql.jdbc.Driver")
        val emp = spark.read.jdbc(url,tableEmp,pop).createOrReplaceTempView("emp")
        val dept = spark.read.jdbc(url,tableDept,pop).createTempView("dept")
    
        spark.sql("select * from emp").show(10)
        spark.sql("select * from dept").show()
    
        //连表和条件查询
        val p = spark.sql("select * from dept,emp where dept.dept_id=emp.d_id and dept.dept_id = 3")
        p.show()
    
        /**
          * overwrite 重写
          * append 追加
          * error 如果文件存在,报错(默认)
          * ignore 如果文件存在,不做操作
          */
        p.write.mode("overwrite").json("E:\empAndDept")
        spark.sql("select * from dept").write.mode("overwrite").json("E:\dept")
      }
    
      def main(args: Array[String]): Unit = {
        fun1()
      }
    }

    pom.xml依赖和插件的配置(idea)

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com2</groupId>
        <artifactId>spark2</artifactId>
        <version>1.0-SNAPSHOT</version>
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <encoding>UTF-8</encoding>
            <scala.version>2.11.11</scala.version>
            <spark.version>2.1.1</spark.version>
            <hadoop.version>2.7.3</hadoop.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.45</version>
            </dependency>
        </dependencies>
        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
    
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    
    </project>
  • 相关阅读:
    多进程或多线程实现并发
    linux服务器配置pyspider出现Could not run curl-config 的解决方式
    js的逆向解析
    修改linux终端命令行各字体颜色
    利用Centos服务器来搭建自己的splash,不再被安装的各种环境繁琐而担忧
    配置 Docker 加速器:适用于 Ubuntu14.04、Debian、CentOS6 、CentOS7、Fedora、Arch Linux、openSUSE Leap 42.1
    一分30秒 kali 开机显示 a start job is running for dev-disk 处理
    虚拟机Ubuntu16.04无法进入图形界面 The system is running in low-graphics mode
    vmware ubuntu硬盘空间不够用,空间扩展
    pip错误-failed to create process/fatal error in launcher
  • 原文地址:https://www.cnblogs.com/han-guang-xue/p/10036987.html
Copyright © 2011-2022 走看看