zoukankan      html  css  js  c++  java
  • spark安装与使用(入门)

    一:在linux下安装java环境(自行安装jdk)

    二:安装Scala2.9.3

    $ tar -zxf scala-2.9.3.tgz
    $ sudo mv scala-2.9.3 /usr/lib
    $ sudo vim /etc/profile
    # add the following lines at the end
    export SCALA_HOME=/usr/lib/scala-2.9.3
    export PATH=$PATH:$SCALA_HOME/bin
    # save and exit vim
    #make the bash profile take effect immediately
    source /etc/profile
    # test
    $ scala -version

    三:安装spark

    从官网下载最新版本的spark,截止目前最新版的是1.5.1.下载地址:http://spark.apache.org/downloads.html

    记住选择预编译好的文件下载,选择Pre-build for Hadoop 2.6 and later,下载的文件为spark-1.5.1-bin-hadoop2.6.tgz

    解压

    $ tar -zxf spark-1.5.1-bin-hadoop2.6.tgz

    设置SPARK_EXAMPLES_JAR 环境变量

    $ vim ~/.bash_profile
    # add the following lines at the end
    export SPARK_EXAMPLES_JAR=$HOME/spark-0.7.2/examples/target/scala-2.9.3/spark-examples_2.9.3-0.7.2.jar
    # save and exit vim
    #make the bash profile take effect immediately
    $ source /etc/profile

    这一步其实最关键,很不幸的是,官方文档和网上的博客,都没有提及这一点。我是偶然看到了这两篇帖子,Running SparkPiNull pointer exception when running ./run spark.examples.SparkPi local,才补上了这一步,之前死活都无法运行SparkPi。

    可选)设置 SPARK_HOME环境变量,并将SPARK_HOME/bin加入PATH

    可选)设置 SPARK_HOME环境变量,并将SPARK_HOME/bin加入PATH

    $ vim ~/.bash_profile
    # add the following lines at the end
    export SPARK_HOME=$HOME/spark-0.7.2
    export PATH=$PATH:$SPARK_HOME/bin
    # save and exit vim
    #make the bash profile take effect immediately
    $ source /etc/profile

    四:Spark配置

    配置Spark环境变量

    cd $SPARK_HOME/conf 
    cp spark-env.sh.template spark-env.sh

    vi spark-env.sh 添加以下内容:

    export JAVA_HOME=/usr/local/java-1.7.0
    export HADOOP_HOME=/opt/hadoop-2.3.0-cdh5.0.0
    export HADOOP_CONF_DIR=/etc/hadoop/conf
    export SCALA_HOME=/usr/local/scala-2.11.4
    export SPARK_HOME=/home/lxw1234/spark-1.3.1-bin-hadoop2.3
    export SPARK_MASTER_IP=127.0.0.1
    export SPARK_MASTER_PORT=7077
    export SPARK_MASTER_WEBUI_PORT=8099
     
    export SPARK_WORKER_CORES=3 //每个Worker使用的CPU核数
    export SPARK_WORKER_INSTANCES=1 //每个Slave中启动几个Worker实例
    export SPARK_WORKER_MEMORY=10G //每个Worker使用多大的内存
    export SPARK_WORKER_WEBUI_PORT=8081 //Worker的WebUI端口号
    export SPARK_EXECUTOR_CORES=1 //每个Executor使用使用的核数
    export SPARK_EXECUTOR_MEMORY=1G //每个Executor使用的内存
     
    export SPARK_CLASSPATH=/opt/hadoop-lzo/current/hadoop-lzo.jar //由于要用到lzo,因此需要配置
    export SPARK_CLASSPATH=$SPARK_CLASSPATH:$CLASSPATH
    export LD_LIBRARY_PATH=${LD_LIBRARY_PATH}:$HADOOP_HOME/lib/native
    • 配置Slave

    cp slaves.template slaves 
    vi slaves 添加以下内容: 
    localhost

    五、配置免密码ssh登陆

    因为Master和Slave处于一台机器,因此配置本机到本机的免密码ssh登陆,如有其他Slave,都需要配置Master到Slave的无密码ssh登陆。

    cd ~/
    ssh-keygen (一路回车)
    cd .ssh/
    cat id_rsa.pub >> authorized_keys
    chmod 600 authorized_keys

    六、启动Spark Master

    cd $SPARK_HOME/sbin/ 
    ./start-master.sh

    启动日志位于 $SPARK_HOME/logs/目录下,正常启动的日志如下:

    15/06/05 14:54:16 INFO server.AbstractConnector: Started SelectChannelConnector@localhost:6066 
    15/06/05 14:54:16 INFO util.Utils: Successfully started service on port 6066. 
    15/06/05 14:54:16 INFO rest.StandaloneRestServer: Started REST server for submitting applications on port 6066 
    15/06/05 14:54:16 INFO master.Master: Starting Spark master at spark://127.0.0.1:7077 
    15/06/05 14:54:16 INFO master.Master: Running Spark version 1.3.1 
    15/06/05 14:54:16 INFO server.Server: jetty-8.y.z-SNAPSHOT 
    15/06/05 14:54:16 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:8099 
    15/06/05 14:54:16 INFO util.Utils: Successfully started service ‘MasterUI’ on port 8099. 
    15/06/05 14:54:16 INFO ui.MasterWebUI: Started MasterWebUI at http://127.1.1.1:8099 
    15/06/05 14:54:16 INFO master.Master: I have been elected leader! New state: ALIVE

    七、启动Spark Slave

    cd $SPARK_HOME/sbin/ 
    ./start-slaves.sh 


    会根据$SPARK_HOME/conf/slaves文件中配置的主机,逐个ssh过去,启动Spark Worker

    成功启动后,在WebUI界面上可以看到,已经有Worker注册上来了,如图:


    在浏览器输入:http://192.168.1.84:8080/   (前面为master的ip地址)

    八、简单小实例(统计文件中出现最多的50个单词)

    在bin目录下直接运行./spark-shell

    hadoop@Master:/usr/local/spark-1.5.1-bin-hadoop2.6/bin$ ./spark-shell
    log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory).
    log4j:WARN Please initialize the log4j system properly.
    log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
    Using Spark's repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
    To adjust logging level use sc.setLogLevel("INFO")
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version 1.5.1
          /_/
    
    Using Scala version 2.10.4 (OpenJDK 64-Bit Server VM, Java 1.7.0_79)
    Type in expressions to have them evaluated.
    Type :help for more information.
    15/10/13 19:12:16 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
    Spark context available as sc.
    15/10/13 19:12:18 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
    15/10/13 19:12:19 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
    15/10/13 19:12:35 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
    15/10/13 19:12:35 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
    15/10/13 19:12:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    15/10/13 19:12:39 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
    15/10/13 19:12:39 WARN Connection: BoneCP specified but not present in CLASSPATH (or one of dependencies)
    SQL context available as sqlContext.


    没注意这么多warn是怎么回事,接着进入spark-shell,依次输入:

    var srcFile = sc.textFile("/usr/local/kern.log")

    var a = srcFile.flatMap(line=>line.split(" ")).map(word=>(word,1)).reduceByKey(_+_)

    a.map(word=>(word._2,word._1)).sortByKey(false).map(word=>(word._2,word._1)).take(50).foreach(println)

    结果打印在终端:

     

    在4040端口可查看job的情况 http://192.168.1.84:4040/jobs/

    八、Spark Java programming (Spark and Spark Streaming)

    1:spark批处理:统计一个文件中出现a和出现b的单词数:SimpleApp.java
    package org.apache.eagle.spark_streaming_kafka;
     
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
     
    public class SimpleApp {
     
        public static void main(String[] args) {
            String logFile = "/var/log/boot.log"; // Should be some file on your system  
            SparkConf conf = new SparkConf().setAppName("Simple Application");  
            JavaSparkContext sc = new JavaSparkContext(conf);  
            JavaRDD<String> logData = sc.textFile(logFile).cache();  
          
            long numAs = logData.filter(new Function<String, Boolean>() {  
              /**
                 * 
                 */
                private static final long serialVersionUID = 1L;
     
            public Boolean call(String s) { return s.contains("a"); }  
            }).count();  
          
            long numBs = logData.filter(new Function<String, Boolean>() {  
                
          
            public Boolean call(String s) { return s.contains("b"); }  
            }).count();  
          
            System.out.println("Lines with a: " + numAs + ", lines with b: " + numBs);  
     
        }
     
    }

    2:Spark Streaming, 读取kafka数据做单词统计。

    package org.apache.eagle.spark_streaming_kafka;
     
    import java.util.HashMap;
    import java.util.Map;
    import java.util.regex.Pattern;
     
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
     
    import com.google.common.collect.Lists;
     
    import scala.Tuple2;
     
     
    /**
     * spark-streaming-kafka
     *
     */
    public class JavaKafkaWordCount 
    {
        private static final Pattern SPACE = Pattern.compile(" ");
     
          private JavaKafkaWordCount() {
          }
          
        public static void main( String[] args )
        {
            
            String zkQuorum = "10.64.255.161";  
            String group = "test-consumer-group";  
            SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
            // Create the context with 2 seconds batch size
            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
            Map<String, Integer> topicMap = new HashMap<String, Integer>();
            topicMap.put("noise",1);
            JavaPairReceiverInputDStream<String, String> messages =
                    KafkaUtils.createStream(jssc, zkQuorum, group, topicMap);;
            
            JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
                public String call(Tuple2<String, String> tuple2) {
                  return tuple2._2();
                }
              });
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                public Iterable<String> call(String x) {
                  return Lists.newArrayList(SPACE.split(x));
                }
              });
     
              JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                new PairFunction<String, String, Integer>() {
                  public Tuple2<String, Integer> call(String s) {
                    return new Tuple2<String, Integer>(s, 1);
                  }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                  public Integer call(Integer i1, Integer i2) {
                    return i1 + i2;
                  }
                });
     
              wordCounts.print();
              jssc.start();
              jssc.awaitTermination();
        }
    }
    注意几点:
        1:环境:要确保spark在本机中正确安装,安装步骤如上所述。zookeeper集群和kafka集群要安装好,kafka的topic要新建好。
        2:之前运行遇到找不到jar的情况(kafkaUtil),原因没有把所有依赖的jar包都打包到最终的jar包里去。应在pom.xml中添加一下:
    <build>
        <sourceDirectory>src/main/java</sourceDirectory>
        <testSourceDirectory>src/test/java</testSourceDirectory>
        <plugins>
          <!--
                       Bind the maven-assembly-plugin to the package phase
            this will create a jar file without the storm dependencies
            suitable for deployment to a cluster.
           -->
          <plugin>
            <artifactId>maven-assembly-plugin</artifactId>
            <configuration>
              <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
              </descriptorRefs>
              <archive>
                <manifest>
                  <mainClass>org.apache.eagle.spark_streaming_kafka.JavaKafkaWordCount</mainClass>
                </manifest>
              </archive>
            </configuration>
            <executions>
              <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                  <goal>single</goal>
                </goals>
              </execution>
            </executions>
          </plugin>
        </plugins>
    </build> 
     将所需的jar包一同打包,所以生成的文件会很大。
         3:如何提交任务?spark和spark streaming提交的方式都一样,用$SPARK_HOME/bin/soark-submit脚本提交,进入bin目录下,
               以下是spark streaming任务提交,具体如下:
    ./spark-submit  --master local[8] /home/zqin/workspace/spark-streaming-kafka/target/spark-streaming-kafka-0.0.1-SNAPSHOT-jar-with-dependencies.jar
    

    由于在pom.xml中指明了入口类,因此不用加--class,如果没有指明,在命令中要用--class 指明入口。
     
              以下是spark任务提交:
    ./spark-submit  --class org.apache.eagle.spark_streaming_kafka.SimpleApp --master local[8] /home/zqin/workspace/spark-streaming-kafka/target/spark-streaming-kafka-0.0.1-SNAPSHOT-jar-with-dependencies.jar
              需要指明程序main入口。
           4:在运行spark streaming时,控制台满屏日志,不好查看结果,在Spark的conf目录下,把log4j.properties.template修改为log4j.properties,把log4j.rootCategory=INFO, console改为log4j.rootCategory=WARN, console即可抑制Spark把INFO级别的日志打到控制台上。如果要显示全面的信息,则把INFO改为DEBUG。


    九、关闭spark

    在spark目录下输入:sbin/stop-all.sh
  • 相关阅读:
    redis-mysql连接池
    Java并发编程原理
    利用MyBatis生成器自动生成实体类、DAO接口和Mapping映射文件
    Tomcat 启动图解
    JVM
    Java中的Exception
    Java 设计模式六原则及23中常用设计模式
    Jquery15 插件
    Jquery14 工具函数
    Jquery13 Ajax 进阶
  • 原文地址:https://www.cnblogs.com/huanghanyu/p/13477560.html
Copyright © 2011-2022 走看看