zoukankan      html  css  js  c++  java
  • day24-20180526笔记

    笔记:python3 复习Celery异步分布式

    一、复习Celery异步分布式

    Celery模块调用

    既然celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢,celery可以支持多台不通的计算机执行不同的任务或者相同的任务。
    如果要说celery的分布式应用的话,我觉得就要提到celery的消息路由机制,就要提一下AMQP协议。具体的可以查看AMQP的文档。简单地说就是可以有多个消息队列(Message Queue),不同的消息可以指定发送给不同的Message Queue,而这是通过Exchange来实现的。发送消息到Message Queue中时,可以指定routiing_key,Exchange通过routing_key来把消息路由(routes)到不同的Message Queue中去

    多进程

    多worker,多队列

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/05/27 09:10
    # @Author  : yangyuanqiang
    # @File    : demon3.py
    
    from celery import Celery
    
    app = Celery()
    app.config_from_object("celeryconfig")
    
    @app.task
    def taskA(x, y):
        return x*y
    
    @app.task
    def taskB(x, y, z):
        return x+y+z
    
    @app.task
    def add(x, y):
        return x+y
    上面的代码中,首先定义了一个Celery的对象,然后通过celeryconfig.py对celery对象进行设置。之后又分别定义了三个task,分别是taskA, taskB和add。接下来看看celeryconfig.py
    注意:代码的缩进格式:
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/05/27 09:30
    # @Author  : yangyuanqiang
    # @File    : celeryconfig.py
    from kombu import Queue, Exchange
    
    BROKER_URL = "redis://192.168.3.11:6379/1"
    CELERY_RESULT_BACKEND = "redis://192.168.3.11:6379/2"
    
    
    CELERY_QUEUES = {
        Queue("default",Exchange("default"),routing_key="default"),
        Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
        Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
    }
    
    CELERY_ROUTES = {
        "demon3.taskA":{"queue":"for_task_A","routing_key":"for_task_A"},
        "demon3.taskB":{"queue":"for_task_B","routing_key":"for_task_B"}
    }
    配置文件一般单独写在一个文件中。

    启动一个worker来指定taskA
    ivandeMacBook-Pro:celery ivan$ celery -A demon3 worker -l info -n workerA.%h -Q for_task_A

    启动一个worker来指定taskB
    ivandeMacBook-Pro:celery ivan$ celery -A demon3 worker -l info -n workerB.%h -Q for_task_B

    执行以下程序

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/05/27 10:05
    # @Author  : yangyuanqiang
    # @File    : test.py
    import time
    
    from demon3 import *
    
    re1 = taskA.delay(10, 20)
    
    re2 = taskB.delay(100, 200, 300)
    
    re3 = add.delay(1000, 2000)
    
    time.sleep(2)
    print(re1.result)
    print(re2.result)
    print(re3.status)
    print(re3.result)

    以上实例输出的结果

    200
    600
    PENDING
    None

    tasks_A输出的结果

    [2018-05-27 10:11:08,855: INFO/MainProcess] Received task: demon3.taskA[f245bc49-7db7-48cf-b1e0-2631f4ea35b9]  
    [2018-05-27 10:11:08,866: INFO/ForkPoolWorker-2] Task demon3.taskA[f245bc49-7db7-48cf-b1e0-2631f4ea35b9] succeeded in 0.008108521999929508s: 200

    tasks_B输出的结果

    [2018-05-27 10:11:08,858: INFO/MainProcess] Received task: demon3.taskB[1edbc326-0949-4d1b-b45f-d7b15cdf17a5]  
    [2018-05-27 10:11:08,868: INFO/ForkPoolWorker-2] Task demon3.taskB[1edbc326-0949-4d1b-b45f-d7b15cdf17a5] succeeded in 0.006413651000002574s: 600
    我们看到add的状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。下面,我们来启动一个worker来执行celery队列中的任务。
    ivandeMacBook-Pro:celery ivan$ celery -A demon3 worker -l info -n worker.%h -Q celery

    执行test.py输出的结果

    200
    600
    SUCCESS
    3000
    Celery与定时任务
    在celery中执行定时任务非常简单,只需要设置celery对象中的CELERYBEAT_SCHEDULE属性即可。
    下面我们接着在celeryconfig.py中添加CELERYBEAT_SCHEDULE变量
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/05/27 09:30
    # @Author  : yangyuanqiang
    # @File    : celeryconfig.py
    from kombu import Queue, Exchange
    
    BROKER_URL = "redis://192.168.3.11:6379/1"
    CELERY_RESULT_BACKEND = "redis://192.168.3.11:6379/2"
    
    
    CELERY_QUEUES = {
        Queue("default",Exchange("default"),routing_key="default"),
        Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),
        Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B")
    }
    
    CELERY_ROUTES = {
        "demon3.taskA":{"queue":"for_task_A","routing_key":"for_task_A"},
        "demon3.taskB":{"queue":"for_task_B","routing_key":"for_task_B"}
    }
    
    #在celeryconfig.py中添加以下代码:
    CELERY_TIMEZONE = 'UTC'
    CELERYBEAT_SCHEDULE = {
        'taskA_schedule' : {
            'task':'demon3.taskA',
            'schedule':2,
            'args':(5,6)
        },
        'taskB_scheduler' : {
            'task':"demon3.taskB",
            "schedule":10,
            "args":(10,20,30)
        },
        'add_schedule': {
            "task":"demon3.add",
            "schedule":5,
            "args":(1,2)
        }
    }

    Celery启动定时任务

    ivandeMacBook-Pro:celery ivan$ celery -A demon3 beat

    Celery启动定时任务

    这样taskA每20秒执行一次taskA.delay(5, 6)
    taskB每200秒执行一次taskB.delay(10, 20, 30)
    Celery每10秒执行一次add.delay(1, 2)

     习题一:不断记录服务器输入日志

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/5/26 20:26
    # @Author  : lingxiangxiang
    # @File    : utils.py
    import codecs
    from threading import Thread, Lock
    
    import os
    
    
    class TraceLog(Thread):
        def __init__(self, logName):
            super(TraceLog, self).__init__()
            self.logName = logName
            self.lock = Lock()
            self.contexts = []
            self.isFile()
    
        def isFile(self):
            if not os.path.exists(self.logName):
                with codecs.open(self.logName, 'w') as f:
                    f.write("this log name is: {0}
    ".format(self.logName))
                    f.write("start log
    ")
    
        def write(self, context):
            self.contexts.append(context)
    
        def run(self):
            while 1:
                self.lock.acquire()
                if len(self.contexts) !=0:
                    with codecs.open(self.logName, "a") as f:
                        for context in self.contexts:
                            f.write(context)
                    del self.contexts[:]#注意不能忘记清空
                self.lock.release()
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2018/5/26 20:19
    # @Author  : lingxiangxiang
    # @File    : main.py
    '''不断记录服务端输入的日志
    实现>> 和>
    '''
    import time
    
    import sys
    
    from day24.test.utils import TraceLog
    
    
    class Server(object):
        def printLog(self):
            print("start server
    ")
            for i in range(100):
                print(i)
                time.sleep(0.1)
            print("end server
    ")
    
    
    
    if __name__ == '__main__':
        traceLog = TraceLog("main.log")
        traceLog.start()
        sys.stdout = traceLog
        sys.stderr = traceLog
        server = Server()
        server.printLog()
    
    # 每当调用print的时候,底层就是在代用sys.stdout.write(str)
    # sys.stdout.write() = traceLog.write()

    以上实例输出的结果,在当前目录下生成一个main.log文件

    this log name is: main.log
    start log
    start server
    
    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    end server

     习题二:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2017/12/3 21:55
    # @Author  : lingxiangxiang
    # @File    : utils.py
    import codecs
    import threading
    
    import os
    
    
    class WriteLog(threading.Thread):
        def __init__(self, logName):
            super(WriteLog,  self).__init__()
            self.logName = logName
            self.lock = threading.Lock()    #课上这里写是Lock没有写()
            self.contexts = []
            self.mkfile()
    
        def mkfile(self):
            if not os.path.exists(self.logName):
                with codecs.open(self.logName, 'w') as f:
                    f.write("This file is log for {0}
    ".format(self.logName))
    
        def write(self, context):
            self.contexts.append(context)
    
        def run(self):
            while 1:
                self.lock.acquire()
                if len(self.contexts) != 0:
                    with codecs.open(self.logName, "a") as f:
                        for context in self.contexts:
                            f.write(context)
                    del self.contexts[:]
                self.lock.release()
    
    
    
    
    # class A(threading.Thread):
    #     def __init__(self):
    #         super(A, self).__init__()
    #         self.contexts = []
    #         self.lock = threading.Lock()
    #     def run(self):
    #         while 1:
    #             self.lock.acquire()
    #             if len(self.contexts) !=0:
    #                 with open('main.log', 'a') as f:
    #                     for context in self.contexts:
    #                         f.write(context)
    #                 del self.contexts[:]
    #             self.lock.release()
    #
    #     def write(self, context):
    #         self.contexts.append(context)
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # @Time    : 2017/12/3 21:42
    # @Author  : lingxiangxiang
    # @File    : main.py
    import time
    
    from utils import WriteLog
    import sys
    reload(sys)
    sys.setdefaultencoding("utf-8")
    
    def Xh():
        for i in xrange(1, 100):
            print(i)
            # sys.stdout.write(str(i))
            time.sleep(0.1)
    
    
    
    
    
    def main():
        writeLog = WriteLog("main.log")
        writeLog.start()
        sys.stdout = writeLog
        sys.stderr = writeLog
        Xh()
    
    if __name__ == '__main__':
        main()

    以上实例输出的结果

    This file is log for main.log
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
  • 相关阅读:
    高精度计算
    c++ sort
    算法分类小结
    二叉树层序遍历
    clion windows c++环境配置 mingw
    kafka,filebeat 配置
    centos7 源码安装mysql5.7
    如何优雅的生成及遍历python嵌套字典
    Java Swing 绝对布局管理方法,null布局【图】
    python3.4+pymssql 中文乱码问题解决
  • 原文地址:https://www.cnblogs.com/ivan-yang/p/9095255.html
Copyright © 2011-2022 走看看