zoukankan      html  css  js  c++  java
  • python借助zookeeper实现分布式服务(一)

     借助zookeeper可以实现服务器的注册与发现,有需求的时候调用zookeeper来发现可用的服务器,将任务均匀分配到各个服务器上去.

    这样可以方便的随任务的繁重程度对服务器进行弹性扩容,客户端和服务端是非耦合的,也可以随时增加客户端.

    zk_server.py

    import threading
    import json
    import socket
    import sys
    from kazoo.client import KazooClient
    
    
    # TCP服务端绑定端口开启监听,同时将自己注册到zk
    class ZKServer(object):
        def __init__(self, host, port):
            self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.host = host
            self.port = port
            self.sock.bind((host, port))
            self.zk = None
    
        def serve(self):
            """
            开始服务,每次获取得到一个信息,都新建一个线程处理
            """
            self.sock.listen(128)
            self.register_zk()
            print("开始监听")
            while True:
                conn, addr = self.sock.accept()
                print("建立链接%s" % str(addr))
                t = threading.Thread(target=self.handle, args=(conn, addr))
                t.start()
    
        # 具体的处理逻辑,只要接收到数据就立即投入工作,下次没有数据本次链接结束
        def handle(self, conn, addr):
            while True:
                data=conn.recv(1024)
                if not data or data.decode('utf-8') == 'exit':
                    break
                print(data.decode('utf-8'))
            conn.close()
            print('My work is done!!!')
    
        # 将自己注册到zk,临时节点,所以连接不能中断
        def register_zk(self):
            """
            注册到zookeeper
            """
            self.zk = KazooClient(hosts='127.0.0.1:2181')
            self.zk.start()
            self.zk.ensure_path('/rpc')  # 创建根节点
            value = json.dumps({'host': self.host, 'port': self.port})
            # 创建服务子节点
            self.zk.create('/rpc/server', value.encode(), ephemeral=True, sequence=True)
    
    
    if __name__ == '__main__':
        if len(sys.argv) < 3:
            print("usage:python server.py [host] [port]")
            exit(1)
        host = sys.argv[1]
        port = sys.argv[2]
        server = ZKServer(host, int(port))
        server.serve()

    zk_client.py

    import random
    import sys
    import time
    import json
    import socket
    
    from kazoo.client import KazooClient
    
    
    # 客户端连接zk,并从zk获取可用的服务器列表
    class ZKClient(object):
        def __init__(self):
            self._zk = KazooClient(hosts='127.0.0.1:2181')
            self._zk.start()
            self._get_servers()
    
        def _get_servers(self, event=None):
            """
            从zookeeper获取服务器地址信息列表
            """
            servers = self._zk.get_children('/rpc', watch=self._get_servers)
            # print(servers)
            self._servers = []
            for server in servers:
                data = self._zk.get('/rpc/' + server)[0]
                if data:
                    addr = json.loads(data.decode())
                    self._servers.append(addr)
    
        def _get_server(self):
            """
            随机选出一个可用的服务器
            """
            return random.choice(self._servers)
    
        def get_connection(self):
            """
            提供一个可用的tcp连接
            """
            sock = None
            while True:
                server = self._get_server()
                print('server:%s' % server)
                try:
                    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    sock.connect((server['host'], server['port']))
                except ConnectionRefusedError:
                    time.sleep(1)
                    continue
                else:
                    break
            return sock
    
    
    if __name__ == '__main__':
        # 模拟多个客户端批量生成任务,推送给服务器执行
        client = ZKClient()
        for i in range(40):
            sock = client.get_connection()
            sock.send(bytes(str(i), encoding='utf8'))
            sock.close()
            time.sleep(1)
  • 相关阅读:
    window.clipboardData(转载)
    动态添加样式(转载)
    IE6 IE7 FF的CSS Hack总结(转载)
    [轉貼] linux解壓 tar 命令
    [轉]用 snprintf / asprintf 取代不安全的 sprintf
    寫一個函數計算當參數為 n(n很大) 時的值 12+34+56+7……+n
    [轉]vi 與 vim 的指令整理
    MySQL和php採用UTF8的方法
    [轉]printf 引數說明
    [C] warning: ISO C90 forbids mixed declarations and code
  • 原文地址:https://www.cnblogs.com/wangbin2188/p/13346079.html
Copyright © 2011-2022 走看看