zoukankan      html  css  js  c++  java
  • Day9 进程同步锁 进程队列 进程池 生产消费模型 进程池 paramike模块

    进程同步锁:

    当运行程序的时候,有可能你的程序同时开多个进程,开进程的时候会将多个执行结果打印出来,这样的话打印的信息都是错乱的,怎么保证打印信息是有序的呢?

    其实也就是相当于让进程独享资源。

     1 from multiprocessing import Process,Lock    #引用函数
     2 import time
     3 def work(name,mutex):    
     4     mutex.acquire()    #在这里加入锁
     5     print('task <%s> is runing' %name)
     6     time.sleep(2)
     7     print('task <%s> is done' % name)
     8     mutex.release()    #加完锁以后必须需要解锁
     9 
    10 if __name__ == '__main__':
    11     mutex=Lock()
    12     p1=Process(target=work,args=('egon',mutex))
    13     p2=Process(target=work,args=('alex',mutex))
    14     p1.start()
    15     p2.start()
    16     print('')

    比如说模拟抢票的功能:

    要先写一个文本   ("count":1)   就记个数就行

     1 import json
     2 import os
     3 import time
     4 from multiprocessing import Process,Lock
     5 def search():
     6     dic=json.load(open('db.txt'))
     7     print('33[32m[%s] 看到剩余票数<%s>33[0m' %(os.getpid(),dic['count']))
     8 def get_ticket():
     9     dic = json.load(open('db.txt'))
    10     time.sleep(0.5) #模拟读数据库的网络延迟
    11     if dic['count'] > 0:
    12         dic['count']-=1
    13         time.sleep(0.5)  # 模拟写数据库的网络延迟
    14         json.dump(dic,open('db.txt','w'))
    15         print('33[31m%s 购票成功33[0m' %os.getpid())
    16 def task(mutex):
    17     search()
    18     mutex.acquire()
    19     get_ticket()
    20     mutex.release()
    21 if __name__ == '__main__':
    22     mutex=Lock()
    23     for i in range(10):
    24         p=Process(target=task,args=(mutex,))
    25         p.start()

    进程队列:

    共享内存的方式:

     1 from multiprocessing import Process,Manager,Lock     #Manager共享内存函数
     2 
     3 def task(dic,mutex):
     4     with mutex:
     5         dic['count']-=1
     6 
     7 if __name__ == '__main__':
     8     mutex=Lock()
     9     m=Manager()
    10     dic=m.dict({'count':100})
    11     p_l=[]
    12     for i in range(100):
    13         p=Process(target=task,args=(dic,mutex))
    14         p_l.append(p)
    15         p.start()
    16 
    17     for p in p_l:
    18         p.join()
    19     print(dic)


    队列:

    进程彼此之间隔离,要实现进程间通信

     1 from multiprocessing import Queue    #引用函数
     2 q=Queue(3)      #意味着你队列长队最大为三
     3
     4 q.put('first')
     5 q.put('second')
     6 q.put('third')
     7 # q.put('fourth')    #满了的话会一直卡住
     8 
     9 print(q.get())
    10 print(q.get())
    11 print(q.get())
    12 print(q.get())
    13 
    14 #了解
    15 # q=Queue(3)
    16 # 
    17 # q.put('first',block=False)
    18 # q.put('second',block=False)
    19 # q.put('third',block=False)    #这样的话队列满了就会抛出异常,不会卡在这里
    20 # # q.put_nowait('fourth') 
    21 #q.put('fourth',block=False)
    22 # q.put('fourth',timeout=3) #指定抛出时间,如果3秒后队列还是满的抛出异常

    生产者消费者模型:

    正常情况下,一般都是生产者预先生产出商品,然后等着消费者来买。

    (实际情况可能是有多个生产者,多个消费者)

    from multiprocessing import Process, JoinableQueue
    import time, os
    
    
    def producer(q, name):
        for i in range(3):
            time.sleep(1)
            res = '%s%s' % (name, i)
            q.put(res)
            print('33[45m<%s> 生产了 [%s]33[0m' % (os.getpid(), res))
        q.join()  #对应下边的task_done
    
    def consumer(q):
        while True:
            res = q.get()
            time.sleep(1.5)
            print('33[34m<%s> 吃了 [%s]33[0m' % (os.getpid(), res))
            q.task_done()    #代表我这个进程我已经取走了,发给生产者,对应生产者要有个jion()
    
    if __name__ == '__main__':
        q = JoinableQueue()
    
        # 生产者们:即厨师们
        p1 = Process(target=producer, args=(q, '包子'))
        p2 = Process(target=producer, args=(q, '饺子'))
        p3 = Process(target=producer, args=(q, '馄饨'))
    
        # 消费者们:即吃货们
        c1 = Process(target=consumer, args=(q,))
        c2 = Process(target=consumer, args=(q,))
    
        c1.daemon=True    #如果消费者不取完的话,程序无法结束
        c2.daemon=True   #这里主进程运行完,子进程要结束掉
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
    
    
        p1.join()
    
        print('')

    进程池:

     1 from multiprocessing import Pool
     2 import os,time
     3 
     4 def work(n):
     5     print('task <%s> is runing' %os.getpid())
     6     time.sleep(2)
     7     return n**2
     8 if __name__ == '__main__':
     9     # print(os.cpu_count())
    10     p=Pool(4)    #定义进程池的大小,如果不定义一般是内核数
    11     # for i in range(10):
    12     #     res=p.apply(work,args=(i,))    #向进程池里边添加进程,同步提交
    13     #     print(res)
    14 
    15     res_l=[]
    16     for i in range(10):
    17         res=p.apply_async(work,args=(i,))    #向进程池异步提交,只管扔进去,不管是否执行完成
    18         res_l.append(res)
    19 
    20     p.close()
    21     p.join()
    22     #
    23     # for res in res_l:
    24     #     print(res.get())

    进程池之回调函数:

    需要回调函数的场景,进程池中任何一个任务一旦处理完成,就立即告诉主进程,我好了,你可以处理我的结果了,主进程调用一个函数去处理该结果,该函数即回调函数。

     1 import requests #pip3 install requests
     2 import os,time
     3 from multiprocessing import Pool
     4 def get_page(url):
     5     print('<%s> get :%s' %(os.getpid(),url))
     6     respone = requests.get(url)
     7     if respone.status_code == 200:
     8         return {'url':url,'text':respone.text}
     9 
    10 def parse_page(dic):
    11     print('<%s> parse :%s' %(os.getpid(),dic['url']))
    12     time.sleep(0.5)
    13     res='url:%s size:%s
    ' %(dic['url'],len(dic['text'])) #模拟解析网页内容
    14     with open('db.txt','a') as f:
    15         f.write(res)
    16 
    17 
    18 if __name__ == '__main__':
    19     p=Pool(4)
    20     urls = [
    21         'http://www.baidu.com',
    22         'http://www.baidu.com',
    23         'http://www.baidu.com',
    24         'http://www.baidu.com',
    25         'http://www.baidu.com',
    26         'http://www.baidu.com',
    27         'http://www.baidu.com',
    28     ]
    29 
    30 
    31     for url in urls:
    32         p.apply_async(get_page,args=(url,),callback=parse_page)
    33 
    34 
    35     p.close()
    36     p.join()
    37     print('主进程pid:',os.getpid())

    paramike模块:

    paramike模块是一个用作做远程控制的模块,使用该模块可以对远程服务器进行命令或者文件操作。值得一提的是,fabric和ansible内部的远程管理就是使用的paramike模块。

    #需要下载安装

    #pip3 install paramiko

    远程连接功能:

     1 import paramiko
     2 
     3 # 创建SSH对象
     4 ssh = paramiko.SSHClient()
     5 # 允许连接不在know_hosts文件中的主机
     6 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
     7 # 连接服务器
     8 ssh.connect(hostname='120.92.84.249', port=22, username='root', password='123QWEasd')
     9 
    10 # 执行命令
    11 stdin, stdout, stderr = ssh.exec_command('df')
    12 # 获取命令结果
    13 result = stdout.read()
    14 print(result.decode('utf-8'))
    15 # 关闭连接
    16 ssh.close()

    这是基于账号密码来访问客户端的。

    另外一种方式是基于秘钥的,

    现在服务端制作一个秘钥,ssh-keygen制作秘钥,sz可以下载到window桌面。

    客户端要用,肯定要基于服务端有认证文件,利用 ssh-copy-id -i 用户@ip

     1 import paramiko
     2 
     3 private_key = paramiko.RSAKey.from_private_key_file('id_rsa')   #秘钥的文件位置
     4 
     5 # 创建SSH对象
     6 ssh = paramiko.SSHClient()
     7 # 允许连接不在know_hosts文件中的主机
     8 ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
     9 # 连接服务器
    10 ssh.connect(hostname='120.92.84.249', port=22, username='root', pkey=private_key)
    11 
    12 # 执行命令
    13 stdin, stdout, stderr = ssh.exec_command('df')
    14 # 获取命令结果
    15 result = stdout.read()
    16 print(result.decode('utf-8'))
    17 # 关闭连接
    18 ssh.close()

     上传下载:

     1 import paramiko
     2 
     3 transport = paramiko.Transport(('120.92.84.249', 22))
     4 transport.connect(username='root', password='123QWEasd')
     5 
     6 sftp = paramiko.SFTPClient.from_transport(transport)
     7 # 将location.py 上传至服务器 /tmp/test.py
     8 sftp.put('id_rsa', '/tmp/test.rsa')
     9 # 将remove_path 下载到本地 local_path
    10 # sftp.get('remove_path', 'local_path')
    11 
    12 transport.close()
  • 相关阅读:
    深度学习100问
    BAT机器学习面试1000题系列
    深度学习项目——基于卷积神经网络(CNN)的人脸在线识别系统
    深入理解卷积层
    AI大道理头尾标识
    git-svn Manual Page
    收藏夹
    C语言 #、##、#@在#define中的用法
    ubuntu 编译安装自己的git-svn
    ALSA参考文档
  • 原文地址:https://www.cnblogs.com/sexiaoshuai/p/7452241.html
Copyright © 2011-2022 走看看