zoukankan      html  css  js  c++  java
  • Spark集群 + Akka + Kafka + Scala 开发(1) : 配置开发环境

    目标

    • 配置一个spark standalone集群 + akka + kafka + scala的开发环境。
    • 创建一个基于spark的scala工程,并在spark standalone的集群环境中运行。
    • 创建一个基于spark+akka的scala工程,并在spark standalone的集群环境中运行。
    • 创建一个基于spark+kafka的scala工程,并在spark standalone的集群环境中运行。

    集群框架图

    本图主要是说明各个组件可以发布到不同的逻辑机器上。

    G Spark Cluster + AKKA + KAFKA akka_client AKKA Client akka_server AKKA Server (Spark) akka_client->akka_server spark_master Spark Master akka_server->spark_master spark_slave1 Spark Slave spark_master->spark_slave1 spark_slave2 Spark Slave spark_master->spark_slave2 spark_slave3 Spark Slave spark_master->spark_slave3 kafka_producer1 Kafka Producer kafka_cluster Kafka Cluster kafka_producer1->kafka_cluster kafka_producer2 Kafka Producer kafka_producer2->kafka_cluster kafka_producer3 Kafka Producer kafka_producer3->kafka_cluster kafka_consumer1 Kafka consumer (Spark) kafka_cluster->kafka_consumer1 kafka_consumer2 Kafka consumer (Spark) kafka_cluster->kafka_consumer2 kafka_consumer3 Kafka consumer (Spark) kafka_cluster->kafka_consumer3 kafka_consumer1->spark_master kafka_consumer2->spark_master kafka_consumer3->spark_master

    本文主要是为了配置一个用于开发和测试的环境,所以将所有组件都部署到了一台机器上。

    预装的软件配置

    SoftwareVersionLocationComment
    CentOS 7.2.1511  
    JDK Oracle JDK 1.8 x64 /opt/java
    Spark 2.0.0 /opt/spark
    Akka 2.4.10 /opt/akka
    Kafka 0.8.2.1 /opt/kafka
    Scala 2.11.8 /opt/scala  
    sbt 0.13.12   开发环境

    配置环境变量

    • 编辑 ~/.bash_profile 加上下面的语句:
    export JAVA_HOME=/opt/java
    export SCALA_HOME=/opt/scala
    export SPARK_HOME=/opt/spark
    export KAFKA_HOME=/opt/kafka
    export AKKA_HOME=/opt/akka
    export PATH=$PATH:$JAVA_HOME/bin:$SPARK_HOME/bin:$SCALA_HOME/bin:$KAFKA_HOME/bin
    • 应用配置 使上面的配置起效。
    source ~/.bash_profile

    下载并安装必要的软件

    tar --directory /opt -xzf jdk-8u102-linux-x64.tar.gz
    mv /opt/jdk-8u102 /opt/java
    • Spark 2.0.0

    Download URL: http://spark.apache.org/downloads.html 下载spark-2.0.0-bin-hadoop2.7.tgz 解压到目录/opt下,把目录名改成/opt/spark

    wget http://d3kbcqa49mib13.cloudfront.net/spark-2.0.0-bin-hadoop2.7.tgz
    tar --directory /opt -xzf spark-2.0.0-bin-hadoop2.7.tgz
    mv /opt/spark-2.0.0-bin-hadoop2.7 /opt/spark
    • Scala

    Download URL: http://www.scala-lang.org/download/ 下载scala-2.11.8.tgz 解压到目录/opt下,把目录名改成/opt/scala

    wget http://downloads.lightbend.com/scala/2.11.8/scala-2.11.8.tgz
    tar --directory /opt -xzf scala-2.11.8.tgz
    mv /opt/scala-2.11.8 /opt/scala
    • AKKA Download URL: http://akka.io/downloads/ 下载Standalone Distribution: akka_2.11-2.4.10.zip 解压到目录/opt下,把目录名改成/opt/akka
    wget http://downloads.typesafe.com/akka/akka_2.11-2.4.10.zip
    upzip -q akka_2.11-2.4.10.zip -d /opt 
    mv /opt/akka_2.11-2.4.10 /opt/akka
    • KAFKA

      由于我们将会使用Spark内置的Stream KAFKA功能,这个功能现在绑定了KAFKA 8.x. 如果你不用这个功能开发,而是写一个Kafka的Comsumer,可以下载最新的版本。

    Download URL: http://kafka.apache.org/downloads.html 下载: kafka_2.11-0.8.2.1.tgz 解压到目录/opt下,把目录名改成/opt/kafka

    wget http://mirrors.cnnic.cn/apache/kafka/0.10.0.1/kafka_2.11-0.8.2.1.tgz
    tar --directory /opt kafka_2.11-0.8.2.1.tgz
    mv /opt/kafka_2.11-0.8.2.1 /opt/kafka
    • sbt sbt包用于开发环境,在测试环境和生产环境可以不要。 可以使用yum安装。
    curl https://bintray.com/sbt/rpm/rpm | sudo tee /etc/yum.repos.d/bintray-sbt-rpm.repo
    sudo yum install sbt

    配置软件

    • 复制AKKA的类库到spark里,这样spark应用就可以使用AKKA的类库。 因为有两个spark配置需要设定class path,我们分别针对它们建两个目录。 一个是$SPARK_HOME/extraClassPath/driver。 另外一个是$SPARK_HOME/extraClassPath/executor。 复制/opt/akka/lib/akka/akka和/opt/akka/lib/akka/config-到这两个目录里。

      注意:不要复制所有的akka文件,akka类库中的某些文件的版本可能会和spark的有冲突, 在运行apark应用时,会出现java.lang.ExceptionInInitializerError。

    mkdir -p $SPARK_HOME/extraClassPath/driver
    cp -n /opt/akka/lib/akka/akka* $SPARK_HOME/extraClassPath/driver/
    cp -n /opt/akka/lib/akka/config-* $SPARK_HOME/extraClassPath/driver/
    
    mkdir -p $SPARK_HOME/extraClassPath/executor
    cp -n /opt/akka/lib/akka/akka* $SPARK_HOME/extraClassPath/executor/
    cp -n /opt/akka/lib/akka/config-* $SPARK_HOME/extraClassPath/executor/
    • 在spark的配置文件spark-defaults.conf中,指定classPath 进入到$SPARK_HOME/conf目录中,看有没有文件spark-defaults.conf。 如果没有复制spark-defaults.conf.templatespark-defaults.conf。 编辑spark-defaults.conf,加入下面两行。
    spark.driver.extraClassPath        /opt/spark/extraClassPath/driver/*
    spark.executor.extraClassPath      /opt/spark/extraClassPath/executor/*
    cp -n $SPARK_HOME/conf/spark-defaults.conf.template $SPARK_HOME/conf/spark-defaults.conf
    echo "spark.driver.extraClassPath        /opt/spark/extraClassPath/driver/*" >> $SPARK_HOME/conf/spark-defaults.conf
    echo "spark.executor.extraClassPath      /opt/spark/extraClassPath/executor/*" >> $SPARK_HOME/conf/spark-defaults.conf
    • 复制KAFKA的类库到spark里。
    mkdir -p $SPARK_HOME/extraClassPath/driver
    cp -n $KAFKA_HOME/libs/kafka_2.11-0.8.2.1.jar $SPARK_HOME/extraClassPath/driver/
    cp -n $KAFKA_HOME/libs/kafka-clients-0.8.2.1.jar $SPARK_HOME/extraClassPath/driver/
    cp -n $KAFKA_HOME/libs/metrics-core-2.2.0.jar $SPARK_HOME/extraClassPath/driver/
    
    mkdir -p $SPARK_HOME/extraClassPath/executor
    cp -n $KAFKA_HOME/libs/kafka_2.11-0.8.2.1.jar $SPARK_HOME/extraClassPath/executor/
    cp -n $KAFKA_HOME/libs/kafka-clients-0.8.2.1.jar $SPARK_HOME/extraClassPath/executor/
    cp -n $KAFKA_HOME/libs/metrics-core-2.2.0.jar $SPARK_HOME/extraClassPath/executor/

    验证安装的结果

    测试spark部署情况

    运行下面的命令:

    $SPARK_HOME/bin/run-example SparkPi 10

    正常的情况下,会有一大堆输出,看看有没有: Pi is roughly 3.14

    • 启动spark集群master server
    $SPARK_HOME/sbin/start-master.sh

    master服务,默认会使用7077这个端口。可以通过其日志文件查看实际的端口号。

    • 启动spark集群slave server
    $SPARK_HOME/sbin/start-slave.sh spark://$(hostname):7077

    后面的参数是master URL.

    • 在集群环境中,运行spark的sample
    $SPARK_HOME/bin/run-example --master spark://$(hostname):7077 SparkPi 10

    后面的参数是master URL. 正常的情况下,会有一大堆输出,看看有没有: 7077 Pi is roughly 3.14

    • 关闭spark的master服务
    $SPARK_HOME/sbin/stop-master.sh
    • 关闭spark的slave服务
    $SPARK_HOME/sbin/stop-slave.sh

    测试Kafka的部署情况

    • 启动kafka服务器
    # Start zookeeper server
    gnome-terminal -x sh -c '$KAFKA_HOME/bin/zookeeper-server-start.sh $KAFKA_HOME/config/zookeeper.properties; bash'
    
    # Wait zookeeper server is started.
    sleep 5s
    
    # Start kafka server
    gnome-terminal -x sh -c '$KAFKA_HOME/bin/kafka-server-start.sh $KAFKA_HOME/config/server.properties; bash'
    
    # Wait kafka server is started.
    sleep 5s

    注:使用Ctrl+C可以中断服务。

    • 创建一个topic
    # Create a topic
    $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
    
    # List topics
    $KAFKA_HOME/bin/kafka-topics.sh --list --zookeeper localhost:2181
    • 发一个Message
    # Send a message
    echo This is a message | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    • 启动一个consumer来接受消息。 新起一个终端。正常情况下可以收到This is a message。 不行的话,再发一遍消息。
    # Start a consumer
    $KAFKA_HOME/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

    测试Scala的部署情况

    这个比较简单,运行:

    echo sys.exit | $SCALA_HOME/bin/scala

    可以看到Scala的版本号。检查是否正确。

    下面请看

    至此,我们已经部署好了一个spark集群+akka+scala的开发环境。下一步请看: Spark集群 + Akka + Kafka + Scala 开发(2) : 开发一个Spark应用 Spark集群 + Akka + Kafka + Scala 开发(3) : 开发一个Akka + Spark的应用 Spark集群 + Akka + Kafka + Scala 开发(4) : 开发一个Kafka + Spark的应用

    参照

  • 相关阅读:
    什么是方法以及evall()和isnan()和number()string()的使用
    js的本质/数据类型
    if条件的种类
    js中期知识点总结11月7日
    js中期知识点总结11月6日
    js中期知识点总结11月5日
    js中期知识点总结11月2日
    js中期总结11月1日
    js中期知识点总结10月31日
    html前期js知识点10月25日
  • 原文地址:https://www.cnblogs.com/liuys635/p/11052830.html
Copyright © 2011-2022 走看看