今天在学习到廖老师Python教程的分布式进程时,遇到了一个错误:_pickle.PicklingError: Can't pickle <function <lambda> at 0x000001710FDC2EA0>: attribute lookup <lambda> on __main__ failed(pickle模块不能序列化lambda函数)
代码如下:
1 #!/usr/bin/env python 2 # _*_ coding:utf-8 _*_ 3 4 import random, queue 5 from multiprocessing.managers import BaseManager 6 7 # 发送任务的队列 8 task_queue = queue.Queue() 9 # 接收结果的队列 10 result_queue = queue.Queue() 11 # 从BaseManager继承的QueueManager 12 13 14 class QueueManager(BaseManager): 15 pass 16 17 18 # 吧两个Queue都注册到网络上,callable参数关联了Queue对象 19 QueueManager.register('get_task_queue', callable=lambda: task_queue) 20 QueueManager.register('get_result_queue', callable=lambda: result_queue) 21 22 # 绑定端口5000,设置验证码abc 23 manager = QueueManager(address=('', 5000), authkey=b'abc') 24 25 # 启动queue 26 manager.start() 27 28 # 获得通过网络访问的Queue对象 29 task = manager.get_task_queue() 30 result = manager.get_result_queue() 31 32 # 放几个任务 33 for i in range(10): 34 n = random.randint(0, 1000) 35 print('添加任务 %d' % n) 36 task.put(n) 37 38 # 从result队列读取结果 39 print('尝试获取结果') 40 for i in range(10): 41 r = result.get(timeout=10) 42 print('结果是:%s' % r) 43 44 # 关闭 45 manager.shutdown() 46 print('master exit')
在教程中我记得有关pickle的事儿(有印象,看来思想还在线上,哈哈),翻了一下,看到:
原来是系统问题造成的,那么,如何解决呢?在教程中我也看到,遇到这样的情况,需要我们自己定义函数,实现序列化。
所以对代码稍加修改,定义两个函数return_task_queue和return_result_queue实现序列化:
1 #!/usr/bin/env python 2 # _*_ coding:utf-8 _*_ 3 4 import random, queue 5 from multiprocessing.managers import BaseManager 6 7 # 发送任务的队列 8 task_queue = queue.Queue() 9 # 接收结果的队列 10 result_queue = queue.Queue() 11 12 13 def return_task_queue(): 14 global task_queue 15 return task_queue 16 17 18 def return_result_queue(): 19 global result_queue 20 return result_queue 21 22 23 # 从BaseManager继承的QueueManager 24 25 26 class QueueManager(BaseManager): 27 pass 28 29 30 if __name__ == '__main__': 31 # 吧两个Queue都注册到网络上,callable参数关联了Queue对象 32 QueueManager.register('get_task_queue', callable=return_task_queue) 33 QueueManager.register('get_result_queue', callable=return_result_queue) 34 35 # 绑定端口5000,设置验证码abc 36 manager = QueueManager(address=('127.0.0.1', 5000), authkey=b'abc') 37 38 # 启动queue 39 manager.start() 40 41 # 获得通过网络访问的Queue对象 42 task = manager.get_task_queue() 43 result = manager.get_result_queue() 44 45 # 放几个任务 46 for i in range(10): 47 n = random.randint(0, 1000) 48 print('添加任务 %d' % n) 49 task.put(n) 50 51 # 从result队列读取结果 52 print('尝试获取结果') 53 for i in range(10): 54 r = result.get(timeout=10) 55 print('结果是:%s' % r) 56 57 # 关闭 58 manager.shutdown() 59 print('master exit')
运行结果:
欢迎各位来和我一起分享技术,交流学习心得