zoukankan      html  css  js  c++  java
  • 第九篇:网络编程补充与进程

    本篇内容

    1. udp协议套接字
    2. 开启进程的方式
    3. 多进程实现并发的套接字通信
    4. join方法
    5. 守护进程
    6. 同步锁
    7. 进程队列
    8. 生产者消费者模型
    9. 进程池
    10. paramiko模块

    一、 udp协议套接字

    1.TCP和UDP在传输层区别:
    UDP是无连接不可靠的数据报协议。TCP提供面向连接的可靠字节流。

    2.使用UDP常见应用:
    DNS(域名系统),NFS(网络文件系统),SNMP(简单网络管理协议)。

    3.代码应用:

    服务端:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    import socketserver
    class MyUDPhandler(socketserver.BaseRequestHandler):
        def handle(self):
            print(self.request)
            self.request[1].sendto(self.request[0].upper(),self.client_address)
    
    if __name__ == '__main__':
        s = socketserver.ThreadingUDPServer(('127.0.0.1',8080),MyUDPhandler)
        s.serve_forever()
    

     客户端:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    from socket import *
    
    udp_client = socket(AF_INET,SOCK_DGRAM)
    
    while True:
        msg=input('>>: ').strip()
        udp_client.sendto(msg.encode('utf-8'),('127.0.0.1',8080))
        data,server_addr = udp_client.recvfrom(1024)
        print(data.decode('utf-8'))
    

     注意:但这种方式并不能控制客户端的并发数量,并发数量达到一定数量后,服务端会down掉,解决办法后续会提供。

    二、开启进程的方式

    开启进程的方式分为两种:

    (1)利用模块开启进程:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    from multiprocessing import Process
    import time
    def work(name):
        print('task <%s> is runing' %name)
        time.sleep(2)
        print('task <%s> is done' % name)
    
    if __name__ == '__main__':
        p1 = Process(target=work,args=('xiaolan',))
        p2 = Process(target=work,args=('xiaohong',))
        p1.start()
        p2.start()
        print('主程序')
    

     (2)利用类开启进程:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    from multiprocessing import Process
    import time
    
    class MyProcess(Process):
        def __init__(self,name):
            super().__init__()
            self.name = name
    
        def run(self):
            print('task <%s> is runing' % self.name)
            time.sleep(2)
            print('task <%s> is done' % self.name)
    
    if __name__ == '__main__':
        p = MyProcess('xiaolan')
        p.start()
        print('主程序')
    

    三、多进程实现并发的套接字通信

    基于刚刚学习的开启进程的方式,咱们就用进程的方式来开启一个网络通信。

    服务端:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    from multiprocessing import Process
    from socket import *
    
    s = socket(AF_INET,SOCK_STREAM)
    s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    s.bind(('127.0.0.1',8080))
    s.listen(5)
    
    def talK(conn,addr):
        while True:
            try:
                data=conn.recv(1024)
                if not data:break
                conn.send(data.upper())
            except Exception:
                break
        conn.close()
    
    if __name__ == '__main__':
        while True:
            conn,addr = s.accept()
            p=Process(target=talK,args=(conn,addr))
            p.start()
    
        s.close()
    

     客户端:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    from socket import *
    
    c = socket(AF_INET,SOCK_STREAM)
    c.connect(('127.0.0.1',8080))
    
    while True:
        msg = input('>>: ').strip()
        if not msg:continue
        c.send(msg.encode('utf-8'))
        data = c.recv(1024)
        print(data.decode('utf-8'))
    
    c.close()
    

    四、join方法

    1.定义:

    (1)join方法的作用是阻塞主进程(挡住,无法执行join以后的语句),专注执行多进程。

    (2)多进程多join的情况下,依次执行各进程的join方法,前头一个结束了才能执行后面一个。

    (3)无参数,则等待到该进程结束,才开始执行下一个进程的join。

     2.代码:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    from multiprocessing import Process
    import time
    
    def work(name):
        print('task <%s> is runing' %name)
        time.sleep(3)
        print('task <%s> is done' % name)
    
    if __name__ == '__main__':
        p1 = Process(target=work,args=('xiaolan',))
        p2 = Process(target=work,args=('xiaohong',))
        p3 = Process(target=work,args=('xiaolv',))
    
        p_list = [p1, p2, p3]
        for p in p_list:
            p.start()
        for p in p_list:
            p.join()
        print('主进程')
    

    五、守护进程

    1.定义:

    (1)守护进程是主程序创建的。

    (2)守护进程会在主进程代码执行结束后就终止。

    (3)守护进程内无法再开启子进程,否则抛出异常:

    AssertionError: daemonic processes are not allowed to have children。

    2.代码:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    from multiprocessing import Process
    import time
    
    def work(name):
        print('task <%s> is runing' %name)
        time.sleep(2)
        print('task <%s> is done' % name)
    
    if __name__ == '__main__':
        p1 = Process(target=work,args=('xiaolan',))
        p1.daemon = True
        p1.start()
        print('主程序')
    

    六、同步锁

    1.定义:

    通常被用来实现共享资源的同步访问,为每一个共享资源创建一个Lock对象当你需要访问该资源时,调用qcuqire方法来获取锁对象(如果其他线程已经获得该锁,则当前线程需等待期被释放),待资源访问完后,在调用release方法释放锁。

    2.代码:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    from multiprocessing import Process,Lock
    import time
    def work(name,mutex):
        mutex.acquire()
        print('task <%s> is runing' %name)
        time.sleep(2)
        print('task <%s> is done' % name)
        mutex.release()
    
    if __name__ == '__main__':
        mutex = Lock()
        p1 = Process(target=work,args=('xiaolan',mutex))
        p2 = Process(target=work,args=('xiaohong',mutex))
        p1.start()
        p2.start()
        print('主程序')
    

     3.代码应用:

    模拟抢票过程

    python代码:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    import json
    import os
    import time
    from multiprocessing import Process,Lock
    
    def search():
        dic = json.load(open('db.txt'))
        print('33[32m[%s] 看到剩余票数<%s>33[0m' %(os.getpid(),dic['count']))
    
    def get_ticket():
        dic = json.load(open('db.txt'))
        time.sleep(0.5) #模拟读数据库的网络延迟
        if dic['count'] > 0:
            dic['count'] -= 1
            time.sleep(0.5)  # 模拟写数据库的网络延迟
            json.dump(dic,open('db.txt','w'))
            print('33[31m%s 购票成功33[0m' %os.getpid())
    
    def task(mutex):
        search()
        mutex.acquire()
        get_ticket()
        mutex.release()
    
    if __name__ == '__main__':
        mutex = Lock()
        for i in range(10):
            p = Process(target=task,args=(mutex,))
            p.start()
    

     db.txt文件:

    {"count": 0}
    

     4.缺点:

    (1)运行效率低

    (2)需要自己加锁处理,操作繁琐

    七、进程队列

    1.定义:

    (1)Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

    (2)maxsize是队列中允许最大项数,省略则无大小限制。

    2.代码:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    from multiprocessing import Queue
    
    q = Queue(3)
    
    q.put('first')
    q.put('second')
    q.put('third')
    
    print(q.get())
    print(q.get())
    print(q.get())
    

    八、生产者消费者模型

    1.定义:

    在工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类、函数、线程、进程等)。产生数据的模块,就形象地称为生产者;而处理数据的模块,就称为消费者。在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模式。

    2.优点:

    (1)解耦。

    (2)支持并发。

    (3)支持忙闲不均。

    3.代码:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    from multiprocessing import Process, JoinableQueue
    import time, os
    
    def producer(q, name):
        for i in range(3):
            time.sleep(1)
            res = '%s%s' % (name, i)
            q.put(res)
            print('33[45m<%s> 生产了 [%s]33[0m' % (os.getpid(), res))
        q.join()
    
    def consumer(q):
        while True:
            res = q.get()
            time.sleep(1.5)
            print('33[34m<%s> 吃了 [%s]33[0m' % (os.getpid(), res))
            q.task_done()
    
    if __name__ == '__main__':
        q = JoinableQueue()
        p1 = Process(target=producer, args=(q, '红烧肉'))
        p2 = Process(target=producer, args=(q, '鱼香肉丝'))
        p3 = Process(target=producer, args=(q, '锅包肉'))
        c1 = Process(target=consumer, args=(q,))
        c2 = Process(target=consumer, args=(q,))
        c1.daemon = True
        c2.daemon = True
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
        p1.join()
        print('主程序')
    

    九、进程池

    1.定义:

    Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。

    2.代码:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    from multiprocessing import Pool
    import os
    import time
    
    def work(n):
        print('task <%s> is runing' % os.getpid())
        time.sleep(2)
        return n**2
    
    if __name__ == '__main__':
        p = Pool(4)
        res_l = []
        for i in range(10):
            res = p.apply_async(work,args=(i,))
            res_l.append(res)
        p.close()
        p.join()
    

     3.进程池之回调函数:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    import requests
    import os,time
    from multiprocessing import Pool
    
    def get_page(url):
        print('<%s> get :%s' % (os.getpid(), url))
        respone = requests.get(url)
        if respone.status_code == 200:
            return {'url': url,'text': respone.text}
    
    def parse_page(dic):
        print('<%s> parse :%s' % (os.getpid(), dic['url']))
        time.sleep(0.5)
        res = 'url:%s size:%s
    ' % (dic['url'], len(dic['text']))
        with open('db.txt', 'a') as f:
            f.write(res)
    
    if __name__ == '__main__':
        p = Pool(4)
        urls = [
            'https://www.baidu.com',
            'https://www.qq.com',
            'https://www.163.com',
            'https://www.sina.com',
            'https://www.jd.com',
            'https://www.taobao.com',
            'https://www.sohu.com',
        ]
    
        for url in urls:
            p.apply_async(get_page, args=(url,), callback=parse_page)
        p.close()
        p.join()
        print('主进程pid:', os.getpid())
    

    十、paramiko模块

    1.定义:

    paramiko是用python语言写的一个模块,遵循SSH2协议,支持以加密和认证的方式,进行远程服务器的连接。

    由于使用的是python这样的能够跨平台运行的语言,所以所有python支持的平台,如Linux, Solaris, BSD, MacOS X, Windows等,paramiko都可以支持,因此,如果需要使用SSH从一个平台连接到另外一个平台,进行一系列的操作时,paramiko是最佳工具之一。

    2.安装:

    由于paramiko是第三方模块,所以是需要我们单独安装的。

    pip3 install paramiko
    

     3.代码:

    (1)使用密码连接的方式:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    import paramiko
    
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(hostname='192.168.0.1', port=22, username='root', password='root123456')
    
    stdin, stdout, stderr = ssh.exec_command('df -h')
    result = stdout.read()
    print(result.decode('utf-8'))
    ssh.close()
    

     (2)使用秘钥连接的方式:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    import paramiko
    
    private_key = paramiko.RSAKey.from_private_key_file('id_rsa')
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(hostname='192.168.0.1', port=22, username='root', pkey=private_key)
    
    stdin, stdout, stderr = ssh.exec_command('df')
    result = stdout.read()
    print(result.decode('utf-8'))
    ssh.close()
    

     (3)上传或下载文件:

    #!/usr/binl/env python
    #encoding: utf-8
    #author: YangLei
    
    import paramiko
    
    transport = paramiko.Transport(('192.168.0.1', 22))
    transport.connect(username='root', password='root123456')
    
    sftp = paramiko.SFTPClient.from_transport(transport)
    sftp.put('test.txt', '/tmp/test.txt')
    sftp.get('/tmp/test.txt', 'test.txt')
    transport.close()
    
  • 相关阅读:
    单例模式
    SRM147 DIV2 950
    SRM147 DIV2 600
    SRM147 DIV2 250
    SRM147 DIV1 1000
    Python 实现字符串反转的9种方法
    ubtuntu redis 集群部署/搭建(官方原始方案)
    Python2 ValueError: chr() arg not in range(256) 解决办法?
    python 字典操作中has_key() 和 in 那个使用更加pythonic?
    Python库 使用filetype精确判断文件类型
  • 原文地址:https://www.cnblogs.com/00doudou00/p/7456658.html
Copyright © 2011-2022 走看看