zoukankan      html  css  js  c++  java
  • springboot集成调用Azkaban

    springboot集成调用Azkaban

     

    一、 说明

      1.Azkaban是由Linkedin公司推出的一个批量工作流任务调度器,主要用于在一个工作流内以一个特定的顺序运行一组工作和流程,它的配置是通过简单的key:value对的方式,通过配置中的dependencies 来设置依赖关系,这个依赖关系必须是无环的,否则会被视为无效的工作流。Azkaban使用job配置文件建立任务之间的依赖关系,并提供一个易于使用的web用户界面维护和跟踪你的工作流。

            2.springboot版本:2.0.5  azkaban版本:3.59.0

    二、maven依赖

            <dependency>
                <groupId>com.google.code.gson</groupId>
                <artifactId>gson</artifactId>
                <version>2.8.5</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-pool2</artifactId>
                <version>2.5.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpcore</artifactId>
                <version>4.4.9</version>
            </dependency>
            <dependency>
                <groupId>joda-time</groupId>
                <artifactId>joda-time</artifactId>
                <version>2.9.7</version>
            </dependency>
    View Code

    三、代码

      1.azkaban配置文件(注意需要在启动类@PropertySource标签中引入读取该配置文件)

    monitor.azkaban.username=azkaban
    monitor.azkaban.password=azkaban
    monitor.azkaban.url=http://192.168.11.12:8081
    monitor.azkaban.connectTimeout=60000
    monitor.azkaban.readTimeout=120000
    azkaban.properties

      2.azkaban配置实体类

    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.http.client.SimpleClientHttpRequestFactory;
    import org.springframework.web.client.RestTemplate;
    
    @Configuration
    public class AzkabanConfig {
        @Value("${monitor.azkaban.username}")
        private String azkUsername;
        @Value("${monitor.azkaban.password}")
        private String azkPassword;
        @Value("${monitor.azkaban.url}")
        private String azkUrl;
        @Value("${monitor.azkaban.connectTimeout}")
        private int connectTimeout;
        @Value("${monitor.azkaban.readTimeout}")
        private int readTimeout;
    
        @Bean
        public RestTemplate getRestTemplate() {
            SimpleClientHttpRequestFactory requestFactory = new SimpleClientHttpRequestFactory();
            requestFactory.setConnectTimeout(connectTimeout);
            requestFactory.setReadTimeout(readTimeout);
            RestTemplate restTemplate = new RestTemplate(requestFactory);
            return restTemplate;
        }
    
        public String getAzkUsername() {
            return azkUsername;
        }
    
        public void setAzkUsername(String azkUsername) {
            this.azkUsername = azkUsername;
        }
    
        public String getAzkPassword() {
            return azkPassword;
        }
    
        public void setAzkPassword(String azkPassword) {
            this.azkPassword = azkPassword;
        }
    
        public String getAzkUrl() {
            return azkUrl;
        }
    
        public void setAzkUrl(String azkUrl) {
            this.azkUrl = azkUrl;
        }
    
    }
    AzkabanConfig

      3.HttpClient配置SSL绕过https证书 

    import java.security.KeyManagementException;
    import java.security.NoSuchAlgorithmException;
    import java.security.cert.X509Certificate;
    
    import javax.net.ssl.HttpsURLConnection;
    import javax.net.ssl.SSLContext;
    import javax.net.ssl.TrustManager;
    import javax.net.ssl.X509TrustManager;
    
    public class SSLUtil {
    
        private static final String PROTOCOL = "SSL";
        private static final TrustManager[] UNQUESTIONING_TRUST_MANAGER = new TrustManager[] { new X509TrustManager() {
    
            public java.security.cert.X509Certificate[] getAcceptedIssuers() {
                return null;
            }
    
            public void checkClientTrusted(X509Certificate[] certs, String authType) {
            }
    
            public void checkServerTrusted(X509Certificate[] certs, String authType) {
            }
        } };
    
        private SSLUtil() {
        }
    
        public static void turnOffSslChecking() throws NoSuchAlgorithmException, KeyManagementException {
            final SSLContext sc = SSLContext.getInstance(PROTOCOL);
            sc.init(null, UNQUESTIONING_TRUST_MANAGER, null);
            HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
        }
    
        public static void turnOnSslChecking() throws KeyManagementException, NoSuchAlgorithmException {
            SSLContext.getInstance(PROTOCOL).init(null, null, null);
        }
    }
    SSLUtil

      4.常量类

    public interface SysContants {
        
            /**azkaban成功状态**/
        String AZK_SUCCESS = "success";
        
    }
    SysContants

      5.根据azkaban返回数据定制实体类(注释少抱歉)

    import java.util.List;
    
    public class ExecNode {
        private String nestedId;
        private List<String> in;
        private String status;
        private String id;
        private String type;
        private Long updateTime;
        private Long startTime;
        private Long endTime;
        private Long attempt;
    
        public String getNestedId() {
            return nestedId;
        }
    
        public void setNestedId(String nestedId) {
            this.nestedId = nestedId;
        }
    
        public List<String> getIn() {
            return in;
        }
    
        public void setIn(List<String> in) {
            this.in = in;
        }
    
        public String getStatus() {
            return status;
        }
    
        public void setStatus(String status) {
            this.status = status;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getType() {
            return type;
        }
    
        public void setType(String type) {
            this.type = type;
        }
    
        public Long getUpdateTime() {
            return updateTime;
        }
    
        public void setUpdateTime(Long updateTime) {
            this.updateTime = updateTime;
        }
    
        public Long getStartTime() {
            return startTime;
        }
    
        public void setStartTime(Long startTime) {
            this.startTime = startTime;
        }
    
        public Long getEndTime() {
            return endTime;
        }
    
        public void setEndTime(Long endTime) {
            this.endTime = endTime;
        }
    
        public Long getAttempt() {
            return attempt;
        }
    
        public void setAttempt(Long attempt) {
            this.attempt = attempt;
        }
    }
    ExecNode
    import java.util.List;
    
    public class ExecNodeBean {
        private String nestedId;
        private List<String> dependencies;
        private String status;
        private String jobId;
        private String type;
        private String updateTime;
        private String startTime;
        private String endTime;
        private Long attempt;
        private String logs;
        private Long elapsed;
    
        public String getNestedId() {
            return nestedId;
        }
    
        public void setNestedId(String nestedId) {
            this.nestedId = nestedId;
        }
    
        public List<String> getDependencies() {
            return dependencies;
        }
    
        public void setDependencies(List<String> dependencies) {
            this.dependencies = dependencies;
        }
    
        public String getStatus() {
            return status;
        }
    
        public void setStatus(String status) {
            this.status = status;
        }
    
        public String getJobId() {
            return jobId;
        }
    
        public void setJobId(String jobId) {
            this.jobId = jobId;
        }
    
        public String getType() {
            return type;
        }
    
        public void setType(String type) {
            this.type = type;
        }
    
        public String getUpdateTime() {
            return updateTime;
        }
    
        public void setUpdateTime(String updateTime) {
            this.updateTime = updateTime;
        }
    
        public String getStartTime() {
            return startTime;
        }
    
        public void setStartTime(String startTime) {
            this.startTime = startTime;
        }
    
        public String getEndTime() {
            return endTime;
        }
    
        public void setEndTime(String endTime) {
            this.endTime = endTime;
        }
    
        public Long getAttempt() {
            return attempt;
        }
    
        public void setAttempt(Long attempt) {
            this.attempt = attempt;
        }
    
        public String getLogs() {
            return logs;
        }
    
        public void setLogs(String logs) {
            this.logs = logs;
        }
    
        public Long getElapsed() {
            return elapsed;
        }
    
        public void setElapsed(Long elapsed) {
            this.elapsed = elapsed;
        }
        
    }
    ExecNodeBean
    public class Execution {
        private String submitUser;
        private String flowId;
        private String status;
        private Long submitTime;
        private Long startTime;
        private Long endTime;
        private Long projectId;
        private Long execId;
    
        public String getSubmitUser() {
            return submitUser;
        }
    
        public void setSubmitUser(String submitUser) {
            this.submitUser = submitUser;
        }
    
        public String getFlowId() {
            return flowId;
        }
    
        public void setFlowId(String flowId) {
            this.flowId = flowId;
        }
    
        public String getStatus() {
            return status;
        }
    
        public void setStatus(String status) {
            this.status = status;
        }
    
        public Long getSubmitTime() {
            return submitTime;
        }
    
        public void setSubmitTime(Long submitTime) {
            this.submitTime = submitTime;
        }
    
        public Long getStartTime() {
            return startTime;
        }
    
        public void setStartTime(Long startTime) {
            this.startTime = startTime;
        }
    
        public Long getEndTime() {
            return endTime;
        }
    
        public void setEndTime(Long endTime) {
            this.endTime = endTime;
        }
    
        public Long getProjectId() {
            return projectId;
        }
    
        public void setProjectId(Long projectId) {
            this.projectId = projectId;
        }
    
        public Long getExecId() {
            return execId;
        }
    
        public void setExecId(Long execId) {
            this.execId = execId;
        }
    Execution
    import java.util.List;
    
    public class ExecutionInfo {
        private String project;
        private String type;
        private Long updateTime;
        private Long attempt;
        private Long execid;
        private Long submitTime;
        private Long startTime;
        private Long endTime;
        private Long projectId;
        private String nestedId;
        private String submitUser;
        private String id;
        private String flowId;
        private String flow;
        private String status;
        private List<ExecNode> nodes;
    
        public String getProject() {
            return project;
        }
    
        public void setProject(String project) {
            this.project = project;
        }
    
        public String getType() {
            return type;
        }
    
        public void setType(String type) {
            this.type = type;
        }
    
        public Long getUpdateTime() {
            return updateTime;
        }
    
        public void setUpdateTime(Long updateTime) {
            this.updateTime = updateTime;
        }
    
        public Long getAttempt() {
            return attempt;
        }
    
        public void setAttempt(Long attempt) {
            this.attempt = attempt;
        }
    
        public Long getExecid() {
            return execid;
        }
    
        public void setExecid(Long execid) {
            this.execid = execid;
        }
    
        public Long getSubmitTime() {
            return submitTime;
        }
    
        public void setSubmitTime(Long submitTime) {
            this.submitTime = submitTime;
        }
    
        public Long getStartTime() {
            return startTime;
        }
    
        public void setStartTime(Long startTime) {
            this.startTime = startTime;
        }
    
        public Long getEndTime() {
            return endTime;
        }
    
        public void setEndTime(Long endTime) {
            this.endTime = endTime;
        }
    
        public Long getProjectId() {
            return projectId;
        }
    
        public void setProjectId(Long projectId) {
            this.projectId = projectId;
        }
    
        public String getNestedId() {
            return nestedId;
        }
    
        public void setNestedId(String nestedId) {
            this.nestedId = nestedId;
        }
    
        public String getSubmitUser() {
            return submitUser;
        }
    
        public void setSubmitUser(String submitUser) {
            this.submitUser = submitUser;
        }
    
        public String getId() {
            return id;
        }
    
        public void setId(String id) {
            this.id = id;
        }
    
        public String getFlowId() {
            return flowId;
        }
    
        public void setFlowId(String flowId) {
            this.flowId = flowId;
        }
    
        public String getFlow() {
            return flow;
        }
    
        public void setFlow(String flow) {
            this.flow = flow;
        }
    
        public String getStatus() {
            return status;
        }
    
        public void setStatus(String status) {
            this.status = status;
        }
    
        public List<ExecNode> getNodes() {
            return nodes;
        }
    
        public void setNodes(List<ExecNode> nodes) {
            this.nodes = nodes;
        }
    
    }
    ExecutionInfo
    import java.util.List;
    
    public class ExecutionInfoBean{
            
        private String project;
        private String type;
        private String updateTime;
        private Long attempt;
        private Long execid;
        private String submitTime;
        private String startTime;
        private String endTime;
        private Long projectId;
        private String nestedId;
        private String submitUser;
        private String jobId;
        private String flowId;
        private String flow;
        private String status;
        private String flowLog;
        private Long elapsed;
        private List<ExecNodeBean> nodes;
    
        public String getProject() {
            return project;
        }
    
        public void setProject(String project) {
            this.project = project;
        }
    
        public String getType() {
            return type;
        }
    
        public void setType(String type) {
            this.type = type;
        }
    
        public String getUpdateTime() {
            return updateTime;
        }
    
        public void setUpdateTime(String updateTime) {
            this.updateTime = updateTime;
        }
    
        public Long getAttempt() {
            return attempt;
        }
    
        public void setAttempt(Long attempt) {
            this.attempt = attempt;
        }
    
        public Long getExecid() {
            return execid;
        }
    
        public void setExecid(Long execid) {
            this.execid = execid;
        }
    
        public String getSubmitTime() {
            return submitTime;
        }
    
        public void setSubmitTime(String submitTime) {
            this.submitTime = submitTime;
        }
    
        public String getStartTime() {
            return startTime;
        }
    
        public void setStartTime(String startTime) {
            this.startTime = startTime;
        }
    
        public String getEndTime() {
            return endTime;
        }
    
        public void setEndTime(String endTime) {
            this.endTime = endTime;
        }
    
        public Long getProjectId() {
            return projectId;
        }
    
        public void setProjectId(Long projectId) {
            this.projectId = projectId;
        }
    
        public String getNestedId() {
            return nestedId;
        }
    
        public void setNestedId(String nestedId) {
            this.nestedId = nestedId;
        }
    
        public String getSubmitUser() {
            return submitUser;
        }
    
        public void setSubmitUser(String submitUser) {
            this.submitUser = submitUser;
        }
    
        public String getJobId() {
            return jobId;
        }
    
        public void setJobId(String jobId) {
            this.jobId = jobId;
        }
    
        public String getFlowId() {
            return flowId;
        }
    
        public void setFlowId(String flowId) {
            this.flowId = flowId;
        }
    
        public String getFlow() {
            return flow;
        }
    
        public void setFlow(String flow) {
            this.flow = flow;
        }
    
        public String getStatus() {
            return status;
        }
    
        public void setStatus(String status) {
            this.status = status;
        }
    
        public String getFlowLog() {
            return flowLog;
        }
    
        public void setFlowLog(String flowLog) {
            this.flowLog = flowLog;
        }
    
        public Long getElapsed() {
            return elapsed;
        }
    
        public void setElapsed(Long elapsed) {
            this.elapsed = elapsed;
        }
    
        public List<ExecNodeBean> getNodes() {
            return nodes;
        }
    
        public void setNodes(List<ExecNodeBean> nodes) {
            this.nodes = nodes;
        }
    
    }
    ExecutionInfoBean
    import java.util.List;
    
    public class FlowExecution {
        private String project;
        private String flow;
        private Long total;
        private Long length;
        private Long from;
        private Long projectId;
        private List<Execution> executions;
    
        public String getProject() {
            return project;
        }
    
        public void setProject(String project) {
            this.project = project;
        }
    
        public String getFlow() {
            return flow;
        }
    
        public void setFlow(String flow) {
            this.flow = flow;
        }
    
        public Long getTotal() {
            return total;
        }
    
        public void setTotal(Long total) {
            this.total = total;
        }
    
        public Long getLength() {
            return length;
        }
    
        public void setLength(Long length) {
            this.length = length;
        }
    
        public Long getFrom() {
            return from;
        }
    
        public void setFrom(Long from) {
            this.from = from;
        }
    
        public Long getProjectId() {
            return projectId;
        }
    
        public void setProjectId(Long projectId) {
            this.projectId = projectId;
        }
    
        public List<Execution> getExecutions() {
            return executions;
        }
    
        public void setExecutions(List<Execution> executions) {
            this.executions = executions;
        }
    
    }
    FlowExecution
    public class GovernTaskRecordBean extends PageEntity {
    
        private static final long serialVersionUID = 1L;
        private String createTime;
        private String status;
        private String owner;
        private String startTime;
        private String endTime;
        private String flowId;
        private Long projectId;
        private Long execId;
        private String projectPath;
        private Long elapsed;
    
        public String getCreateTime() {
            return createTime;
        }
    
        public void setCreateTime(String createTime) {
            this.createTime = createTime;
        }
    
        public String getStatus() {
            return status;
        }
    
        public void setStatus(String status) {
            this.status = status;
        }
    
        public String getOwner() {
            return owner;
        }
    
        public void setOwner(String owner) {
            this.owner = owner;
        }
    
        public String getStartTime() {
            return startTime;
        }
    
        public void setStartTime(String startTime) {
            this.startTime = startTime;
        }
    
        public String getEndTime() {
            return endTime;
        }
    
        public void setEndTime(String endTime) {
            this.endTime = endTime;
        }
    
        public String getFlowId() {
            return flowId;
        }
    
        public void setFlowId(String flowId) {
            this.flowId = flowId;
        }
    
        public Long getProjectId() {
            return projectId;
        }
    
        public void setProjectId(Long projectId) {
            this.projectId = projectId;
        }
    
        public Long getExecId() {
            return execId;
        }
    
        public void setExecId(Long execId) {
            this.execId = execId;
        }
    
        public String getProjectPath() {
            return projectPath;
        }
    
        public void setProjectPath(String projectPath) {
            this.projectPath = projectPath;
        }
    
        public Long getElapsed() {
            return elapsed;
        }
    
        public void setElapsed(Long elapsed) {
            this.elapsed = elapsed;
        }
    
    }
    GovernTaskRecordBean

      6.调度执行状态枚举类

    public enum ScheduleStatus {
    
        READY("READY","就绪"),SUCCEEDED("SUCCEEDED","成功"),KILLING("KILLING","停止中"),KILLED("KILLED","已中断"),FAILED("FAILED","失败"),
        SKIPPED("SKIPPED","跳过"),DISABLED("DISABLED","停用"),QUEUED("QUEUED","等待中"),CANCELLED("CANCELLED","取消执行"),
        RUNNING("RUNNING","运行中"),PAUSED("PAUSED","暂停");
        
    
        /**
         * 状态编码
         */
        private String code;
    
        /**
         * 状态描述
         */
        private String desc;
    
        public String getCode() {
            return code;
        }
    
        public void setCode(String code) {
            this.code = code;
        }
    
        public String getDesc() {
            return desc;
        }
    
        public void setDesc(String desc) {
            this.desc = desc;
        }
    
        ScheduleStatus(String code, String desc) {
            this.code = code;
            this.desc = desc;
        }
    
    }
    ScheduleStatus

      7.AzkabanService接口类

    import java.io.BufferedOutputStream;
    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.OutputStream;
    import java.net.HttpURLConnection;
    import java.net.URL;
    import java.util.ArrayList;
    import java.util.Calendar;
    import java.util.Date;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import org.apache.commons.collections4.CollectionUtils;
    import org.apache.commons.io.IOUtils;
    import org.apache.commons.lang3.StringUtils;
    import org.joda.time.DateTime;
    import org.joda.time.format.DateTimeFormat;
    import org.joda.time.format.DateTimeFormatter;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.core.io.FileSystemResource;
    import org.springframework.http.HttpEntity;
    import org.springframework.http.HttpHeaders;
    import org.springframework.http.HttpMethod;
    import org.springframework.http.ResponseEntity;
    import org.springframework.stereotype.Service;
    import org.springframework.util.LinkedMultiValueMap;
    import org.springframework.util.MultiValueMap;
    import org.springframework.web.client.RestTemplate;
    
    import com.google.common.collect.Lists;
    import com.google.gson.Gson;
    import com.google.gson.JsonElement;
    import com.google.gson.JsonObject;
    
    import org.apache.http.HttpStatus;
    
    /**
     * azkaban接口
     * @author hao
     *
     */
    @Service
    public class AzkabanService {
        
        private static final Logger logger = LoggerFactory.getLogger(AzkabanService.class);
        private static final String CONTENT_TYPE = "application/x-www-form-urlencoded; charset=utf-8";
        private static final String X_REQUESTED_WITH = "XMLHttpRequest";
        private static final DateTimeFormatter formatterTime = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss");
        
        @Autowired
        private RestTemplate restTemplate;
        
        @Autowired
        private AzkabanConfig azkabanConfig;
        
        /**
         * Azkaban登录接口,返回sessionId
         * @return
         * @throws Exception
         */
        public String login() throws Exception {
            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", CONTENT_TYPE);
            hs.add("X-Requested-With", X_REQUESTED_WITH);
            LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
            linkedMultiValueMap.add("action", "login");
            linkedMultiValueMap.add("username", azkabanConfig.getAzkUsername());
            linkedMultiValueMap.add("password", azkabanConfig.getAzkPassword());
    
            HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
            String result = restTemplate.postForObject(azkabanConfig.getAzkUrl(), httpEntity, String.class);
            if (!SysContants.AZK_SUCCESS.equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())) {
                logger.error("Azkaban登录失败!返回错误信息:"+result);
                throw new Exception("Azkaban登录失败!");
            }
            return new Gson().fromJson(result, JsonObject.class).get("session.id").getAsString();
        }
    
        /**
         * Azkaban上传zip文件
         * @param projectName 项目名称
         * @param file 文件
         * @return projectId编号
         * @throws Exception
         */
        public String uploadZip(String projectName, File file) throws Exception {
            SSLUtil.turnOffSslChecking();
            FileSystemResource resource = new FileSystemResource(file);
            LinkedMultiValueMap<String, Object> linkedMultiValueMap = new LinkedMultiValueMap<String, Object>();
            linkedMultiValueMap.add("session.id", login());
            linkedMultiValueMap.add("ajax", "upload");
            linkedMultiValueMap.add("project", projectName);
            linkedMultiValueMap.add("file", resource);
            String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/manager", linkedMultiValueMap,
                    String.class);
            if (StringUtils.isEmpty(new Gson().fromJson(result, JsonObject.class).get("projectId").getAsString())) {
                logger.error("上传文件至Azkaban失败:",projectName,file.getPath());
                logger.error("Azkaban上传文件失败!返回错误信息:"+result);
                return null;
            }
    
            return new Gson().fromJson(result, JsonObject.class).get("projectId").getAsString();
        }
        
        /**
         * Azkaban创建project
         * @param projectName,project名称
         * @param description,project描述
         * @return 是否成功
         * @throws Exception
         */
        public boolean createProject(String projectName, String description) throws Exception {
            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", CONTENT_TYPE);
            hs.add("X-Requested-With", X_REQUESTED_WITH);
            LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
            linkedMultiValueMap.add("session.id", login());
            linkedMultiValueMap.add("action", "create");
            linkedMultiValueMap.add("name", projectName);
            linkedMultiValueMap.add("description", description);
    
            HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
            logger.info("Azkaban请求信息:" + httpEntity.toString());
            String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/manager", httpEntity, String.class);
            logger.info("Azkaban返回创建Project信息:" + result);
            // 创建成功和已存在,都表示创建成功
            if (!SysContants.AZK_SUCCESS
                    .equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())) {
                if (!"Project already exists."
                        .equals(new Gson().fromJson(result, JsonObject.class).get("message").getAsString())) {
                    logger.error("创建Azkaban Project失败:",projectName);
                    logger.error("创建Azkaban Project失败!返回错误信息:"+result);
                    return false;
                }
            }
            return true;
        }
        
        /**
         * Azkaban删除project
         * @param projectName 项目名称
         * @throws Exception 
         */
        public void deleteProject(String projectName) throws Exception {
            SSLUtil.turnOffSslChecking();
    
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", CONTENT_TYPE);
            hs.add("X-Requested-With", X_REQUESTED_WITH);
            hs.add("Accept", "text/plain;charset=utf-8");
    
            Map<String, String> map = new HashMap<>();
    
            map.put("id", login());
            map.put("project", projectName);
    
            ResponseEntity<String> exchange = restTemplate.exchange(
                    azkabanConfig.getAzkUrl() + "/manager?session.id={id}&delete=true&project={project}", HttpMethod.GET,
                    new HttpEntity<String>(hs), String.class, map);
    
            if (HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                logger.error("删除Azkaban Project失败!返回错误信息:"+exchange);
                throw new Exception("删除Azkaban Project失败");
            }
        }
        
            /**
             * 获取一个项目的所有流flows
             * @param projectName 项目名称
             * @return List<String> 项目的所有流
             * @throws Exception
             */
            public List<String> fetchFlowsProject(String projectName) throws Exception {
            SSLUtil.turnOffSslChecking();
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", CONTENT_TYPE);
            hs.add("X-Requested-With", X_REQUESTED_WITH);
            hs.add("Accept", "text/plain;charset=utf-8");
    
            Map<String, String> map = new HashMap<>();
    
            map.put("id", login());
            map.put("project", projectName);
    
            ResponseEntity<String> exchange = restTemplate.exchange(
                    azkabanConfig.getAzkUrl() + "/manager?&session.id={id}&project={project}&ajax=fetchprojectflows", HttpMethod.GET,
                    new HttpEntity<String>(hs), String.class, map);
            if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                logger.error("Azkaban获取一个项目的所有流信息失败:" + projectName);
                logger.error("Azkaban获取一个项目的所有流信息失败:!返回错误信息:"+exchange);
                return null;
            }
            JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("flows");
            if (obj == null) {
                logger.error("Azkaban获取一个项目的所有流信息失败:{}:{}", projectName);
                return null;
            }
            List<String> flows = new ArrayList<String>();
            for(JsonElement jobj:obj.getAsJsonArray()) {
                flows.add(jobj.getAsJsonObject().get("flowId").getAsString());
            }
            return flows;
        }
        
            /**
             * 获取一个流的所有作业
             * @param projectName 项目名
             * @param flowId 流id
             * @return 
             * @throws Exception
             */
            public String fetchJobsFlow(String projectName, String flowId) throws Exception {
            SSLUtil.turnOffSslChecking();
    
            HttpHeaders hs = new HttpHeaders();
            hs.add("Content-Type", CONTENT_TYPE);
            hs.add("X-Requested-With", X_REQUESTED_WITH);
            hs.add("Accept", "text/plain;charset=utf-8");
    
            Map<String, String> map = new HashMap<>();
    
            map.put("id", login());
            map.put("project", projectName);
            map.put("flow", flowId);
    
            ResponseEntity<String> exchange = restTemplate.exchange(
                    azkabanConfig.getAzkUrl() + "/manager?&session.id={id}&project={project}&flow={flow}&ajax=fetchflowgraph", HttpMethod.GET,
                    new HttpEntity<String>(hs), String.class, map);
            if (exchange == null) {
                logger.error("Azkaban获取一个项目的所有流信息失败:" + projectName);
                return null;
            }
            logger.debug("Azkaban获取一个项目的所有流信息:" + exchange);
            if (HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                throw new Exception("Azkaban获取一个项目的所有流信息失败");
            }
            return exchange.toString();
        }
        
            /**
             * Flow 获取执行的project 列表
             * azkaban api 获取流的执行
             * @param projectName 项目名
             * @param flowId 流id
             * @param start
             * @param length
             * @return
             * @throws Exception
             */
             public FlowExecution fetchFlowExecutions(String projectName, String flowId, String start,String length) throws Exception {
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
                hs.add("X-Requested-With", "XMLHttpRequest");
                hs.add("Accept", "text/plain;charset=utf-8");
    
                Map<String, String> map = new HashMap<>();
    
                map.put("id", login());
                map.put("project", projectName);
                map.put("flow", flowId);
                map.put("start", String.valueOf(start));
                map.put("length", String.valueOf(length));
    
                ResponseEntity<String> exchange = restTemplate.exchange(
                        azkabanConfig.getAzkUrl() + "/manager?session.id={id}&ajax=fetchFlowExecutions&project={project}&flow={flow}&start={start}&length={length}",
                        HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
                if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                    logger.error("Azkaban获取一个项目运行记录信息失败:{}:{}", projectName, flowId);
                    return null;
                }
                return new Gson().fromJson(exchange.getBody(), FlowExecution.class);
            }
         
         /**
          * Flow 获取正在执行的流id
          * @param projectName
          * @param flowId
          * @return
          * @throws Exception
          */
             public String getRunning(String projectName, String flowId) throws Exception {
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
                hs.add("X-Requested-With", "XMLHttpRequest");
                hs.add("Accept", "text/plain;charset=utf-8");
    
                Map<String, String> map = new HashMap<>();
                map.put("id", login());
                map.put("project", projectName);
                map.put("flow", flowId);
                ResponseEntity<String> exchange = restTemplate.exchange(
                        azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=getRunning&project={project}&flow={flow}", HttpMethod.GET,
                        new HttpEntity<String>(hs), String.class, map);
               return exchange.getBody();
            }
             
             /**
             * Execute a Flow 执行一个流 还有很多其他参数 具体参考azkabanConfig.getAzkUrl()
             * 
             * @throws KeyManagementException
             * @throws NoSuchAlgorithmException
             */
            public String executeFlow(String projectName, String flowId) throws Exception {
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
                hs.add("X-Requested-With", "XMLHttpRequest");
                hs.add("Accept", "text/plain;charset=utf-8");
    
                Map<String, String> map = new HashMap<>();
                map.put("id", login());
                map.put("project", projectName);
                map.put("flow", flowId);
                ResponseEntity<String> exchange = restTemplate.exchange(
                        azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=executeFlow&project={project}&flow={flow}", HttpMethod.GET,
                        new HttpEntity<String>(hs), String.class, map);
                if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                    logger.error("执行一个流请求失败:{}:{}", projectName, flowId);
                    return null;
                }
                JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("execid");
                if (obj == null) {
                    logger.error("执行一个流失败:{}:{}", projectName, flowId);
                    return null;
                }
                return obj.getAsString();
            }
            
            /**
             * Cancel a Flow Execution 取消流程执行
             * azkaban api 取消流程执行
             * @throws KeyManagementException
             * @throws NoSuchAlgorithmException
             */
            public void cancelEXEaFlow(String projectName,String start,String size) throws Exception {
                int flag=0;
                List<String> flows = fetchFlowsProject(projectName);//获取所有流
                for (String flow : flows) {
                    FlowExecution fe = fetchFlowExecutions(projectName, flow, start, size);
                    if (fe == null) {
                        continue;
                    }
                    List<Execution> executions = fe.getExecutions();//获取执行id
                    for (Execution execution : executions) {
                        if(null!=execution&&null!=execution.getExecId()&&"RUNNING".equals(execution.getStatus())){//运行中的
                            SSLUtil.turnOffSslChecking();
                            HttpHeaders hs = new HttpHeaders();
                            hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
                            hs.add("X-Requested-With", "XMLHttpRequest");
                            hs.add("Accept", "text/plain;charset=utf-8");
    
                            Map<String, String> map = new HashMap<>();
                            map.put("id", login());
                            map.put("execid", String.valueOf(execution.getExecId()));
                            ResponseEntity<String> exchange = restTemplate.exchange(
                                    azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=cancelFlow&execid={execid}", HttpMethod.GET,
                                    new HttpEntity<String>(hs), String.class, map);
                            System.out.println(exchange.getBody());
                            if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                                logger.error("取消执行调度失败,请求azkaban接口异常:"+exchange);
                                throw new Exception("取消执行调度失败,请求azkaban接口异常:"+exchange);
                            }
                            JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("error");
                            if (obj != null) {
                                throw new Exception("取消执行调度失败,请刷新列表获取最新调度状态!");
                            }
                            flag++;
                        }
                    }
                }
                if(0==flag){
                    throw new Exception("该调度不是运行中状态,请刷新列表获取最新状态!");
                }
            }
            
            /**
             * 根据时间 创建调度任务
             * 
             * @param projectId
             * @param projectName
             * @param flow
             * @param flowName
             * @param recurring,是否循环,on循环
             * @param period,循环频率:M:Months,w:Weeks,d:Days,h:Hours,m:Minutes,s:Seconds;如60s,支持分钟的倍数
             * @param date,开始时间
             * @return 返回scheduleId
             * @throws Exception
             */
            public String scheduleEXEaFlow(String projectId, String projectName, String flow, String flowName, String recurring,
                    String period, Date date) throws Exception {
    
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", CONTENT_TYPE);
                hs.add("X-Requested-With", X_REQUESTED_WITH);
                LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
                linkedMultiValueMap.add("session.id", login());
                linkedMultiValueMap.add("ajax", "scheduleFlow");
                linkedMultiValueMap.add("projectName", projectName);
                linkedMultiValueMap.add("projectId", projectId);
                linkedMultiValueMap.add("flow", flow);
                linkedMultiValueMap.add("flowName", flowName);
                linkedMultiValueMap.add("is_recurring", recurring);
                linkedMultiValueMap.add("period", period);
                scheduleTimeInit(linkedMultiValueMap, date);
    
                HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
                String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class);
    
                logger.info("Azkaban返回根据时间创建定时任务信息:" + result);
    
                if (!SysContants.AZK_SUCCESS.equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())
                        || new Gson().fromJson(result, JsonObject.class).get("scheduleId").getAsInt() < 0) {
                    logger.error("Azkaban返回根据时间创建定时任务信息失败:!返回错误信息:"+result);
                    throw new Exception("根据时间创建定时任务失败");
                }
    
                return new Gson().fromJson(result, JsonObject.class).get("scheduleId").getAsString();
            }
    
            /**
             * 根据cron表达式 创建调度任务
             * @param projectName 项目名称
             * @param cron cron表达式
             * @param flow 流
             * @param flowName 流名称
             * @return 返回scheduleId
             * @throws Exception
             */
            public String scheduleByCronEXEaFlow(String projectName, String cron, String flowName)
                    throws Exception {
    
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", CONTENT_TYPE);
                hs.add("X-Requested-With", X_REQUESTED_WITH);
                LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
                linkedMultiValueMap.add("session.id", login());
                linkedMultiValueMap.add("ajax", "scheduleCronFlow");
                linkedMultiValueMap.add("projectName", projectName);
                linkedMultiValueMap.add("cronExpression", cron);
                linkedMultiValueMap.add("flow", flowName);
    
                HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
                String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class);
    
                if (!SysContants.AZK_SUCCESS
                        .equals(new Gson().fromJson(result, JsonObject.class).get("status").getAsString())) {
                    logger.error("Azkaban返回根据时间创建定时任务信息失败:!返回错误信息:"+result);
                    return null;
                }
    
                return new Gson().fromJson(result, JsonObject.class).get("scheduleId").getAsString();
            }
    
            /**
             * 根据scheduleId取消一个流的调度
             * 暂停一次执行,输入为exec id。如果这个执行不是处于running状态,会返回错误信息。
             * @param scheduleId
             * @throws Exception
             */
            public boolean unscheduleFlow(String scheduleId) {
                try {
                    SSLUtil.turnOffSslChecking();
                    HttpHeaders hs = new HttpHeaders();
                    hs.add("Content-Type", CONTENT_TYPE);
                    hs.add("X-Requested-With", X_REQUESTED_WITH);
                    LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
                    linkedMultiValueMap.add("session.id", login());
                    linkedMultiValueMap.add("action", "removeSched");
                    linkedMultiValueMap.add("scheduleId", scheduleId);
    
                    HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
                    String result = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity,
                            String.class);
                    if (StringUtils.isBlank(result)) {
                        return false;
                    }
                    if (!SysContants.AZK_SUCCESS
                            .equals(String.valueOf(new Gson().fromJson(result, JsonObject.class).get("status")))) {
                        logger.error("取消流调度信息失败:{}", scheduleId);
                        logger.error("Azkaban取消流调度信息失败失败:!返回错误信息:"+result);
                        return false;
                    }
                } catch (Exception e) {
                    logger.error("取消流调度信息失败:{}", scheduleId);
                    return false;
                }
                return true;
            }
    
            /**
             * 下载Azkaban压缩文件
             * @param projectName 项目名称
             * @param zipPath 文件路径
             * @throws Exception 文件异常
             */
            public void downLoadZip(String projectName, String zipPath) throws Exception {
                OutputStream output = null;
                BufferedOutputStream bufferedOutput = null;
    
                try {
                    URL url = new URL(azkabanConfig.getAzkUrl() + "/manager?session.id=" + login() + "&download=true&project="
                            + projectName);
                    HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                    conn.setConnectTimeout(3 * 1000);
                    InputStream inputStream = conn.getInputStream();
                    File file = new File(zipPath);
                    output = new FileOutputStream(file);
                    bufferedOutput = new BufferedOutputStream(output);
                    bufferedOutput.write(IOUtils.toByteArray(inputStream));
                } catch (Exception e) {
                    logger.info("下载Azkaban压缩文件异常:" + e.getMessage(), e);
                } finally {
                    if (bufferedOutput != null) {
                        try {
                            bufferedOutput.flush();
                            bufferedOutput.close();
                        } catch (IOException e) {
                            logger.info("关闭流异常:" + e.getMessage(), e);
                        }
                    }
    
                    if (output != null) {
                        try {
                            output.close();
                        } catch (IOException e) {
                            logger.info("关闭流异常:" + e.getMessage(), e);
                        }
                    }
                }
            }
    
    
            /**
             * 获取一个调度器job的信息 根据project的id 和 flowId
             * @param projectId 项目名称
             * @param flowId 流编号
             * @return job的信息
             */
            public String fetchSchedule(String projectId, String flowId) {
                try {
                    SSLUtil.turnOffSslChecking();
                    HttpHeaders hs = new HttpHeaders();
                    hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
                    hs.add("X-Requested-With", "XMLHttpRequest");
                    hs.add("Accept", "text/plain;charset=utf-8");
    
                    Map<String, String> map = new HashMap<>();
                    map.put("id", login());
                    map.put("projectId", projectId);
                    map.put("flowId", flowId);
                    ResponseEntity<String> exchange = restTemplate.exchange(
                            azkabanConfig.getAzkUrl()
                                    + "/schedule?session.id={id}&ajax=fetchSchedule&projectId={projectId}&flowId={flowId}",
                            HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
                    if (HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                        logger.error("获取一个调度器job的信息失败:{}:{}", projectId, flowId);
                        return null;
                    }
                    JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("schedule");
                    if (obj == null) {
                        logger.error("获取一个调度器job的信息失败:{}:{}", projectId, flowId);
                        return null;
                    }
                    return obj.getAsJsonObject().get("scheduleId").getAsString();
                } catch (Exception e) {
                    logger.error("获取一个调度器job的信息失败:{}:{}", projectId, flowId);
                }
                return null;
            }
    
            /**
             * SLA 设置调度任务 执行的时候 或者执行成功失败等等的规则匹配 发邮件或者...
             * @return
             * @throws Exception
             */
            public String setSla() throws Exception {
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
                hs.add("X-Requested-With", "XMLHttpRequest");
                LinkedMultiValueMap<String, String> linkedMultiValueMap = new LinkedMultiValueMap<String, String>();
                linkedMultiValueMap.add("session.id", "ffad7355-4427-4770-9c14-3d19736fa73a");
                linkedMultiValueMap.add("ajax", "setSla");
                linkedMultiValueMap.add("scheduleId", "6");
                linkedMultiValueMap.add("slaEmails", "771177@qq.com");
                linkedMultiValueMap.add("settings[0]", "begin,SUCCESS,5:00,true,false");
                linkedMultiValueMap.add("settings[1]", "exe,SUCCESS,5:00,true,false");
                linkedMultiValueMap.add("settings[2]", "end,SUCCESS,5:00,true,false");
                // linkedMultiValueMap.add("settings[3]",
                // "xxx,SUCCESS,5:00,true,false");
    
                HttpEntity<MultiValueMap<String, String>> httpEntity = new HttpEntity<>(linkedMultiValueMap, hs);
                String postForObject = restTemplate.postForObject(azkabanConfig.getAzkUrl() + "/schedule", httpEntity, String.class);
               return postForObject;
            }
    
            /**
             * SLA 获取调度的规则配置
             * @throws Exception
             */
            public void slaInfo() throws Exception {
    
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
                hs.add("X-Requested-With", "XMLHttpRequest");
                hs.add("Accept", "text/plain;charset=utf-8");
    
                Map<String, String> map = new HashMap<>();
                map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
                map.put("scheduleId", "6");
                ResponseEntity<String> exchange = restTemplate.exchange(
                        azkabanConfig.getAzkUrl() + "/schedule?session.id={id}&ajax=slaInfo&scheduleId={scheduleId}", HttpMethod.GET,
                        new HttpEntity<String>(hs), String.class, map);
                System.out.println(exchange.getBody());
            }
    
            /**
             * Execution 暂停一个执行流
             * @throws Exception
             */
            public void pauseFlow() throws Exception {
    
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
                hs.add("X-Requested-With", "XMLHttpRequest");
                hs.add("Accept", "text/plain;charset=utf-8");
    
                Map<String, String> map = new HashMap<>();
                map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
                map.put("execid", "12");
                ResponseEntity<String> exchange = restTemplate.exchange(
                        azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=pauseFlow&execid={execid}", HttpMethod.GET,
                        new HttpEntity<String>(hs), String.class, map);
                System.out.println(exchange.getBody());
            }
    
            /**
             * Flow Execution 重新执行一个执行流
             * @throws Exception
             */
            public void resumeFlow() throws Exception {
    
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
                hs.add("X-Requested-With", "XMLHttpRequest");
                hs.add("Accept", "text/plain;charset=utf-8");
    
                Map<String, String> map = new HashMap<>();
                map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
                map.put("execid", "11");
                ResponseEntity<String> exchange = restTemplate.exchange(
                        azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=resumeFlow&execid={execid}", HttpMethod.GET,
                        new HttpEntity<String>(hs), String.class, map);
                System.out.println(exchange.getBody());
            }
    
            /**
             *  获取一个执行流的详细信息 这个流的每个节点的信息 成功或者失败等等
             * @param execid 执行id
             * @return
             * @throws Exception
             */
            public String fetchexecflow(String execid) throws Exception {
    
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
                hs.add("X-Requested-With", "XMLHttpRequest");
                hs.add("Accept", "text/plain;charset=utf-8");
    
                Map<String, String> map = new HashMap<>();
                map.put("id", login());
                map.put("execid", execid);
                ResponseEntity<String> exchange = restTemplate.exchange(
                        azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchexecflow&execid={execid}", HttpMethod.GET,
                        new HttpEntity<String>(hs), String.class, map);
                if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                    logger.error("获取一个执行流的详细信息失败:" + execid);
                    return null;
                }
                return exchange.getBody();
            }
    
            /**
             * 获取一个执行流的日志
             * @param execid 执行编号
             * @param jobId job编号
             * @param offset
             * @param length
             * @return
             * @throws Exception
             */
            public String fetchExecJobLogs(String execid,String jobId,String offset,String length) throws Exception {
    
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
                hs.add("X-Requested-With", "XMLHttpRequest");
                hs.add("Accept", "text/plain;charset=utf-8");
    
                Map<String, String> map = new HashMap<>();
                map.put("id", login());
                map.put("execid", execid);
                map.put("jobId", jobId);
                map.put("offset", offset);
                map.put("length", length);
                ResponseEntity<String> exchange = restTemplate.exchange(
                        azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchExecJobLogs&execid={execid}&jobId={jobId}&offset={offset}&length={length}",
                        HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
                if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                    logger.error("获取一个执行流的详细信息失败:{}:{}", execid,jobId);
                    return null;
                }
                
                JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("data");
                if (obj == null) {
                    logger.error("获取一个执行流的详细信息为空:{}:{}", execid,jobId);
                    return null;
                }
                return obj.getAsString();
            }
            
            /**
             * 获取一个执行流的日志概要
             * @param execid
             * @param offset
             * @param length
             * @return
             * @throws Exception
             */
            public String fetchExecFlowLogs(String execid,String offset,String length) throws Exception {
    
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
                hs.add("X-Requested-With", "XMLHttpRequest");
                hs.add("Accept", "text/plain;charset=utf-8");
    
                Map<String, String> map = new HashMap<>();
                map.put("id", login());
                map.put("execid", execid);
                map.put("offset", offset);
                map.put("length", length);
                ResponseEntity<String> exchange = restTemplate.exchange(
                        azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchExecFlowLogs&execid={execid}&offset={offset}&length={length}",
                        HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
                if (exchange == null || HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                    logger.error("获取一个执行流的日志概要信息失败:{}", execid);
                    return null;
                }
                
                JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("data");
                if (obj == null) {
                    logger.error("获取一个执行流的日志概要信息为空:{}:{}", execid);
                    return null;
                }
                return obj.getAsString();
            }
            
            /**
             * 获取执行流的信息状态
             * @throws Exception
             */
            public void fetchexecflowupdate() throws Exception {
    
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", "application/x-www-form-urlencoded; charset=utf-8");
                hs.add("X-Requested-With", "XMLHttpRequest");
                hs.add("Accept", "text/plain;charset=utf-8");
    
                Map<String, String> map = new HashMap<>();
                map.put("id", "c4adf192-dcf4-4e05-bd08-f6989dc544a7");
                map.put("execid", "11");
                map.put("lastUpdateTime", "-1");
                ResponseEntity<String> exchange = restTemplate.exchange(
                        azkabanConfig.getAzkUrl() + "/executor?session.id={id}&ajax=fetchexecflowupdate&execid={execid}&lastUpdateTime={lastUpdateTime}",
                        HttpMethod.GET, new HttpEntity<String>(hs), String.class, map);
                System.out.println(exchange.getBody());
            }
            
            private void scheduleTimeInit(LinkedMultiValueMap<String, String> linkedMultiValueMap, Date date) {
                Calendar calendar = Calendar.getInstance();
                calendar.setTime(date);
                Integer year = calendar.get(Calendar.YEAR);
                Integer month = calendar.get(Calendar.MONTH) + 1;
                Integer day = calendar.get(Calendar.DATE);
                Integer hour = calendar.get(Calendar.HOUR_OF_DAY);
                Integer minute = calendar.get(Calendar.MINUTE);
    
                linkedMultiValueMap.add("scheduleTime", hour + "," + minute + (hour > 11 ? ",pm,PDT" : ",am,EDT"));
                linkedMultiValueMap.add("scheduleDate", month + "/" + day + "/" + year);
            }
    
            /**
             * 获取azkaban调度  调度概况  (分页查询)
             * @param projectName  工程名称
             * @param start  分页参数
             * @param size   分页参数
             * @return
             * @throws Exception
             * @throws Exception
             */
            public List<GovernTaskRecordBean> getAzkabanExcutions(String projectName,String start,String size) throws Exception {
    
                List<GovernTaskRecordBean> excInfoList = Lists.newArrayList();
                List<String> flows = fetchFlowsProject(projectName);
                for (String flow : flows) {
                    FlowExecution fe = fetchFlowExecutions(projectName, flow, start, size);
                    if (fe == null) {
                        continue;
                    }
                    List<Execution> executions = fe.getExecutions();
                    for (Execution execution : executions) {
                        GovernTaskRecordBean eInfo = new GovernTaskRecordBean();
                        eInfo.setCreateTime(new DateTime(execution.getSubmitTime()).toString(formatterTime));
                        if (execution.getEndTime() > 0) {
                            eInfo.setEndTime(new DateTime(execution.getEndTime()).toString(formatterTime));
                            eInfo.setElapsed((execution.getEndTime() - execution.getStartTime()) / 1000);
                        } else {
                            eInfo.setElapsed((DateTime.now().getMillis() - execution.getStartTime()) / 1000);
                        }
                        eInfo.setExecId(execution.getExecId());
                        eInfo.setFlowId(execution.getFlowId());
                        eInfo.setOwner(execution.getSubmitUser());
                        eInfo.setProjectId(execution.getProjectId());
                        eInfo.setProjectPath(projectName);
                        eInfo.setStartTime(new DateTime(execution.getStartTime()).toString(formatterTime));
                        if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.FAILED.getDesc())) {
                            eInfo.setStatus(ScheduleStatus.FAILED.getCode());
                        } else if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.CANCELLED.getDesc())) {
                            eInfo.setStatus(ScheduleStatus.CANCELLED.getCode());
                        } else if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.KILLED.getDesc())) {
                            eInfo.setStatus(ScheduleStatus.KILLED.getCode());
                        } else if (execution.getStatus().equalsIgnoreCase(ScheduleStatus.SUCCEEDED.getDesc())) {
                            eInfo.setStatus(ScheduleStatus.SUCCEEDED.getCode());
                        } 
                        excInfoList.add(eInfo);
                    }
                }
                return excInfoList;
            }
    
            /**
             * 获取azkaban调度 流的执行情况 (分页)
             * @param excuteId  调度执行id
             * @param start
             * @param size
             * @return
             * @throws Exception
             */
            public ExecutionInfoBean getAzkabanExcutionDetails(String excuteId, String start, String size) throws Exception {
                String result = fetchexecflow(excuteId);
                if (StringUtils.isBlank(result)) {
                    throw new CommonException("查询任务流的执行详情失败!");
                }
                ExecutionInfo ei = new Gson().fromJson(result, ExecutionInfo.class);
                if(ei==null) {
                    throw new CommonException("查询任务流的执行详情失败!");
                }
                List<ExecNode> nodes = ei.getNodes();
                if (nodes == null || nodes.size() == 0) {
                    return null;
                }
                ExecutionInfoBean eib = new ExecutionInfoBean();
                eib.setAttempt(ei.getAttempt());
                eib.setStartTime(new DateTime(ei.getStartTime()).toString(formatterTime));
                if (ei.getEndTime() > 0) {
                    eib.setEndTime(new DateTime(ei.getEndTime()).toString(formatterTime));
                    eib.setElapsed((ei.getEndTime() - ei.getStartTime()) / 1000);
                } else {
                    eib.setElapsed((DateTime.now().getMillis() - ei.getStartTime()) / 1000);
                }
                eib.setExecid(ei.getExecid());
                eib.setFlow(ei.getFlow());
                eib.setFlowId(ei.getFlowId());
                eib.setJobId(ei.getId());
                eib.setNestedId(ei.getNestedId());
                eib.setProject(ei.getProject());
                eib.setProjectId(ei.getProjectId());
                String stats = ScheduleStatus.getDescByCode(ei.getStatus());
                if(StringUtils.isNotBlank(stats)){
                    eib.setStatus(stats);
                }
                eib.setSubmitTime(new DateTime(ei.getSubmitTime()).toString(formatterTime));
                eib.setSubmitUser(ei.getSubmitUser());
                eib.setType(ei.getType());
                eib.setUpdateTime(new DateTime(ei.getUpdateTime()).toString(formatterTime));
    
                String flowLog= fetchExecFlowLogs(excuteId, start, size);
                eib.setFlowLog(flowLog);
                List<ExecNodeBean> nodeBeanList = Lists.newArrayList();
                for(ExecNode node:nodes) {
                    ExecNodeBean ebn = new ExecNodeBean();
                    ebn.setAttempt(node.getAttempt());
                    ebn.setJobId(node.getId());
                    ebn.setDependencies(node.getIn());
                    ebn.setNestedId(node.getNestedId());
                    ebn.setStartTime(new DateTime(ei.getStartTime()).toString(formatterTime));
                    if (node.getEndTime() > 0) {
                        ebn.setEndTime(new DateTime(node.getEndTime()).toString(formatterTime));
                        ebn.setElapsed((node.getEndTime() - node.getStartTime()) / 1000);
                    } else {
                        ebn.setElapsed((DateTime.now().getMillis() - node.getStartTime()) / 1000);
                    }
                    String stats2 = ScheduleStatus.getDescByCode(node.getStatus());
                    if(StringUtils.isNotBlank(stats2)){
                        ebn.setStatus(stats2);
                    }
                    ebn.setType(node.getType());
                    ebn.setUpdateTime(new DateTime(ei.getUpdateTime()).toString(formatterTime));
                    String logs = fetchExecJobLogs(excuteId,ebn.getJobId(),start, size);
                    ebn.setLogs(logs);
                    nodeBeanList.add(ebn);
                }
                eib.setNodes(nodeBeanList);
                return eib;
            }
    
            /**
             * 获取一个项目的projectId
             * @param projectName
             * @return
             * @throws Exception
             */
            public String fetchProjectId(String projectName) throws Exception {
                SSLUtil.turnOffSslChecking();
                HttpHeaders hs = new HttpHeaders();
                hs.add("Content-Type", CONTENT_TYPE);
                hs.add("X-Requested-With", X_REQUESTED_WITH);
                hs.add("Accept", "text/plain;charset=utf-8");
    
                Map<String, String> map = new HashMap<>();
    
                map.put("id", login());
                map.put("project", projectName);
    
                ResponseEntity<String> exchange = restTemplate.exchange(
                        azkabanConfig.getAzkUrl() + "/manager?&session.id={id}&project={project}&ajax=fetchprojectflows", HttpMethod.GET,
                        new HttpEntity<String>(hs), String.class, map);
                if (exchange == null||HttpStatus.SC_OK != exchange.getStatusCodeValue()) {
                    logger.error("Azkaban获取一个项目的所有流信息失败:" + projectName);
                    return null;
                }
                JsonElement obj = new Gson().fromJson(exchange.getBody(), JsonObject.class).get("projectId");
                if (obj == null) {
                    logger.error("Azkaban获取一个项目的所有流信息失败:{}:{}", projectName);
                    return null;
                }
                String projectId = obj.getAsString();
                if(StringUtils.isBlank(projectId)){
                    logger.error("获取Azkaban  projectId 异常");
                }
                return projectId;
            }
    
            /**
             * 获取
             * @param projectName
             * @return
             * @throws Exception
             */
            public String getLastScheduleStatus(String projectName) throws Exception {
                List<String> flows = fetchFlowsProject(projectName);
                if(CollectionUtils.isNotEmpty(flows)) {
                    for (String flow : flows) {
                        FlowExecution fe = fetchFlowExecutions(projectName, flow, "0", "1000");
                        if (fe == null) {
                            continue;
                        }
                        List<Execution> executions = fe.getExecutions();
                        if (executions == null || executions.size() == 0) {
                            continue;
                        }
                        String status = executions.get(0).getStatus();
                        return status;
                    }
                }
                return null;
            }
        
        
    
    }
    AzkabanService

      8.调用demo

        /**
         * 根据flowId 立即执行任务
         * @param projectName 项目名称
         * @param flow 流id
         * @throws CommonException 阿兹卡班异常
         */
        private  void excuteFlowImmediately(String projectName, String flow) throws CommonException {
            try {
                azkabanService.executeFlow(projectName,flow);
            } catch (Exception e) {
                throw new CommonException("调度初始化完毕,立即执行任务异常",e);
            }
        }
    demo

    四、注意

      1.如遇报错情况,请关注azkaban相关log日志。

      2.如果pom文件jar包不全 请评论。

     

  • 相关阅读:
    zookeeper实现主从选举
    基于Zookeeper+MHA的mysql高可用架构设计
    MHA实现MySQL主从自动在线切换功能
    redis 主从复制
    虚拟iP
    nginx 主从
    代码的快速操作
    网页延时加载
    ajax循环数据
    js实现网页中打印、刷新、关闭、前进、后退、返回等操作
  • 原文地址:https://www.cnblogs.com/weitaming/p/9964044.html
Copyright © 2011-2022 走看看