zoukankan      html  css  js  c++  java
  • Dolphin Scheduler增加Spark任务实例停止功能

    api模块直接对server模块进行依赖是个很糟糕的做法,以下内容,仅供娱乐。

    ------------

    DS任务实例面板的操作项仅提供了日志查看功能,当要关停Spark类型的任务实例时,如果YARN和worker不在同一台服务器上,停止工作流并不能同时关停任务实例,因此就想到专门针对Spark类型的任务增加一个任务停止功能。

    基本效果如下:

    前端

    首先在src/js/conf/home/pages/projects/pages/taskInstance/_source/list.vue添加停止按钮

    <td>
        <x-button
            v-show="item.taskType==='SPARK'"
            :disabled="item.state !== 'RUNNING_EXEUTION'"
            type="error"
            shape="circle"
            size="xsmall"
            data-toggle="tooltip"
            :title="$t('Stop')"
            icon="ans-icon-stop"
            @click="_stopTask(item)">
        </x-button>
    </td>

    其中_stopTask方法实现如下

    _stopTask(item) {
            console.log(item)
    
            let prom = new Promise((resolve, reject) => {
              io.post(`projects/${projectName}/task-instance/kill`, {  // 发送任务停止请求
                host: item.host,
                taskInstanceId: item.id
              }, res => {
                resolve(res)
              }).catch(e => {
                reject(e)
              })
            })
            prom.then(() => {
              console.log("killed task, will update task list")
              new Promise((resolve, reject) => {
                io.get(`projects/${projectName}/task-instance/list-paging`, this.searchParams, res => {  // 刷新任务列表,杀死任务之后任务状态没那么快变为FAILED,立即刷新任务状态也不一定显示为停止,多刷新一次一般就好了,废话。。
                  resolve(res.data)
                }).catch(e => {
                  reject(e)
                })
              }).then(res => {
                console.log(res)
                this.taskInstanceList = res.totalList
              }, err => {
                console.log("get task list failed " + err)
              })
            })
          }

    完整代码如下

    /*
    * Licensed to the Apache Software Foundation (ASF) under one or more
    * contributor license agreements.  See the NOTICE file distributed with
    * this work for additional information regarding copyright ownership.
    * The ASF licenses this file to You under the Apache License, Version 2.0
    * (the "License"); you may not use this file except in compliance with
    * the License.  You may obtain a copy of the License at
    *
    *    http://www.apache.org/licenses/LICENSE-2.0
    *
    * Unless required by applicable law or agreed to in writing, software
    * distributed under the License is distributed on an "AS IS" BASIS,
    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    * See the License for the specific language governing permissions and
    * limitations under the License.
    */
    <template>
      <div class="list-model">
        <div class="table-box">
          <table class="fixed">
            <tr>
              <th scope="col">
                <span>{{$t('#')}}</span>
              </th>
              <th scope="col">
                <span>{{$t('Name')}}</span>
              </th>
              <th scope="col">
                <span>{{$t('Process Instance')}}</span>
              </th>
              <th scope="col" width="70">
                <span>{{$t('Executor')}}</span>
              </th>
              <th scope="col" width="90">
                <span>{{$t('Node Type')}}</span>
              </th>
              <th scope="col" width="40">
                <span>{{$t('State')}}</span>
              </th>
              <th scope="col" width="140">
                <span>{{$t('Submit Time')}}</span>
              </th>
              <th scope="col" width="140">
                <span>{{$t('Start Time')}}</span>
              </th>
              <th scope="col" width="140">
                <span>{{$t('End Time')}}</span>
              </th>
              <th scope="col" width="110">
                <span>{{$t('host')}}</span>
              </th>
              <th scope="col" width="74">
                <span>{{$t('Duration')}}(s)</span>
              </th>
              <th scope="col" width="84">
                <span>{{$t('Retry Count')}}</span>
              </th>
              <th scope="col" width="50">
                <span>{{$t('Operation')}}</span>
              </th>
            </tr>
            <tr v-for="(item, $index) in list" :key="item.id">
              <td>
                <span>{{parseInt(pageNo === 1 ? ($index + 1) : (($index + 1) + (pageSize * (pageNo - 1))))}}</span>
              </td>
              <td>
                <span class="ellipsis" :title="item.name">{{item.name}}</span>
              </td>
              <td><a href="javascript:" class="links" @click="_go(item)"><span
                class="ellipsis">{{item.processInstanceName}}</span></a></td>
              <td>
                <span v-if="item.executorName">{{item.executorName}}</span>
                <span v-else>-</span>
              </td>
              <td><span>{{item.taskType}}</span></td>
              <td><span v-html="_rtState(item.state)" style="cursor: pointer;"></span></td>
              <td>
                <span v-if="item.submitTime">{{item.submitTime | formatDate}}</span>
                <span v-else>-</span>
              </td>
              <td>
                <span v-if="item.startTime">{{item.startTime | formatDate}}</span>
                <span v-else>-</span>
              </td>
              <td>
                <span v-if="item.endTime">{{item.endTime | formatDate}}</span>
                <span v-else>-</span>
              </td>
              <td><span>{{item.host || '-'}}</span></td>
              <td><span>{{item.duration}}</span></td>
              <td><span>{{item.retryTimes}}</span></td>
              <td>
                <x-button
                  type="info"
                  shape="circle"
                  size="xsmall"
                  data-toggle="tooltip"
                  :title="$t('View log')"
                  icon="ans-icon-log"
                  @click="_refreshLog(item)">
                </x-button>
              </td>
              <td>
                <x-button
                  v-show="item.taskType==='SPARK'"
                  :disabled="item.state !== 'RUNNING_EXEUTION'"
                  type="error"
                  shape="circle"
                  size="xsmall"
                  data-toggle="tooltip"
                  :title="$t('Stop')"
                  icon="ans-icon-stop"
                  @click="_stopTask(item)">
                </x-button>
              </td>
            </tr>
          </table>
        </div>
      </div>
    </template>
    <script>
      import Permissions from '@/module/permissions'
      import mLog from '@/conf/home/pages/dag/_source/formModel/log'
      import {tasksState} from '@/conf/home/pages/dag/_source/config'
      import io from '@/module/io'
      import localStore from '@/module/util/localStorage'
    
      // Get the name of the item currently clicked
      let projectName = localStore.getItem('projectName')
    
      export default {
        name: 'list',
        data() {
          return {
            list: [],
            isAuth: Permissions.getAuth(),
            backfillItem: {},
            searchParams: {
              // page size
              pageSize: 10,
              // page index
              pageNo: 1,
              // Query name
              searchVal: '',
              // Process instance id
              processInstanceId: '',
              // host
              host: '',
              // state
              stateType: '',
              // start date
              startDate: '',
              // end date
              endDate: '',
              // Exectuor Name
              executorName: ''
            }
          }
        },
        props: {
          taskInstanceList: Array,
          pageNo: Number,
          pageSize: Number
        },
        methods: {
          _rtState(code) {
            let o = tasksState[code]
            return `<em class="${o.icoUnicode} ${o.isSpin ? 'as as-spin' : ''}" style="color:${o.color}" data-toggle="tooltip" data-container="body" title="${o.desc}"></em>`
          },
          _stopTask(item) {
            console.log(item)
    
            let prom = new Promise((resolve, reject) => {
              console.log(`projects/${projectName}/task-instance/kill`)
              io.post(`projects/${projectName}/task-instance/kill`, {
                host: item.host,
                taskInstanceId: item.id
              }, res => {
                resolve(res)
              }).catch(e => {
                reject(e)
              })
            })
            prom.then(() => {
              console.log("killed task, will update task list")
              new Promise((resolve, reject) => {
                io.get(`projects/${projectName}/task-instance/list-paging`, this.searchParams, res => {
                  resolve(res.data)
                }).catch(e => {
                  reject(e)
                })
              }).then(res => {
                console.log(res)
                this.taskInstanceList = res.totalList
              }, err => {
                console.log("get task list failed " + err)
              })
            })
          },
          _refreshLog(item) {
            let self = this
            let instance = this.$modal.dialog({
              closable: false,
              showMask: true,
              escClose: true,
              className: 'v-modal-custom',
              transitionName: 'opacityp',
              render(h) {
                return h(mLog, {
                  on: {
                    ok() {
                    },
                    close() {
                      instance.remove()
                    }
                  },
                  props: {
                    self: self,
                    source: 'list',
                    logId: item.id
                  }
                })
              }
            })
          },
          _go(item) {
            this.$router.push({path: `/projects/instance/list/${item.processInstanceId}`})
          },
        },
        watch: {
          taskInstanceList(a) {
            this.list = []
            setTimeout(() => {
              this.list = a
            })
          }
        },
        created() {
        },
        mounted() {
          this.list = this.taskInstanceList
        },
        components: {}
      }
    </script>
    View Code

     后端

    主要是修改api模块的代码。

    dolphinscheduler-apisrcmainjavaorgapachedolphinschedulerapicontrollerTaskInstanceController.java中添加任务停止接口

    @PostMapping("/kill")
    @ResponseStatus(HttpStatus.OK)
    public Result kill(@RequestParam("host") String host, @RequestParam("taskInstanceId") Integer taskId) {
        logger.info("start kill task");
        Map<String, Object> result = taskInstanceService.killTask(host, taskId);
        Status status = (Status)result.get(Constants.STATUS);
        if (status == Status.SUCCESS) {
            return success(status.getMsg());
        } else {
            Integer code = status.getCode();
            String msg = (String)result.get(Constants.MSG);
            return error(code, msg);
        }
    }
    killTask方法在dolphinscheduler-apisrcmainjavaorgapachedolphinschedulerapiserviceTaskInstanceService.java中实现
    public Map<String, Object> killTask(String host, Integer taskInstanceId) {
            Map<String, Object> result = new HashMap<>(5);
            TaskInstance taskInstance = processDao.getTaskInstanceDetailByTaskId(taskInstanceId);
            if (taskInstance == null) {
                logger.error("cannot find the task to kill:" + taskInstanceId);
                result.put(Constants.STATUS, Status.TASK_INSTANCE_NOT_FOUND);
                return result;
            }
    
            if (!taskInstance.getTaskType().toLowerCase().equals(TaskType.SPARK.getDescp())) {
                logger.error("cannot kill the task type: " + taskInstance.getTaskType());
                result.put(Constants.STATUS, Status.REQUEST_PARAMS_NOT_VALID_ERROR);
                result.put(Constants.MSG, "not support kill the task type " + taskInstance.getTaskType());
                return result;
            }
    
            try {
                ProcessUtils.killSparkTask(taskInstance);
            } catch (Exception e) {
                logger.error("kill spark task failed : " + e.getMessage(), e);
                result.put(Constants.STATUS, Status.TASK_KILL_ERROR);
                result.put(Constants.MSG, "kill spark task failed");
                return result;
            }
    
            result.put(Constants.STATUS, Status.SUCCESS);
            result.put(Constants.MSG, "kill task success");
            return result;
        }
    Status.TASK_KILL_ERROR是新加的,在原来的代码中没有。

    ProcessUtils.killSparkTask的实现如下
    public static void killSparkTask(TaskInstance taskInstance) {
            try {
               
                String state = "{"state": "KILLED"}";
                String appid = ProcessUtils.getYarnAppId(taskInstance);
                String appAddress = HadoopUtils.getAppAddress();
    
                String url = String.format(appAddress, appid)+"/state";
                logger.info("kill yarn task request url is " + url);
    
                String response = HttpUtils.put(url, state);
                logger.info("kill yarn task response: " + response);
            } catch (Exception e) {
                throw e;
            }
        }

    其中ProcessUtils.getYarnAppId(taskInstance);是从已有代码中抽取出来的

    private static String getYarnAppId(TaskInstance taskInstance) {
            String appId = null;
            try {
                Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                int port = PropertyUtils.getInt(Constants.LOGGER_SERVER_RPC_PORT);
                LogClient logClient = new LogClient(taskInstance.getHost(), port);
    
                String log = logClient.viewLog(taskInstance.getLogPath());
                if (StringUtils.isNotEmpty(log)) {
                    List<String> appIds = LoggerUtils.getAppIds(log, logger);
                    String workerDir = taskInstance.getExecutePath();
                    if (StringUtils.isEmpty(workerDir)) {
                        logger.error("task instance work dir is empty");
                        throw new RuntimeException("task instance work dir is empty");
                    }
                    if (appIds.size() > 0) {
                        appId = appIds.get(appIds.size() - 1);
                    }
                }
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
    
            return appId;
        }
    HttpUtils.put(url, state);的实现代码如下
    public static String put(String url, String json) {
            // create HttpClient
            CloseableHttpClient httpclient = HttpClients.createDefault();
            CloseableHttpResponse response = null;
            String responseContent = null;
    
            try {
                // create http post request
                HttpPut httpPut = new HttpPut(url);
                httpPut.setHeader("Content-Type", "application/json;charset=UTF-8");
    
                StringEntity se = new StringEntity(json);
                se.setContentType("text/json");
    
                httpPut.setEntity(se);
    
                // execute
                response = httpclient.execute(httpPut);
            } catch (Exception e) {
               logger.error(e.getMessage(), e);
            } finally {
                try {
                    responseContent = EntityUtils.toString(response.getEntity(), "UTF-8");
                    if (response != null) {
                        response.close();
                    }
                    httpclient.close();
                } catch (Exception ex) {
                    logger.error(ex.getMessage(), ex);
                }
            }
    
            return responseContent;
        }
    
    
    总体来说,实现起来不难,基本都是借用已有代码,就是需要前期理清原代码的思路。

  • 相关阅读:
    docker生产——容器通信
    .net core集成JWT(基础)
    JWT基本概念
    MySQL数据更新
    MySQL查询练习2
    C语言程序设计之字符串处理
    MySQL查询练习
    博客文章搬迁
    C语言程序设计之 数组2020-10-28
    Java方法重载浅谈
  • 原文地址:https://www.cnblogs.com/144823836yj/p/12566995.html
Copyright © 2011-2022 走看看