zoukankan      html  css  js  c++  java
  • 大数据实践(十) Spark多种开发语言、与Hive集成

    Spark 可以使用scala、Java、Sql、Python、R语言进行开发。

    在bin目录下也提供了spark-shell、spark-sql、sparkR、pyspark等交互方式。

    SparkSQL实现了Hive的模型、Hive在新版本中也建议使用Spark作为计算引擎。

    一、Spark实现wordCount(TopK)

    使用以下文本进行词频统计。

    Java hadoop Spark  Hbase
    Spark hadoop Java 
    hive mysql
    hadoop Spark hive	ClickHouse
    Spark Flink hadoop
    Java scala hadoop
    Spark hadoop Java,hadoop
    
    0、HiveQL/SparkSQL

    在hive中就是写sql,然后转换为MR。现在Hive已经建议使用SparkTez等作为计算引擎。

    hive命令行和spark-sql命令行都是写sql,语句基本一样。

    Spark bin目录下也有sparkR工具可以使用,和这种方式基本一样,就是写SQL.

    select t.word word,count(word) as count from (select explode(split(name,'\s+')) as word from sparkdemotable) t group by word;
    
    --hive beeline命令行
    +-----------------+--------+
    |      word       | count  |
    +-----------------+--------+
    |                 | 1      |
    | Flink           | 1      |
    | Hbase           | 1      |
    | Java            | 3      |
    | Java,hadoop     | 1      |
    | Spark           | 5      |
    | hadoop          | 6      |
    | hive            | 1      |
    | hiveClickHouse  | 1      |
    | mysql           | 1      |
    | scala           | 1      |
    +-----------------+--------+
    11 rows selected (2.162 seconds)
    
    
    
    --求topkey也很方便,写sql就行
    select t.word word,count(word) as count from (select explode(split(name,'\s+')) as word from sparkdemotable) t group by word order by count desc limit 3;
    
    +---------+--------+
    |  word   | count  |
    +---------+--------+
    | hadoop  | 6      |
    | Spark   | 5      |
    | Java    | 3      |
    +---------+--------+
    3 rows selected (3.122 seconds)
    
    
    1、Scala

    Scala开发Spark比较方便快捷。

    在交互式环境下连接到HDFS,使用RDD进行词频统计、排序。

    scala> var res=sc.textFile("hdfs://192.168.x.x:9000/hadoop/SparkDemo.txt")
    res: org.apache.spark.rdd.RDD[String] = hdfs://192.168.x.x:9000/hadoop/SparkDemo.txt MapPartitionsRDD[15] at textFile at <console>:24
    
    scala> var line=res.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
    line: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:25
    
    
    scala> line.foreach(println)
    (hive,1)
    (mysql,1)
    (hello,1)
    (java,2)
    (spark,2)
    (hadoop,2)
    
    
    //所有参数都可以显式指定
      def main(args: Array[String]): Unit = {
    
        val conf:SparkConf=new SparkConf().setAppName(s"${args(0)}")
          .setMaster("local[3]") //提交到Spark中,这个参数就不要了
        val sparkContext = new SparkContext(conf)
    
        val lines: RDD[String] = sparkContext.textFile(s"${args(1)}")
    
        val text=lines.flatMap(_.split("\s+")).map(_.toUpperCase).map((_,1)).reduceByKey((_+_))
    
        text.map(_.swap).sortByKey(ascending = false).map(_.swap).take(3).foreach(println)
    
      }
    
    2、 Python

    pyspark是Spark的Python实现,api基本和scala版本一样。

    在bin目录下也可以直接使用pyspark进行python编程(需要有python环境)。

    >>> text=sc.textFile('/usr/local/SparkDemo.txt')
    
    >>> text.first()
    'java hadoop spark'                                                             
    
    >>> lines=text.flatMap(lambda line:line.split(' '))
    >>> line=lines.map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b)
    
    >>> line.collect()
    [('java', 3), ('hadoop', 5), ('hive', 2), ('Spark', 2), ('Flink', 1), ('scala', 1), ('spark', 2), ('hello', 1), ('mysql', 1), ('SparkHadoopJava', 1)]
                           
    >>> line.foreach(lambda t:print(t))
    ('spark', 2)
    ('hello', 1)
    ('mysql', 1)
    ('SparkHadoopJava', 1)
    ('java', 3)
    ('hadoop', 5)
    ('hive', 2)
    ('Spark', 2)
    ('Flink', 1)
    ('scala', 1)
    >>> 
    
    
    #写在文件中
    from pyspark import SparkContext
    from pyspark.sql.session import SparkSession
    
    
    # conf = SparkConf().setAppName('test_parquet')
    sc = SparkContext('local[6]', 'test')
    # spark = SparkSession(sc)
    # sc.setLogLevel("INFO")
    
    text = sc.textFile(name ="wordcount.txt")
    import re
    
    #压扁、分割、聚合
    word = text.flatMap(lambda x:re.split("\W+", x)).map(lambda x:(x, 1)).reduceByKey(lambda a, b: a + b)
    
    #收集
    top = word.collect()
    
    print(top)
    
    sc.stop()
    
    
    3、Java

    在Java中写wordCount,有Java8的函数式编程支持,写起来还好。

    public static void main(String[] args) {
    
            SparkConf conf = new SparkConf();
    
            //wordCount local[3]  dir/wordcount.txt
            conf.setAppName(args[0]).setMaster(args[1]);
    
            //转换成Java上下文
            JavaSparkContext sC = new JavaSparkContext(conf);
    
            //读取文件
            JavaRDD<String> rdd = sC.textFile(args[2]);
    
            //压扁,分割,要返回迭代器
            JavaRDD<String> javaRDD = rdd.flatMap(lines -> Stream.of(lines.split("\W+")).iterator()).
                    map(String::toUpperCase);//转大写
    
    
    //        JavaPairRDD<String, Integer> mapRDD = javaRDD.mapToPair(new PairFunction<String, String, Integer>() {
    //
    //            @Override
    //            public Tuple2<String, Integer> call(String s) throws Exception {
    //                return new Tuple2<String, Integer>(s, 1);
    //            }
    //        });
    
            //转成元组,(word,1)的格式
            JavaPairRDD<String, Integer> mapRDD = javaRDD.mapToPair(w -> Tuple2.<String, Integer>apply(w, 1));
    
            //求和
            JavaPairRDD<String, Integer> reduceRDD = mapRDD.reduceByKey(Integer::sum);
    
            //交换、排序,取TopK
            List<Tuple2<String, Integer>> topK = reduceRDD.map(Tuple2::swap).sortBy(Tuple2::_1, false, 2).
                    map(Tuple2::swap).take(3);
    
            //打印
            topK.forEach(System.out::println);
    
            sC.stop();
    
        }
    

    二、Spark操作Hive

    Spark操作hive:

    1、将hive中conf目录下的hive-site.xml移动Spark的conf目录下。
    2、spark执行命令中加入数据库驱动
    	#如果报错,将jars换成--driver-class-path
    	spark-shell --master local[2] --jars /usr/local/hive/lib/mysql-connector-java-8.0.20.jar
    3、hdfs要启动,即使是Spark本地模式,但是hive的数据是在Hdfs中的。
    

    spark-shell中使用Spark的api进行操作,pyspark的api一样。

    #启动Spark-shell命令行,使用sparkSQL的api操作hive
    spark-shell --master local[2] --driver-class-path /usr/local/hive/lib/mysql-connector-java-8.0.20.jar
    
    scala> spark.sql("select count(name),name from sparkdemotable group by name").show()
    +-----------+--------------------+                                              
    |count(name)|                name|
    +-----------+--------------------+
    |          1|     SparkHadoopJava|
    |          1|spark hadoop java...|
    |          1|   java scala hadoop|
    |          1|          hive mysql|
    |          1|   hadoop Spark hive|
    |          1|  Spark Flink hadoop|
    |          2|   java hadoop spark|
    +-----------+--------------------+
    

    spark-sql中就是写sql,方式和hive中一样

    #启动spark-sql命令行,使用sql的方式操作hive
    >>spark-sql  --driver-class-path /usr/local/hive/lib/mysql-connector-java-8.0.20.jar
    
    select * from sparkdemotable;
    
    java hadoop spark
    java hadoop spark
    spark hadoop java hello
    hive mysql
    hadoop Spark hive
    Spark Flink hadoop
    java scala hadoop
    SparkHadoopJava
    
    #两种方式出来的数据格式都不一样,
    #方式一好快啊,方式二慢了许多,底层应该都是一样的RDD
    
  • 相关阅读:
    [JSOI2007][BZOJ1031] 字符加密Cipher|后缀数组
    leetcode Flatten Binary Tree to Linked List
    leetcode Pascal's Triangle
    leetcode Triangle
    leetcode Valid Palindrome
    leetcode Word Ladder
    leetcode Longest Consecutive Sequence
    leetcode Sum Root to Leaf Numbers
    leetcode Clone Graph
    leetcode Evaluate Reverse Polish Notation
  • 原文地址:https://www.cnblogs.com/cgl-dong/p/14034964.html
Copyright © 2011-2022 走看看