zoukankan      html  css  js  c++  java
  • 单线程service服务

    1.@Service修饰类名,同时类继承Thread类

    @Service
    public class MasterSchedulerService extends Thread {
    
        /**
         * logger of MasterSchedulerService
         */
        private static final Logger logger = LoggerFactory.getLogger(MasterSchedulerService.class);
    
        @Autowired
        private ServerConfig serverConfig;
    
        /**
         * dolphinscheduler database interface
         */
        @Autowired
        private ProcessService processService;
    
        /**
         * zookeeper master client
         */
        @Autowired
        private ZKClient zkClient;
    
        /**
         * master exec service
         */
        private ThreadPoolExecutor masterExecService;
    
        @Autowired
        private FamaVmMapper vmMapper;
    
        /**
         * constructor of MasterSchedulerService
         */
        @PostConstruct
        public void init(){
            this.masterExecService = (ThreadPoolExecutor) ThreadUtils.newDaemonFixedThreadExecutor("Server-Thread"
                    , serverConfig.getMasterExecThreads());
        }
    
        @Override
        public synchronized void start(){
            super.setName("MasterSchedulerService");
            super.start();
        }
    
        public void close() {
            masterExecService.shutdown();
            boolean terminated = false;
            try {
                terminated = masterExecService.awaitTermination(5, TimeUnit.SECONDS);
            } catch (InterruptedException ignore) {
                Thread.currentThread().interrupt();
            }
            if(!terminated){
                logger.warn("masterExecService shutdown without terminated, increase await time");
            }
            logger.info("master schedule service stopped...");
        }
    
        /**
         * run of MasterSchedulerService
         */
        @Override
        public void run() {
            logger.info("master scheduler started");
            while (Stopper.isRunning()){
                try {
                    if (zkClient.getZkClient().getState() == CuratorFrameworkState.STARTED) {
                        scheduleProcess();
                    }
                } catch (Exception e) {
                    logger.error("master scheduler thread error", e);
                }
            }
        }
    
        private void scheduleProcess() throws Exception {
            InterProcessMutex mutex = null;
            try {
                mutex = zkClient.blockAcquireMutex();
    
                // make sure to scan and delete command  table in one transaction
                Command command = processService.findOneCommand();
                if (command != null) {
                    logger.info("find one command: {}", JsonUtils.toJSONString(command));
    
                    try {
                        String address = getLocalAddress();
                        ResourceInstance processInstance = processService.handleCommand(address,command);
                        logger.info("processInstance: {}", JsonUtils.toJSONString(processInstance));
                        if (processInstance != null) {
                            MapperContext context = new MapperContext();
                            context.vmMapper = vmMapper;
                            MasterExecThread execThread = new MasterExecThread(
                                    processInstance
                                    , processService
                                    , serverConfig
                                    , context);
                            masterExecService.execute(execThread);
                        }
                    }  catch (Exception e) {
                        logger.error("Command consumer error ", e);
    //                    processService.moveToErrorCommand(command, e.toString());
                    }
                } else {
                    //indicate that no command ,sleep for 1s
                    Thread.sleep(Constants.SLEEP_TIME_MILLIS);
                }
            } finally {
                zkClient.releaseMutex(mutex);
            }
        }
    
    
        private String getLocalAddress() {
            return NetUtils.getAddr(serverConfig.getListenPort());
        }
    }
    

    2.执行时,直接执行start方法

    @ComponentScan(value = {"fama.cost"})
    public class FamaServerApplication implements IStoppable {
    
        private static final Logger logger = LoggerFactory.getLogger(FamaServerApplication.class);
    
        @Autowired
        private MasterSchedulerService schedulerService;
    
        @Autowired
        private SpringApplicationContext springApplicationContext;
    
        @Autowired
        private BeanContext beanContext;
    
        @Autowired
        private ZKClient zkClient;
    
        public static void main(String[] args) {
            Thread.currentThread().setName("Schedule-Server");
            new SpringApplicationBuilder(FamaServerApplication.class).web(WebApplicationType.NONE).run(args);
            int currentProcessPid = getProcessID();
            logger.info("Current Process Pid : {}",currentProcessPid);
        }
    
        @PostConstruct
        public void run() {
            this.zkClient.start(this);
            this.schedulerService.start();
            try {
                logger.info("start Quartz server...");
                QuartzExecutors.getInstance().start();
            } catch (Exception e) {
                try {
                    QuartzExecutors.getInstance().shutdown();
                } catch (SchedulerException e1) {
                    logger.error("QuartzExecutors shutdown failed : " + e1.getMessage(), e1);
                }
                logger.error("start Quartz failed", e);
            }
    
    
            Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                @Override
                public void run() {
                    if (Stopper.isRunning()) {
                        close("shutdownHook");
                    }
                }
            }));
        }
    
        public void close(String cause) {
    
            try {
                //execute only once
                if (Stopper.isStopped()) {
                    return;
                }
    
                logger.info("master server is stopping ..., cause : {}", cause);
    
                // set stop signal is true
                Stopper.stop();
    
                try {
                    //thread sleep 3 seconds for thread quietly stop
                    Thread.sleep(3000L);
                } catch (Exception e) {
                    logger.warn("thread sleep exception ", e);
                }
                //close
                this.schedulerService.close();
                this.zkClient.close();
                //close quartz
                try {
                    QuartzExecutors.getInstance().shutdown();
                    logger.info("Quartz service stopped");
                } catch (Exception e) {
                    logger.warn("Quartz service stopped exception:{}", e.getMessage());
                }
            } catch (Exception e) {
                logger.error("master server stop exception ", e);
            } finally {
    //            System.exit(-1);
            }
        }
    
        @Override
        public void stop(String cause) {
            close(cause);
        }
    
        public static final int getProcessID() {
            try {
                RuntimeMXBean runtimeMXBean = ManagementFactory.getRuntimeMXBean();
                return Integer.valueOf(runtimeMXBean.getName().split("@")[0]).intValue();
            }catch (Exception ex){
                logger.error(ex.getMessage(),ex);
                return 0;
            }
        }
    }
    
  • 相关阅读:
    总结@ 在C# 中的用法
    如何在多线程中调用winform窗体控件
    jQuery对象和DOM对象原来不一样啊
    以编程方式使用 Word 中的内置对话框
    C#中Application.DoEvents()的作用
    本地设置正常,放服务器上就报 System.Security系统找不到指定的文件解决方法
    复制选中的listbox内容
    将一列数据拼接成一个字符串
    服务器不能复制粘贴问题处理
    获取Token不完整问题
  • 原文地址:https://www.cnblogs.com/PythonOrg/p/14592874.html
Copyright © 2011-2022 走看看