1、特点
- 快如闪电的集群计算:是Hadoop的100倍,磁盘计算快10倍
- 大规模快速通用的计算引擎:支持Java/scala/python/R 提供80+种操作符,容易构建并行应用 组合SQL 流计算 复杂分析
- 运行环境:Hadoop mesos,standalone等
2、spark模块
- spark core 核心模块
- spark SQL
- spark streaming 流计算
- spark MLlib 机器学习
- spark graph 图计算
3、spark安装
- 配置环境变量的时候记得sbin和bin都要配置
4、spark启动
- 进入bin目录下,启动spark-shell
5、spark编译
- sc :sparkcontext spark程序的入口点,封装了所有的方法
- RDD:弹性分布式数据集
6、spark入门案例--单词统计
scala> val rdd1 = sc.textFile("/home/test.txt")
rdd1: org.apache.spark.rdd.RDD[String] = /home/test.txt MapPartitionsRDD[1] at textFile at <console>:24
scala> val rdd2 = rdd1.flatMap(line => line.split(" "))
rdd2: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:26
scala> val rdd3 = rdd2.ma
map mapPartitions mapPartitionsWithIndex max
scala> val rdd3 = rdd2.map(word => (word,1))
rdd3: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:28
scala> val rdd4 = rdd3.reduce
reduce reduceByKey reduceByKeyLocally
scala> val rdd4 = rdd3.reduceByKey
reduceByKey reduceByKeyLocally
scala> val rdd4 = rdd3.reduceByKey(_+_)
rdd4: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:30
scala> rdd4.collect()
res0: Array[(String, Int)] = Array((world2,1), (world1,1), (hello,3), (world3,1))
现在我们将上述代码简化:
scala> sc.textFile("/home/test.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect
res3: Array[(String, Int)] = Array((world2,1), (world1,1), (hello,3), (world3,1))
7、单词过滤
scala> sc.textFile("/home/test.txt").flatMap(_.split(" ")).filter(_.contains("wor")).map((_,1)).reduceByKey(_+_).collect
res4: Array[(String, Int)] = Array((world2,1), (world1,1), (world3,1))
filter:是选出需要的
8、利用intellij运行spark进行单词统计-Scala
首先先记录一下遇到的问题:is already defined as object -----》原因是由于在source root的时候直接将整个项目进行了source,导致出现重复定义object
下面附上代码:
import org.apache.spark.{SparkConf, SparkContext}
object Demo111 {
def main(args: Array[String]): Unit = {
/**
* sparkConf介绍
*/
/*
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
*/
/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
*
* Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load
* values from any `spark.*` Java system properties set in your application as well. In this case,
* parameters you set directly on the `SparkConf` object take priority over system properties.
*
* For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
* get the same configuration no matter what the system properties are.
*
* All setter methods in this class support chaining. For example, you can write
* `new SparkConf().setMaster("local").setAppName("My app")`.
*
* @param loadDefaults whether to also load values from Java system properties
*
* @note Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
* by the user. Spark does not support modifying the configuration at runtime.
*/
//创建一个spark configure配置对象
val conf = new SparkConf();
conf.setAppName("wordCount");
//设置master属性
conf.setMaster("local");
//通过conf创建spark环境变量
val sc = new SparkContext(conf);
//开始wordcount
/**
* textFile()函数
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
* @param path path to the text file on a supported file system
* @param minPartitions suggested minimum number of partitions for the resulting RDD
* @return RDD of lines of the text file
*/
//rdd1按照行数存储
val rdd1 = sc.textFile("D:\test.txt");
//System.setProperty("hadoop.home.dir","E:\hadoop\hadoop-2.6.5")
/**flatMap()函数
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
//按照空格分隔
val rdd2 = rdd1.flatMap(_.split(""))
// println(rdd2)
//初始化所有的单词出现的次数为1
val rdd3 = rdd2.map((_,1))
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
*/
//根据key进行聚合统计
val rdd4 = rdd3.reduceByKey(_ + _);
/**
* Return an array that contains all of the elements in this RDD.
*
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
//将rdd转为列表
val result = rdd4.collect()
for (i <- result) println(i)
}
}
运行的结果却是统计了每个字母的出现的次数(d,1)
(w,1)
(e,1)
(h,1)
( ,1)
(o,2)
(r,1)
(l,3)
那么断定出现问题的地方应该是分隔单词的时候出错
split(" ")中间少写了一个空格,否则就是分隔为一个一个的字母
import org.apache.spark.{SparkConf, SparkContext}
object Demo111 {
def main(args: Array[String]): Unit = {
/**
* sparkConf介绍
*/
/*
/** Create a SparkConf that loads defaults from system properties and the classpath */
def this() = this(true)
*/
/**
* Configuration for a Spark application. Used to set various Spark parameters as key-value pairs.
*
* Most of the time, you would create a SparkConf object with `new SparkConf()`, which will load
* values from any `spark.*` Java system properties set in your application as well. In this case,
* parameters you set directly on the `SparkConf` object take priority over system properties.
*
* For unit tests, you can also call `new SparkConf(false)` to skip loading external settings and
* get the same configuration no matter what the system properties are.
*
* All setter methods in this class support chaining. For example, you can write
* `new SparkConf().setMaster("local").setAppName("My app")`.
*
* @param loadDefaults whether to also load values from Java system properties
*
* @note Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified
* by the user. Spark does not support modifying the configuration at runtime.
*/
//创建一个spark configure配置对象
val conf = new SparkConf();
conf.setAppName("wordCount");
//设置master属性
conf.setMaster("local");
//通过conf创建spark环境变量
val sc = new SparkContext(conf);
//开始wordcount
/**
* textFile()函数
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
* @param path path to the text file on a supported file system
* @param minPartitions suggested minimum number of partitions for the resulting RDD
* @return RDD of lines of the text file
*/
//rdd1按照行数存储
val rdd1 = sc.textFile("D:\test.txt");
//System.setProperty("hadoop.home.dir","E:\hadoop\hadoop-2.6.5")
/**flatMap()函数
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
//按照空格分隔
val rdd2 = rdd1.flatMap(line =>line.split(" "));
// println(rdd2)
//初始化所有的单词出现的次数为1
val rdd3 = rdd2.map((word => (word,1)));
/**
* Merge the values for each key using an associative and commutative reduce function. This will
* also perform the merging locally on each mapper before sending results to a reducer, similarly
* to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.
*/
//根据key进行聚合统计
val rdd4 = rdd3.reduceByKey(_ + _);
/**
* Return an array that contains all of the elements in this RDD.
*
* @note This method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
//将rdd转为列表
val result = rdd4.collect()
for (i <- result) println(i) // 等价于result.foreach(println);
}
}
(hello,1)
(world,1)
还有一个很奇怪的问题是,我必须要新建一个Java项目来引入Scala,才可以引入spark,如果直接新建一个Scala项目,就不可以引入
还有一个小地方需要记住的是:
//通过conf创建spark环境变量
val sc = new SparkContext(conf);
这里的spark的环境变量是通过spark配置对象conf来创建的,所以一定要跟参数conf
9、java版的spark单词统计
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class wordcount {
public static void main(String[] args) {
//同样的需要创建一个环境变量的对象conf
SparkConf conf = new SparkConf();
conf.setAppName("wordcount_java");
conf.setMaster("local");
//创建一个Java spark环境变量
JavaSparkContext sc = new JavaSparkContext(conf);
//处理文本,得到按照行数分隔的文本
JavaRDD rdd1 = sc.textFile("D:\test.txt");
/**FlatMapFunction()函数返回一个迭代器
* @FunctionalInterface
public interface FlatMapFunction<T, R> extends Serializable {
Iterator<R> call(T t) throws Exception;
}
*/
//对每行按照空格进行切割 ,并且压扁,返回一个集合列表的迭代器
JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction() {
@Override
public Iterator call(Object o) throws Exception {
String s = o.toString();
String[] arr = s.split(" ");
List<String> list = new ArrayList<String>();
for(String ss: arr){
list.add(ss);
}
return list.iterator();
}
});
/**
* @FunctionalInterface
public interface PairFunction<T, K, V> extends Serializable {
Tuple2<K, V> call(T t) throws Exception;
}
*/
//初始化每个单词word---(word,1)
JavaPairRDD<String,Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String,Integer>(s,1);
}
});
/**
* def reduceByKey(func: JFunction2[V, V, V]): JavaPairRDD[K, V] = {
fromRDD(reduceByKey(defaultPartitioner(rdd), func))
}
*/
//纵向捏合 reduce化简
JavaPairRDD<String,Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
/**
* /**
* Return an array that contains all of the elements in this RDD.
*
* @note this method should only be used if the resulting array is expected to be small, as
* all the data is loaded into the driver's memory.
*/
//打印rdd4
List<Tuple2<String,Integer>> list = rdd4.collect();
for(Tuple2<String,Integer> t : list){
System.out.println(t._1 + "===" + t._2);
}
}
}