Spark 可以使用scala、Java、Sql、Python、R语言进行开发。
在bin目录下也提供了spark-shell、spark-sql、sparkR、pyspark等交互方式。
SparkSQL实现了Hive的模型、Hive在新版本中也建议使用Spark作为计算引擎。
一、Spark实现wordCount(TopK)
使用以下文本进行词频统计。
Java hadoop Spark Hbase
Spark hadoop Java
hive mysql
hadoop Spark hive ClickHouse
Spark Flink hadoop
Java scala hadoop
Spark hadoop Java,hadoop
0、HiveQL/SparkSQL
在hive中就是写sql,然后转换为MR。现在Hive已经建议使用SparkTez等作为计算引擎。
hive
命令行和spark-sql
命令行都是写sql,语句基本一样。
Spark
bin目录下也有sparkR
工具可以使用,和这种方式基本一样,就是写SQL.
select t.word word,count(word) as count from (select explode(split(name,'\s+')) as word from sparkdemotable) t group by word;
--hive beeline命令行
+-----------------+--------+
| word | count |
+-----------------+--------+
| | 1 |
| Flink | 1 |
| Hbase | 1 |
| Java | 3 |
| Java,hadoop | 1 |
| Spark | 5 |
| hadoop | 6 |
| hive | 1 |
| hiveClickHouse | 1 |
| mysql | 1 |
| scala | 1 |
+-----------------+--------+
11 rows selected (2.162 seconds)
--求topkey也很方便,写sql就行
select t.word word,count(word) as count from (select explode(split(name,'\s+')) as word from sparkdemotable) t group by word order by count desc limit 3;
+---------+--------+
| word | count |
+---------+--------+
| hadoop | 6 |
| Spark | 5 |
| Java | 3 |
+---------+--------+
3 rows selected (3.122 seconds)
1、Scala
Scala开发Spark比较方便快捷。
在交互式环境下连接到HDFS,使用RDD进行词频统计、排序。
scala> var res=sc.textFile("hdfs://192.168.x.x:9000/hadoop/SparkDemo.txt")
res: org.apache.spark.rdd.RDD[String] = hdfs://192.168.x.x:9000/hadoop/SparkDemo.txt MapPartitionsRDD[15] at textFile at <console>:24
scala> var line=res.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
line: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[18] at reduceByKey at <console>:25
scala> line.foreach(println)
(hive,1)
(mysql,1)
(hello,1)
(java,2)
(spark,2)
(hadoop,2)
//所有参数都可以显式指定
def main(args: Array[String]): Unit = {
val conf:SparkConf=new SparkConf().setAppName(s"${args(0)}")
.setMaster("local[3]") //提交到Spark中,这个参数就不要了
val sparkContext = new SparkContext(conf)
val lines: RDD[String] = sparkContext.textFile(s"${args(1)}")
val text=lines.flatMap(_.split("\s+")).map(_.toUpperCase).map((_,1)).reduceByKey((_+_))
text.map(_.swap).sortByKey(ascending = false).map(_.swap).take(3).foreach(println)
}
2、 Python
pyspark是Spark的Python实现,api基本和scala版本一样。
在bin目录下也可以直接使用
pyspark
进行python编程(需要有python环境)。
>>> text=sc.textFile('/usr/local/SparkDemo.txt')
>>> text.first()
'java hadoop spark'
>>> lines=text.flatMap(lambda line:line.split(' '))
>>> line=lines.map(lambda x:(x,1)).reduceByKey(lambda a,b:a+b)
>>> line.collect()
[('java', 3), ('hadoop', 5), ('hive', 2), ('Spark', 2), ('Flink', 1), ('scala', 1), ('spark', 2), ('hello', 1), ('mysql', 1), ('SparkHadoopJava', 1)]
>>> line.foreach(lambda t:print(t))
('spark', 2)
('hello', 1)
('mysql', 1)
('SparkHadoopJava', 1)
('java', 3)
('hadoop', 5)
('hive', 2)
('Spark', 2)
('Flink', 1)
('scala', 1)
>>>
#写在文件中
from pyspark import SparkContext
from pyspark.sql.session import SparkSession
# conf = SparkConf().setAppName('test_parquet')
sc = SparkContext('local[6]', 'test')
# spark = SparkSession(sc)
# sc.setLogLevel("INFO")
text = sc.textFile(name ="wordcount.txt")
import re
#压扁、分割、聚合
word = text.flatMap(lambda x:re.split("\W+", x)).map(lambda x:(x, 1)).reduceByKey(lambda a, b: a + b)
#收集
top = word.collect()
print(top)
sc.stop()
3、Java
在Java中写wordCount,有Java8的函数式编程支持,写起来还好。
public static void main(String[] args) {
SparkConf conf = new SparkConf();
//wordCount local[3] dir/wordcount.txt
conf.setAppName(args[0]).setMaster(args[1]);
//转换成Java上下文
JavaSparkContext sC = new JavaSparkContext(conf);
//读取文件
JavaRDD<String> rdd = sC.textFile(args[2]);
//压扁,分割,要返回迭代器
JavaRDD<String> javaRDD = rdd.flatMap(lines -> Stream.of(lines.split("\W+")).iterator()).
map(String::toUpperCase);//转大写
// JavaPairRDD<String, Integer> mapRDD = javaRDD.mapToPair(new PairFunction<String, String, Integer>() {
//
// @Override
// public Tuple2<String, Integer> call(String s) throws Exception {
// return new Tuple2<String, Integer>(s, 1);
// }
// });
//转成元组,(word,1)的格式
JavaPairRDD<String, Integer> mapRDD = javaRDD.mapToPair(w -> Tuple2.<String, Integer>apply(w, 1));
//求和
JavaPairRDD<String, Integer> reduceRDD = mapRDD.reduceByKey(Integer::sum);
//交换、排序,取TopK
List<Tuple2<String, Integer>> topK = reduceRDD.map(Tuple2::swap).sortBy(Tuple2::_1, false, 2).
map(Tuple2::swap).take(3);
//打印
topK.forEach(System.out::println);
sC.stop();
}
二、Spark操作Hive
Spark操作hive:
1、将hive中conf目录下的hive-site.xml移动Spark的conf目录下。
2、spark执行命令中加入数据库驱动
#如果报错,将jars换成--driver-class-path
spark-shell --master local[2] --jars /usr/local/hive/lib/mysql-connector-java-8.0.20.jar
3、hdfs要启动,即使是Spark本地模式,但是hive的数据是在Hdfs中的。
spark-shell中使用Spark的api进行操作,pyspark的api一样。
#启动Spark-shell命令行,使用sparkSQL的api操作hive
spark-shell --master local[2] --driver-class-path /usr/local/hive/lib/mysql-connector-java-8.0.20.jar
scala> spark.sql("select count(name),name from sparkdemotable group by name").show()
+-----------+--------------------+
|count(name)| name|
+-----------+--------------------+
| 1| SparkHadoopJava|
| 1|spark hadoop java...|
| 1| java scala hadoop|
| 1| hive mysql|
| 1| hadoop Spark hive|
| 1| Spark Flink hadoop|
| 2| java hadoop spark|
+-----------+--------------------+
spark-sql中就是写sql,方式和hive中一样
#启动spark-sql命令行,使用sql的方式操作hive
>>spark-sql --driver-class-path /usr/local/hive/lib/mysql-connector-java-8.0.20.jar
select * from sparkdemotable;
java hadoop spark
java hadoop spark
spark hadoop java hello
hive mysql
hadoop Spark hive
Spark Flink hadoop
java scala hadoop
SparkHadoopJava
#两种方式出来的数据格式都不一样,
#方式一好快啊,方式二慢了许多,底层应该都是一样的RDD