zoukankan      html  css  js  c++  java
  • Day9 进程同步锁 进程队列 进程池 生产消费模型 进程池 paramike模块

    进程同步锁:

    当运行程序的时候,有可能你的程序同时开多个进程,开进程的时候会将多个执行结果打印出来,这样的话打印的信息都是错乱的,怎么保证打印信息是有序的呢?

    其实也就是相当于让进程独享资源。

     1 from multiprocessing import Process,Lock    #引用函数
     2 import time
     3 def work(name,mutex):    
     4     mutex.acquire()    #在这里加入锁
     5     print('task <%s> is runing' %name)
     6     time.sleep(2)
     7     print('task <%s> is done' % name)
     8     mutex.release()    #加完锁以后必须需要解锁
     9 
    10 if __name__ == '__main__':
    11     mutex=Lock()
    12     p1=Process(target=work,args=('egon',mutex))
    13     p2=Process(target=work,args=('alex',mutex))
    14     p1.start()
    15     p2.start()
    16     print('')

    比如说模拟抢票的功能:

    要先写一个文本   ("count":1)   就记个数就行

     1 import json
     2 import os
     3 import time
     4 from multiprocessing import Process,Lock
     5 def search():
     6     dic=json.load(open('db.txt'))
     7     print('33[32m[%s] 看到剩余票数<%s>33[0m' %(os.getpid(),dic['count']))
     8 def get_ticket():
     9     dic = json.load(open('db.txt'))
    10     time.sleep(0.5) #模拟读数据库的网络延迟
    11     if dic['count'] > 0:
    12         dic['count']-=1
    13         time.sleep(0.5)  # 模拟写数据库的网络延迟
    14         json.dump(dic,open('db.txt','w'))
    15         print('33[31m%s 购票成功33[0m' %os.getpid())
    16 def task(mutex):
    17     search()
    18     mutex.acquire()
    19     get_ticket()
    20     mutex.release()
    21 if __name__ == '__main__':
    22     mutex=Lock()
    23     for i in range(10):
    24         p=Process(target=task,args=(mutex,))
    25         p.start()

    进程队列:

    共享内存的方式:

     1 from multiprocessing import Process,Manager,Lock     #Manager共享内存函数
     2 
     3 def task(dic,mutex):
     4     with mutex:
     5         dic['count']-=1
     6 
     7 if __name__ == '__main__':
     8     mutex=Lock()
     9     m=Manager()
    10     dic=m.dict({'count':100})
    11     p_l=[]
    12     for i in range(100):
    13         p=Process(target=task,args=(dic,mutex))
    14         p_l.append(p)
    15         p.start()
    16 
    17     for p in p_l:
    18         p.join()
    19     print(dic)


    队列:

    进程彼此之间隔离,要实现进程间通信

     1 from multiprocessing import Queue    #引用函数
     2 q=Queue(3)      #意味着你队列长队最大为三
     3
     4 q.put('first')
     5 q.put('second')
     6 q.put('third')
     7 # q.put('fourth')    #满了的话会一直卡住
     8 
     9 print(q.get())
    10 print(q.get())
    11 print(q.get())
    12 print(q.get())
    13 
    14 #了解
    15 # q=Queue(3)
    16 # 
    17 # q.put('first',block=False)
    18 # q.put('second',block=False)
    19 # q.put('third',block=False)    #这样的话队列满了就会抛出异常,不会卡在这里
    20 # # q.put_nowait('fourth') 
    21 #q.put('fourth',block=False)
    22 # q.put('fourth',timeout=3) #指定抛出时间,如果3秒后队列还是满的抛出异常

    生产者消费者模型:

    正常情况下,一般都是生产者预先生产出商品,然后等着消费者来买。

    (实际情况可能是有多个生产者,多个消费者)

    from multiprocessing import Process, JoinableQueue
    import time, os
    
    
    def producer(q, name):
        for i in range(3):
            time.sleep(1)
            res = '%s%s' % (name, i)
            q.put(res)
            print('33[45m<%s> 生产了 [%s]33[0m' % (os.getpid(), res))
        q.join()  #对应下边的task_done
    
    def consumer(q):
        while True:
            res = q.get()
            time.sleep(1.5)
            print('33[34m<%s> 吃了 [%s]33[0m' % (os.getpid(), res))
            q.task_done()    #代表我这个进程我已经取走了,发给生产者,对应生产者要有个jion()
    
    if __name__ == '__main__':
        q = JoinableQueue()
    
        # 生产者们:即厨师们
        p1 = Process(target=producer, args=(q, '包子'))
        p2 = Process(target=producer, args=(q, '饺子'))
        p3 = Process(target=producer, args=(q, '馄饨'))
    
        # 消费者们:即吃货们
        c1 = Process(target=consumer, args=(q,))
        c2 = Process(target=consumer, args=(q,))
    
        c1.daemon=True    #如果消费者不取完的话,程序无法结束
        c2.daemon=True   #这里主进程运行完,子进程要结束掉
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
    
    
        p1.join()
    
        print('')

    进程池:

     1 from multiprocessing import Pool
     2 import os,time
     3 
     4 def work(n):
     5     print('task <%s> is runing' %os.getpid())
     6     time.sleep(2)
     7     return n**2
     8 if __name__ == '__main__':
     9     # print(os.cpu_count())
    10     p=Pool(4)    #定义进程池的大小,如果不定义一般是内核数
    11     # for i in range(10):
    12     #     res=p.apply(work,args=(i,))    #向进程池里边添加进程,同步提交
    13     #     print(res)
    14 
    15     res_l=[]
    16     for i in range(10):
    17         res=p.apply_async(work,args=(i,))    #向进程池异步提交,只管扔进去,不管是否执行完成
    18         res_l.append(res)
    19 
    20     p.close()
    21     p.join()
    22     #
    23     # for res in res_l:
    24     #     print(res.get())

    进程池之回调函数:

    需要回调函数的场景,进程池中任何一个任务一旦处理完成,就立即告诉主进程,我好了,你可以处理我的结果了,主进程调用一个函数去处理该结果,该函数即回调函数。

     1 import requests #pip3 install requests
     2 import os,time
     3 from multiprocessing import Pool
     4 def get_page(url):
     5     print('<%s> get :%s' %(os.getpid(),url))
     6     respone = requests.get(url)
     7     if respone.status_code == 200:
     8         return {'url':url,'text':respone.text}
     9 
    10 def parse_page(dic):
    11     print('<%s> parse :%s' %(os.getpid(),dic['url']))
    12     time.sleep(0.5)
    13     res='url:%s size:%s
    ' %(dic['url'],len(dic['text'])) #模拟解析网页内容
    14     with open('db.txt','a') as f:
    15         f.write(res)
    16 
    17 
    18 if __name__ == '__main__':
    19     p=Pool(4)
    20     urls = [
    21         'http://www.baidu.com',
    22         'http://www.baidu.com',
    23         'http://www.baidu.com',
    24         'http://www.baidu.com',
    25         'http://www.baidu.com',
    26         'http://www.baidu.com',
    27         'http://www.baidu.com',
    28     ]
    29 
    30 
    31     for url in urls:
    32         p.apply_async(get_page,args=(url,),callback=parse_page)
    33 
    34 
    35     p.close()
    36     p.join()
    37     print('主进程pid:',os.getpid())

    paramike模块:

    paramike模块是一个用作做远程控制的模块,使用该模块可以对远程服务器进行命令或者文件操作。值得一提的是,fabric和ansible内部的远程管理就是使用的paramike模块。

    #需要下载安装

    #pip3 install paramiko

    远程连接功能:

     1 import paramiko
     2 
     3 # 创建SSH对象
     4 ssh = paramiko.SSHClient()
     5 # 允许连接不在know_hosts文件中的主机
     6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
     7 # 连接服务器
     8 ssh.connect(hostname='120.92.84.249', port=22, username='root', password='123QWEasd')
     9 
    10 # 执行命令
    11 stdin, stdout, stderr = ssh.exec_command('df')
    12 # 获取命令结果
    13 result = stdout.read()
    14 print(result.decode('utf-8'))
    15 # 关闭连接
    16 ssh.close()

    这是基于账号密码来访问客户端的。

    另外一种方式是基于秘钥的,

    现在服务端制作一个秘钥,ssh-keygen制作秘钥,sz可以下载到window桌面。

    客户端要用,肯定要基于服务端有认证文件,利用 ssh-copy-id -i 用户@ip

     1 import paramiko
     2 
     3 private_key = paramiko.RSAKey.from_private_key_file('id_rsa')   #秘钥的文件位置
     4 
     5 # 创建SSH对象
     6 ssh = paramiko.SSHClient()
     7 # 允许连接不在know_hosts文件中的主机
     8 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
     9 # 连接服务器
    10 ssh.connect(hostname='120.92.84.249', port=22, username='root', pkey=private_key)
    11 
    12 # 执行命令
    13 stdin, stdout, stderr = ssh.exec_command('df')
    14 # 获取命令结果
    15 result = stdout.read()
    16 print(result.decode('utf-8'))
    17 # 关闭连接
    18 ssh.close()

     上传下载:

     1 import paramiko
     2 
     3 transport = paramiko.Transport(('120.92.84.249', 22))
     4 transport.connect(username='root', password='123QWEasd')
     5 
     6 sftp = paramiko.SFTPClient.from_transport(transport)
     7 # 将location.py 上传至服务器 /tmp/test.py
     8 sftp.put('id_rsa', '/tmp/test.rsa')
     9 # 将remove_path 下载到本地 local_path
    10 # sftp.get('remove_path', 'local_path')
    11 
    12 transport.close()
  • 相关阅读:
    笨方法学python中执行argv提示ValueError: not enough values to unpack (expected 4, got 1)
    VMware workstation安装
    Redis bigkey分析
    MySQL drop table 影响及过程
    MySQL 大表硬连接删除
    ES elasticsearch 各种查询
    ES elasticsearch 各种聚合
    ES elasticsearch 聚合统计
    ES elasticsearch 实现 count单字段,分组取前多少位,以地理位置中心进行统计
    MySQL行溢出、varchar最多能存多少字符
  • 原文地址:https://www.cnblogs.com/sexiaoshuai/p/7452241.html
Copyright © 2011-2022 走看看