zoukankan      html  css  js  c++  java
  • Python_生产者消费者模型、管道、数据共享、进程池

    1、生产者消费者模型

       生产者 —— 生产数据的人

      消费者 —— 消费数据的人

      生产者消费者模型:供销数据不平衡的现象。

     1 import time
     2 import random
     3 from multiprocessing import Process, Queue
     4 
     5 def consumer(q):
     6     while True:
     7         obj = q.get()
     8         print(f'消费了一个数据{obj}')
     9         time.sleep(random.randint(1, 3))
    10 
    11 if __name__ == "__main__":
    12     q = Queue()
    13     Process(target=consumer, args=(q,)).start()
    14     for i in range(10):
    15         time.sleep(random.randint(1, 5))
    16         q.put(f"food{i}")
    17         print(f'生产了一个数据food{i}')
    生产者消费者速度不一致
    D:Python36python.exe E:/Python/草稿纸.py
    生产了一个数据food0
    消费了一个数据food0
    生产了一个数据food1
    消费了一个数据food1
    生产了一个数据food2
    消费了一个数据food2
    生产了一个数据food3
    消费了一个数据food3
    生产了一个数据food4
    消费了一个数据food4
    生产了一个数据food5
    消费了一个数据food5
    生产了一个数据food6
    生产了一个数据food7
    消费了一个数据food6
    生产了一个数据food8
    生产了一个数据food9
    消费了一个数据food7
    消费了一个数据food8
    消费了一个数据food9
    结果(阻塞)
     1 import time
     2 import random
     3 from multiprocessing import Process, Queue
     4 
     5 def consumer(name, q):
     6     while True:
     7         obj = q.get()
     8         print(f'{name}吃了一个{obj}')
     9         time.sleep(random.randint(1, 3))
    10 
    11 def producer(name, food, q):
    12     for i in range(10):
    13         time.sleep(random.randint(1, 5))
    14         q.put(f'{name}生产的{food}{i}')
    15         print(f'{name}生产了一个数据{food}{i}')
    16 
    17 if __name__ == '__main__':
    18     q = Queue()
    19     Process(target=consumer, args=('alex', q)).start()
    20     Process(target=producer, args=('yuan', '泔水', q)).start()
    21     Process(target=producer, args=('egon', '骨头', q)).start()
    供大于求
    D:Python36python.exe E:/Python/草稿纸.py
    egon生产了一个数据骨头0
    alex吃了一个egon生产的骨头0
    yuan生产了一个数据泔水0
    egon生产了一个数据骨头1
    alex吃了一个yuan生产的泔水0
    alex吃了一个egon生产的骨头1
    yuan生产了一个数据泔水1
    egon生产了一个数据骨头2
    alex吃了一个yuan生产的泔水1
    egon生产了一个数据骨头3
    yuan生产了一个数据泔水2
    alex吃了一个egon生产的骨头2
    alex吃了一个egon生产的骨头3
    egon生产了一个数据骨头4
    alex吃了一个yuan生产的泔水2
    yuan生产了一个数据泔水3
    alex吃了一个egon生产的骨头4
    alex吃了一个yuan生产的泔水3
    yuan生产了一个数据泔水4
    egon生产了一个数据骨头5
    yuan生产了一个数据泔水5
    alex吃了一个yuan生产的泔水4
    yuan生产了一个数据泔水6
    egon生产了一个数据骨头6
    alex吃了一个egon生产的骨头5
    yuan生产了一个数据泔水7
    alex吃了一个yuan生产的泔水5
    yuan生产了一个数据泔水8
    alex吃了一个yuan生产的泔水6
    egon生产了一个数据骨头7
    alex吃了一个egon生产的骨头6
    yuan生产了一个数据泔水9
    alex吃了一个yuan生产的泔水7
    egon生产了一个数据骨头8
    alex吃了一个yuan生产的泔水8
    egon生产了一个数据骨头9
    alex吃了一个egon生产的骨头7
    alex吃了一个yuan生产的泔水9
    alex吃了一个egon生产的骨头8
    alex吃了一个egon生产的骨头9
    结果(consumer结束不了)

      

     1 import time
     2 import random
     3 from multiprocessing import Process, Queue
     4 
     5 
     6 def consumer(name, q):
     7     while True:
     8         obj = q.get()  # 阻塞
     9         if obj is None:
    10             break
    11         print(f'{name}吃了一个{obj}')
    12         time.sleep(random.randint(1, 3))
    13 
    14 
    15 def producer(name, food, q):
    16     for i in range(10):
    17         time.sleep(random.randint(1, 5))
    18         q.put(f'{name}生产的{food}{i}')
    19         print(f'{name}生产了一个{food}{i}')
    20 
    21 
    22 if __name__ == '__main__':
    23     q = Queue()
    24     Process(target=consumer, args=('alex', q)).start()
    25     Process(target=consumer, args=('wusir', q)).start()
    26     p1 = Process(target=producer, args=('yuan', '泔水', q))
    27     p1.start()
    28     p2 = Process(target=producer, args=('egon', '骨头', q))
    29     p2.start()
    30     p1.join()
    31     p2.join()
    32     q.put(None)
    33     q.put(None)
    数据被消费完就结束进程
    D:Python36python.exe E:/Python/草稿纸.py
    egon生产了一个骨头0
    alex吃了一个egon生产的骨头0
    yuan生产了一个泔水0
    wusir吃了一个yuan生产的泔水0
    yuan生产了一个泔水1
    alex吃了一个yuan生产的泔水1
    egon生产了一个骨头1
    yuan生产了一个泔水2
    alex吃了一个egon生产的骨头1
    wusir吃了一个yuan生产的泔水2
    egon生产了一个骨头2
    alex吃了一个egon生产的骨头2
    yuan生产了一个泔水3
    wusir吃了一个yuan生产的泔水3
    egon生产了一个骨头3
    wusir吃了一个egon生产的骨头3
    yuan生产了一个泔水4
    wusir吃了一个yuan生产的泔水4
    egon生产了一个骨头4
    alex吃了一个egon生产的骨头4
    yuan生产了一个泔水5
    wusir吃了一个yuan生产的泔水5
    egon生产了一个骨头5
    alex吃了一个egon生产的骨头5
    egon生产了一个骨头6
    wusir吃了一个egon生产的骨头6
    yuan生产了一个泔水6
    alex吃了一个yuan生产的泔水6
    yuan生产了一个泔水7
    alex吃了一个yuan生产的泔水7
    egon生产了一个骨头7
    wusir吃了一个egon生产的骨头7
    yuan生产了一个泔水8
    alex吃了一个yuan生产的泔水8
    yuan生产了一个泔水9
    wusir吃了一个yuan生产的泔水9
    egon生产了一个骨头8
    alex吃了一个egon生产的骨头8
    egon生产了一个骨头9
    wusir吃了一个egon生产的骨头9
    
    Process finished with exit code 0
    结果

      

      q.join()  对这个队列进行阻塞,这个队列中的所有值被取走,且执行了task_done.

     1 import time
     2 from multiprocessing import Process, JoinableQueue
     3 
     4 def consumer(q):
     5     while True:
     6         print(q.get())
     7         time.sleep(0.3)
     8         q.task_done()   # 通知队列一个数据已经被处理完了
     9 
    10 if __name__ == "__main__":
    11     q = JoinableQueue()
    12     c = Process(target=consumer, args=(q,))
    13     c.daemon = True
    14     c.start()
    15     for i in range(10):
    16         q.put(i)    # 10个数据
    17     q.join()    # join表示所有的数据都被取走且被处理完才结束阻塞
    18     print('所有数据都被处理完了')
    JoinableQueue
    D:Python36python.exe E:/Python/草稿纸.py
    0
    1
    2
    3
    4
    5
    6
    7
    8
    9
    所有数据都被处理完了
    
    Process finished with exit code 0
    结果

      

     1 import time
     2 import random
     3 from multiprocessing import Process, JoinableQueue
     4 
     5 def consumer(name, q):
     6     while True:
     7         obj = q.get()   # 阻塞
     8         print(f'{name}吃了一个{obj}')
     9         time.sleep(random.randint(1, 3))
    10         q.task_done()
    11 
    12 def producer(name, food, q):
    13     for i in range(10):
    14         time.sleep(random.randint(1, 5))
    15         q.put(f'{name}生产的{food}{i}')
    16         print(f'{name}生产了一个{food}{i}')
    17 
    18 if __name__ == "__main__":
    19     q = JoinableQueue()
    20     c1 = Process(target=consumer, args=('alex', q))
    21     c2 = Process(target=consumer, args=('wusir', q))
    22     c1.daemon = True
    23     c2.daemon = True
    24     p1 = Process(target=producer, args=('yuan', '泔水', q))
    25     p2 = Process(target=producer, args=('egon', '骨头', q))
    26     c1.start()
    27     c2.start()
    28     p1.start()
    29     p2.start()
    30     p1.join()
    31     p2.join()
    32     q.join()
    协调生产者与消费者之间的供给关系
    D:Python36python.exe E:/Python/草稿纸.py
    egon生产了一个骨头0
    alex吃了一个egon生产的骨头0
    yuan生产了一个泔水0
    wusir吃了一个yuan生产的泔水0
    egon生产了一个骨头1
    wusir吃了一个egon生产的骨头1
    egon生产了一个骨头2
    alex吃了一个egon生产的骨头2
    yuan生产了一个泔水1
    wusir吃了一个yuan生产的泔水1
    egon生产了一个骨头3
    alex吃了一个egon生产的骨头3
    yuan生产了一个泔水2
    wusir吃了一个yuan生产的泔水2
    egon生产了一个骨头4
    alex吃了一个egon生产的骨头4
    yuan生产了一个泔水3
    wusir吃了一个yuan生产的泔水3
    egon生产了一个骨头5
    alex吃了一个egon生产的骨头5
    yuan生产了一个泔水4
    wusir吃了一个yuan生产的泔水4
    yuan生产了一个泔水5
    egon生产了一个骨头6
    alex吃了一个yuan生产的泔水5
    wusir吃了一个egon生产的骨头6
    egon生产了一个骨头7
    alex吃了一个egon生产的骨头7
    yuan生产了一个泔水6
    wusir吃了一个yuan生产的泔水6
    yuan生产了一个泔水7
    alex吃了一个yuan生产的泔水7
    egon生产了一个骨头8
    wusir吃了一个egon生产的骨头8
    yuan生产了一个泔水8
    alex吃了一个yuan生产的泔水8
    egon生产了一个骨头9
    wusir吃了一个egon生产的骨头9
    yuan生产了一个泔水9
    alex吃了一个yuan生产的泔水9
    
    Process finished with exit code 0
    结果

      

      队列:

        维护了一个先进先出的顺序.

        且保证了数据在进程之间的安全.

    2、管道

       管道在数据管理上是不安全的

      队列的实现机制就是 管道 + 锁

    1 from multiprocessing import Pipe, Process
    2 
    3 # 左发右
    4 lp, rp = Pipe()
    5 lp.send([1, 2, 3])
    6 print(rp.recv())
    7 # 右发左
    8 rp.send('aa')
    9 print(lp.recv())
    管道通信
    D:Python36python.exe E:/Python/草稿纸.py
    [1, 2, 3]
    aa
    
    Process finished with exit code 0
    结果
     1 from multiprocessing import Pipe, Process
     2 
     3 def consumer(lp, rp):
     4     lp.close()
     5     while True:
     6         try:
     7             print(rp.recv())
     8         except EOFError:
     9             break
    10 
    11 if __name__ == '__main__':
    12     lp, rp = Pipe()
    13     Process(target=consumer, args=(lp, rp)).start()
    14     Process(target=consumer, args=(lp, rp)).start()
    15     Process(target=consumer, args=(lp, rp)).start()
    16     Process(target=consumer, args=(lp, rp)).start()
    17     Process(target=consumer, args=(lp, rp)).start()
    18     rp.close()
    19     for i in range(500):
    20         lp.send(f'food{i}')
    21     lp.close()
    示例
    D:Python36python.exe E:/Python/草稿纸.py
    food0
    food1
    food2
    food3
    food4
    food5
    food6
    food7
    food8
    food9
    food10
    food11
    food12
    food13
    food14
    food15
    food16
    food17
    food18
    food19
    food20
    food21
    food22
    food23
    food24
    food25
    food26
    food27
    food28
    food29
    food30
    food31
    food32
    food33
    food34
    food35
    food36
    food37
    food38
    food39
    food40
    food41
    food42
    food43
    food44
    food45
    food46
    food47
    food48
    food49
    food50
    food51
    food52
    food53
    food54
    food55
    food56
    food57
    food58
    food59
    food60
    food61
    food62
    food63
    food64
    food65
    food66
    food67
    food68
    food69
    food70
    food71
    food72
    food73
    food74
    food75
    food76
    food77
    food78
    food79
    food80
    food81
    food82
    food83
    food84
    food85
    food86
    food87
    food88
    food89
    food90
    food91
    food92
    food93
    food94
    food95
    food96
    food97
    food98
    food99
    food100
    food101
    food102
    food103
    food104
    food105
    food106
    food107
    food108
    food109
    food110
    food111
    food112
    food113
    food114
    food115
    food116
    food117
    food118
    food119
    food120
    food121
    food122
    food123
    food124
    food125
    food126
    food127
    food128
    food129
    food130
    food131
    food132
    food133
    food134
    food135
    food136
    food137
    food138
    food139
    food140
    food141
    food142
    food143
    food144
    food145
    food146
    food147
    food148
    food149
    food150
    food151
    food152
    food153
    food154
    food155
    food156
    food157
    food158
    food159
    food160
    food161
    food162
    food163
    food164
    food165
    food166
    food167
    food168
    food169
    food170
    food171
    food172
    food173
    food174
    food175
    food176
    food177
    food178
    food179
    food180
    food181
    food182
    food183
    food184
    food185
    food186
    food187
    food188
    food189
    food190
    food191
    food192
    food193
    food194
    food195
    food196
    food197
    food198
    food199
    food200
    food201
    food202
    food203
    food204
    food205
    food206
    food207
    food208
    food209
    food210
    food211
    food212
    food213
    food214
    food215
    food216
    food217
    food218
    food219
    food220
    food221
    food222
    food223
    food224
    food225
    food226
    food227
    food228
    food229
    food230
    food231
    food232
    food233
    food234
    food235
    food236
    food237
    food238
    food239
    food240
    food241
    food242
    food243
    food244
    food245
    food246
    food247
    food248
    food249
    food250
    food251
    food252
    food253
    food254
    food255
    food256
    food257
    food258
    food259
    food260
    food261
    food262
    food263
    food264
    food265
    food266
    food267
    food268
    food269
    food270
    food271
    food272
    food273
    food274
    food275
    food276
    food277
    food278
    food279
    food280
    food281
    food282
    food283
    food284
    food285
    food286
    food287
    food288
    food289
    food290
    food291
    food292
    food293
    food294
    food295
    food296
    food297
    food298
    food299
    food300
    food301
    food302
    food303
    food304
    food305
    food306
    food307
    food308
    food309
    food310
    food311
    food312
    food313
    food314
    food315
    food316
    food317
    food318
    food319
    food320
    food321
    food322
    food323
    food324
    food325
    food326
    food327
    food328
    food329
    food330
    food331
    food332
    food333
    food334
    food335
    food336
    food337
    food338
    food339
    food340
    food341
    food342
    food343
    food344
    food345
    food346
    food347
    food348
    food349
    food350
    food351
    food352
    food353
    food354
    food355
    food356
    food357
    food358
    food359
    food360
    food361
    food362
    food363
    food364
    food365
    food366
    food367
    food368
    food369
    food370
    food371
    food372
    food373
    food374
    food375
    food376
    food377
    food378
    food379
    food380
    food381
    food382
    food383
    food384
    food385
    food386
    food387
    food388
    food389
    food390
    food391
    food392
    food393
    food394
    food395
    food396
    food397
    food398
    food399
    food400
    food401
    food402
    food403
    food404
    food405
    food406
    food407
    food408
    food409
    food410
    food411
    food412
    food413
    food414
    food415
    food416
    food417
    food418
    food419
    food420
    food421
    food422
    food423
    food424
    food425
    food426
    food427
    food428
    food429
    food430
    food431
    food432
    food433
    food434
    food435
    food436
    food437
    food438
    food439
    food440
    food441
    food442
    food443
    food444
    food445
    food446
    food447
    food448
    food449
    food450
    food451
    food452
    food453
    food454
    food455
    food456
    food457
    food458
    food459
    food460
    food461
    food462
    food463
    food464
    food465
    food466
    food467
    food468
    food469
    food470
    food471
    food472
    food474
    food475
    food473
    food476
    food477
    food478
    food479
    food481
    food482
    food483
    food484
    food485
    food486
    food487
    food488
    food489
    food490
    food491
    food492
    food493
    food494
    food495
    food496
    food497
    food498
    food499
    food480
    
    Process finished with exit code 0
    结果

    3、数据共享

       普通模式:

     1 from multiprocessing import Manager, Process, Lock
     2 
     3 
     4 def work(d, lock):
     5     with lock:  # 不加锁而操作共享的数据,肯定会出现数据错乱
     6         d['count'] -= 1
     7 
     8 
     9 if __name__ == '__main__':
    10     lock = Lock()
    11     m = Manager()
    12     dic = m.dict({'count': 100})
    13     p_l = []
    14     for i in range(100):
    15         p = Process(target=work, args=(dic, lock))
    16         p_l.append(p)
    17         p.start()
    18     for p in p_l:
    19         p.join()
    20     print(dic)
    示例
    D:Python36python.exe E:/Python/草稿纸.py
    {'count': 0}
    
    Process finished with exit code 0
    结果

      上下文管理模式:

     1 from multiprocessing import Manager, Process, Lock
     2 
     3 
     4 def work(d, lock):
     5     with lock:  # 不加锁而操作共享的数据,肯定会出现数据错乱
     6         d['count'] -= 1
     7 
     8 
     9 if __name__ == '__main__':
    10     lock = Lock()
    11     with Manager() as m:
    12         dic = m.dict({'count': 100})
    13         p_l = []
    14         for i in range(100):
    15             p = Process(target=work, args=(dic, lock))
    16             p_l.append(p)
    17             p.start()
    18         for p in p_l:
    19             p.join()
    20         print(dic)
    示例
    D:Python36python.exe E:/Python/草稿纸.py
    {'count': 0}
    
    Process finished with exit code 0
    结果

    4、进程池

      多进程和进程池的对比:

        对于纯计算型的代码,使用进程池更好 ——真理

        对于高IO的代码,直接使用多进程更好 —— 相对论

      结论:进程池比起多进程来说,节省了开启进程回收进程的时间,给操作系统调度进程降低了难度

      使用进程池提交任务:

        apply  # 同步提交任务,没有多进程的优势

        apply_async  # 异步提交任务,常用,可以通过get方法获取返回值

        close  # 关闭进程池,阻止往池中添加新的任务

        join  # join依赖close,一个进程必须先close再join

      map  # 接收一个任务函数,和一个iterable。节省了for循环和close、join,是一种简便的写法。

      apply_async和map相比,操作复杂,但是可以通过get方法获取返回值。

     1 import os
     2 def wahaha(num):
     3     print('',os.getpid())
     4     return num ** num
     5 
     6 def call(argv): # 回调函数用的是主进程的资源
     7     print(os.getpid())
     8     print(argv)
     9 
    10 if __name__ == '__main__':
    11     print('', os.getpid())
    12     p = Pool(5)
    13     p.apply_async(func=wahaha, args=(50,), callback=call)
    14     p.close()
    15     p.join()
    进程池
    D:Python36python.exe E:/Python/草稿纸.py
    主 1363613636
    13636
    8881784197001252323389053344726562500000000000000000000000000000000000000000000000000
    
    Process finished with exit code 0
    结果
  • 相关阅读:
    jstl插件使用
    IDEA配置tomcat
    Spring框架
    2020/7/17 JAVA模拟斗地主发牌洗牌
    2020/7/15 JAVA之Map接口
    2020/7/14 Java之增强for循环、泛型、List接口、Set接口
    2020/7/13 集合之ArrayList集合、Collection接口、Iterator迭代器
    2020/7/13 常用API之基本类型包装类、System类、Math类、Arrays类、大数据运算
    2020/7/11 日期相关类
    2020/7/8 JAVA总结之:匿名对象/内部类/包的声明与访问/访问修饰符/代码块
  • 原文地址:https://www.cnblogs.com/ZN-225/p/9179112.html
Copyright © 2011-2022 走看看