zoukankan      html  css  js  c++  java
  • 谈谈在incubator-dolphinscheduler 中为啥不能及时看到python任务输出的print日志

    一、incubator-dolphinscheduler 中如何获取shell类型的节点或者python类型的节点任务的日志

    1、在org.apache.dolphinscheduler.server.worker.task.AbstractCommandExecutor 类中通过java.lang.ProcessBuilder 来将python 脚本生成命令进行执行,AbstractCommandExecutor中的部分源码如下:

    ......        
    } else {
                //init process builder
                ProcessBuilder processBuilder = new ProcessBuilder();
                // setting up a working directory
                processBuilder.directory(new File(taskExecutionContext.getExecutePath()));
                // merge error information to standard output stream
                processBuilder.redirectErrorStream(true);
    
                // setting up user to run commands
                command.add("sudo");
                command.add("-u");
                command.add(taskExecutionContext.getTenantCode());
                command.add(commandInterpreter());
                command.addAll(commandOptions());
                command.add(commandFile);
    
                // setting commands
                processBuilder.command(command);
                process = processBuilder.start();
            }
    ......

    2、通过process.getInputStream() 来获取命令终端输出的日志,部分源码如下:

     private void parseProcessOutput(Process process) {
            String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId());
            ExecutorService parseProcessOutputExecutorService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName);
            parseProcessOutputExecutorService.submit(new Runnable() {
                @Override
                public void run() {
                    BufferedReader inReader = null;
    
                    try {
                        inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
                        String line;
                        long lastFlushTime = System.currentTimeMillis();
                        logBuffer.add("welcome to use bigdata scheduling system...");
                        Thread.sleep(Constants.DEFAULT_LOG_FLUSH_INTERVAL * 2);
                        while ((line = inReader.readLine()) != null || logBuffer.size()>0) {
                            if(null != line){
                                logBuffer.add(line);
                            }
                            lastFlushTime = flush(lastFlushTime);
                        }
                        if (logBuffer.size() > 0) {
                            Thread.sleep(Constants.DEFAULT_LOG_FLUSH_INTERVAL * 2);
                            lastFlushTime = flush(lastFlushTime);
                        }
                    } catch (Exception e) {
                        logger.error(e.getMessage(), e);
                    } finally {
                        clear();
                        close(inReader);
                    }
                }
            });
            parseProcessOutputExecutorService.shutdown();
        }

    二、 Python 脚本中通过print()打印输出的日志为啥不能及时被incubator-dolphinscheduler获取到以及如何改进python脚本任务

    在python 脚本中,很多人习惯于用print()来输出日志,这本身也没啥问题,而且在python 3版本中,print()本身也是自动换行输出的,而dolphinscheduler 也是按行来读取process的输出的,按理应该是可以及时输出的。

    if __name__=='__main__':
    .........
        print(xxxxxxxx)
    .........
        print(xxxxxxxx)
    .........
        print(xxxxxxxx)

    在 Python 3中打印日志调用 print (obj) 的时候,事实上是调用了 sys.stdout.write(obj+' '),print ()将需要的打印内容打印到了控制台,然后追加了一个换行符,print() 会调用 sys.stdout 的 write() 方法。

    一行print("hello,world") 其实等价于执行sys.stdout.write('hello,world'+' '),看到这里是不是就容易理解了。因为这样会一直写如到了缓冲区,需要等到线程退出等情况下,缓冲区的内容才会被刷出,但是我们可以通过在脚本中强制调用sys.stdout.flush() 让其及时的刷出。

    三、 直接通过参数解决

     

     python中提供了-u 参数:force the stdout and stderr streams to be unbuffered;this option has no effect on stdin; also PYTHONUNBUFFERED=x  可以强制输出e stdout and stderr streams

    作者的原创文章,转载须注明出处。原创文章归作者所有,欢迎转载,但是保留版权。对于转载了博主的原创文章,不标注出处的,作者将依法追究版权,请尊重作者的成果。
  • 相关阅读:
    react使用 UEditor富文本编辑器
    ES6、ES7的新特性、基本使用以及 async/await的基本使用
    react 生命周期
    webpack 新创项目
    TMultipartFormData上传文件
    ffmpeg水印处理
    ffmpeg通过rtsp对摄像头摄像头抓图
    ffmpeg命令行截图
    ffmpeg保存为jpg文件
    ffmpeg打开视频文件
  • 原文地址:https://www.cnblogs.com/laoqing/p/14582947.html
Copyright © 2011-2022 走看看