zoukankan      html  css  js  c++  java
  • SPARK第一天

    并行
    -------------
    集群计算。
    并行计算。

    并发
    -------------
    并发执行。

    Spark
    ------------------------
    Lightning-fast cluster computing。
    快如闪电的集群计算。
    大规模快速通用的计算引擎。
    速度: 比hadoop 100x,磁盘计算快10x
    使用: java / Scala /R /python
    提供80+算子(操作符),容易构建并行应用。
    通用: 组合SQL ,流计算 + 复杂分析。

    运行: Hadoop, Mesos, standalone, or in the cloud,local.
    Spark模块
    ----------------
    Spark core //核心模块
    Spark SQL //SQL
    Spark Streaming //流计算
    Spark MLlib //机器学习
    Spark graph //图计算

    DAG //direct acycle graph,有向无环图。

    安装Spark
    -------------------
    1.下载spark-2.1.0-bin-hadoop2.7.tgz
    ..
    2.解压
    ..
    3.环境变量
    [/etc/profile]
    SPARK_HOME=/soft/spark
    PATH=$PATH:$SPARK_HOME/bin:$SPARK_HOME/sbin

    [source]
    $>source /etc/profile

    4.验证spark

    $>cd /soft/spark
    $>./spark-shell

    5.webui
    http://s201:4040/

    体验spark
    -------------------

    0.sc
    SparkContext,Spark程序的入口点,封装了整个spark运行环境的信息。

    1.进入spark-shell
    $>spark-shell
    $scala>sc


    API
    --------------
    [SparkContext]
    Spark程序的入口点,封装了整个spark运行环境的信息。

    [RDD]
    resilient distributed dataset,弹性分布式数据集。等价于集合。


    spark实现word count
    ------------------------
    //加载文本文件,以换行符方式切割文本.Array(hello world2,hello world2 ,...)
    val rdd1 = sc.textFile("/home/centos/test.txt");

    //单词统计1
    $scala>val rdd1 = sc.textFile("/home/centos/test.txt")
    $scala>val rdd2 = rdd1.flatMap(line=>line.split(" "))
    $scala>val rdd3 = rdd2.map(word = > (word,1))
    $scala>val rdd4 = rdd3.reduceByKey(_ + _)
    $scala>rdd4.collect

    //单词统计2
    sc.textFile("/home/centos/test.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_ + _).collect

    //统计所有含有wor字样到单词个数。filter

    //过滤单词
    sc.textFile("/home/centos/test.txt").flatMap(_.split(" ")).filter(_.contains("wor")).map((_,1)).reduceByKey(_ + _).collect

    [API]
    SparkContext:
    Spark功能的主要入口点。代表到Spark集群的连接,可以创建RDD、累加器和广播变量.
    每个JVM只能激活一个SparkContext对象,在创建sc之前需要stop掉active的sc。

    SparkConf:
    spark配置对象,设置Spark应用各种参数,kv形式。


    编写scala程序,引入spark类库,完成wordcount
    ----------------------------------------------
    1.创建Scala模块,并添加pom.xml
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.it18zhang</groupId>
    <artifactId>SparkDemo1</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
    <dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.1.0</version>
    </dependency>
    </dependencies>
    </project>

    2.编写scala文件
    import org.apache.spark.{SparkConf, SparkContext}

    /**
    * Created by Administrator on 2017/4/20.
    */
    object WordCountDemo {
    def main(args: Array[String]): Unit = {
    //创建Spark配置对象
    val conf = new SparkConf();
    conf.setAppName("WordCountSpark")
    //设置master属性
    conf.setMaster("local") ;

    //通过conf创建sc
    val sc = new SparkContext(conf);

    //加载文本文件
    val rdd1 = sc.textFile("d:/scala/test.txt");
    //压扁
    val rdd2 = rdd1.flatMap(line => line.split(" ")) ;
    //映射w => (w,1)
    val rdd3 = rdd2.map((_,1))
    val rdd4 = rdd3.reduceByKey(_ + _)
    val r = rdd4.collect()
    r.foreach(println)
    }
    }


    java版单词统计
    ------------------
    import org.apache.spark.SparkConf;
    import org.apache.spark.SparkContext;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import scala.Tuple2;

    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;

    /**
    * java版
    */
    public class WordCountJava2 {
    public static void main(String[] args) {
    //创建SparkConf对象
    SparkConf conf = new SparkConf();
    conf.setAppName("WordCountJava2");
    conf.setMaster("local");

    //创建java sc
    JavaSparkContext sc = new JavaSparkContext(conf);
    //加载文本文件
    JavaRDD<String> rdd1 = sc.textFile("d:/scala//test.txt");

    //压扁
    JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
    public Iterator<String> call(String s) throws Exception {
    List<String> list = new ArrayList<String>();
    String[] arr = s.split(" ");
    for(String ss :arr){
    list.add(ss);
    }
    return list.iterator();
    }
    });

    //映射,word -> (word,1)
    JavaPairRDD<String,Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
    public Tuple2<String, Integer> call(String s) throws Exception {
    return new Tuple2<String, Integer>(s,1);
    }
    });

    //reduce化简
    JavaPairRDD<String,Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
    public Integer call(Integer v1, Integer v2) throws Exception {
    return v1 + v2;
    }
    });

    //
    List<Tuple2<String,Integer>> list = rdd4.collect();
    for(Tuple2<String, Integer> t : list){
    System.out.println(t._1() + " : " + t._2());
    }
    }
    }


    Spark2.1.0最新版是基于Scala2.11.8版本,因此安装scala2.11.8版本,
    否则如果基于2.12.0版本编译会出现找不到包的问题。
    ----------------------------------------------
    1.卸载原来的scala.
    2.重新安装scala2.11.8版本
    3.配置idea的全局库
    project settings -> global library -> 删除原来的scala sdk
    project settings -> global library -> 添加sdk -> browser -> 定位scala安装目录 ->选中scala-compiler.jar +
    scala-library.jar +
    scala-reflect.jar

    4.在模块中添加scala sdk 2.11.8版本

    5.重新编译项目 -> 导入jar ->丢到集群运行。


    提交作业到spark集群运行
    --------------------------
    1.导出jar包
    2.spark-submit提交命令运行job
    //Scala版本
    $>spark-submit --master local --name MyWordCount --class com.it18zhang.spark.scala.WordCountScala SparkDemo1-1.0-SNAPSHOT.jar /home/centos/test.txt
    //java版
    $>spark-submit --master local --name MyWordCount --class com.it18zhang.spark.java.WordCountJava SparkDemo1-1.0-SNAPSHOT.jar /home/centos/test.txt

    Spark集群模式
    -----------------
    1.local
    nothing!
    spark-shell --master local; //默认

    2.standalone
    独立。
    a)复制spark目录到其他主机
    b)配置其他主机的所有环境变量
    [/etc/profile]
    SPARK_HOME
    PATH

    c)配置master节点的slaves
    [/soft/spark/conf/slaves]
    s202
    s203
    s204

    d)启动spark集群
    /soft/spark/sbin/start-all.sh

    e)查看进程
    $>xcall.jps jps
    master //s201
    worker //s202
    worker //s203
    worker //s204
    e)webui
    http://s201:8080/


    提交作业jar到完全分布式spark集群
    --------------------------------
    1.需要启动hadoop集群(只需要hdfs)
    $>start-dfs.sh
    2.put文件到hdfs.

    3.运行spark-submit
    $>spark-submit
    --master spark://s201:7077
    --name MyWordCount
    --class com.it18zhang.spark.scala.WordCountScala
    SparkDemo1-1.0-SNAPSHOT.jar
    hdfs://s201:8020/user/centos/test.txt

    脚本分析
    -----------------------
    [start-all.sh]
    sbin/spark-config.sh
    sbin/spark-master.sh //启动master进程
    sbin/spark-slaves.sh //启动worker进程

    [start-master.sh]
    sbin/spark-config.sh
    org.apache.spark.deploy.master.Master
    spark-daemon.sh start org.apache.spark.deploy.master.Master --host --port --webui-port ...

    [spark-slaves.sh]
    sbin/spark-config.sh
    slaves.sh //conf/slaves

    [slaves.sh]
    for conf/slaves{
    ssh host start-slave.sh ...
    }

    [start-slave.sh]
    CLASS="org.apache.spark.deploy.worker.Worker"
    sbin/spark-config.sh
    for (( .. )) ; do
    start_instance $(( 1 + $i )) "$@"
    done

    $>cd /soft/spark/sbin
    $>./stop-all.sh //停掉整个spark集群.
    $>./start-master.sh //停掉整个spark集群.
    $>./start-master.sh //启动master节点
    $>./start-slaves.sh //启动所有worker节点

  • 相关阅读:
    python3.6下安装wingIDE破解方法
    Python 列表list 和 字符串str 互转
    c# 读取txt文件并分隔
    基础连接已经关闭: 未能为 SSL/TLS 安全通道建立信任关系
    Jquery ajax动态更新下拉列表的内容
    vs2015使用技巧-------- 查看类关系图
    Dapper 批量操作sql
    Linq批量建表
    WebRequest的get及post提交
    git -- 常用命令
  • 原文地址:https://www.cnblogs.com/jeasonit/p/9922544.html
Copyright © 2011-2022 走看看