zoukankan      html  css  js  c++  java
  • Python使用Redis实现一个简单作业调度系统

    Python使用Redis实现一个简单作业调度系统

    概述

    Redis作为内存数据库的一个典型代表,已经在非常多应用场景中被使用,这里仅就Redis的pub/sub功能来说说如何通过此功能来实现一个简单的作业调度系统。这里仅仅是想展现一个简单的想法,所以还是有非常多须要考虑的东西没有包含在这个样例中。比方错误处理,持久化等。

    以下是实现上的想法

    • MyMaster:集群的master节点程序。负责产生作业,派发作业和获取执行结果。
    • MySlave:集群的计算节点程序。每一个计算节点一个,负责获取作业并执行,并将结果发送会master节点。

    • channel CHANNEL_DISPATCH:每一个slave节点订阅一个channel。比方“CHANNEL_DISPATCH_[idx或机器名]”,master会向此channel中publish被dispatch的作业。
    • channel CHANNEL_RESULT:用来保存作业结果的channel,master和slave共享此channel,master订阅此channel来获取作业执行结果,每一个slave负责将作业执行结果公布到此channel中。

    Master代码

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import time
    import threading
    import random
    import redis
    
    
    REDIS_HOST = 'localhost'
    REDIS_PORT = 6379
    REDIS_DB = 0
    CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
    CHANNEL_RESULT = 'CHANNEL_RESULT'
    
    
    class MyMaster():
        def __init__(self):
            pass
    
        def start(self):
            MyServerResultHandleThread().start()
            MyServerDispatchThread().start()
    
    
    class MyServerDispatchThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
    
        def run(self):
            r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
            for i in range(1, 100):
                channel = CHANNEL_DISPATCH + '_' + str(random.randint(1, 3))
                print("Dispatch job %s to %s" % (str(i), channel))
                ret = r.publish(channel, str(i))
                if ret == 0:
                    print("Dispatch job %s failed." % str(i))
                time.sleep(5)
    
    
    class MyServerResultHandleThread(threading.Thread):
        def __init__(self):
            threading.Thread.__init__(self)
    
        def run(self):
            r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
            p = r.pubsub()
            p.subscribe(CHANNEL_RESULT)
            for message in p.listen():
                if message['type'] != 'message':
                    continue
                print("Received finished job %s" % message['data'])
    
    
    if __name__ == "__main__":
        MyMaster().start()
        time.sleep(10000)
    

    说明

    • MyMaster类 - master主程序,用来启动dispatch和resulthandler的线程
    • MyServerDispatchThread类 - 派发作业线程。产生作业并派发到计算节点
    • MyServerResultHandleThread类 - 作业执行结果处理线程。从channel里获取作业结果并显示

    Slave代码

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    from datetime import datetime
    import time
    import threading
    import random
    import redis
    
    
    REDIS_HOST = 'localhost'
    REDIS_PORT = 6379
    REDIS_DB = 0
    CHANNEL_DISPATCH = 'CHANNEL_DISPATCH'
    CHANNEL_RESULT = 'CHANNEL_RESULT'
    
    
    class MySlave():
        def __init__(self):
            pass
    
        def start(self):
            for i in range(1, 4):
                MyJobWorkerThread(CHANNEL_DISPATCH + '_' + str(i)).start()
    
    
    class MyJobWorkerThread(threading.Thread):
    
        def __init__(self, channel):
            threading.Thread.__init__(self)
            self.channel = channel
    
        def run(self):
            r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)
            p = r.pubsub()
            p.subscribe(self.channel)
            for message in p.listen():
                if message['type'] != 'message':
                    continue
                print("%s: Received dispatched job %s " % (self.channel, message['data']))
                print("%s: Run dispatched job %s " % (self.channel, message['data']))
                time.sleep(2)
                print("%s: Send finished job %s " % (self.channel, message['data']))
                ret = r.publish(CHANNEL_RESULT, message['data'])
                if ret == 0:
                    print("%s: Send finished job %s failed." % (self.channel, message['data']))
    
    
    if __name__ == "__main__":
        MySlave().start()
        time.sleep(10000)
    

    说明

    • MySlave类 - slave节点主程序,用来启动MyJobWorkerThread的线程
    • MyJobWorkerThread类 - 从channel里获取派发的作业并将执行结果发送回master

    測试

    • 首先执行MySlave来定义派发作业channel。
    • 然后执行MyMaster派发作业并显示执行结果。

    转载请以链接形式标明本文地址
    本文地址:http://blog.csdn.net/kongxx/article/details/50952090

  • 相关阅读:
    Java xml 操作(Dom4J修改xml   + xPath技术  + SAX解析 + XML约束)
    Git 命令 操作
    vim常用快捷键
    离线数据分析流程介绍
    WebPack 简单使用
    React Native之React速学教程(下)
    puppet 部署 horizon server 所需的参数和部署逻辑
    jsp出现getOutputStream() has already been called for this response异常的原因和解决方法
    12款优秀的 JavaScript 日历和时间选择控件
    mysql 去重
  • 原文地址:https://www.cnblogs.com/yjbjingcha/p/7157390.html
Copyright © 2011-2022 走看看