zoukankan      html  css  js  c++  java
  • Python 基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现

    基于Python及zookeeper实现简单分布式任务调度系统设计思路及核心代码实现

     

    by:授客 QQ:1033553122

     

    测试环境 

    功能需求 

    实现思路 

    代码实践(关键技术点实现) 

    代码模块组织结构 

    配置文件解析 

    MyTCPServer.py 

    MyTCPClient.py 

    appClient.py 

    loadAgent.py 

    运行效果 13

     

     

     

    测试环境

    Win7 64位

     

    Linux 64位

     

    Python 3.3.4

     

    kazoo-2.6.1-py2.py3-none-any.whl(windows)

    kazoo-2.6.1.tar.gz (linux)

    https://pypi.org/project/kazoo/#files

     

    zookeeper-3.4.13.tar.gz

    下载地址1:

    http://zookeeper.apache.org/releases.html#download

    https://www.apache.org/dyn/closer.cgi/zookeeper/

    https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/

     

     

    功能需求

    把不同的负载主机,注册为zookeeper的节点,其它应用模块请求zookeeper获取相关节点信息(服务器ip,端口号,服务器任务执行状态),通过服务器任务状态选择没有运行指定任务的服务器执行相关任务。

     

    针对以上需求,做一个技术预研,核心代码实现

     

     

    实现思路

    负载服务器启动时,初始化zookeeper客户端,创建tcp服务器,注册节点信息到zookeeper服务器(信息包含tcp服务器ip,端口,负载服务器任务执行状态),然后定时检测负载服务器任务执行状态(通过检测某个进程的名称是否存在进行判断),其它应用模块通过zookeeper获取节点信息后,通过tcp socket通信,向负载服务器发送执行命令,然后负载服务器根据这些命令进行不同的处理。

     

     

    代码实践(关键技术点实现)

    代码模块组织结构

     

     

    配置文件解析

    conf/agent.conf

    [AGENT]

    interval = 5

    proc = sftp-server

     

     

    [README]

    interval = 更新服务器节点信息频率(单位 秒

    proc = 需要检测的进程名称(程序通过查找对应进程名称来判断负载程序是否还在运行,从而判断服务器状态

     

    conf/tcpserver.conf

    [TCPSERVER]

    host=10.202.7.165

    port = 8000

     

    [README]

    host = tcp服务器主机地址

    port = tcp服务器监听端口

     

    conf/zookeeper.conf

    [ZOOKEEPER]

    hosts = 10.118.52.26:2181

    nodeParentPath=/rootNode

     

    [README]

    hosts = zookeeper地址,如果是集群地址,即有多个,用英文逗号分隔

    nodeParentPath=负载机节点所在父级路径

     

    MyTCPServer.py

     

    #!/usr/bin/env python 3.4.0

    #-*- encoding:utf-8 -*-

     

    __author__ = 'shouke'

     

    import socketserver

    from log import logger

     

     

    class MyTCPHandler(socketserver.BaseRequestHandler):

        """

        The RequestHandler class for our server.

     

        It is instantiated once per connection to the server, and must

        override the handle() method to implement communication to the

        client.

        """

     

        def handle(self):

            while True:

                # self.request is the TCP socket connected to the client

                self.data = self.request.recv(1024).decode('utf-8').strip()

                logger.info('receive data from client[host:%s port:%s]:%s' % (self.client_address[0], self.client_address[1], self.data))

                if self.data == 'bye':

                    self.request.sendall(bytes('bye', encoding='utf-8'))

                    self.request.close()

                    break

                else:

                    self.request.sendall(self.data.upper().encode('utf-8'))

     

    class MyTCPServer:

        def __init__(self, host, port):

            try:

                self.host = host

                self.port = port

     

                # Create the server, binding to self.host on port 'self.port'

                self.server = socketserver.TCPServer((self.host, self.port), MyTCPHandler)

            except Exception as e:

                logger.error('初始化TCPServer失败:%s' % e)

                exit(1)

     

        def start(self):

            # Activate the server; this will keep running until you interrupt the program with Ctrl-C

            self.server.serve_forever()

     

    MyTCPClient.py

     

    #!/usr/bin/env python 3.4.0

    #-*- encoding:utf-8 -*-

     

    __author__ = 'shouke'

     

    import socket

    import configparser

    import time

     

    from log import logger

     

     

    if __name__ == '__main__':

        if_sock_connected = False

        try:

            config_parser = configparser.ConfigParser()

            config_parser.read('./conf/tcpserver.conf', encoding='utf-8-sig')

            host = config_parser.get('TCPSERVER', 'host')

            port = int(config_parser.get('TCPSERVER', 'port'))

     

            # Create a socket (SOCK_STREAM means a TCP socket)

            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

     

            # Connect to server and send data

            sock.connect((host, port))

     

            if_sock_connected = True # 标记socket是否已连接

            i = 0

     

            while i < 10000:

                if i == 1000:

                    sock.sendall(bytes('bye ', "utf-8"))

                else:

                    sock.sendall(bytes('hello world with tcp ', "utf-8"))

     

                # Receive data from the server

                received = str(sock.recv(1024), "utf-8")

                logger.info('receive data from server:%s' % received)

                if received == 'bye':

                    break

     

                time.sleep(5)

     

                i += 1

        except Exception as e:

            logger.error('程序运行出错:%s' % e)

        finally:

            if if_sock_connected:

                sock.close()

     

     

    appClient.py

     

    #!/usr/bin/env python

    #-*- encoding:utf-8 -*-

     

    __author__ = 'shouke'

     

    import time

    from log import logger

     

    from kazoo.client import  KazooClient

    from kazoo.client import KazooState

     

    def my_listener(state):

        if state == KazooState.LOST:

            logger.info('LOST')

     

            # Register somewhere that the session was lost

        elif state == KazooState.SUSPENDED:

            logger.info('SUSPENDED')

            # Handle being disconnected from Zookeeper

        else:

            logger.info('CONNECTED')

            # Handle being connected/reconnected to Zookeeper

     

    def my_event_listener(event):

        logger.info(event)

     

     

    zk_client = KazooClient(hosts='10.118.52.26:2181')

    zk_client.add_listener(my_listener)

    zk_client.start()

     

    node_path = '/rootNode'

    sub_node = 'loaderAgent102027165'

    children = zk_client.get_children(node_path, watch=my_event_listener)

    logger.info('there are %s children with names %s' % (len(children), children))

     

     

    @zk_client.ChildrenWatch(node_path)

    def watch_children(children):

        logger.info("Children are now: %s" % children)

     

     

    @zk_client.DataWatch("%s/%s" % (node_path, sub_node))

    def watch_node(data, state):

        """监视节点数据是否变化"""

        if state:

            logger.info('Version:%s, data:%s' % (state.version, data))

     

    i = 0

    while i < 1000:

        time.sleep(5)

        children = zk_client.get_children(node_path, watch=my_event_listener)

        logger.info('there are %s children with names %s' % (len(children), children))

        i += 1

     

    zk_client.stop()

    zk_client.close()

     

     

     

     

     

    loadAgent.py

    #!/usr/bin/env python 3.4.0

    #-*- encoding:utf-8 -*-

     

    __author__ = 'shouke'

     

    import time

    import threading

    import configparser

    import json

    import subprocess

     

    from kazoo.client import  KazooClient

    from kazoo.client import KazooState

    from log import logger

     

    from myTCPServer import MyTCPServer

     

    # 全局变量

    zk_conn_stat = 0 # zookeeper连接状态 1-LOST   2-SUSPENDED 3-CONNECTED/RECONNECTED

    registry_status = 0 # 服务器节点在zookeeper的注册状态  0-未注册、正在注册, 1-已注册

     

    def restart_zk_client():

        '''重启zookeeper会话'''

     

        global zk_client

        global zk_conn_stat

        try:

            zk_client.restart()

            registry_zookeeper()

        except Exception as e:

            logger.error('重启zookeeper客户端异常:%s' % e)

     

     

    def zk_conn_listener(state):

        '''zookeeper连接状态监听器'''

     

        global zk_conn_stat

        global registry_status

        if state == KazooState.LOST:

            logger.warn('zookeeper connection lost')

            zk_conn_stat = 1

            registry_status = 0 # 重置是否完成注册

            # Register somewhere that the session was lost

     

            thread = threading.Thread(target=restart_zk_client)

            thread.start()

     

        elif state == KazooState.SUSPENDED:

            logger.warn('zookeeper connection dicconnected')

            zk_conn_stat = 2

            # Handle being disconnected from Zookeeper

        else:

            zk_conn_stat = 3

            logger.info('zookeeper connection cconnected/reconnected')

            # Handle being connected/reconnected to Zookeeper

     

    def registry_zookeeper():

        '''注册节点信息到zookeeper'''

     

        global node_parent_path

        global host

        global port

        global zk_client

        global zk_conn_stat

        global registry_status

     

        try:

            while zk_conn_stat != 3: # 如果zookeeper客户端没连上zookeeper,则先不让注册

                continue

     

            logger.info('正在注册负载机到zookeeper...')

            zk_client.ensure_path(node_parent_path)

     

            loader_agent_info = '{"host":"%s", "port":%s, "status":"idle"}' % (host, port)

     

            if not zk_client.exists('%s/loaderAgent%s' % (node_parent_path, host.replace('.', ''))):

                zk_client.create('%s/loaderAgent%s' % (node_parent_path, host.replace('.', '')), loader_agent_info.encode('utf-8'), ephemeral=True, sequence=False)

     

            # children = zk_client.get_children(node_parent_path)

            # logger.info('there are %s children with names: %s' % (len(children), children))

            # for child in children:

            #     logger.info(child)

            #     data, stat = zk_client.get('%s/%s' % (node_parent_path, child))

            #     logger.info(data)

            registry_status = 1 # 完成注册

            logger.info('注册负载机到zookeeper成功')

            return True

        except Exception as e:

            logger.error('注册负载机到zookeeper失败:%s' % e)

            return False

     

     

    def start_tcpserver(tcpserver):

        '''启动tcp服务器'''

     

        tcpserver.start()

     

     

    def get_server_status(proc_name):

        '''通过给定进程名称获取服务器状态'''

     

        with subprocess.Popen('ps -e | grep "%s"' % proc_name, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, universal_newlines=True) as proc:

            try:

                outs, errs = proc.communicate(timeout=30)

                outs = outs.strip()

                if outs.find(proc_name) != -1:

                    # logger.info('获取负载机状态成功 %s' % outs)

                    server_status = 'busy'

                elif outs == '':

                    # logger.info('获取负载机状态成功')

                    server_status = 'idle'

                else:

                    logger.error('获取负载机状态失败:%s' % errs)

                    server_status = 'unknow'

            except Exception as e:

                proc.kill()

                logger.error('获取负载机状态失败:%s' % e)

                server_status = 'unknow'

        return server_status

     

     

    def update_server_status(interval, proc_name):

        '''定时检测并更新服务器状态:根据进程名称是否存在来判断服务器状态,如果存在则表示服务器被占用,标记服务器状态为busy,否则标记服务器状态为 idle

        如果根据进程名,检查进程失败,则标记服务器状态为unknow'''

     

        global node_parent_path

        global host

        global port

     

        while True:

            second_for_localtime1 = time.mktime(time.localtime()) # UTC时间(秒)

     

            if zk_conn_stat != 3: # 如果zookeeper客户端还没连上zookeeper,则不让进行后续操作

                continue

     

            if registry_status != 1: # 如果zookeeper客户端已连上zookeeper,但是还没注册节点到zookeeper,则不让进行后续操作

                continue

     

            server_status = get_server_status(proc_name)

            loader_agent_info = '{"host":"%s", "port":%s, "status":"%s"}' % (host, port, server_status)

            '''

            这里为啥要加这个判断:zookeeper删除临时节点存在延迟,如果zookeeper客户端主动关闭后快速重启并注册节点信息 这个过程耗时比较短,可能注册完节点信息时,zookeeper

            还没来得及删除重启之前创建的临时节点,而本次创建的临时节点路径和重启前的一模一样,这样导致的结果是,zookeeper接下来的删除操作,会把重启后注册的节点也删除

           '''

            if zk_client.exists('%s/loaderAgent%s' % (node_parent_path, host.replace('.', ''))):

                zk_client.set('%s/loaderAgent%s' % (node_parent_path, host.replace('.', '')), loader_agent_info.encode('utf-8'))

            else:

                registry_zookeeper()

     

            second_for_localtime2 = time.mktime(time.localtime()) # UTC时间(秒)

            time_difference = second_for_localtime2 - second_for_localtime1

            if time_difference < interval:

                time.sleep(interval - time_difference)

     

     

    if __name__ == '__main__':

        logger.info('正在启动代理...')

     

        try:

            logger.info('正在读取zookeeper配置...')

            config_parser = configparser.ConfigParser()

            config_parser.read('./conf/zookeeper.conf', encoding='utf-8-sig')

            zk_hosts = config_parser.get('ZOOKEEPER', 'hosts').replace(',', ',').strip()

            node_parent_path = config_parser.get('ZOOKEEPER', 'nodeParentPath').replace(',', ',').strip()

     

            logger.info('正在构建并启动zookeeper客户端...')

            zk_client = KazooClient(hosts=zk_hosts)

            zk_client.add_listener(zk_conn_listener)

            zk_client.start()

        except Exception as e:

            logger.error('初始化zookeeper客户端失败: %s' % e)

            exit(1)

     

        try:

            config_parser.clear()

            config_parser.read('./conf/tcpserver.conf', encoding='utf-8-sig')

            host = config_parser.get('TCPSERVER', 'host')

            port = int(config_parser.get('TCPSERVER', 'port'))

            tcp_server  = MyTCPServer(host, port)

            thread = threading.Thread(target=start_tcpserver, args=(tcp_server,))

            thread.start()

        except Exception as e:

            logger.error('TCPServer启动失败:%s,请检查配置/conf/tcpserver.conf是否正确' % e)

            exit(1)

     

     

        try:

            # 注册到zookeeper

            registry_zookeeper()

     

            config_parser.clear()

            config_parser.read('./conf/agent.conf', encoding='utf-8-sig')

            interval = int(config_parser.get('AGENT', 'interval'))

            proc = config_parser.get('AGENT', 'proc').strip()

     

            # 定时更新服务器节点繁忙状态

            update_server_status(interval, proc)

        except Exception as e:

            logger.error('zk_client运行失败:%s,请检查配置/conf/agent.conf是否正确' % e)

            exit(1)

     

     

     

    运行效果

     

  • 相关阅读:
    搭建james邮件服务器
    Spring -- AOP
    【RF库Collections测试】List Should Not Contain Duplicates
    【RF库Collections测试】Keep In Dictionary
    【RF库Collections测试】Insert Into List
    【RF库Collections测试】Get Index From List
    【RF库Collections测试】Get From List
    【RF库Collections测试】Count Values In List
    【RF库Collections测试】Get Slice From List
    【RF库Collections测试】Copy Dictionary
  • 原文地址:https://www.cnblogs.com/shouke/p/10582572.html
Copyright © 2011-2022 走看看