1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53  #coding:gbk import time, queue from multiprocessing.managers import BaseManager from multiprocessing import freeze_support # 任务个数 task_number = 10 # 定义收发队列 task_queue = queue.Queue(task_number) result_queue = queue.Queue(task_number) def gettask(): return task_queue def getresult(): return result_queue def test(): # windows下绑定调用接口不能使用lambda，所以只能先定义函数再绑定 BaseManager.register('get_task', callable=gettask) BaseManager.register('get_result', callable=getresult) # 绑定端口并设置验证码，windows下需要填写ip地址，linux下不填默认为本地 manager = BaseManager(address=('127.0.0.1', 5002), authkey=b'123') # 启动 manager.start() try: # 通过网络获取任务队列和结果队列 task = manager.get_task() result = manager.get_result() # 添加任务 for i in range(task_number): print('Put task %d...' % i) task.put(i) # 每秒检测一次是否所有任务都被执行完 while not result.full(): print(task.qsize()) time.sleep(1) for i in range(result.qsize()): ans = result.get() print('task %d is finish , runtime:%d s' % ans) except: print('Manager error') finally: manager.shutdown() if __name__ == '__main__': # windows下多进程可能会炸，添加这句可以缓解 freeze_support() test() 

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24  #coding:gbk import time, sys, queue, random from multiprocessing.managers import BaseManager BaseManager.register('get_task') BaseManager.register('get_result') conn = BaseManager(address = ('127.0.0.1',5002), authkey = b'123') try: conn.connect() except: print('连接失败') sys.exit() task = conn.get_task() result = conn.get_result() while not task.empty(): print(task.qsize()) n = task.get(timeout = 1) print('run task %d' % n) sleeptime = random.randint(0,3) time.sleep(sleeptime) rt = (n, sleeptime) result.put(rt) if __name__ == '__main__': pass; 

  1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20  Put task 0... Put task 1... Put task 2... Put task 3... Put task 4... Put task 5... Put task 6... Put task 7... Put task 8... Put task 9... task 0 is finish , runtime:3 s task 1 is finish , runtime:0 s task 2 is finish , runtime:2 s task 4 is finish , runtime:1 s task 3 is finish , runtime:3 s task 6 is finish , runtime:1 s task 7 is finish , runtime:0 s task 5 is finish , runtime:3 s task 8 is finish , runtime:2 s task 9 is finish , runtime:3 s 

  1 2 3 4 5 6 7 8 9 10 11 12  10 run task 0 9 run task 1 8 run task 2 6 run task 4 5 run task 5 1 run task 9 

 1 2 3 4 5 6 7 8  7 run task 3 4 run task 6 3 run task 7 2 run task 8 

python中使用分步式进程计算
