zoukankan      html  css  js  c++  java
  • volcano测试用例实验笔记(一)-flink

    在CNCF:community bridge#1285Reading Material Update And Supplement的议题中,我们需要提供volcano支持北向框架的测试用例,这篇笔记主要用来记录实验环境的搭建和实验过程中踩的坑。

    CCE环境部署

    1. 部署k8s(注意要分配公网ip)

    2. 安装插件volcano(huaweicloud的k8s v1.17暂时不支持volcano)

    3. 安装配置kubectl(针对华为云实验环境)

      (1)下载kubectl、kubtctl配置文件。kubectl是k8s发布的软件包下的kubernetes/server/bin的可执行文件kubectl

      (2)进入客户端机器(集群的工作节点),执行如下代码

      cd /home
      chmod +x kubectl
      mv -f kubectl /usr/local/bin
      mkdir -p $HOME/.kube
      mv -f kubeconfig.json $HOME/.kube/config
      
    4. 部署北向框架(kubeflow,spark,MPI,GAS)

    5. 部署用例

    6. schedulerName= volcano

    7. 提交作业

    检查job的状态

    kubectl get vcjob job-1 -oyaml

    关键字段schedulerName = volcano

    Flink简介

    Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。Flink以数据并行和流水线方式执行任意流数据程序,Flink的流水线运行时系统可以执行批处理和流处理程序。此外,Flink的运行时本身也支持迭代算法的执行。

    前提条件

    需要已经部署创建好CCE集群,集群下至少有一个可用节点,集群内节点已经绑定了弹性公网IP、kubectl命令行工具。

    部署流程

    参考:https://ci.apache.org/projects/flink/flink-docs-release-1.12/try-flink/local_installation.html

    1.Download

    为了运行Flink,需要java8或11的环境,使用如下的指令确定java的版本。

    java -version
    

    下载软件包并且进入目录下。

    $ wget https://www.apache.org/dyn/closer.lua/flink/flink-1.12.2/flink-1.12.2-src.tgz
    $ cd flink-1.12.2
    
    2.Start a Cluster

    运行脚本完成flink在集群上的部署。

    $ ./bin/start-cluster.sh
    
    3.Submit a job

    随后可以使用如下的指令提交作业。

    $ ./bin/flink run examples/streaming/WordCount.jar
    $ tail log/flink-*-taskexecutor-*.out
    

    参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/standalone/kubernetes.html

    1.部署组件

    Flink cluster的部署需要创建两个deploy、一个service和一个configmap。调度策略采用volcano。

    flink-configuration-configmap.yaml内容如下

    apiVersion: v1
    kind: ConfigMap
    metadata:
      name: flink-config
      labels:
        app: flink
    data:
      flink-conf.yaml: |+
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 2
        blob.server.port: 6124
        jobmanager.rpc.port: 6123
        taskmanager.rpc.port: 6122
        queryable-state.proxy.ports: 6125
        jobmanager.memory.process.size: 1600m
        taskmanager.memory.process.size: 1728m
        parallelism.default: 2
      log4j-console.properties: |+
        # This affects logging for both user code and Flink
        rootLogger.level = INFO
        rootLogger.appenderRef.console.ref = ConsoleAppender
        rootLogger.appenderRef.rolling.ref = RollingFileAppender
    
        # Uncomment this if you want to _only_ change Flink's logging
        #logger.flink.name = org.apache.flink
        #logger.flink.level = INFO
    
        # The following lines keep the log level of common libraries/connectors on
        # log level INFO. The root logger does not override this. You have to manually
        # change the log levels here.
        logger.akka.name = akka
        logger.akka.level = INFO
        logger.kafka.name= org.apache.kafka
        logger.kafka.level = INFO
        logger.hadoop.name = org.apache.hadoop
        logger.hadoop.level = INFO
        logger.zookeeper.name = org.apache.zookeeper
        logger.zookeeper.level = INFO
    
        # Log all infos to the console
        appender.console.name = ConsoleAppender
        appender.console.type = CONSOLE
        appender.console.layout.type = PatternLayout
        appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    
        # Log all infos in the given rolling file
        appender.rolling.name = RollingFileAppender
        appender.rolling.type = RollingFile
        appender.rolling.append = false
        appender.rolling.fileName = ${sys:log.file}
        appender.rolling.filePattern = ${sys:log.file}.%i
        appender.rolling.layout.type = PatternLayout
        appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
        appender.rolling.policies.type = Policies
        appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
        appender.rolling.policies.size.size=100MB
        appender.rolling.strategy.type = DefaultRolloverStrategy
        appender.rolling.strategy.max = 10
    
        # Suppress the irrelevant (wrong) warnings from the Netty channel handler
        logger.netty.name = org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline
        logger.netty.level = OFF
    

    service用来提供JobManager的REST和UI端口的服务,jobmanager-service.yaml内容如下

    apiVersion: v1
    kind: Service
    metadata:
      name: flink-jobmanager
    spec:
      type: ClusterIP
      ports:
      - name: rpc
        port: 6123
      - name: blob-server
        port: 6124
      - name: webui
        port: 8081
      selector:
        app: flink
        component: jobmanager
    

    jobmanager-session-deployment.yaml内容如下

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: flink-jobmanager
    spec:
      replicas: 1
      selector:
        matchLabels:
          app: flink
          component: jobmanager
      template:
        metadata:
          labels:
            app: flink
            component: jobmanager
        spec:
          containers:
          - name: jobmanager
            image: flink:1.11.0-scala_2.11
            args: ["jobmanager"]
            ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
            livenessProbe:
              tcpSocket:
                port: 6123
              initialDelaySeconds: 30
              periodSeconds: 60
            volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf
            securityContext:
              runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
          volumes:
          - name: flink-config-volume
            configMap:
              name: flink-config
              items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
    

    taskmanager-session-deployment.yaml内容如下

    apiVersion: apps/v1
    kind: Deployment
    metadata:
      name: flink-taskmanager
    spec:
      replicas: 2
      selector:
        matchLabels:
          app: flink
          component: taskmanager
      template:
        metadata:
          labels:
            app: flink
            component: taskmanager
        spec:
          containers:
          - name: taskmanager
            image: flink:1.11.0-scala_2.11
            args: ["taskmanager"]
            ports:
            - containerPort: 6122
              name: rpc
            - containerPort: 6125
              name: query-state
            livenessProbe:
              tcpSocket:
                port: 6122
              initialDelaySeconds: 30
              periodSeconds: 60
            volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/flink/conf/
            securityContext:
              runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
          volumes:
          - name: flink-config-volume
            configMap:
              name: flink-config
              items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
    

    在集群节点创建好上面四个yaml配置文件,使用如下指令进行部署。

    kubectl create -f flink-configuration-configmap.yamlkubectl create -f jobmanager-service.yamlkubectl create -f jobmanager-session-deployment.yamlkubectl create -f taskmanager-session-deployment.yaml
    

    创建成功后查询:

    kubectl get cm| grep flinkkubectl get svc | grep flinkkubectl get pod -owide | grep Flink
    
    2.对外发布服务

    参考链接:https://support.huaweicloud.com/bestpractice-cce/cce_bestpractice_0121.html

    创建好flink负载之后,需要像外部发布服务。

    • 若使用华为云CCE进行测试,进入CCE的"工作负载-无状态负载"页面。选择flink-jobmanager,单击"访问方式"。
    • 点击“添加service”,选择节点访问,输入容器端口位8081。
    • 点击CCE中的网络管理,能够看到刚才我们添加的service,访问对外发布的链接。
    • 进入flink的Dashboard页面,点击submit new job提交任务。这里可以选择提交官方提供的WordCount样例。所在目录为flink-1.12.2/examples/streaming/WordCount.jar

    参考文档:

    1. https://bbs.huaweicloud.com/blogs/104368
    2. https://support.huaweicloud.com/bestpractice-cce/cce_bestpractice_0075.html
    3. https://support.huaweicloud.com/bestpractice-cce/cce_bestpractice_0119.html
    4. https://support.huaweicloud.com/bestpractice-cce/cce_bestpractice_0121.html
    5. https://support.huaweicloud.com/bestpractice-cce/cce_bestpractice_0131.html
  • 相关阅读:
    mysql备份数据及导入数据
    django.db.migrations.graph.NodeNotFoundError: Migration travel_list.0001_initial dependencies reference nonexistent parent node ('employer', '0001_initial')问题
    JAVA 两个对象不同为什么他们的hashcode有可能相同
    Java序列化的作用和反序列化
    Java语言中的值传递与引用传递
    Java中关键字this、super的含义及使用
    java redis client jedis 测试及常用命令
    使用AJAX报406错误
    Mybatis中,当插入数据后,返回最新主键id的几种方法,及具体用法
    连接zookeeper+solr的服务器超时
  • 原文地址:https://www.cnblogs.com/rhythmic/p/14614754.html
Copyright © 2011-2022 走看看