zoukankan      html  css  js  c++  java
  • 大数据实践(十) Spark多种开发语言、与Hive集成

    Spark 可以使用scala、Java、Sql、Python、R语言进行开发。

    在bin目录下也提供了spark-shell、spark-sql、sparkR、pyspark等交互方式。

    SparkSQL实现了Hive的模型、Hive在新版本中也建议使用Spark作为计算引擎。

    一、Spark实现wordCount(TopK)

    使用以下文本进行词频统计。

    Java hadoop Spark  Hbase
    Spark hadoop Java 
    hive mysql
    hadoop Spark hive	ClickHouse
    Spark Flink hadoop
    Java scala hadoop
    Spark hadoop Java,hadoop
    
    0、HiveQL/SparkSQL

    在hive中就是写sql,然后转换为MR。现在Hive已经建议使用SparkTez等作为计算引擎。

    hive命令行和spark-sql命令行都是写sql,语句基本一样。

    Spark bin目录下也有sparkR工具可以使用,和这种方式基本一样,就是写SQL.

    select t.word word,count(word) as count from (select explode(split(name,'\s+')) as word from sparkdemotable) t group by word;
    
    --hive beeline命令行
    +-----------------+--------+
    |      word       | count  |
    +-----------------+--------+
    |                 | 1      |
    | Flink           | 1      |
    | Hbase           | 1      |
    | Java            | 3      |
    | Java,hadoop     | 1      |
    | Spark           | 5      |
    | hadoop          | 6      |
    | hive            | 1      |
    | hiveClickHouse  | 1      |
    | mysql           | 1      |
    | scala           | 1      |
    +-----------------+--------+
    11 rows selected (2.162 seconds)
    
    
    
    --求topkey也很方便,写sql就行
    select t.word word,count(word) as count from (select explode(split(name,'\s+')) as word from sparkdemotable) t group by word order by count desc limit 3;
    
    +---------+--------+
    |  word   | count  |
    +---------+--------+
    | hadoop  | 6      |
    | Spark   | 5      |
    | Java    | 3      |
    +---------+--------+
    3 rows selected (3.122 seconds)
    
    
    1、Scala

    Scala开发Spark比较方便快捷。

    在交互式环境下连接到HDFS,使用RDD进行词频统计、排序。

    scala> var res=sc.textFile("hdfs://192.168.x.x:9000/hadoop/SparkDemo.txt")
    res: org.apache.spark.rdd.RDD[String] = hdfs://192.168.x.x:9000/hadoop/SparkDemo.txt MapPartitionsRDD[15] at textFile at <console>:24
    
    scala> var line=res.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    line: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:25
    
    
    scala> line.foreach(println)
    (hive,1)
    (mysql,1)
    (hello,1)
    (java,2)
    (spark,2)
    (hadoop,2)
    
    
    //所有参数都可以显式指定
      def main(args: Array[String]): Unit = {
    
        val conf:SparkConf=new SparkConf().setAppName(s"${args(0)}")
          .setMaster("local[3]") //提交到Spark中,这个参数就不要了
        val sparkContext = new SparkContext(conf)
    
        val lines: RDD[String] = sparkContext.textFile(s"${args(1)}")
    
        val text=lines.flatMap(_.split("\s+")).map(_.toUpperCase).map((_,1)).reduceByKey((_+_))
    
        text.map(_.swap).sortByKey(ascending = false).map(_.swap).take(3).foreach(println)
    
      }
    
    2、 Python

    pyspark是Spark的Python实现,api基本和scala版本一样。

    在bin目录下也可以直接使用pyspark进行python编程(需要有python环境)。

    >>> text=sc.textFile('/usr/local/SparkDemo.txt')
    
    >>> text.first()
    'java hadoop spark'                                                             
    
    >>> lines=text.flatMap(lambda line:line.split(' '))
    >>> line=lines.map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b)
    
    >>> line.collect()
    [('java', 3), ('hadoop', 5), ('hive', 2), ('Spark', 2), ('Flink', 1), ('scala', 1), ('spark', 2), ('hello', 1), ('mysql', 1), ('SparkHadoopJava', 1)]
                           
    >>> line.foreach(lambda t:print(t))
    ('spark', 2)
    ('hello', 1)
    ('mysql', 1)
    ('SparkHadoopJava', 1)
    ('java', 3)
    ('hadoop', 5)
    ('hive', 2)
    ('Spark', 2)
    ('Flink', 1)
    ('scala', 1)
    >>> 
    
    
    #写在文件中
    from pyspark import SparkContext
    from pyspark.sql.session import SparkSession
    
    
    # conf = SparkConf().setAppName('test_parquet')
    sc = SparkContext('local[6]', 'test')
    # spark = SparkSession(sc)
    # sc.setLogLevel("INFO")
    
    text = sc.textFile(name ="wordcount.txt")
    import re
    
    #压扁、分割、聚合
    word = text.flatMap(lambda x:re.split("\W+", x)).map(lambda x:(x, 1)).reduceByKey(lambda a, b: a + b)
    
    #收集
    top = word.collect()
    
    print(top)
    
    sc.stop()
    
    
    3、Java

    在Java中写wordCount,有Java8的函数式编程支持,写起来还好。

    public static void main(String[] args) {
    
            SparkConf conf = new SparkConf();
    
            //wordCount local[3]  dir/wordcount.txt
            conf.setAppName(args[0]).setMaster(args[1]);
    
            //转换成Java上下文
            JavaSparkContext sC = new JavaSparkContext(conf);
    
            //读取文件
            JavaRDD<String> rdd = sC.textFile(args[2]);
    
            //压扁,分割,要返回迭代器
            JavaRDD<String> javaRDD = rdd.flatMap(lines -> Stream.of(lines.split("\W+")).iterator()).
                    map(String::toUpperCase);//转大写
    
    
    //        JavaPairRDD<String, Integer> mapRDD = javaRDD.mapToPair(new PairFunction<String, String, Integer>() {
    //
    //            @Override
    //            public Tuple2<String, Integer> call(String s) throws Exception {
    //                return new Tuple2<String, Integer>(s, 1);
    //            }
    //        });
    
            //转成元组,(word,1)的格式
            JavaPairRDD<String, Integer> mapRDD = javaRDD.mapToPair(w -> Tuple2.<String, Integer>apply(w, 1));
    
            //求和
            JavaPairRDD<String, Integer> reduceRDD = mapRDD.reduceByKey(Integer::sum);
    
            //交换、排序,取TopK
            List<Tuple2<String, Integer>> topK = reduceRDD.map(Tuple2::swap).sortBy(Tuple2::_1, false, 2).
                    map(Tuple2::swap).take(3);
    
            //打印
            topK.forEach(System.out::println);
    
            sC.stop();
    
        }
    

    二、Spark操作Hive

    Spark操作hive:

    1、将hive中conf目录下的hive-site.xml移动Spark的conf目录下。
    2、spark执行命令中加入数据库驱动
    	#如果报错,将jars换成--driver-class-path
    	spark-shell --master local[2] --jars /usr/local/hive/lib/mysql-connector-java-8.0.20.jar
    3、hdfs要启动,即使是Spark本地模式,但是hive的数据是在Hdfs中的。
    

    spark-shell中使用Spark的api进行操作,pyspark的api一样。

    #启动Spark-shell命令行,使用sparkSQL的api操作hive
    spark-shell --master local[2] --driver-class-path /usr/local/hive/lib/mysql-connector-java-8.0.20.jar
    
    scala> spark.sql("select count(name),name from sparkdemotable group by name").show()
    +-----------+--------------------+                                              
    |count(name)|                name|
    +-----------+--------------------+
    |          1|     SparkHadoopJava|
    |          1|spark hadoop java...|
    |          1|   java scala hadoop|
    |          1|          hive mysql|
    |          1|   hadoop Spark hive|
    |          1|  Spark Flink hadoop|
    |          2|   java hadoop spark|
    +-----------+--------------------+
    

    spark-sql中就是写sql,方式和hive中一样

    #启动spark-sql命令行,使用sql的方式操作hive
    >>spark-sql  --driver-class-path /usr/local/hive/lib/mysql-connector-java-8.0.20.jar
    
    select * from sparkdemotable;
    
    java hadoop spark
    java hadoop spark
    spark hadoop java hello
    hive mysql
    hadoop Spark hive
    Spark Flink hadoop
    java scala hadoop
    SparkHadoopJava
    
    #两种方式出来的数据格式都不一样,
    #方式一好快啊,方式二慢了许多,底层应该都是一样的RDD
    
  • 相关阅读:
    springmvc与ajax交互
    [PAT] A1052 Linked List Sorting
    [PAT] A1032 Sharing
    [PAT] A1076 Forwards on Weibo
    [PAT] A1034 Head of a Gang
    [PAT] A1030 Travel Plan
    [PAT] A1031 Hello World for U
    [PAT] A1029 Median
    [PAT] A1028 List Sorting
    [PAT] A1026 Table Tennis
  • 原文地址:https://www.cnblogs.com/cgl-dong/p/14034964.html
Copyright © 2011-2022 走看看