zoukankan      html  css  js  c++  java
  • Spark on K8S (Kubernetes Native)

    Spark on K8S 的几种模式

    Start Minikube

    sudo minikube start --driver=none --image-repository=registry.cn-hangzhou.aliyuncs.com/google_containers
    

    如果启动失败可以尝试先删除集群 minikube delete

    Spark on K8S 官网

    https://spark.apache.org/docs/latest/running-on-kubernetes.html

    上面没说 Spark 版本和 K8S 版本的兼容问题,但是是有影响的

    Download Spark

    https://archive.apache.org/dist/spark/

    Spark 可能和 Hadoop 关系比较紧密,可以下载带 Hadoop 的版本,这样会有 Hadoop 的 jar 包可以用,不然可能会出现找不到包和类的错误,哪怕其实没用到 Hadoop

    Build Spark Image

    Spark 提供 bin/docker-image-tool.sh 工具用于 build image

    这个工具会找到 kubernetes/dockerfiles 下的 docker 文件,根据 docker file 会把需要的 Spark 命令、工具、库、jar 包、java、example、entrypoint.sh 等 build 进 image

    2.3 只支持 Java/Scala,从 2.4 开始支持 Python 和 R,会有三个 docker file,会 build 出三个 image,其中 Python 和 R 是基于 Java/Scala 版的

    sudo ./bin/docker-image-tool.sh -t my_spark_2.4_hadoop_2.7 build
    

    遇到类似下面的错误

    WARNING: Ignoring http://dl-cdn.alpinelinux.org/alpine/v3.9/main/x86_64/APKINDEX.tar.gz: temporary error (try again later)
    WARNING: Ignoring http://dl-cdn.alpinelinux.org/alpine/v3.9/community/x86_64/APKINDEX.tar.gz: temporary error (try again later)
    ERROR: unsatisfiable constraints:
      bash (missing):
        required by: world[bash]
    

    这是网络问题,可以修改 ./bin/docker-image-tool.sh,在里面的 docker build 命令加上 --network=host 使容器使用宿主机网络 (要确保宿主机网络是 OK 的)

    在宿主机提交 Job

    bin/spark-submit 
        --master k8s://https://<k8s-apiserver-host>:<k8s-apiserver-port> 
        --deploy-mode cluster 
        --name spark-pi 
        --class org.apache.spark.examples.SparkPi 
        --conf spark.executor.instances=5 
        --conf spark.kubernetes.container.image=<spark-image> 
        local:///path/to/examples.jar
    

    注意这里的 local:///path/to/examples.jar 指的是 容器的文件系统,不是执行 spark-submit 的机器的文件系统,官网的说法:Note that using application dependencies from the submission client's local file system is currently not yet supported.

    如果不使用 local 的话,也可以用 HTTP、HDFS 等系统,没指定的话默认是 local 模式

    因为一开始没用带 Hadoop 包的 Spark,结果 spark-submit 会报 classNotFound
    然后指定 --jars 或是在宿主机的 conf/spark-env.sh 添加

    export SPARK_DIST_CLASSPATH=$(/home/lin/Hadoop/hadoop-2.8.3/bin/hadoop classpath)
    

    这样 spark-submit 过了,但容器跑起来后还是报 classNotFound
    实际上启动的 driver 容器又调用了 spark-submit,只是改了一些参数,比如把 cluster 模式改成 client 模式
    后来改成使用带 Hadoop 包的 Spark,这个问题就没出现了
    所以推测 spark-submit 使用 --jars 指定的包,可能也需要在 容器里有

    获取 K8S Api Server 的地址

    sudo kubectl cluster-info
    

    假设返回

    https://192.168.0.107:8443
    

    那么 spark-submit 命令是

    # --master 指定 k8s api server
    # --conf spark.kubernetes.container.image 指定通过 docker-image-tool.sh 创建的镜像
    # 第一个 wordcount.py 是要执行的命令
    # 第二个 wordcount.py 是参数,即统计 wordcount.py 文件的单词量
    bin/spark-submit 
        --master k8s://https://192.168.0.107:8443 
        --deploy-mode cluster 
        --name spark-test 
        --conf spark.executor.instances=3 
        --conf spark.kubernetes.container.image=spark-py:my_spark_2.4_hadoop_2.7 
        /opt/spark/examples/src/main/python/wordcount.py 
        /opt/spark/examples/src/main/python/wordcount.py
    

    这样可能会报证书错误,无法启动 Pod,可能需要配置证书
    Spark on K8S 官网看到有 spark.kubernetes.authenticate.submission.caCertFile 配置项,不过没试
    在测试环境可以用下面的命令使用 proxy,生成一个不需要证书认证的地址

    kubectl proxy
    

    然后 spark-submit 命令变成

    # Api Server 的地址变成 http://127.0.0.1:8001
    bin/spark-submit 
        --master k8s://http://127.0.0.1:8001 
        --deploy-mode cluster 
        --name spark-test 
        --conf spark.executor.instances=3 
        --conf spark.kubernetes.container.image=spark-py:my_spark_2.4_hadoop_2.7 
        /opt/spark/examples/src/main/python/wordcount.py 
        /opt/spark/examples/src/main/python/wordcount.py
    

    这样还是会报错,在宿主机或容器里报,没有权限,需要在 K8S 配置一个有权限的用户

    准备一个 role.yaml 文件

    apiVersion: v1
    kind: ServiceAccount
    metadata:
      name: spark
      namespace: default
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: Role
    metadata:
      namespace: default
      name: spark-role
    rules:
    - apiGroups: [""]
      resources: ["pods"]
      verbs: ["*"]
    - apiGroups: [""]
      resources: ["services"]
      verbs: ["*"]
    ---
    apiVersion: rbac.authorization.k8s.io/v1
    kind: RoleBinding
    metadata:
      name: spark-role-binding
      namespace: default
    subjects:
    - kind: ServiceAccount
      name: spark
      namespace: default
    roleRef:
      kind: Role
      name: spark-role
      apiGroup: rbac.authorization.k8s.io
    

    可以参考 https://github.com/GoogleCloudPlatform/spark-on-k8s-operator/blob/master/manifest/spark-rbac.yaml

    执行命令

    sudo kubectl apply -f role.yaml
    

    查看配置

    sudo kubectl get role
    sudo kubectl get role spark-role -o yaml
    sudo kubectl get rolebinding
    sudo kubectl get rolebinding spark-role-binding -o yaml
    

    重新提交

    # 添加了 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
    bin/spark-submit 
        --master k8s://http://127.0.0.1:8001 
        --deploy-mode cluster 
        --name spark-test 
        --conf spark.executor.instances=3 
        --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark 
        --conf spark.kubernetes.container.image=spark-py:my_spark_2.4_hadoop_2.7 
        /opt/spark/examples/src/main/python/wordcount.py 
        /opt/spark/examples/src/main/python/wordcount.py
    

    没报权限错误了,但可能还会有其他错误

    20/07/09 06:32:23 INFO SparkContext: Successfully stopped SparkContext
    Traceback (most recent call last):
      File "/opt/spark/examples/src/main/python/wordcount.py", line 33, in <module>
        .appName("PythonWordCount")
      File "/opt/spark/python/lib/pyspark.zip/pyspark/sql/session.py", line 173, in getOrCreate
      File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 367, in getOrCreate
      File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 136, in __init__
      File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 198, in _do_init
      File "/opt/spark/python/lib/pyspark.zip/pyspark/context.py", line 306, in _initialize_context
      File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1525, in __call__
      File "/opt/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
    : org.apache.spark.SparkException: External scheduler cannot be instantiated
            at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2794)
            at org.apache.spark.SparkContext.<init>(SparkContext.scala:493)
            at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:58)
            at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
            at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
            at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
            at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
            at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
            at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
            at py4j.Gateway.invoke(Gateway.java:238)
            at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
            at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
            at py4j.GatewayConnection.run(GatewayConnection.java:238)
            at java.lang.Thread.run(Thread.java:748)
    Caused by: io.fabric8.kubernetes.client.KubernetesClientException: Operation: [get]  for kind: [Pod]  with name: [spark-test-1594276334218-driver]  in namespace: [default]  failed.
            at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:64)
            at io.fabric8.kubernetes.client.KubernetesClientException.launderThrowable(KubernetesClientException.java:72)
            at io.fabric8.kubernetes.client.dsl.base.BaseOperation.getMandatory(BaseOperation.java:237)
            at io.fabric8.kubernetes.client.dsl.base.BaseOperation.get(BaseOperation.java:170)
            at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$$anonfun$1.apply(ExecutorPodsAllocator.scala:57)
            at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator$$anonfun$1.apply(ExecutorPodsAllocator.scala:55)
            at scala.Option.map(Option.scala:146)
            at org.apache.spark.scheduler.cluster.k8s.ExecutorPodsAllocator.<init>(ExecutorPodsAllocator.scala:55)
            at org.apache.spark.scheduler.cluster.k8s.KubernetesClusterManager.createSchedulerBackend(KubernetesClusterManager.scala:89)
            at org.apache.spark.SparkContext$.org$apache$spark$SparkContext$$createTaskScheduler(SparkContext.scala:2788)
            ... 13 more
    Caused by: java.net.SocketException: Broken pipe (Write failed)
            at java.net.SocketOutputStream.socketWrite0(Native Method)
    

    这个 Broken pipe 应该是 Spark 使用的代码和 jar 包,跟 K8S 不兼容导致的
    尝试替换 spark 下面的 jar 目录下的 k8s 包

    https://repo1.maven.org/maven2/io/fabric8/kubernetes-client/4.4.2/kubernetes-client-4.4.2.jar
    https://gitee.com/everworking/kubernetes-client
    

    但还是有其他问题,应该是版本不兼容导致的
    查看 Spark 2.4.6 的 jars 目录可以看到使用的 K8S jar 包

    kubernetes-client-4.6.1.jar
    kubernetes-model-4.6.1.jar
    kubernetes-model-common-4.6.1.jar
    

    查看 Kubernetes Client 的说明
    https://github.com/fabric8io/kubernetes-client#compatibility-matrix
    可以看到 4.6.1 可以匹配的 Kubernetes 最高版本是 15(Spark 官网对 K8S 版本的兼容就没说清楚)
    而当前最新的 Minikube 默认安装的版本是 18

    删除并重新启动 15 版本的 Kubernetes 集群

    sudo minikube stop
    sudo minikube delete
    
    sudo rm -rf ~/.kube
    sudo rm -rf ~/.minikube
    
    sudo minikube start --driver=none 
                        --image-repository=registry.cn-hangzhou.aliyuncs.com/google_containers 
                        --kubernetes-version="v1.15.3"
    

    同时下载 15 版本的 kubectl

    curl -LO https://storage.googleapis.com/kubernetes-release/release/v1.15.3/bin/linux/amd64/kubectl
    

    查看版本确保都是 15 的

    sudo kubectl version --client
    

    同样的命令重新提交

    # 添加了 --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark
    bin/spark-submit 
        --master k8s://http://127.0.0.1:8001 
        --deploy-mode cluster 
        --name spark-test 
        --conf spark.executor.instances=3 
        --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark 
        --conf spark.kubernetes.container.image=spark-py:my_spark_2.4_hadoop_2.7 
        /opt/spark/examples/src/main/python/wordcount.py 
        /opt/spark/examples/src/main/python/wordcount.py
    

    这次成功了,可以看到 Driver 和 Executor 的 Pod 都启动了

    NAME                                   READY   STATUS    RESTARTS   AGE
    pythonwordcount-1595818025111-exec-1   1/1     Running   0          12s
    pythonwordcount-1595818025401-exec-2   1/1     Running   0          12s
    pythonwordcount-1595818025443-exec-3   0/1     Pending   0          12s
    spark-test-1595818015819-driver        1/1     Running   0          20s
    

    查看相应的 Service

    NAME                                  TYPE        CLUSTER-IP   EXTERNAL-IP   PORT(S)             AGE
    kubernetes                            ClusterIP   10.96.0.1    <none>        443/TCP             27m
    spark-test-1595818015819-driver-svc   ClusterIP   None         <none>        7078/TCP,7079/TCP   22s
    

    通过 docker 命令查看相应的容器

    CONTAINER ID        IMAGE                                                                     COMMAND                  CREATED             STATUS              PORTS               NAMES
    a55500cdd00f        9cdc285a4fbb                                                              "/opt/entrypoint.sh …"   9 seconds ago       Up 8 seconds                            k8s_executor_pythonwordcount-1595818025401-exec-2_default_0624eb2d-aeab-454e-bce5-15c38b46f970_0
    37ddc67f3527        9cdc285a4fbb                                                              "/opt/entrypoint.sh …"   9 seconds ago       Up 8 seconds                            k8s_executor_pythonwordcount-1595818025111-exec-1_default_0d8fa5ac-07dc-41ea-a7fb-a75d1f5dfdf9_0
    5d9d9c5517e4        registry.cn-hangzhou.aliyuncs.com/google_containers/pause:3.1             "/pause"                 10 seconds ago      Up 8 seconds                            k8s_POD_pythonwordcount-1595818025401-exec-2_default_0624eb2d-aeab-454e-bce5-15c38b46f970_0
    210ebc82c274        registry.cn-hangzhou.aliyuncs.com/google_containers/pause:3.1             "/pause"                 10 seconds ago      Up 8 seconds                            k8s_POD_pythonwordcount-1595818025111-exec-1_default_0d8fa5ac-07dc-41ea-a7fb-a75d1f5dfdf9_0
    400f155d78f2        9cdc285a4fbb                                                              "/opt/entrypoint.sh …"   15 seconds ago      Up 14 seconds                           k8s_spark-kubernetes-driver_spark-test-1595818015819-driver_default_3198bca3-fcb7-4a5c-8821-c5fe7ef02dfa_0
    d4c7f82d90de        registry.cn-hangzhou.aliyuncs.com/google_containers/pause:3.1             "/pause"                 16 seconds ago      Up 14 seconds                           k8s_POD_spark-test-1595818015819-driver_default_3198bca3-fcb7-4a5c-8821-c5fe7ef02dfa_0
    

    这里只有两个 Executor 容器,而提交的时候是指定 3 个
    上面的 Pod 也可以看到有一个处于 Pending 状态
    查看 Pending 的 Pod

    sudo kubectl describe pod pythonwordcount-1595818025443-exec-3
    

    返回很多信息,最后面可以看到

    Events:
      Type     Reason            Age                From               Message
      ----     ------            ----               ----               -------
      Warning  FailedScheduling  23s (x2 over 23s)  default-scheduler  0/1 nodes are available: 1 Insufficient cpu.
    

    所以 Pending 的原因是 CPU 不够
    不过这不影响 Job 的正常运行

    Spark Job 结束后 Executor 和 Driver 容器都会变成 Exit 状态
    但是 Executor 变成 Exit 一小段时间后就不见了,相应的 Pod 也被删除了
    而 Driver 一直都在,且相应的 pod 会变成 Completed 状态

    NAME                              READY   STATUS      RESTARTS   AGE
    spark-test-1595818015819-driver   0/1     Completed   0          32s
    

    如果出错了会是 Error 状态

    在容器里提交 Job

    定义 deployment,注意指定 serviceAccountName 使用前面创建的 spark role

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: spark-client
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: spark
          component: client
      template:
        metadata:
          labels:
            app: spark
            component: client
        spec:
          containers:
          - name: sparkclient
            image: spark-py:2.4.6
            workingDir: /opt/spark
            command: ["/bin/bash", "-c", "while true;do echo hello;sleep 6000;done"]
          serviceAccountName: spark
    

    部署

    sudo kubectl create -f client-deployment.yaml
    

    查看并登陆 pod

    sudo kubectl exec -t -i spark-client-6479b76776-l5bzw /bin/bash
    

    通过 env 命令可以看到容器里有定义 Kubernetes API Server 的地址

    KUBERNETES_SERVICE_HOST=10.96.0.1
    KUBERNETES_SERVICE_PORT_HTTPS=443
    

    实际上容器上还有相应的 token 和证书,可以用来访问 API Server

    TOKEN=$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)
    
    curl --cacert /var/run/secrets/kubernetes.io/serviceaccount/ca.crt 
         -H "Authorization: Bearer $TOKEN" 
         -s https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS}/api/v1/namespaces/default/pods
    

    但通过 spark-submit 提交 Job 报错了,说是没权限获取 configMap,看来要求的权限和在宿主机提交不一样
    改变 spark role 的配置,允许操作所有资源,然后重新执行 kubectl create

    - apiGroups: [""]
      resources: ["*"]
      verbs: ["*"]
    

    重新提交 Job,可以看到成功启动运行了

    # 第二个 wordcount.py 是作为参数用
    bin/spark-submit 
        --master k8s://https://${KUBERNETES_SERVICE_HOST}:${KUBERNETES_SERVICE_PORT_HTTPS} 
        --deploy-mode cluster 
        --name spark-test 
        --conf spark.executor.instances=3 
        --conf spark.kubernetes.authenticate.driver.serviceAccountName=spark 
        --conf spark.kubernetes.container.image=spark-py:2.4.6 
        /opt/spark/examples/src/main/python/wordcount.py 
        /opt/spark/examples/src/main/python/wordcount.py
    

     转自:https://www.cnblogs.com/moonlight-lin/p/13296909.html

  • 相关阅读:
    BSGS模板(互质与不互质) 洛谷P4195、P3846
    整除分块(数论分块)
    洛谷P3327 [SDOI2015]约数个数和——莫比乌斯反演
    模块二
    模块
    二分法 匿名函数
    3.26作业
    函数的递归调用
    yield表达式 三元表达式
    3.25作业
  • 原文地址:https://www.cnblogs.com/javalinux/p/15068926.html
Copyright © 2011-2022 走看看