zoukankan      html  css  js  c++  java
  • luigi学习5-task详解

    task是代码执行的地方。task通过target互相依赖。

    下面是一个典型的task的大纲视图。

    一、Task.requires

    requires方法用来指定本task的依赖的其他task对象,依赖的task对象甚至可以是同一个class的对象,下面是一个例子:

    def requires(self):
        return OtherTask(self.date), DailyReport(self.date - datetime.timedelta(1))

    上述的DailyReport task依赖两个task,其中一个是同类型的。从这里也可以看出requires方法可以返回多个依赖的task对象,这些对象可以封装在一个dict、list或者tuple中。

    二、requiring another task

    注意requires不能返回一个target对象,如果你的task依赖一个简单的target对象,那么你要为这个target对象生成一个task class。例如下面这个例子:

    class LogFiles(luigi.ExternalTask):
        def output(self):
            return luigi.contrib.hdfs.HdfsTarget('/log')

    这样做也方面你使用参数来控制:

    class LogFiles(luigi.ExternalTask):
        date = luigi.DateParameter()
        def output(self):
            return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('/log/%Y-%m-%d'))

    三、Task.output

    output方法可以返回一个或者多个target对象。和requires方法一样,你可以使用容器来随意包装。

    但是我们非常希望每一个task只会返回一个target对象在output方法中。如果对个对象被返回,那么你的task就必须保证每一个target都是原子被创建的。

    当然如果不关注原子性,那么返回多个target对象也是安全的。

    例子:

    class DailyReport(luigi.Task):
        date = luigi.DateParameter()
        def output(self):
            return luigi.contrib.hdfs.HdfsTarget(self.date.strftime('/reports/%Y-%m-%d'))
        # ...

    四、Task.run

    run方法是包含实际运行的代码。当你同时使用了Task.requires和Task.run那么luigi会把这个分成两个阶段。

    首先luigi需要计算出task之间的依赖关系,然后依次执行。input方法是一个很好的辅助方法,他对应着依赖对象的output方法。

    下面是一个例子:

    class GenerateWords(luigi.Task):
    
        def output(self):
            return luigi.LocalTarget('words.txt')
    
        def run(self):
    
            # write a dummy list of words to output file
            words = [
                    'apple',
                    'banana',
                    'grapefruit'
                    ]
    
            with self.output().open('w') as f:
                for word in words:
                    f.write('{word}
    '.format(word=word))
    
    
    class CountLetters(luigi.Task):
    
        def requires(self):
            return GenerateWords()
    
        def output(self):
            return luigi.LocalTarget('letter_counts.txt')
    
        def run(self):
    
            # read in file as list
            with self.input().open('r') as infile:
                words = infile.read().splitlines()
    
            # write each word to output file with its corresponding letter count
            with self.output().open('w') as outfile:
                for word in words:
                    outfile.write(
                            '{word} | {letter_count}
    '.format(
                                word=word,
                                letter_count=len(word)
                                )
                            )

    五、task.input

    input方法保证了task.requires返回的对应的target对象。task.requires返回的任何东西都会被转换,包括list,dict等等。这是非常有用的,当你task有多个依赖的时候。下面是一个例子:

    class TaskWithManyInputs(luigi.Task):
        def requires(self):
            return {'a': TaskA(), 'b': [TaskB(i) for i in xrange(100)]}
    
        def run(self):
            f = self.input()['a'].open('r')
            g = [y.open('r') for y in self.input()['b']]

    六、Dynamic dependencies

    有时可能会发生这样的情况,在运行之前你不能确切的知道本task依赖于哪一个task对象。在这种情况下,luigi提供了一种机制来指定动态依赖。

    如果你在task.run方法中yield了另一个task对象,那么当前的task会被挂起并且这个被yield的task会运行。你也可以yield一系列的task。

    例子:

    class MyTask(luigi.Task):
        def run(self):
            other_target = yield OtherTask()
    
            # dynamic dependencies resolve into targets
            f = other_target.open('r')

    这种机制和task.requires只能二中选一。但是这也带了很多的限制,你必须保证你的task.run方法是幂等的。

    七、task status tracking

    对于长时间运行的作业,你可以通过命令行或者日志或者中央调度器的GUI界面来看到任务的进度信息。

    你可以再task.run方法中指定一个额外的监控系统。你可以如下这么设置:

    class MyTask(luigi.Task):
        def run(self):
            # set a tracking url
            self.set_tracking_url("http://...")
    
            # set status messages during the workload
            for i in range(100):
                # do some hard work here
                if i % 10 == 0:
                    self.set_status_message("Progress: %d / 100" % i)

    八、events and callbacks

    luigi有一个内置的event系统允许你注册回调函数给event。

    你可以同时使用预定义的event和你自定义的event。

    每一个event handle都是与一个task class相关的,它也只能被这个class或者其subclass来触发。

    这允许你轻松的订阅event从一个特殊的类,比如hadoop jobs

    @luigi.Task.event_handler(luigi.Event.SUCCESS)
    def celebrate_success(task):
        """Will be called directly after a successful execution
           of `run` on any Task subclass (i.e. all luigi Tasks)
        """
        ...
    
    @luigi.contrib.hadoop.JobTask.event_handler(luigi.Event.FAILURE)
    def mourn_failure(task, exception):
        """Will be called directly after a failed execution
           of `run` on any JobTask subclass
        """
        ...
    
    luigi.run()

    九、运行hadoop job

    你可以这么直接运行一个hadoop job,而不是用luigi

    MyJobTask('abc', 123).run()

    你也可以直接使用HdfsTarget class

    t = luigi.contrib.hdfs.target.HdfsTarget('/tmp/test.gz', format=format.Gzip)
    f = t.open('w')
    # ...
    f.close() # needed

    十、task priority

    luigi调度下一个作业运行时根据优先级的。默认情况下是随意选择执行的,这个适合大多数的场景。

    如果你想人为的控制执行顺序,那么可以设置task的priority:

    # A static priority value as a class constant:
    class MyTask(luigi.Task):
        priority = 100
        # ...
    
    # A dynamic priority value with a "@property" decorated method:
    class OtherTask(luigi.Task):
        @property
        def priority(self):
            if self.date > some_threshold:
                return 80
            else:
                return 40
        # ...

    优先级的值越高越优先执行。优先级没有一个确切的范文,你可以随意指定一个int或者float的值作为优先级。默认值是0。

    注意:优先级是需要考虑依赖的,依赖没有执行,优先级最高也没什么用。

    十一、instance caching

    luigi提供了一个元类逻辑,如果

    DailyReport(datetime.date(2012, 5, 10))

    被实例化了两次,其实在luigi中是同一个对象。

  • 相关阅读:
    【5min+】 秋名山的竞速。 ValueTask 和 Task
    int16、int32、int64的范围
    C#实现的一些常见时间格式
    C# WPF抽屉效果实现(C# WPF Material Design UI: Navigation Drawer & PopUp Menu)
    如何为.NETCore安装汉化包智能感知
    .NetCore学习笔记:三、基于AspectCore的AOP事务管理
    C#设计模式学习笔记:简单工厂模式(工厂方法模式前奏篇)
    .net core3.0 webapi搭建(一)
    [Abp vNext 源码分析]
    WPF 控件功能重写(ComboBox回车搜索)
  • 原文地址:https://www.cnblogs.com/dongdone/p/5710510.html
Copyright © 2011-2022 走看看