zoukankan      html  css  js  c++  java
  • 20.装饰器相关

    装饰器模式

    装饰器模式是设计模式手册中描述的模式。它是一种很明显的修改对象行为的方法,将其封装在一个具有类似接口的装饰对象内。

    不要与Python decorator混淆,后者是动态修改函数或类的语言特性。

    这是在Python中使用修饰器模式的一个例子。

    View Code

    输出类似:

    100 100 100 100 181 161 125 100 200 100
    200 100 100 200 100 200 200 184 162 100
    155 100 200 100 200 200 100 200 143 100
    100 200 144 200 101 143 114 200 166 136
    100 147 200 200 100 100 200 141 172 100
    144 161 100 200 200 200 190 125 100 177
    150 200 100 175 111 195 193 128 100 100
    100 200 100 200 200 129 159 105 112 100
    100 101 200 200 100 100 200 100 101 120
    180 200 100 100 198 151 100 195 131 100

    因此,装饰器是:

    它是对象被包围的其他对象,它们共享相似的接口,装饰对象似乎要屏蔽或修改或注释被封闭的对象。

     

    Python装饰库

    这一页是装饰器代码块的中央存储库,无论是否有用。它不是一个讨论修饰符语法的页面!

    创建行为良好的装饰器

    这只是一个秘诀。其他的包括来自标准装饰器的继承,例如functools @wraps 装饰器和甚至保留了签名信息的一个decorator工厂函数

    View Code

    属性定义

     这些decorator提供了一种可读的定义属性的方法:

    View Code

    这是一种不需要任何新的装饰者的方法:

    View Code

    另一个属性修饰符:

    View Code

    Memoize

    这是一个记忆类

    #!/usr/bin/env python
    #coding:utf-8
    
    import collections
    import functools
    
    class memoized(object):
       '''Decorator. Caches a function's return value each time it is called.
       If called later with the same arguments, the cached value is returned
       (not reevaluated).
       '''
       def __init__(self, func):
          self.func = func
          self.cache = {}
       def __call__(self, *args):
          if not isinstance(args, collections.Hashable):
             # uncacheable. a list, for instance.
             # better to not cache than blow up.
             return self.func(*args)
          if args in self.cache:
             return self.cache[args]
          else:
             value = self.func(*args)
             self.cache[args] = value
             return value
       def __repr__(self):
          '''Return the function's docstring.'''
          return self.func.__doc__
       def __get__(self, obj, objtype):
          '''Support instance methods.'''
          return functools.partial(self.__call__, obj)
    
    @memoized
    def fibonacci(n):
       "Return the nth fibonacci number."
       if n in (0, 1):
          return n
       return fibonacci(n-1) + fibonacci(n-2)
    
    print fibonacci(12)

    替代memoize作为嵌套函数

    这里有一个关于函数、方法或类的memoizing函数,并公开显示缓存。

    # note that this decorator ignores **kwargs
    def memoize(obj):
        cache = obj.cache = {}
    
        @functools.wraps(obj)
        def memoizer(*args, **kwargs):
            if args not in cache:
                cache[args] = obj(*args, **kwargs)
            return cache[args]
        return memoizer

    这是一个修改后的版本,也包括kwargs。

    def memoize(obj):
        cache = obj.cache = {}
    
        @functools.wraps(obj)
        def memoizer(*args, **kwargs):
            key = str(args) + str(kwargs)
            if key not in cache:
                cache[key] = obj(*args, **kwargs)
            return cache[key]
        return memoizer

    替代memoize作为dict子类

    这个想法让我很感兴趣,但它似乎只适用于函数:

    class memoize(dict):
        def __init__(self, func):
            self.func = func
    
        def __call__(self, *args):
            return self[args]
    
        def __missing__(self, key):
            result = self[key] = self.func(*key)
            return result
    
    #
    # Sample use
    #
    
    >>> @memoize
    ... def foo(a, b):
    ...     return a * b
    >>> foo(2, 4)
    8
    >>> foo
    {(2, 4): 8}
    >>> foo('hi', 3)
    'hihihi'
    >>> foo
    {(2, 4): 8, ('hi', 3): 'hihihi'}

    在执行之间存储缓存的另一种memoize

    Github上还提供了这个decorator的附加信息和文档。

    import pickle
    import collections
    import functools
    import inspect
    import os.path
    import re
    import unicodedata
    
    class Memorize(object):
        '''
        A function decorated with @Memorize caches its return
        value every time it is called. If the function is called
        later with the same arguments, the cached value is
        returned (the function is not reevaluated). The cache is
        stored as a .cache file in the current directory for reuse
        in future executions. If the Python file containing the
        decorated function has been updated since the last run,
        the current cache is deleted and a new cache is created
        (in case the behavior of the function has changed).
        '''
        def __init__(self, func):
            self.func = func
            self.set_parent_file() # Sets self.parent_filepath and self.parent_filename
            self.__name__ = self.func.__name__
            self.set_cache_filename()
            if self.cache_exists():
                self.read_cache() # Sets self.timestamp and self.cache
                if not self.is_safe_cache():
                    self.cache = {}
            else:
                self.cache = {}
    
        def __call__(self, *args):
            if not isinstance(args, collections.Hashable):
                return self.func(*args)
            if args in self.cache:
                return self.cache[args]
            else:
                value = self.func(*args)
                self.cache[args] = value
                self.save_cache()
                return value
    
        def set_parent_file(self):
            """
            Sets self.parent_file to the absolute path of the
            file containing the memoized function.
            """
            rel_parent_file = inspect.stack()[-1].filename
            self.parent_filepath = os.path.abspath(rel_parent_file)
            self.parent_filename = _filename_from_path(rel_parent_file)
    
        def set_cache_filename(self):
            """
            Sets self.cache_filename to an os-compliant
            version of "file_function.cache"
            """
            filename = _slugify(self.parent_filename.replace('.py', ''))
            funcname = _slugify(self.__name__)
            self.cache_filename = filename+'_'+funcname+'.cache'
    
        def get_last_update(self):
            """
            Returns the time that the parent file was last
            updated.
            """
            last_update = os.path.getmtime(self.parent_filepath)
            return last_update
    
        def is_safe_cache(self):
            """
            Returns True if the file containing the memoized
            function has not been updated since the cache was
            last saved.
            """
            if self.get_last_update() > self.timestamp:
                return False
            return True
    
        def read_cache(self):
            """
            Read a pickled dictionary into self.timestamp and
            self.cache. See self.save_cache.
            """
            with open(self.cache_filename, 'rb') as f:
                data = pickle.loads(f.read())
                self.timestamp = data['timestamp']
                self.cache = data['cache']
    
        def save_cache(self):
            """
            Pickle the file's timestamp and the function's cache
            in a dictionary object.
            """
            with open(self.cache_filename, 'wb+') as f:
                out = dict()
                out['timestamp'] = self.get_last_update()
                out['cache'] = self.cache
                f.write(pickle.dumps(out))
    
        def cache_exists(self):
            '''
            Returns True if a matching cache exists in the current directory.
            '''
            if os.path.isfile(self.cache_filename):
                return True
            return False
    
        def __repr__(self):
            """ Return the function's docstring. """
            return self.func.__doc__
    
        def __get__(self, obj, objtype):
            """ Support instance methods. """
            return functools.partial(self.__call__, obj)
    
    def _slugify(value):
        """
        Normalizes string, converts to lowercase, removes
        non-alpha characters, and converts spaces to
        hyphens. From
        http://stackoverflow.com/questions/295135/turn-a-string-into-a-valid-filename-in-python
        """
        value = unicodedata.normalize('NFKD', value).encode('ascii', 'ignore')
        value = re.sub(r'[^ws-]', '', value.decode('utf-8', 'ignore'))
        value = value.strip().lower()
        value = re.sub(r'[-s]+', '-', value)
        return value
    
    def _filename_from_path(filepath):
        return filepath.split('/')[-1]

    缓存属性

    #
    # © 2011 Christopher Arndt, MIT License
    #
    
    import time
    
    class cached_property(object):
        '''Decorator for read-only properties evaluated only once within TTL period.
    
        It can be used to create a cached property like this::
    
            import random
    
            # the class containing the property must be a new-style class
            class MyClass(object):
                # create property whose value is cached for ten minutes
                @cached_property(ttl=600)
                def randint(self):
                    # will only be evaluated every 10 min. at maximum.
                    return random.randint(0, 100)
    
        The value is cached  in the '_cache' attribute of the object instance that
        has the property getter method wrapped by this decorator. The '_cache'
        attribute value is a dictionary which has a key for every property of the
        object which is wrapped by this decorator. Each entry in the cache is
        created only when the property is accessed for the first time and is a
        two-element tuple with the last computed property value and the last time
        it was updated in seconds since the epoch.
    
        The default time-to-live (TTL) is 300 seconds (5 minutes). Set the TTL to
        zero for the cached value to never expire.
    
        To expire a cached property value manually just do::
    
            del instance._cache[<property name>]
    
        '''
        def __init__(self, ttl=300):
            self.ttl = ttl
    
        def __call__(self, fget, doc=None):
            self.fget = fget
            self.__doc__ = doc or fget.__doc__
            self.__name__ = fget.__name__
            self.__module__ = fget.__module__
            return self
    
        def __get__(self, inst, owner):
            now = time.time()
            try:
                value, last_update = inst._cache[self.__name__]
                if self.ttl > 0 and now - last_update > self.ttl:
                    raise AttributeError
            except (KeyError, AttributeError):
                value = self.fget(inst)
                try:
                    cache = inst._cache
                except AttributeError:
                    cache = inst._cache = {}
                cache[self.__name__] = (value, now)
            return value

    Retry

    调用一个返回True / False的函数来表示成功或失败。在失败时,等待,并再次尝试函数。重复的失败,在每次连续的尝试之间等待更长时间。如果decorator没有尝试,那么它将放弃并返回False,但是您可以轻松地抛出一些异常。

    import time
    import math
    
    # Retry decorator with exponential backoff
    def retry(tries, delay=3, backoff=2):
      '''Retries a function or method until it returns True.
    
      delay sets the initial delay in seconds, and backoff sets the factor by which
      the delay should lengthen after each failure. backoff must be greater than 1,
      or else it isn't really a backoff. tries must be at least 0, and delay
      greater than 0.'''
    
      if backoff <= 1:
        raise ValueError("backoff must be greater than 1")
    
      tries = math.floor(tries)
      if tries < 0:
        raise ValueError("tries must be 0 or greater")
    
      if delay <= 0:
        raise ValueError("delay must be greater than 0")
    
      def deco_retry(f):
        def f_retry(*args, **kwargs):
          mtries, mdelay = tries, delay # make mutable
    
          rv = f(*args, **kwargs) # first attempt
          while mtries > 0:
            if rv is True: # Done on success
              return True
    
            mtries -= 1      # consume an attempt
            time.sleep(mdelay) # wait...
            mdelay *= backoff  # make future wait longer
    
            rv = f(*args, **kwargs) # Try again
    
          return False # Ran out of tries :-(
    
        return f_retry # true decorator -> decorated function
      return deco_retry  # @retry(arg[, ...]) -> true decorator

    Pseudo currying

    您可以使用functools . partial()来模拟currying(即使对于关键字参数也是如此

    class curried(object):
      '''
      Decorator that returns a function that keeps returning functions
      until all arguments are supplied; then the original function is
      evaluated.
      '''
    
      def __init__(self, func, *a):
        self.func = func
        self.args = a
    
      def __call__(self, *a):
        args = self.args + a
        if len(args) < self.func.func_code.co_argcount:
          return curried(self.func, *args)
        else:
          return self.func(*args)
    
    
    @curried
    def add(a, b):
        return a + b
    
    add1 = add(1)
    
    print add1(2)

    用可选参数创建装饰器

    import functools, inspect
    
    def decorator(func):
        ''' Allow to use decorator either with arguments or not. '''
    
        def isFuncArg(*args, **kw):
            return len(args) == 1 and len(kw) == 0 and (
                inspect.isfunction(args[0]) or isinstance(args[0], type))
    
        if isinstance(func, type):
            def class_wrapper(*args, **kw):
                if isFuncArg(*args, **kw):
                    return func()(*args, **kw) # create class before usage
                return func(*args, **kw)
            class_wrapper.__name__ = func.__name__
            class_wrapper.__module__ = func.__module__
            return class_wrapper
    
        @functools.wraps(func)
        def func_wrapper(*args, **kw):
            if isFuncArg(*args, **kw):
                return func(*args, **kw)
    
            def functor(userFunc):
                return func(userFunc, *args, **kw)
    
            return functor
    
        return func_wrapper

    例如:

    @decorator
    def apply(func, *args, **kw):
        return func(*args, **kw)
    
    @decorator
    class apply:
        def __init__(self, *args, **kw):
            self.args = args
            self.kw   = kw
    
        def __call__(self, func):
            return func(*self.args, **self.kw)
    
    #
    # Usage in both cases:
    #
    @apply
    def test():
        return 'test'
    
    assert test == 'test'
    
    @apply(2, 3)
    def test(a, b):
        return a + b
    
    assert test is 5

    注意:只有一个缺点:包装器检查其对单个函数或类的参数。为了避免错误的行为,你可以使用关键字参数而不是位置,例如:

    @decorator
    def my_property(getter, *, setter=None, deleter=None, doc=None):
        return property(getter, setter, deleter, doc)

    可控的DIY调试

    import sys
    
    WHAT_TO_DEBUG = set(['io', 'core'])  # change to what you need
    
    class debug:
        '''Decorator which helps to control what aspects of a program to debug
        on per-function basis. Aspects are provided as list of arguments.
        It DOESN'T slowdown functions which aren't supposed to be debugged.
        '''
        def __init__(self, aspects=None):
            self.aspects = set(aspects)
    
        def __call__(self, f):
            if self.aspects & WHAT_TO_DEBUG:
                def newf(*args, **kwds):
                    print >> sys.stderr, f.func_name, args, kwds
                    f_result = f(*args, **kwds)
                    print >> sys.stderr, f.func_name, "returned", f_result
                    return f_result
                newf.__doc__ = f.__doc__
                return newf
            else:
                return f
    
    @debug(['io'])
    def prn(x):
        print x
    
    @debug(['core'])
    def mult(x, y):
        return x * y
    
    prn(mult(2, 2))

    向类实例添加简单的方法

    class Foo:
        def __init__(self):
            self.x = 42
    
    foo = Foo()
    
    def addto(instance):
        def decorator(f):
            import types
            f = types.MethodType(f, instance, instance.__class__)
            setattr(instance, f.func_name, f)
            return f
        return decorator
    
    @addto(foo)
    def print_x(self):
        print self.x
    
    # foo.print_x() would print "42"

    计数函数调用

    class countcalls(object):
       "Decorator that keeps track of the number of times a function is called."
    
       __instances = {}
    
       def __init__(self, f):
          self.__f = f
          self.__numcalls = 0
          countcalls.__instances[f] = self
    
       def __call__(self, *args, **kwargs):
          self.__numcalls += 1
          return self.__f(*args, **kwargs)
    
       @staticmethod
       def count(f):
          "Return the number of times the function f was called."
          return countcalls.__instances[f].__numcalls
    
       @staticmethod
       def counts():
          "Return a dict of {function: # of calls} for all registered functions."
          return dict([(f, countcalls.count(f)) for f in countcalls.__instances])

    交替计算函数调用

    class countcalls(object):
       "Decorator that keeps track of the number of times a function is called."
    
       __instances = {}
    
       def __init__(self, f):
          self.__f = f
          self.__numcalls = 0
          countcalls.__instances[f] = self
    
       def __call__(self, *args, **kwargs):
          self.__numcalls += 1
          return self.__f(*args, **kwargs)
    
       def count(self):
          "Return the number of times the function f was called."
          return countcalls.__instances[self.__f].__numcalls
    
       @staticmethod
       def counts():
          "Return a dict of {function: # of calls} for all registered functions."
          return dict([(f.__name__, countcalls.__instances[f].__numcalls) for f in countcalls.__instances])
    
    #example
    
    @countcalls
    def f():
       print 'f called'
    
    @countcalls
    def g():
       print 'g called'
    
    f()
    f()
    f()
    print f.count() # prints 3
    print countcalls.counts() # same as f.counts() or g.counts()
    g()
    print g.count() # prints 1

    产生许多警告信息

    import warnings
    
    def deprecated(func):
        '''This is a decorator which can be used to mark functions
        as deprecated. It will result in a warning being emitted
        when the function is used.'''
        def new_func(*args, **kwargs):
            warnings.warn("Call to deprecated function {}.".format(func.__name__),
                          category=DeprecationWarning)
            return func(*args, **kwargs)
        new_func.__name__ = func.__name__
        new_func.__doc__ = func.__doc__
        new_func.__dict__.update(func.__dict__)
        return new_func
    
    # === Examples of use ===
    
    @deprecated
    def some_old_function(x,y):
        return x + y
    
    class SomeClass:
        @deprecated
        def some_old_method(self, x,y):
            return x + y

    智能弃用警告(使用有效的文件名、行号等)

    import warnings
    import functools
    
    
    def deprecated(func):
        '''This is a decorator which can be used to mark functions
        as deprecated. It will result in a warning being emitted
        when the function is used.'''
    
        @functools.wraps(func)
        def new_func(*args, **kwargs):
            warnings.warn_explicit(
                "Call to deprecated function {}.".format(func.__name__),
                category=DeprecationWarning,
                filename=func.func_code.co_filename,
                lineno=func.func_code.co_firstlineno + 1
            )
            return func(*args, **kwargs)
        return new_func
    
    
    ## Usage examples ##
    @deprecated
    def my_func():
        pass
    
    @other_decorators_must_be_upper
    @deprecated
    def my_func():
        pass

    忽略弃用警告

    import warnings
    
    def ignore_deprecation_warnings(func):
        '''This is a decorator which can be used to ignore deprecation warnings
        occurring in a function.'''
        def new_func(*args, **kwargs):
            with warnings.catch_warnings():
                warnings.filterwarnings("ignore", category=DeprecationWarning)
                return func(*args, **kwargs)
        new_func.__name__ = func.__name__
        new_func.__doc__ = func.__doc__
        new_func.__dict__.update(func.__dict__)
        return new_func
    
    # === Examples of use ===
    
    @ignore_deprecation_warnings
    def some_function_raising_deprecation_warning():
        warnings.warn("This is a deprecationg warning.",
                      category=DeprecationWarning)
    
    class SomeClass:
        @ignore_deprecation_warnings
        def some_method_raising_deprecation_warning():
            warnings.warn("This is a deprecationg warning.",
                          category=DeprecationWarning)

    启用/禁用Decorators

    def unchanged(func):
        "This decorator doesn't add any behavior"
        return func
    
    def disabled(func):
        "This decorator disables the provided function, and does nothing"
        def empty_func(*args,**kargs):
            pass
        return empty_func
    
    # define this as equivalent to unchanged, for nice symmetry with disabled
    enabled = unchanged
    
    #
    # Sample use
    #
    
    GLOBAL_ENABLE_FLAG = True
    
    state = enabled if GLOBAL_ENABLE_FLAG else disabled
    @state
    def special_function_foo():
        print "function was enabled"

    函数参数的简单转储

    def dump_args(func):
        "This decorator dumps out the arguments passed to a function before calling it"
        argnames = func.func_code.co_varnames[:func.func_code.co_argcount]
        fname = func.func_name
    
        def echo_func(*args,**kwargs):
            print fname, ":", ', '.join(
                '%s=%r' % entry
                for entry in zip(argnames,args) + kwargs.items())
            return func(*args, **kwargs)
    
        return echo_func
    
    @dump_args
    def f1(a,b,c):
        print a + b + c
    
    f1(1, 2, 3)

    前置/后置条件

    '''
    Provide pre-/postconditions as function decorators.
    
    Example usage:
    
      >>> def in_ge20(inval):
      ...    assert inval >= 20, 'Input value < 20'
      ...
      >>> def out_lt30(retval, inval):
      ...    assert retval < 30, 'Return value >= 30'
      ...
      >>> @precondition(in_ge20)
      ... @postcondition(out_lt30)
      ... def inc(value):
      ...   return value + 1
      ...
      >>> inc(5)
      Traceback (most recent call last):
        ...
      AssertionError: Input value < 20
      >>> inc(29)
      Traceback (most recent call last):
        ...
      AssertionError: Return value >= 30
      >>> inc(20)
      21
    
    You can define as many pre-/postconditions for a function as you
    like. It is also possible to specify both types of conditions at once:
    
      >>> @conditions(in_ge20, out_lt30)
      ... def add1(value):
      ...   return value + 1
      ...
      >>> add1(5)
      Traceback (most recent call last):
        ...
      AssertionError: Input value < 20
    
    An interesting feature is the ability to prevent the creation of
    pre-/postconditions at function definition time. This makes it
    possible to use conditions for debugging and then switch them off for
    distribution.
    
      >>> debug = False
      >>> @precondition(in_ge20, debug)
      ... def dec(value):
      ...   return value - 1
      ...
      >>> dec(5)
      4
    '''
    
    __all__ = ['precondition', 'postcondition', 'conditions']
    
    DEFAULT_ON = True
    
    def precondition(precondition, use_conditions=DEFAULT_ON):
        return conditions(precondition, None, use_conditions)
    
    def postcondition(postcondition, use_conditions=DEFAULT_ON):
        return conditions(None, postcondition, use_conditions)
    
    class conditions(object):
        __slots__ = ('__precondition', '__postcondition')
    
        def __init__(self, pre, post, use_conditions=DEFAULT_ON):
            if not use_conditions:
                pre, post = None, None
    
            self.__precondition  = pre
            self.__postcondition = post
    
        def __call__(self, function):
            # combine recursive wrappers (@precondition + @postcondition == @conditions)
            pres  = set((self.__precondition,))
            posts = set((self.__postcondition,))
    
            # unwrap function, collect distinct pre-/post conditions
            while type(function) is FunctionWrapper:
                pres.add(function._pre)
                posts.add(function._post)
                function = function._func
    
            # filter out None conditions and build pairs of pre- and postconditions
            conditions = map(None, filter(None, pres), filter(None, posts))
    
            # add a wrapper for each pair (note that 'conditions' may be empty)
            for pre, post in conditions:
                function = FunctionWrapper(pre, post, function)
    
            return function
    
    class FunctionWrapper(object):
        def __init__(self, precondition, postcondition, function):
            self._pre  = precondition
            self._post = postcondition
            self._func = function
    
        def __call__(self, *args, **kwargs):
            precondition  = self._pre
            postcondition = self._post
    
            if precondition:
                precondition(*args, **kwargs)
            result = self._func(*args, **kwargs)
            if postcondition:
                postcondition(result, *args, **kwargs)
            return result
    
    def __test():
        import doctest
        doctest.testmod()
    
    if __name__ == "__main__":
        __test()

    分析/覆盖分析

    代码和示例有点长,所以我将包含一个链接:https://mg.pov.lt/blog/profiling.html

    行跟踪单个函数

    我从跟踪模块中拼凑出了这些。它允许你装饰个人的功能,使他们的行被追踪。我认为这是一个比运行跟踪模块稍微小一点的锤子,并试图减少使用排除的痕迹。

    import sys
    import os
    import linecache
    
    def trace(f):
        def globaltrace(frame, why, arg):
            if why == "call":
                return localtrace
            return None
    
        def localtrace(frame, why, arg):
            if why == "line":
                # record the file name and line number of every trace
                filename = frame.f_code.co_filename
                lineno = frame.f_lineno
    
                bname = os.path.basename(filename)
                print "{}({}): {}".format(  bname,
                                            lineno,
                                            linecache.getline(filename, lineno)),
            return localtrace
    
        def _f(*args, **kwds):
            sys.settrace(globaltrace)
            result = f(*args, **kwds)
            sys.settrace(None)
            return result
    
        return _f

    同步

    在给定的锁上同步两个(或多个)函数。

    def synchronized(lock):
        '''Synchronization decorator.'''
    
        def wrap(f):
            def new_function(*args, **kw):
                lock.acquire()
                try:
                    return f(*args, **kw)
                finally:
                    lock.release()
            return new_function
        return wrap
    
    # Example usage:
    
    from threading import Lock
    my_lock = Lock()
    
    @synchronized(my_lock)
    def critical1(*args):
        # Interesting stuff goes here.
        pass
    
    @synchronized(my_lock)
    def critical2(*args):
        # Other interesting stuff goes here.
        pass

    类型执行(接受/返回)

    为函数参数和返回值提供不同程度的类型强制执行。

    '''
    One of three degrees of enforcement may be specified by passing
    the 'debug' keyword argument to the decorator:
        0 -- NONE:   No type-checking. Decorators disabled.
     #!python
    -- MEDIUM: Print warning message to stderr. (Default)
        2 -- STRONG: Raise TypeError with message.
    If 'debug' is not passed to the decorator, the default level is used.
    
    Example usage:
        >>> NONE, MEDIUM, STRONG = 0, 1, 2
        >>>
        >>> @accepts(int, int, int)
        ... @returns(float)
        ... def average(x, y, z):
        ...     return (x + y + z) / 2
        ...
        >>> average(5.5, 10, 15.0)
        TypeWarning:  'average' method accepts (int, int, int), but was given
        (float, int, float)
        15.25
        >>> average(5, 10, 15)
        TypeWarning:  'average' method returns (float), but result is (int)
        15
    
    Needed to cast params as floats in function def (or simply divide by 2.0).
    
        >>> TYPE_CHECK = STRONG
        >>> @accepts(int, debug=TYPE_CHECK)
        ... @returns(int, debug=TYPE_CHECK)
        ... def fib(n):
        ...     if n in (0, 1): return n
        ...     return fib(n-1) + fib(n-2)
        ...
        >>> fib(5.3)
        Traceback (most recent call last):
          ...
        TypeError: 'fib' method accepts (int), but was given (float)
    
    '''
    import sys
    
    def accepts(*types, **kw):
        '''Function decorator. Checks decorated function's arguments are
        of the expected types.
    
        Parameters:
        types -- The expected types of the inputs to the decorated function.
                 Must specify type for each parameter.
        kw    -- Optional specification of 'debug' level (this is the only valid
                 keyword argument, no other should be given).
                 debug = ( 0 | 1 | 2 )
    
        '''
        if not kw:
            # default level: MEDIUM
            debug = 1
        else:
            debug = kw['debug']
        try:
            def decorator(f):
                def newf(*args):
                    if debug is 0:
                        return f(*args)
                    assert len(args) == len(types)
                    argtypes = tuple(map(type, args))
                    if argtypes != types:
                        msg = info(f.__name__, types, argtypes, 0)
                        if debug is 1:
                            print >> sys.stderr, 'TypeWarning: ', msg
                        elif debug is 2:
                            raise TypeError, msg
                    return f(*args)
                newf.__name__ = f.__name__
                return newf
            return decorator
        except KeyError, key:
            raise KeyError, key + "is not a valid keyword argument"
        except TypeError, msg:
            raise TypeError, msg
    
    
    def returns(ret_type, **kw):
        '''Function decorator. Checks decorated function's return value
        is of the expected type.
    
        Parameters:
        ret_type -- The expected type of the decorated function's return value.
                    Must specify type for each parameter.
        kw       -- Optional specification of 'debug' level (this is the only valid
                    keyword argument, no other should be given).
                    debug=(0 | 1 | 2)
        '''
        try:
            if not kw:
                # default level: MEDIUM
                debug = 1
            else:
                debug = kw['debug']
            def decorator(f):
                def newf(*args):
                    result = f(*args)
                    if debug is 0:
                        return result
                    res_type = type(result)
                    if res_type != ret_type:
                        msg = info(f.__name__, (ret_type,), (res_type,), 1)
                        if debug is 1:
                            print >> sys.stderr, 'TypeWarning: ', msg
                        elif debug is 2:
                            raise TypeError, msg
                    return result
                newf.__name__ = f.__name__
                return newf
            return decorator
        except KeyError, key:
            raise KeyError, key + "is not a valid keyword argument"
        except TypeError, msg:
            raise TypeError, msg
    
    def info(fname, expected, actual, flag):
        '''Convenience function returns nicely formatted error/warning msg.'''
        format = lambda types: ', '.join([str(t).split("'")[1] for t in types])
        expected, actual = format(expected), format(actual)
        msg = "'{}' method ".format( fname )
              + ("accepts", "returns")[flag] + " ({}), but ".format(expected)
              + ("was given", "result is")[flag] + " ({})".format(actual)
        return msg

    CGI的方法包装

    处理从CGI方法返回的页面顶部和底部的HTML样板文件。与cgi模块一起工作。现在您的请求处理程序可以输出有趣的HTML,并让decorator处理所有的顶部和底层的混乱。

    (注意:异常处理程序会将所有的异常都排除在CGI中,因为程序在独立的子进程中运行。至少在这里,异常内容将写到输出页面。

    class CGImethod(object):
        def __init__(self, title):
            self.title = title
    
        def __call__(self, fn):
            def wrapped_fn(*args):
                print "Content-Type: text/html
    
    "
                print "<HTML>"
                print "<HEAD><TITLE>{}</TITLE></HEAD>".format(self.title)
                print "<BODY>"
                try:
                    fn(*args)
                except Exception, e:
                    print
                    print e
                print
                print "</BODY></HTML>"
    
            return wrapped_fn
    
    @CGImethod("Hello with Decorator")
    def say_hello():
        print '<h1>Hello from CGI-Land</h1>'

    机器状态的实现

    用于实现状态机的修饰器的改进版的代码太长,可以参考:https://wiki.python.org/moin/State%20Machine%20via%20Decorators

    本例使用decorator来促进Python中状态机的实现。decorator用于指定类的事件处理程序。在本例中,操作与转换有关,但是稍微考虑一下将动作与状态关联起来是可能的。

    这个例子定义了一个类MyMachine。类的多个实例可以被实例化,每个实例都维护它自己的状态。一个类也可能有多个状态。这里我使用了gstate和tstate。

    导入的statedefn文件中的代码有点麻烦,但是您可能不需要对它进行深入研究。

    # State Machine example Program
    
    from statedefn import *
    
    class MyMachine(object):
    
        # Create Statedefn object for each state you need to keep track of.
        # the name passed to the constructor becomes a StateVar member of the current class.
        # i.e. if my_obj is a MyMachine object, my_obj.gstate maintains the current gstate
        gstate = StateTable("gstate")
        tstate = StateTable("turtle")
    
        def __init__(self, name):
            # must call init method of class's StateTable object. to initialize state variable
            self.gstate.initialize(self)
            self.tstate.initialize(self)
            self.mname = name
            self.a_count = 0
            self.b_count = 0
            self.c_count = 0
    
        # Decorate the Event Handler virtual functions -note gstate parameter
        @event_handler(gstate)
        def event_a(self): pass
    
        @event_handler(gstate)
        def event_b(self): pass
    
        @event_handler(gstate)
        def event_c(self, val): pass
    
        @event_handler(tstate)
        def toggle(self): pass
    
    
        # define methods to handle events.
        def _event_a_hdlr1(self):
            print "State 1, event A"
            self.a_count += 1
        def _event_b_hdlr1(self):
            print "State 1, event B"
            self.b_count += 1
        def _event_c_hdlr1(self, val):
            print "State 1, event C"
            self.c_count += 3*val
    
        def _event_a_hdlr2(self):
            print "State 2, event A"
            self.a_count += 10
            # here we brute force the tstate to on, leave & enter functions called if state changes.
            # turtle is object's state variable for tstate, comes from constructor argument
            self.turtle.set_state(self, self._t_on)
        def _event_b_hdlr2(self):
            print "State 2, event B"
            self.b_count += 10
        def _event_c_hdlr2(self, val):
            print "State 2, event C"
            self.c_count += 2*val
    
        def _event_a_hdlr3(self):
            self.a_count += 100
            print "State 3, event A"
        def _event_b_hdlr3(self):
            print "State 3, event B"
            self.b_count += 100
            # we decide here we want to go to state 2, overrrides spec in state table below.
            # transition to next_state is made after the method exits.
            self.gstate.next_state = self._state2
        def _event_c_hdlr3(self, val):
            print "State 3, event C"
            self.c_count += 5*val
    
        # Associate the handlers with a state. The first argument is a list of methods.
        # One method for each event_handler decorated function of gstate. Order of methods
        # in the list correspond to order in which the Event Handlers were declared.
        # Second arg is the name of the state.  Third argument is to be come a list of the
        # next states.
        # The first state created becomes the initial state.
        _state1 = gstate.state("One",  (_event_a_hdlr1, _event_b_hdlr1, _event_c_hdlr1),
                                          ("Two", "Three", None))
        _state2 = gstate.state("Two",  (_event_a_hdlr2, _event_b_hdlr2, _event_c_hdlr2),
                                         ("Three",        None,          "One"))
        _state3 = gstate.state("Three",(_event_a_hdlr3, _event_b_hdlr3, _event_c_hdlr3),
                                     (None,         "One",         "Two"))
    
    
        # Declare a function that will be called when entering a new gstate.
        # Can also declare a leave function using @on_leave_function(gstate)
        @on_enter_function(gstate)
        def _enter_gstate(self):
            print "entering state ", self.gstate.name() , "of ", self.mname
        @on_leave_function(tstate)
        def _leave_tstate(self):
            print "leaving state ", self.turtle.name() , "of ", self.mname
    
    
        def _toggle_on(self):
            print "Toggle On"
    
        def _toggle_off(self):
            print "Toggle Off"
    
        _t_off = tstate.state("Off", [_toggle_on],
                             ["On"])
        _t_on =  tstate.state("On", [_toggle_off],
                              ["Off"])
    
    
    def main():
        big_machine = MyMachine("big")
        lil_machine = MyMachine("lil")
    
        big_machine.event_a()
        lil_machine.event_a()
        big_machine.event_a()
        lil_machine.event_a()
        big_machine.event_b()
        lil_machine.event_b()
        big_machine.event_c(4)
        lil_machine.event_c(2)
        big_machine.event_c(1)
        lil_machine.event_c(3)
        big_machine.event_b()
        lil_machine.event_b()
        big_machine.event_a()
        lil_machine.event_a()
        big_machine.event_a()
    
        big_machine.toggle()
        big_machine.toggle()
        big_machine.toggle()
    
        lil_machine.event_a()
        big_machine.event_b()
        lil_machine.event_b()
        big_machine.event_c(3)
        big_machine.event_a()
        lil_machine.event_c(2)
        lil_machine.event_a()
        big_machine.event_b()
        lil_machine.event_b()
        big_machine.event_c(7)
        lil_machine.event_c(1)
    
        print "Event A count ", big_machine.a_count
        print "Event B count ", big_machine.b_count
        print "Event C count ", big_machine.c_count
        print "LilMachine C count ", lil_machine.c_count
    
    main()

    现在输入的是statedefn .py

    #
    # Support for State Machines.  ref - Design Patterns by GoF
    #  Many of the methods in these classes get called behind the scenes.
    #
    #  Notable exceptions are methods of the StateVar class.
    #
    #  See example programs for how this module is intended to be used.
    #
    class StateMachineError(Exception):
        def __init__(self, args = None):
           self.args = args
    
    class StateVar(object):
        def __init__(self, initial_state):
            self._current_state = initial_state
            self.next_state = initial_state            # publicly settable in an event handling routine.
    
        def set_state(self, owner, new_state):
            '''
            Forces a state change to new_state
            '''
            self.next_state = new_state
            self.__to_next_state(owner)
    
        def __to_next_state(self, owner):
            '''
            The low-level state change function which calls leave state & enter state functions as
            needed.
    
            LeaveState and EnterState functions are called as needed when state transitions.
            '''
            if self.next_state is not self._current_state:
                if hasattr(self._current_state, "leave"):
                    self._current_state.leave(owner)
                elif hasattr(self, "leave"):
                    self.leave(owner)
                self._current_state =  self.next_state
                if hasattr(self._current_state, "enter"):
                    self._current_state.enter(owner)
                elif hasattr(self, "enter"):
                    self.enter(owner)
    
        def __fctn(self, func_name):
            '''
            Returns the owning class's method for handling an event for the current state.
            This method not for public consumption.
            '''
            vf = self._current_state.get_fe(func_name)
            return vf
    
        def name(self):
            '''
            Returns the current state name.
            '''
            return self._current_state.name
    
    class STState(object):
        def __init__(self, state_name):
            self.name = state_name
            self.fctn_dict = {}
    
        def set_events(self, event_list, event_hdlr_list, next_states):
            dictionary = self.fctn_dict
            if not next_states:
                def set_row(event, method):
                    dictionary[event] = [method, None]
                map(set_row, event_list, event_hdlr_list)
            else:
                def set_row2(event, method, next_state):
                    dictionary[event] = [method, next_state]
                map(set_row2, event_list, event_hdlr_list, next_states)
            self.fctn_dict = dictionary
    
        def get_fe(self, fctn_name):
            return self.fctn_dict[fctn_name]
    
        def map_next_states(self, state_dict):
            ''' Changes second dict value from name of state to actual state.'''
            for de in self.fctn_dict.values():
                next_state_name = de[1]
                if next_state_name:
                    if next_state_name in state_dict:
                        de[1] = state_dict[next_state_name]
                    else:
                        raise StateMachineError('Invalid Name for next state: {}'.format(next_state_name))
    
    
    class StateTable(object):
        '''
        Magical class to define a state machine, with the help of several decorator functions
        which follow.
        '''
        def __init__(self, declname):
            self.machine_var = declname
            self._initial_state = None
            self._state_list = {}
            self._event_list = []
            self.need_initialize = 1
    
        def initialize(self, parent):
            '''
            Initializes the parent class's state variable for this StateTable class.
            Must call this method in the parent' object's __init__ method.  You can have
            Multiple state machines within a parent class. Call this method for each
            '''
            statevar= StateVar(self._initial_state)
            setattr(parent, self.machine_var, statevar)
            if hasattr(self, "enter"):
                statevar.enter = self.enter
            if hasattr(self, "leave"):
                statevar.leave = self.leave
            #Magic happens here - in the 'next state' table, translate names into state objects.
            if  self.need_initialize:
                for xstate in list(self._state_list.values()):
                    xstate.map_next_states(self._state_list)
                self.need_initialize = 0
    
        def def_state(self, event_hdlr_list, name):
            '''
            This is used to define a state. the event handler list is a list of functions that
            are called for corresponding events. name is the name of the state.
            '''
            state_table_row = STState(name)
            if len(event_hdlr_list) != len(self._event_list):
                raise StateMachineError('Mismatch between number of event handlers and the methods specified for the state.')
    
            state_table_row.set_events(self._event_list, event_hdlr_list, None)
    
            if self._initial_state is None:
                self._initial_state = state_table_row
            self._state_list[name] = state_table_row
            return state_table_row
    
        def state(self, name, event_hdlr_list, next_states):
            state_table_row = STState(name)
            if len(event_hdlr_list) != len(self._event_list):
                raise StateMachineError('Mismatch between number of event handlers and the methods specified for the state.')
            if next_states is not None and len(next_states) != len(self._event_list):
                raise StateMachineError('Mismatch between number of event handlers and the next states specified for the state.')
    
            state_table_row.set_events(self._event_list, event_hdlr_list, next_states)
    
            if self._initial_state is None:
                self._initial_state = state_table_row
            self._state_list[name] = state_table_row
            return state_table_row
    
        def __add_ev_hdlr(self, func_name):
            '''
            Informs the class of an event handler to be added. We just need the name here. The
            function name will later be associated with one of the functions in a list when a state is defined.
            '''
            self._event_list.append(func_name)
    
    # Decorator functions ...
    def event_handler(state_class):
        '''
        Declare a method that handles a type of event.
        '''
        def wrapper(func):
            state_class._StateTable__add_ev_hdlr(func.__name__)
            def obj_call(self, *args, **keywords):
                state_var = getattr(self, state_class.machine_var)
                funky, next_state = state_var._StateVar__fctn(func.__name__)
                if next_state is not None:
                    state_var.next_state = next_state
                rv = funky(self, *args, **keywords)
                state_var._StateVar__to_next_state(self)
                return rv
            return obj_call
        return wrapper
    
    def on_enter_function(state_class):
        '''
        Declare that this method should be called whenever a new state is entered.
        '''
        def wrapper(func):
            state_class.enter = func
            return func
        return wrapper
    
    def on_leave_function(state_class):
        '''
        Declares that this method should be called whenever leaving a state.
        '''
        def wrapper(func):
            state_class.leave = func
            return func
        return wrapper

    不同的装饰形式

    有操作上的区别:

    装饰不带参数
    
    装饰与参数
    
    封装类实例意识的装饰器

    例如:

    from sys import stdout,stderr
    from pdb import set_trace as bp
    
    class DecoTrace(object):
        '''
        Decorator class with no arguments
    
        This can only be used for functions or methods where the instance
        is not necessary
    
        '''
    
        def __init__(self, f):
            self.f = f
    
        def _showargs(self, *fargs, **kw):
            print >> stderr, 'T: enter {} with args={}, kw={}'.format(self.f.__name__, str(fargs), str(kw))
    
        def _aftercall(self, status):
            print >> stderr, 'T: exit {} with status={}'.format(self.f.__name__, str(status))
    
        def __call__(self, *fargs, **kw):
            '''Pass *just* function arguments to wrapped function.'''
            self._showargs(*fargs, **kw)
            ret=self.f(*fargs, **kw)
            self._aftercall(ret)
            return ret
    
        def __repr__(self):
            return self.f.func_name
    
    
    class DecoTraceWithArgs(object):
        '''decorator class with ARGUMENTS
    
           This can be used for unbounded functions and methods.  If this wraps a
           class instance, then extract it and pass to the wrapped method as the
           first arg.
        '''
    
        def __init__(self, *dec_args, **dec_kw):
            '''The decorator arguments are passed here.  Save them for runtime.'''
            self.dec_args = dec_args
            self.dec_kw = dec_kw
    
            self.label = dec_kw.get('label', 'T')
            self.fid = dec_kw.get('stream', stderr)
    
        def _showargs(self, *fargs, **kw):
    
            print >> self.fid, 
                  '{}: enter {} with args={}, kw={}'.format(self.label, self.f.__name__, str(fargs), str(kw))
            print >> self.fid, 
                  '{}:   passing decorator args={}, kw={}'.format(self.label, str(self.dec_args), str(self.dec_kw))
    
        def _aftercall(self, status):
            print >> self.fid, '{}: exit {} with status={}'.format(self.label, self.f.__name__, str(status))
        def _showinstance(self, instance):
            print >> self.fid, '{}: instance={}'.format(self.label, instance)
    
        def __call__(self, f):
            def wrapper(*fargs, **kw):
                '''
                  Combine decorator arguments and function arguments and pass to wrapped
                  class instance-aware function/method.
    
                  Note: the first argument cannot be "self" because we get a parse error
                  "takes at least 1 argument" unless the instance is actually included in
                  the argument list, which is redundant.  If this wraps a class instance,
                  the "self" will be the first argument.
                '''
    
                self._showargs(*fargs, **kw)
    
                # merge decorator keywords into the kw argument list
                kw.update(self.dec_kw)
    
                # Does this wrap a class instance?
                if fargs and getattr(fargs[0], '__class__', None):
    
                    # pull out the instance and combine function and
                    # decorator args
                    instance, fargs = fargs[0], fargs[1:]+self.dec_args
                    self._showinstance(instance)
    
                    # call the method
                    ret=f(instance, *fargs, **kw)
                else:
                    # just send in the give args and kw
                    ret=f(*(fargs + self.dec_args), **kw)
    
                self._aftercall(ret)
                return ret
    
            # Save wrapped function reference
            self.f = f
            wrapper.__name__ = f.__name__
            wrapper.__dict__.update(f.__dict__)
            wrapper.__doc__ = f.__doc__
            return wrapper
    
    
    @DecoTrace
    def FirstBruce(*fargs, **kwargs):
        'Simple function using simple decorator.'
        if fargs and fargs[0]:
            print fargs[0]
    
    @DecoTraceWithArgs(name="Second Bruce", standardline="G'day, Bruce!")
    def SecondBruce(*fargs, **kwargs):
        'Simple function using decorator with arguments.'
        print '{}:'.format(kwargs.get('name', 'Unknown Bruce'))
    
        if fargs and fargs[0]:
            print fargs[0]
        else:
            print kwargs.get('standardline', None)
    
    class Bruce(object):
        'Simple class.'
    
        def __init__(self, id):
            self.id = id
    
        def __str__(self):
            return self.id
    
        def __repr__(self):
            return 'Bruce'
    
        @DecoTraceWithArgs(label="Trace a class", standardline="How are yer Bruce?", stream=stdout)
        def talk(self, *fargs, **kwargs):
            'Simple function using decorator with arguments.'
    
            print '{}:'.format(self)
            if fargs and fargs[0]:
                print fargs[0]
            else:
                print kwargs.get('standardline', None)
    
    ThirdBruce = Bruce('Third Bruce')
    
    SecondBruce()
    FirstBruce("First Bruce: Oh, Hello Bruce!")
    ThirdBruce.talk()
    FirstBruce("First Bruce: Bit crook, Bruce.")
    SecondBruce("Where's Bruce?")
    FirstBruce("First Bruce: He's not here, Bruce")
    ThirdBruce.talk("Blimey, s'hot in here, Bruce.")
    FirstBruce("First Bruce: S'hot enough to boil a monkey's bum!")
    SecondBruce("That's a strange expression, Bruce.")
    FirstBruce("First Bruce: Well Bruce, I heard the Prime Minister use it. S'hot enough to boil a monkey's bum in 'ere, your Majesty,' he said and she smiled quietly to herself.")
    ThirdBruce.talk("She's a good Sheila, Bruce and not at all stuck up.")

     未实现的函数替换

    允许您在开发环境中测试未实现的代码,将默认参数指定为对decorator的参数(或者您可以将其保留为指定不返回的参数)。

    # Annotation wrapper annotation method
    def unimplemented(defaultval):
        if(type(defaultval) == type(unimplemented)):
            return lambda: None
        else:
            # Actual annotation
            def unimp_wrapper(func):
                # What we replace the function with
                def wrapper(*arg):
                    return defaultval
                return wrapper
            return unimp_wrapper

    重定向标准输出打印到python标准日志记录。

    class LogPrinter:
        '''LogPrinter class which serves to emulates a file object and logs
           whatever it gets sent to a Logger object at the INFO level.'''
        def __init__(self):
            '''Grabs the specific logger to use for logprinting.'''
            self.ilogger = logging.getLogger('logprinter')
            il = self.ilogger
            logging.basicConfig()
            il.setLevel(logging.INFO)
    
        def write(self, text):
            '''Logs written output to a specific logger'''
            self.ilogger.info(text)
    
    def logprintinfo(func):
        '''Wraps a method so that any calls made to print get logged instead'''
        def pwrapper(*arg, **kwargs):
            stdobak = sys.stdout
            lpinstance = LogPrinter()
            sys.stdout = lpinstance
            try:
                return func(*arg, **kwargs)
            finally:
                sys.stdout = stdobak
        return pwrapper

    访问控制

    这个示例阻止用户访问未被授权访问的地方

    class LoginCheck:
        '''
        This class checks whether a user
        has logged in properly via
        the global "check_function". If so,
        the requested routine is called.
        Otherwise, an alternative page is
        displayed via the global "alt_function"
        '''
        def __init__(self, f):
            self._f = f
    
        def __call__(self, *args):
            Status = check_function()
            if Status is 1:
                return self._f(*args)
            else:
                return alt_function()
    
    def check_function():
        return test
    
    def alt_function():
        return 'Sorry - this is the forced behaviour'
    
    @LoginCheck
    def display_members_page():
        print 'This is the members page'

    例如:

    test = 0
    DisplayMembersPage()
    # Displays "Sorry - this is the forced behaviour"
    
    test = 1
    DisplayMembersPage()
    # Displays "This is the members page"

    事件不断上升和处理

    请看这里的代码和例子:https://pypi.python.org/pypi/Decovent

    Singleton

    import functools
    
    def singleton(cls):
        ''' Use class as singleton. '''
    
        cls.__new_original__ = cls.__new__
    
        @functools.wraps(cls.__new__)
        def singleton_new(cls, *args, **kw):
            it =  cls.__dict__.get('__it__')
            if it is not None:
                return it
    
            cls.__it__ = it = cls.__new_original__(cls, *args, **kw)
            it.__init_original__(*args, **kw)
            return it
    
        cls.__new__ = singleton_new
        cls.__init_original__ = cls.__init__
        cls.__init__ = object.__init__
    
        return cls
    
    #
    # Sample use:
    #
    
    @singleton
    class Foo:
        def __new__(cls):
            cls.x = 10
            return object.__new__(cls)
    
        def __init__(self):
            assert self.x == 10
            self.x = 15
    
    assert Foo().x == 15
    Foo().x = 20
    assert Foo().x == 20

    异步调用

    from Queue import Queue
    from threading import Thread
    
    class asynchronous(object):
        def __init__(self, func):
            self.func = func
    
            def threaded(*args, **kwargs):
                self.queue.put(self.func(*args, **kwargs))
    
            self.threaded = threaded
    
        def __call__(self, *args, **kwargs):
            return self.func(*args, **kwargs)
    
        def start(self, *args, **kwargs):
            self.queue = Queue()
            thread = Thread(target=self.threaded, args=args, kwargs=kwargs);
            thread.start();
            return asynchronous.Result(self.queue, thread)
    
        class NotYetDoneException(Exception):
            def __init__(self, message):
                self.message = message
    
        class Result(object):
            def __init__(self, queue, thread):
                self.queue = queue
                self.thread = thread
    
            def is_done(self):
                return not self.thread.is_alive()
    
            def get_result(self):
                if not self.is_done():
                    raise asynchronous.NotYetDoneException('the call has not yet completed its task')
    
                if not hasattr(self, 'result'):
                    self.result = self.queue.get()
    
                return self.result
    
    if __name__ == '__main__':
        # sample usage
        import time
    
        @asynchronous
        def long_process(num):
            time.sleep(10)
            return num * num
    
        result = long_process.start(12)
    
        for i in range(20):
            print i
            time.sleep(1)
    
            if result.is_done():
                print "result {0}".format(result.get_result())
    
    
        result2 = long_process.start(13)
    
        try:
            print "result2 {0}".format(result2.get_result())
    
        except asynchronous.NotYetDoneException as ex:
            print ex.message

    使用实例的类方法装饰器

    在修饰类方法时,decorator接收一个尚未绑定到实例的函数。

    decorator不能在实例调用它时做任何事情,除非它实际上是一个描述符。

    from functools import wraps
    
    def decorate(f):
        '''
        Class method decorator specific to the instance.
    
        It uses a descriptor to delay the definition of the
        method wrapper.
        '''
        class descript(object):
            def __init__(self, f):
                self.f = f
    
            def __get__(self, instance, klass):
                if instance is None:
                    # Class method was requested
                    return self.make_unbound(klass)
                return self.make_bound(instance)
    
            def make_unbound(self, klass):
                @wraps(self.f)
                def wrapper(*args, **kwargs):
                    '''This documentation will vanish :)'''
                    raise TypeError(
                        'unbound method {}() must be called with {} instance '
                        'as first argument (got nothing instead)'.format(
                            self.f.__name__,
                            klass.__name__)
                    )
                return wrapper
    
            def make_bound(self, instance):
                @wraps(self.f)
                def wrapper(*args, **kwargs):
                    '''This documentation will disapear :)'''
                    print "Called the decorated method {} of {}".format(self.f.__name__, instance)
                    return self.f(instance, *args, **kwargs)
                # This instance does not need the descriptor anymore,
                # let it find the wrapper directly next time:
                setattr(instance, self.f.__name__, wrapper)
                return wrapper
    
        return descript(f)
    
    

    另一个失败的装饰器

    这里还有另一个修饰符,用于使函数重新尝试若干次。这个decorator是优越的IMHO,因为它应该与任何一个在失败中引发异常的旧函数一起工作。

    特点:

    使用任何通过引发异常来表示失败的函数(即任何函数)
    
    支持重试延迟和后退
    
    用户可以指定为重试而捕获的异常。例如,在通信困难的情况下,网络代码可能会引起攻击,而任何其他的异常可能表明代码中有错误。
    
    为自定义日志记录

    例子:

    #
    # Copyright 2012 by Jeff Laughlin Consulting LLC
    #
    # Permission is hereby granted, free of charge, to any person obtaining a copy
    # of this software and associated documentation files (the "Software"), to deal
    # in the Software without restriction, including without limitation the rights
    # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
    # copies of the Software, and to permit persons to whom the Software is
    # furnished to do so, subject to the following conditions:
    #
    # The above copyright notice and this permission notice shall be included in
    # all copies or substantial portions of the Software.
    #
    # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
    # SOFTWARE.
    
    
    import sys
    from time import sleep
    
    
    def example_exc_handler(tries_remaining, exception, delay):
        """Example exception handler; prints a warning to stderr.
    
        tries_remaining: The number of tries remaining.
        exception: The exception instance which was raised.
        """
        print >> sys.stderr, "Caught '%s', %d tries remaining, sleeping for %s seconds" % (exception, tries_remaining, delay)
    
    
    def retries(max_tries, delay=1, backoff=2, exceptions=(Exception,), hook=None):
        """Function decorator implementing retrying logic.
    
        delay: Sleep this many seconds * backoff * try number after failure
        backoff: Multiply delay by this factor after each failure
        exceptions: A tuple of exception classes; default (Exception,)
        hook: A function with the signature myhook(tries_remaining, exception);
              default None
    
        The decorator will call the function up to max_tries times if it raises
        an exception.
    
        By default it catches instances of the Exception class and subclasses.
        This will recover after all but the most fatal errors. You may specify a
        custom tuple of exception classes with the 'exceptions' argument; the
        function will only be retried if it raises one of the specified
        exceptions.
    
        Additionally you may specify a hook function which will be called prior
        to retrying with the number of remaining tries and the exception instance;
        see given example. This is primarily intended to give the opportunity to
        log the failure. Hook is not called after failure if no retries remain.
        """
        def dec(func):
            def f2(*args, **kwargs):
                mydelay = delay
                tries = range(max_tries)
                tries.reverse()
                for tries_remaining in tries:
                    try:
                       return func(*args, **kwargs)
                    except exceptions as e:
                        if tries_remaining > 0:
                            if hook is not None:
                                hook(tries_remaining, e, mydelay)
                            sleep(mydelay)
                            mydelay = mydelay * backoff
                        else:
                            raise
                    else:
                        break
            return f2
        return dec

    带有指定日志记录器(或默认)的日志装饰器

    这个装饰器将使用指定的记录器来记录您的funtion的进入和退出点,或者它默认为您的函数的模块名logger。

    在当前表单中,它使用日志记录。信息级别,但我可以很容易地定制使用任何级别。输入和退出消息也是相同的。

    例子1:

    import functools, logging
    
    
    log = logging.getLogger(__name__)
    log.setLevel(logging.DEBUG)
    
    class log_with(object):
        '''Logging decorator that allows you to log with a
    specific logger.
    '''
        # Customize these messages
        ENTRY_MESSAGE = 'Entering {}'
        EXIT_MESSAGE = 'Exiting {}'
    
        def __init__(self, logger=None):
            self.logger = logger
    
        def __call__(self, func):
            '''Returns a wrapper that wraps func.
    The wrapper will log the entry and exit points of the function
    with logging.INFO level.
    '''
            # set logger if it was not set earlier
            if not self.logger:
                logging.basicConfig()
                self.logger = logging.getLogger(func.__module__)
    
            @functools.wraps(func)
            def wrapper(*args, **kwds):
                self.logger.info(self.ENTRY_MESSAGE.format(func.__name__))  # logging level .info(). Set to .debug() if you want to
                f_result = func(*args, **kwds)
                self.logger.info(self.EXIT_MESSAGE.format(func.__name__))   # logging level .info(). Set to .debug() if you want to
                return f_result
            return wrapper

    例子2:

    # Sample use and output:
    
    if __name__ == '__main__':
        logging.basicConfig()
        log = logging.getLogger('custom_log')
        log.setLevel(logging.DEBUG)
        log.info('ciao')
    
        @log_with(log)     # user specified logger
        def foo():
            print 'this is foo'
        foo()
    
        @log_with()        # using default logger
        def foo2():
            print 'this is foo2'
        foo2()

    例子3:

    # output
    >>> ================================ RESTART ================================
    >>>
    INFO:custom_log:ciao
    INFO:custom_log:Entering foo # uses the correct logger
    this is foo
    INFO:custom_log:Exiting foo
    INFO:__main__:Entering foo2  # uses the correct logger
    this is foo2
    INFO:__main__:Exiting foo2

    Lazy Thunkify

    这个decorator将导致任何函数,而不是运行它的代码,启动一个线程来运行代码,返回thunk(没有args的函数),等待函数的完成并返回值(或引发异常)。

    如果你有计算A需要x秒,然后使用计算B,这需要y秒。而不是x + y秒,你只需要max(x,y)秒。

    import threading, sys, functools, traceback
    
    def lazy_thunkify(f):
        """Make a function immediately return a function of no args which, when called,
        waits for the result, which will start being processed in another thread."""
    
        @functools.wraps(f)
        def lazy_thunked(*args, **kwargs):
            wait_event = threading.Event()
    
            result = [None]
            exc = [False, None]
    
            def worker_func():
                try:
                    func_result = f(*args, **kwargs)
                    result[0] = func_result
                except Exception, e:
                    exc[0] = True
                    exc[1] = sys.exc_info()
                    print "Lazy thunk has thrown an exception (will be raised on thunk()):
    %s" % (
                        traceback.format_exc())
                finally:
                    wait_event.set()
    
            def thunk():
                wait_event.wait()
                if exc[0]:
                    raise exc[1][0], exc[1][1], exc[1][2]
    
                return result[0]
    
            threading.Thread(target=worker_func).start()
    
            return thunk
    
        return lazy_thunked

    例如:

    @lazy_thunkify
    def slow_double(i):
        print "Multiplying..."
        time.sleep(5)
        print "Done multiplying!"
        return i*2
    
    
    def maybe_multiply(x):
        double_thunk = slow_double(x)
        print "Thinking..."
        time.sleep(3)
        time.sleep(3)
        time.sleep(1)
        if x == 3:
            print "Using it!"
            res = double_thunk()
        else:
            print "Not using it."
            res = None
        return res
    
    #both take 7 seconds
    maybe_multiply(10)
    maybe_multiply(3)

    生成器函数的聚合装饰器

    这可能是一个全局的修饰符。目标是将一个聚合函数应用到一个泛函数的迭代结果中。

    两个有趣的聚合器可以是sum和average:

    import functools as ft
    import operator as op
    
    def summed(f):
      return lambda *xs : sum(f(*xs))
    
    def averaged(f):
      def aux(acc, x):
        return (acc[0] + x, acc[1] + 1)
    
      def out(*xs):
        s, n = ft.reduce(aux, f(*xs), (0, 0))
        return s / n if n > 0 else 0
    
      return out

    两个建议的装饰者的例子:

    @averaged
    def producer2():
        yield 10
        yield 5
        yield 2.5
        yield 7.5
    
    assert producer2() == (10 + 5 + 2.5 + 7.5) / 4
    
    @summed
    def producer1():
        yield 10
        yield 5
        yield 2.5
        yield 7.5
    
    assert producer1() == (10 + 5 + 2.5 + 7.5)

    Function Timeout

    import signal
    import functools
    
    class TimeoutError(Exception): pass
    
    def timeout(seconds, error_message = 'Function call timed out'):
        def decorated(func):
            def _handle_timeout(signum, frame):
                raise TimeoutError(error_message)
    
            def wrapper(*args, **kwargs):
                signal.signal(signal.SIGALRM, _handle_timeout)
                signal.alarm(seconds)
                try:
                    result = func(*args, **kwargs)
                finally:
                    signal.alarm(0)
                return result
    
            return functools.wraps(func)(wrapper)
    
        return decorated

    例如:

    import time
    
    @timeout(1, 'Function slow; aborted')
    def slow_function():
        time.sleep(5)

    收集由装饰功能引起的数据差异

    它调用一个用户函数来收集修饰函数运行之前和之后的一些数据。为了计算不同,它调用差分计算器用户函数。

    示例:检查打印作业的页码:从打印前和打印后,从打印机中获取所有打印页面的数量。然后计算差异,以获得由装饰的函数打印的页数

    import inspect
    # Just in case you want to use the name of the decorator instead of difference calculator
    # But in that case if the function decorated  more than once the collected difference will be overwritten
    
    import time
    # Demo purposes only, the difference will be generated from time
    
    from functools import wraps
    
    
    def collect_data_and_calculate_difference(data_collector, difference_calculator):
        """Returns difference of data collected before and after the decorated function,
        plus the original return value of the decorated function. Return type: dict.
        Keys:
            - function name of the decorated function
            - name of the difference calculator function
        Values:
            - the original return value of decorated function
            - difference calculated by difference_calculator functions
        Parameters: functions to collect data, and create difference from collected data
    
        Created: 2017
        Author: George Fischhof
        """
    
        current_decorator_function_name = inspect.currentframe().f_code.co_name
        # Just in case you want to use it
    
        def function_wrapper_because_of_parameters(decorated_function):
            difference_calculator_name = difference_calculator.__name__
            decorated_function_name = decorated_function.__name__
    
            i_am_the_first_decorator = not hasattr(decorated_function, '__wrapped__')
    
            @wraps(decorated_function)
            def wrapper(*args, **kwargs) -> dict:
                result_dict = dict()
    
                before = data_collector()
                original_result = decorated_function(*args, **kwargs)
                after = data_collector()
    
                my_collection = difference_calculator(before=before, after=after)
    
                i_am_not_first_decorator_but_first_is_similar_to_me = (
                    not i_am_the_first_decorator
                    and isinstance(original_result, dict)
                    and (decorated_function_name in original_result)
                )
    
                if i_am_not_first_decorator_but_first_is_similar_to_me:
                    original_result[difference_calculator_name] = my_collection
                    return original_result
                else:
                    result_dict[decorated_function_name] = original_result
                    result_dict[difference_calculator_name] = my_collection
                    return result_dict
    
            return wrapper
        return function_wrapper_because_of_parameters
    
    
    # Usage
    
    
    def collect_data_or_data_series_a():
        time.sleep(0.5)
        return time.time()
    
    
    def collect_data_or_data_series_b():
        time.sleep(0.5)
        return time.time()
    
    
    def calculate_difference_on_data_series_a(before, after):
        return after - before
    
    
    def calculate_difference_on_data_series_b(before, after):
        return after - before
    
    
    @collect_data_and_calculate_difference(
        data_collector=collect_data_or_data_series_a,
        difference_calculator=calculate_difference_on_data_series_a)
    @collect_data_and_calculate_difference(
        data_collector=collect_data_or_data_series_b,
        difference_calculator=calculate_difference_on_data_series_b)
    def do_something_that_changes_the_collected_data():
        return 'result of decorated function...'
    
    
    print(do_something_that_changes_the_collected_data())
    # result dict:
    # {'calculate_difference_on_data_series_a': 1.5010299682617188,
    # 'do_something_that_changes_the_collected_data': 'result of decorated function...',
    # 'calculate_difference_on_data_series_b': 0.5001623630523682}

    参考:

    https://wiki.python.org/moin/DecoratorPattern

    https://wiki.python.org/moin/PythonDecoratorLibrary

    https://wiki.python.org/moin/PythonDecoratorLibrary?action=AttachFile&do=view&target=memoize.py

    https://wiki.python.org/moin/PythonDecoratorProposals

    https://www.python.org/dev/peps/pep-0318/#syntax-alternatives

    在python中创建单例模式:

    https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python

    decorator模块:

    https://pypi.python.org/pypi/decorator

    https://docs.python.org/dev/library/functools.html

  • 相关阅读:
    计算页数
    DOS批量拷贝文件
    时间与日期处理
    MOUSE_OVER/MOUSE_OUT与ROLL_OVER/ROLL_OUT的区别
    vb程序改写方法。
    sqldmo备份还原sqlserver2000数据库
    终于有了自己的窝啦
    vb中创建Excel,把数据存入Excel
    提高vb 》excel数据的导入速度
    SQLServer2000数据库特有的1433端口号不能访问
  • 原文地址:https://www.cnblogs.com/zhongguiyao/p/11048483.html
Copyright © 2011-2022 走看看