day7
hadoop 离线数据分析 批量;
spark
【spark】
* 环境配置:
安装spark - Local本地模式 ok
* spark学习
@Scala环境:
1 shell交互环境
启动:spark-shell;(默认进入且自带):
命令学习:
实验案例:
1 wordcount:
textFile("input"):读取本地文件input文件夹数据;
flatMap(_.split(" ")):压平操作,按照空格分割符将一行数据映射成一个个单词;
map((_,1)):对每一个元素操作,将单词映射为元组;
reduceByKey(_+_):按照key将值进行聚合,相加;
collect:将数据收集到Driver端展示。
*** RDD:
1 RDD认识:
概念认知:
分布式对象集合;
本质上是一个只读的分区记录集合,每个RDD可以分成多个分区,
每个分区就是一个数据集片段,
并且一个RDD的不同分区可以被保存到集群中不同的节点上,
从而可以在集群中的不同节点上进行并行计算
弹性数据集;
RDD提供了一种高度受限的共享内存模型????;
RDD提供了一组丰富的操作以支持常见的数据运算;
只读
操作理解:
创建:
转换:理解 - 输入RDD,输出RDD;存在“父子”依赖关系 具体:父子RDD分区的对应关系;
行动:理解 - 输入RDD,输出值;
官方性名词理解:
分区:
逻辑上是分区
算子:
理解:一系列操作 transformations&actions;
目标 - 一个RDD转成另一个RDD;
依赖:
一个RDD转成另一个RDD时,有联系;
窄依赖:RDDs之间分区是一一对应的 / 多(父RDD)对一(子RDD)
宽依赖:下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系;
(窄宽-理解:依据分区的对应关系分类)
缓存:
目的:方便RDD重用;
checkpoint
问题:对于长时间迭代型应用,血缘关系越来越长,一旦在后续迭代过程中出错,
则需要通过非常长的血缘关系去重建,影响性能
目的:容错
实现:将数据保存到持久化的存储,切断血缘关系,直接在checkpoint处拿数据;
任务划分
Application,Job,Stage,Task;
Application->Job->Stage-> Task每一层都是1对n的关系
划分阶段(1-2)
1 根据依赖关系可构架一个DAG;
2 反向解析,存在宽依赖,就划开阶段
3 任务集:每个阶段都代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集合
每个任务会被任务调度器分发给各个工作节点(Worker Node)上的Executor去执行
2 RDD实践:
Lamda 表达式;??????????????? 我好难啊
tuple 元组<key,value>;
RDD:也是Map+reduce思路;
flatMap(将一个一个小集合合成一个大集合)
Map:
Reduce:
RDD transformations:
Value类型
partitions.size;:查看RDD的分区数
map();
mapPartitions();
mapPartitionsWithIndex();
flatMap();
glom()
groupBy();
filter();
sample(withReplacement,fraction,seed);
distinct([numTasks]);
coalesce(numPartitions);
repartition(numPartitions);
sortBy(func,[ascending], [numTasks]);
双Value类型交互:源RDD & 参数RDD
union(otherDataset);
subtract(otherDataset);
intersection(otherDataset);
cartesian(otherDataset);笛卡尔积-> 生成一系列元组<a,b>
zip(otherDataset);
Key-Value类型
partitionBy();
groupByKey();
reduceByKey(func, [numTasks]);
aggregateByKey();?
foldByKey();
combineByKey();?
sortByKey([ascending], [numTasks]);
mapValues();
join(otherDataset, [numTasks]);
cogroup(otherDataset, [numTasks]);
RDD Action
reduce(func);
collect();
count();
first();
take(n);
takeOrdered(n);
aggregate;
fold(num)(func);
saveAsTextFile(path);
saveAsSequenceFile(path) ;
saveAsObjectFile(path);
countByKey();
foreach(func);
RDD总结:
1 解决了什么?
实现(对大型数据集)高效计算;
2 怎么实现?
?
2 spark独立应用程序(注:支持java,scala,python)
* scala:
方式一 - 手动构建:
Scala编译打包工具sbt + 构建项目目录结构 + 编写核心代码文件 -> 打包成jar;
方式人 - 编译器:
* java:
方式一 - 手动构建:
java打包工具Maven+ 构建maven项目 -> 打包成jar;
方式人 - 编译器:
@Python环境:
0 配置与pyspark相关文件;
1 shell交互环境
启动:pyshpark;
【new】
问题1:在linux下转移文件到 /usr/local/....后需要 命令加sudo才可以执行;
方式:更改文件所有者 chown -R kouch:kouch(当前用户) **** ;
...................................................