重新思考了分布式服务的分工与合作,我梳理了分布式系统的三个角色,重写了上一篇的代码.
众所周知,分布式系统中一般有三个角色,master,worker和client
1.master
主服务器监视新的worker和task,将任务分配给可用的工作人员。若worker丢失,将曾经分配给丢失的worker的task重新分配给新的worker,当然自己也要高可用
2.worker
worker在系统中进行注册,以确保主服务器可以分配任务给自己,然后监视新任务,有任务分配给自己就开始执行。
3.client
客户端创建新任务并提交给系统,然后监控对应task的状态
详细的逻辑来讲是这样的:
#1.多个worker竞选master,竞选成功的成为master,创建/workers,/tasks,/assigin,竞选失败的worker监控/master随时准备竞选master create -e /rpc/master "host:port" create /rpc/workers "" create /rpc/tasks "" create /rpc/assign "" ls -w /rpc/master stat -w /rpc/master #2.worker在zk注册自己,zk会将worker存在/workers一个子节点,创建属于自己的分配空间,并监控master可能分配给自己的任务 create -e /rpc/workers/worker1.example.com "worker1.example.com:2224" create /rpc/assign/worker1.example.com "" ls -w /rpc/assign/worker1.example.com #3.client在tasks下创建有序task-子节点,并监控task-的状态 create -s /rpc/tasks/task- "cmd" ls -w /rpc/tasks/task-0000000000 #4.master查看新task,获取可用worker节点,将分配任务给master之外的worker ls -w /rpc/workers ls -w /rpc/tasks create /rpc/assign/worker1.example.com/task-0000000000 "" #5.worker监控分配给自己的task,发现有分配给自己的task时执行task,执行完修改task状态 ls /rpc/assigin/worker1.example.com create /rpc/tasks/task-0000000000/status "done" #6.client收到通知,获取到task的状态 get /rpc/tasks/task-0000000000/status get /rpc/tasks/task-0000000000
下面看python代码
zk_master.py
# -*- coding:utf-8 -*- # @Time : 2020-07-22 14:12 # @Author : wangbin import json import random import socket import sys import time from kazoo.client import KazooClient from kazoo.exceptions import NodeExistsError class ZKMaster(object): def __init__(self, host, port): self.host = host self.port = port self.zk = KazooClient(hosts='127.0.0.1:2181') self.zk.start() self._workers = [] # 将自己注册到zk,临时节点连接不能中断,同时监听/rpc/master是否存在,不存在重新竞选master def register_zk(self, event=None): """ 注册到zookeeper """ self.zk.ensure_path('/rpc') # 创建根节点 value = json.dumps({'host': self.host, 'port': self.port}) # 创建服务子节点 try: self.zk.create('/rpc/master', value.encode(), ephemeral=True) except NodeExistsError as e: print(e) self.zk.exists('/rpc/master', watch=self.register_zk) def do(self): self.get_works() self.watch_tasks() while True: print(self._workers) time.sleep(3) # 监控/rpc/workers子节点变化,都会实时更新self._servers列表 def get_works(self, event=None): """ 从zookeeper获取服务器地址信息列表 """ workers = self.zk.get_children('/rpc/workers', watch=self.get_works) self._workers = [] for worker in workers: data = self.zk.get('/rpc/workers/' + worker)[0] if data: addr = json.loads(data.decode()) self._workers.append(addr) # 监控最新任务,没有状态的进行分配,成功的进行删除 def watch_tasks(self, event=None): tasks = self.zk.get_children('/rpc/tasks', watch=self.watch_tasks) for task in tasks: # 如果没有status,那就没有在执行 is_exist = self.zk.exists('/rpc/tasks/' + task + '/status') if not is_exist: worker = random.choice(self._workers) self.assign(worker['host'], task) else: # 如果task任务状态为成功,则删除task节点 status, data = self.zk.get('/rpc/tasks/' + task + '/status') if status == b'done': self.zk.delete('/rpc/tasks/' + task + '/status') self.zk.delete('/rpc/tasks/' + task) print('delete done task=' + task) # 将任务分配给worker,分配前打上状态标记 def assign(self, worker, task): print('task=%s schedule to %s' % (task, worker)) self.zk.create('/rpc/tasks/' + task + '/status', b'schedule') self.zk.create('/rpc/assign/{worker}/{task}'.format(worker=worker, task=task), value=b'') if __name__ == '__main__': host = sys.argv[1] port = sys.argv[2] m = ZKMaster(host, int(port)) m.register_zk() m.do()
zk_worker.py
# -*- coding:utf-8 -*- # @Time : 2020-07-20 17:50 # @Author : wangbin import json import socket import sys import time from kazoo.client import KazooClient # worker将自己注册到zk from kazoo.exceptions import NodeExistsError class ZKWorker(object): def __init__(self, host, port): self.host = host self.port = port self.zk = None # 将自己注册到zk,临时节点,所以连接不能中断 def register_zk(self): """ 注册到zookeeper """ self.zk = KazooClient(hosts='127.0.0.1:2181') self.zk.start() self.zk.ensure_path('/rpc/workers') # 创建根节点 value = json.dumps({'host': self.host, 'port': self.port}) # 创建服务子节点 self.zk.create('/rpc/workers/{name}'.format(name=self.host), value.encode(), ephemeral=True) def wait_assign(self, event=None): """create /assign/worker1.example.com "" ls -w /assign/worker1.example.com """ self.zk.ensure_path('/rpc/assign') # 创建根节点 try: self.zk.create('/rpc/assign/{name}'.format(name=self.host), b'') except NodeExistsError as e: print(e) tasks = self.zk.get_children('/rpc/assign/{name}'.format(name=self.host), watch=self.wait_assign) print(tasks) for task in tasks: self.do_task(task) # 执行 def do_task(self, task): print('ready to do ' + task) time.sleep(2) self.update_task_status(task) self.delete_assign(task) # 获取新分配的任务,执行后更新任务状态 def update_task_status(self, task): self.zk.set('/rpc/tasks/{task}/status'.format(task=task), b'done') print('task=%s is done! ' % task) def delete_assign(self, task): self.zk.delete('/rpc/assign/{host}/{task}'.format(host=self.host, task=task)) print('delete done assign task=' + task) if __name__ == '__main__': host = sys.argv[1] port = sys.argv[2] worker = ZKWorker(host, int(port)) worker.register_zk() worker.wait_assign() while True: print('i am worker!wait for task') time.sleep(3)
zk_client.py
# -*- coding:utf-8 -*- # @Time : 2020-07-20 17:53 # @Author : wangbin from kazoo.client import KazooClient # 客户端连接zk,并从zk获取可用的服务器列表 class ZKClient(object): def __init__(self): self.zk = KazooClient(hosts='127.0.0.1:2181') self.zk.start() def add_task(self): self.zk.ensure_path('/rpc/tasks') task = self.zk.create('/rpc/tasks/task-', value=b'cmd', sequence=True) print(task) if __name__ == '__main__': client = ZKClient() client.add_task()