zoukankan      html  css  js  c++  java
  • python_day09 多进程 多线程 协程 paramiko模块

    多进程
    多线程
    协程
    paramiko模块

    1、基于UDP的套接字

    UDP是面向数据报的,不是面向连接的

    from socket import *
    
    udp_server=socket(AF_INET,SOCK_DGRAM)
    udp_server.bind(('127.0.0.1',8080))
    
    while True:
        data,client_addr=udp_server.recvfrom(1024)
        print(data,client_addr)
        udp_server.sendto(data.upper(),client_addr)
    UDP服务端
    from socket import *
    
    udp_client=socket(AF_INET,SOCK_DGRAM)
    
    while True:
        msg=input('>>: ').strip()
        udp_client.sendto(msg.encode('utf-8'),('127.0.0.1',8080))
        data,server_addr=udp_client.recvfrom(1024)
        print(data.decode('utf-8'))
    UDP客户端

    基于UDP的套接字不会发生粘包现象

    from socket import *
    udp_server=socket(AF_INET,SOCK_DGRAM)
    udp_server.bind(('127.0.0.1',8080))
    data1,client_addr=udp_server.recvfrom(3)
    print('data1',data1)
    data2,client_addr=udp_server.recvfrom(1024)
    print('data2',data2)
    UDP服务端
    from socket import *
    udp_client=socket(AF_INET,SOCK_DGRAM)
    udp_client.sendto('hello'.encode('utf-8'),('127.0.0.1',8080))
    udp_client.sendto('world'.encode('utf-8'),('127.0.0.1',8080))
    UDP客户端

    并发的UDP套接字

    #UDP服务端
    import
    socketserver class MyUDPhandler(socketserver.BaseRequestHandler): def handle(self): print(self.request) self.request[1].sendto(self.request[0].upper(),self.client_address) if __name__ == '__main__': s=socketserver.ThreadingUDPServer(('127.0.0.1',8080),MyUDPhandler) s.serve_forever()
    #UDP客户端
    from socket import *
    udp_client=socket(AF_INET,SOCK_DGRAM)
    while True:
        msg=input('>>: ').strip()
        udp_client.sendto(msg.encode('utf-8'),('127.0.0.1',8080))
        data,server_addr=udp_client.recvfrom(1024)
        print(data.decode('utf-8'))

    2、进程理论知识

    进程是对正在运行程序的一个抽象。
    #一 操作系统的作用:
        1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口
        2:管理、调度进程,并且将多个进程对硬件的竞争变得有序
    进程与程序的区别:
    程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。
    注:同一个程序执行两次,那也是两个进程,比如打开暴风影音,虽然都是同一个软件,但是一个可以播放苍井空,一个可以播放饭岛爱。
    同步执行:一个进程在执行某个任务时,另外一个进程必须等待其执行完毕,才能继续执行
    异步执行:一个进程在执行某个任务时,另外一个进程无需等待其执行完毕,就可以继续执行,当有消息返回时,系统会通知后者进行处理,这样可以提高执行效率
    #开启进程的方式一
    from multiprocessing import Process
    import time
    def work(name):
        print('task <%s> is runing' %name)
        time.sleep(0.5)
        print('task <%s> is done' % name)
    if __name__ == '__main__':      #windows系统开启子进程一定要写在main函数下。
        # Process(target=work,kwargs={'name':'egon'})
        p1=Process(target=work,args=('egon',))  #一定要加,表示此为元组
        p2=Process(target=work,args=('alex',))
        p1.start()
        p2.start()
        print('')
    #join方法 待子进程运行完后主进程开始运行
    from multiprocessing import Process
    import time
    def work(name):
        print('task <%s> is runing' %name)
        time.sleep(0.5)
        print('task <%s> is done' % name)
    if __name__ == '__main__':
        p1=Process(target=work,args=('egon',))
        p2=Process(target=work,args=('alex',))
        p3=Process(target=work,args=('yuanhao',))
        p_l = [p1, p2, p3]
        for p in p_l:
            p.start()
        for p in p_l:
            p.join()
        # p1.join() #主进程等,等待p1运行结束
        # p2.join() #主进程等,等待p2运行结束
        # p3.join() #主进程等,等待p3运行结束
        print('')
    #开启子进程的方式二
    from multiprocessing import Process
    import time
    class MyProcess(Process):
        def __init__(self,name):
            super().__init__()
            self.name=name
        def run(self):
            print('task <%s> is runing' % self.name)
            time.sleep(0.5)
            print('task <%s> is done' % self.name)
    if __name__ == '__main__':
        p=MyProcess('egon')
        p.start()
        print('')
    #并发的套接字通讯
    #服务端
    from multiprocessing import Process
    from socket import *
    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind(('127.0.0.1',8080))
    s.listen(5)
    def talK(conn,addr):
        while True:
            try:
                data=conn.recv(1024)
                if not data:break
                conn.send(data.upper())
            except Exception:
                break
        conn.close()
    if __name__ == '__main__':
        while True:
            conn,addr=s.accept()
            p=Process(target=talK,args=(conn,addr))   #链接循环使用开启通讯循环子进程方式
            p.start()
        s.close()
    #客户端
    from socket import *
    c=socket(AF_INET,SOCK_STREAM)
    c.connect(('127.0.0.1',8080))
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        c.send(msg.encode('utf-8'))
        data=c.recv(1024)
        print(data.decode('utf-8'))
    c.close()
    #Process对象的其他方法和属性
    from multiprocessing import Process
    import time,os
    def work():
        print('parent:%s task <%s> is runing' %(os.getppid(),os.getpid()))
        time.sleep(1)
        print('parent:%s task <%s> is done'  %(os.getppid(),os.getpid()))
    if __name__ == '__main__':
    p1=Process(target=work,args=('egon',),name='123123')  #指定进程名p1.start()
    p1.terminate() #强制中止进程 如果p1有子进程,会出现僵尸进程
    p1.is_alive()    #True os过了一段时间才能回收p1进程
    p1.name #进程名
    p1.pid     #进程号
    os.getpid() #当前进程的pid
    os.getppid  #父进程的pid
    windows cmd tasklist|findstr python(pycharm)

     守护进程daemon

    #守护进程daemon
    #其一:守护进程会在主进程代码执行结束后就终止
    #其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
    from multiprocessing import Process import time def work(name): print('task <%s> is runing' %name) time.sleep(0.5) print('task <%s> is done' % name) if __name__ == '__main__': p1=Process(target=work,args=('egon',)) p1.daemon = True #子进程start之前必须要要指定daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行 p1.start() print('') #主进程代码运行完毕,守护进程就会结束 from multiprocessing import Process import time def foo(): print(123) time.sleep(1) print("end123") def bar(): print(456) time.sleep(3) print("end456") if __name__ == '__main__': p1=Process(target=foo) p2=Process(target=bar) p1.daemon=True p1.start() p2.start() print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止

    同步锁mutex

    #竞争带来的结果就是错乱,如何控制,就是加锁处理
    #
    同步锁mutex from multiprocessing import Process,Lock import time def work(name,mutex): mutex.acquire() #加锁 print('task <%s> is runing' %name) time.sleep(2) print('task <%s> is done' % name) mutex.release() #解锁 if __name__ == '__main__': mutex=Lock() p1=Process(target=work,args=('egon',mutex)) #参数要指定mutex p2=Process(target=work,args=('alex',mutex)) p1.start() p2.start() print('')
    加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
    虽然可以用文件共享数据实现进程间通信,但问题是:
    1.效率低
    2.需要自己加锁处理

    模拟抢票

    #db.txt {"count": 1}  #序列化需要用双引号
    import json
    import os
    import time
    from multiprocessing import Process,Lock
    def search():
        dic=json.load(open('db.txt'))
        print('33[32m[%s] 看到剩余票数<%s>33[0m' %(os.getpid(),dic['count']))
    def get_ticket():
        dic = json.load(open('db.txt'))
        time.sleep(0.5) #模拟读数据库的网络延迟
        if dic['count'] > 0:
            dic['count']-=1
            time.sleep(0.5)  # 模拟写数据库的网络延迟
            json.dump(dic,open('db.txt','w'))
            print('33[31m%s 购票成功33[0m' %os.getpid())
    def task(mutex):
        search()
        mutex.acquire()
        get_ticket()
        mutex.release()
    if __name__ == '__main__':
        mutex=Lock()
        for i in range(10):
            p=Process(target=task,args=(mutex,))
            p.start()

    共享数据

    from multiprocessing import Process,Manager,Lock
    def task(dic,mutex):
        with mutex:
            dic['count']-=1
    if __name__ == '__main__':
        mutex=Lock()
        m=Manager()
        dic=m.dict({'count':100})
        p_l=[]
        for i in range(10):
            p=Process(target=task,args=(dic,mutex))
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
        print(dic)

    队列

    #为此mutiprocessing模块为我们提供了基于消息的IPC通信机制:队列和管道。
    1 队列和管道都是将数据存放于内存中
    2 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
    我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
    from
    multiprocessing import Queue q=Queue(3) q.put('first') q.put('second') q.put('third') # q.put('fourth') #满了会一直卡在这 print(q.get()) print(q.get()) print(q.get()) # print(q.get()) #空的话会一直卡在这 #了解 q=Queue(3) q.put('first',block=False) q.put('second',block=False) q.put('third',block=False) # q.put_nowait('fourth') == #q.put('fourth',block=False) #满了不会卡在这,会抛出一个异常 q.put('fourth',timeout=3) #指定超时时间

    生产者消费者模型

    from multiprocessing import Process,Queue
    import time,os
    def producer(q,name):
        for i in range(3):
            time.sleep(1)
            res='%s%s' %(name,i)
            q.put(res)
            print('33[45m<%s> 生产了 [%s]33[0m' %(os.getpid(),res))
    def consumer(q):
        while True:
            res=q.get()
            if res is None:break
            time.sleep(1.5)
            print('33[34m<%s> 吃了 [%s]33[0m' % (os.getpid(), res))
    if __name__ == '__main__':
        q=Queue()
        #生产者们:即厨师们
        p1=Process(target=producer,args=(q,'包子'))
        p2=Process(target=producer,args=(q,'饺子'))
        p3=Process(target=producer,args=(q,'馄饨'))
        #消费者们:即吃货们
        c1=Process(target=consumer,args=(q,))
        c2=Process(target=consumer,args=(q,))
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
        p1.join()
        p2.join()
        p3.join()   #待生产者们运行完毕
        q.put(None) #队列中放入结束指定符,几个消费者就放入几个None
        q.put(None)
        print('')

    Joinable生产者消费者模型

    #Joinablequeue
    #消费者发消息给生产者
    from multiprocessing import Process, JoinableQueue
    import time, os
    def producer(q, name):
        for i in range(3):
            time.sleep(1)
            res = '%s%s' % (name, i)
            q.put(res)
            print('33[45m<%s> 生产了 [%s]33[0m' % (os.getpid(), res))
        q.join()     #待进程运行完毕
    def consumer(q):
        while True:
            res = q.get()
            time.sleep(1.5)
            print('33[34m<%s> 吃了 [%s]33[0m' % (os.getpid(), res))
            q.task_done()    #消费者发消息给生产者
    if __name__ == '__main__':
        q = JoinableQueue()
        # 生产者们:即厨师们
        p1 = Process(target=producer, args=(q, '包子'))
        p2 = Process(target=producer, args=(q, '饺子'))
        p3 = Process(target=producer, args=(q, '馄饨'))
        # 消费者们:即吃货们
        c1 = Process(target=consumer, args=(q,))
        c2 = Process(target=consumer, args=(q,))
        c1.daemon=True  #消费者为守护进程   
        c2.daemon=True 
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
        p1.join()
        print('')

     进程池

    创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程
    #
    进程池Pool from multiprocessing import Pool import os,time def work(n): print('task <%s> is runing' %os.getpid()) time.sleep(2) return n**2 if __name__ == '__main__': # print(os.cpu_count()) #CPU个数获得方式 p=Pool(4) #进程个数设置为CPU个数 #要创建的进程数,如果省略,将默认使用cpu_count()的值 res_l=[] for i in range(10): res=p.apply_async(work,args=(i,)) #异步方式提交任务 res_l.append(res)
    #异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了
    p.close() #不允许再给进程池加任务 join前必须要执行close,否则程序有问题 p.join() #主进程等待进程池中任务执行结束 for res in res_l: print(res.get()) #从进程池中获得任务执行的结果 ##使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get

    p.apply(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
    p.apply_async(func [, args [, kwargs]]):在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
       
    p.close():关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
    P.jion():等待所有工作进程退出。此方法只能在close()或teminate()之后调用

    进程池之回调函数

    需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
    我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
    #回调函数callback=
    #进程池异步方式提交任务,进程池结果使用回调函数处理任务执行结果
    import requests #pip3 install requests
    import os,time
    from multiprocessing import Pool
    def get_page(url):
        print('<%s> get :%s' %(os.getpid(),url))
        respone = requests.get(url)
        if respone.status_code == 200:
            return {'url':url,'text':respone.text}
    def parse_page(dic):
        print('<%s> parse :%s' %(os.getpid(),dic['url']))
        time.sleep(0.5)
        res='url:%s size:%s
    ' %(dic['url'],len(dic['text'])) #模拟解析网页内容
        with open('db.txt','a') as f:
            f.write(res)
    if __name__ == '__main__':
        p=Pool(4)
        urls = [
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
            'http://www.baidu.com',
        ]
        for url in urls:
            p.apply_async(get_page,args=(url,),callback=parse_page)
        p.close()
        p.join()
        print('主进程pid:',os.getpid())

    进程池控制并发的套接字通信

    #服务端
    from multiprocessing import Pool
    import os
    from socket import *
    s=socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind(('127.0.0.1',8080))
    s.listen(5)
    def talK(conn,addr):
        print(os.getpid())
        while True:
            try:
                data=conn.recv(1024)
                if not data:break
                conn.send(data.upper())
            except Exception:
                break
        conn.close()
    if __name__ == '__main__':
        p=Pool(4)
        while True:
            conn,addr=s.accept()
            p.apply_async(talK,args=(conn,addr))
        s.close()
    #客户端
    from socket import *
    c=socket(AF_INET,SOCK_STREAM)
    c.connect(('127.0.0.1',8080))
    while True:
        msg=input('>>: ').strip()
        if not msg:continue
        c.send(msg.encode('utf-8'))
        data=c.recv(1024)
        print(data.decode('utf-8'))
    c.close()

    paramiko模块

    
    
    paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作
    pip3 install paramiko #在python3中
    #用户名密码方式远程连接服务器执行命令获取结果
    import paramiko
    # 创建SSH对象
    ssh = paramiko.SSHClient()
    # 允许连接不在know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 连接服务器
    ssh.connect(hostname='120.92.84.249', port=22, username='root', password='123QWEasd')
    # 执行命令
    stdin, stdout, stderr = ssh.exec_command('df')
    # 获取命令结果
    result = stdout.read()
    print(result.decode('utf-8'))
    # 关闭连接
    ssh.close()
    
    #公私钥方式远程连接服务器执行命令获取结果
    # import paramiko
    private_key = paramiko.RSAKey.from_private_key_file('id_rsa')
    # 创建SSH对象
    ssh = paramiko.SSHClient()
    # 允许连接不在know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 连接服务器
    ssh.connect(hostname='120.92.84.249', port=22, username='root', pkey=private_key)
    # 执行命令
    stdin, stdout, stderr = ssh.exec_command('df')
    # 获取命令结果
    result = stdout.read()
    print(result.decode('utf-8'))
    # 关闭连接
    ssh.close()
    
    #paramiko利用sftp上传下载文件
    import paramiko
    transport = paramiko.Transport(('120.92.84.249', 22))
    transport.connect(username='root', password='123QWEasd')
    sftp = paramiko.SFTPClient.from_transport(transport)
    # 将location.py 上传至服务器 /tmp/test.py
    sftp.put('id_rsa', '/tmp/test.rsa')
    # 将remove_path 下载到本地 local_path
    # sftp.get('remove_path', 'local_path')
    transport.close()
  • 相关阅读:
    Apicloud_(模板)登陆注册功能模板
    Apicloud_(接口验证)用户注册头部信息X-APICloud-AppKey生成
    Apicloud_(项目)网上书城02_后端数据获取
    Apicloud_(项目)网上书城01_前端搭建
    R_Studio(时序)Apriori算法寻找频繁项集的方法
    R_Studio(聚类)针对iris数据比较几种聚类方法优劣
    R_Studio(神经网络)BP神经网络算法预测销量的高低
    JavaWeb_(SSH论坛)_七、辅助模块
    JavaWeb_(SSH论坛)_六、点赞模块
    JavaWeb_(SSH论坛)_五、帖子模块
  • 原文地址:https://www.cnblogs.com/liweijing/p/7443031.html
Copyright © 2011-2022 走看看