zoukankan      html  css  js  c++  java
  • python基础-UDP、进程、进程池、paramike模块

    1 基于UDP套接字
    1.1 介绍
      udp是无连接的,是数据报协议,先启动哪端都不会报错
      udp服务端

    import socket
    
    sk = socket() #创建一个服务器的套接字
    sk.bind() #绑定服务器套接字
    while True: #服务器无限循环
    cs = sk.recvfrom()/sk.sendto() # 对话(接收与发送)
    sk.close() # 关闭服务器套接字

      udp客户端

    import socket
    client = socket() # 创建客户套接字
    while True: # 通讯循环
    client.sendto()/client.recvfrom() # 对话(发送/接收)
    client.close() # 关闭客户套接字

    1.2 基本实例
    1.2.1 服务端

    import socket
    
    udp_server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_server.bind(('127.0.0.1', 9999))
    
    while True:
    data,client_addr = udp_server.recvfrom(512)
    print(data, client_addr)
    udp_server.sendto(data.upper(), client_addr)

    1.2.2 客户端

    import socket
    
    udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    while True:
    msg = input('>>').strip()
    udp_client.sendto(msg.encode('utf-8'), ('127.0.0.1',9999))
    data,server_addr = udp_client.recvfrom(512)
    print(data.decode('utf-8'))

    1.3 udp不会粘包
      udp是基于数据报协议,发送一份信息,有完整的报头的主题,不会像tcp那样基于数据流的,没有开头、没有结尾;而udp有开头(报头),也有结尾,所以不会出现像tcp那样粘包现象。
    1.3.1 服务端

    import socket
    
    udp_server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udp_server.bind(('127.0.0.1',9999))
    
    info1,client_addr = udp_server.recvfrom(1)
    print('info1', info1)
    info2,client_addr = udp_server.recvfrom(512)
    print('info2', info2)

    1.3.2 客户端

    import socket
    
    udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    udp_client.sendto('welcome'.encode('utf-8'), ('127.0.0.1',9999))
    udp_client.sendto('beijing'.encode('utf-8'), ('127.0.0.1',9999))

    1.4 udp并发
    1.4.1 服务端

    import socketserver
    class MyUDPhandler(socketserver.BaseRequestHandler):
    def handle(self):
    print(self.request)
    self.request[1].sendto(self.request[0].upper(), self.client_address)
    
    if __name__ == '__main__':
    udp_server = socketserver.ThreadingUDPServer(('127.0.0.1',8080), MyUDPhandler)
    udp_server.serve_forever()

    1.4.2 客户端

    import socket
    
    udp_client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    
    while True:
    info = input('>>').strip()
    udp_client.sendto(info.encode('utf-8'), ('127.0.0.1',9999))
    body,server_addr = udp_client.recvfrom(512)
    print(body.decode('utf-8'))

    2 进程
    2.1 介绍
      进程:正在运行的一个过程或者任务,是对正在运行程序的一个抽象。

    2.2 开启进程
    示例1

    from multiprocessing import Process
    import time
    def my_run(info):
    print('task <%s> is running' %info)
    time.sleep(0.5)
    print('task <%s> is done' % info)
    
    if __name__ == '__main__':
    process1 = Process(target = my_run, args=('mary',))
    process2 = Process(target = my_run, args=('jack',))
    process1.start()
    process2.start()

    示例2

    from multiprocessing import Process
    import time
    class MyMulProcess(Process):
    def __init__(self,name):
    super().__init__()
    self.name = name
    
    def my_run(self):
    print('task <%s> is runing' % self.name)
    time.sleep(0.5)
    print('task <%s> is done' % self.name)
    
    if __name__ == '__main__':
    process = MyMulProcess('jack')
    process.my_run()
    process.start()

    2.3 并发通信
    2.3.1 服务端

    from multiprocessing import Pool
    import os
    import socket
    tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # 解决Address already in use
    tcp_server.bind(('127.0.0.1',9999))
    tcp_server.listen(5)
    def work(conn, addr):
    print(os.getpid())
    print(addr)
    while True:
    try:
    data = conn.recv(1024)
    if not data:break
    conn.send(data.upper())
    except Exception:
    break
    conn.close()
    
    if __name__ == '__main__':
    pool = Pool(4)
    while True:
    conn,addr = tcp_server.accept()
    pool.apply_async(work, args = (conn, addr))
    tcp_server.close()

    2.3.2 客户端

    import socket
    
    tcp_client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    tcp_client.connect(('127.0.0.1',9999))
    
    while True:
    info = input('>>').strip()
    if not info:continue
    tcp_client.send(info.encode('utf-8'))
    data = tcp_client.recv(1024)
    print(data.decode('utf-8'))
    
    tcp_client.close()

    2.4 join方法
      主进程,等待子进程运行完,才执行下面内容;
      p.join只能join住start开启的进程,而不能join住run开启的进程
      主进程,等待p1执行结束,才执行主进程下面的内容

    from multiprocessing import Process
    import time
    
    def work(name):
    print('task <%s> is runing' %name)
    time.sleep(0.5)
    print('task <%s> is done' % name)
    
    if __name__ == '__main__':
    process1 = Process(target = work, args=('jack',))
    process2 = Process(target = work, args=('mary',))
    
    process_list = [process1, process2]
    for process in process_list:
    process.start()
    
    for process in process_list:
    process.join()

    2.6 守护进程
      主进程代码运行完毕,守护进程就会结束
      主进程创建守护进程
      1.守护进程会在主进程代码执行结束后就终止
      2.守护进程内无法再开启子进程,否则抛出异常。

      守护进程,守护者主进程,主进程结束,守护进程随即结束;主进程代码结束后,守护进程随之结束

    from multiprocessing import Process
    import time
    def work(name):
    print('task <%s> is runing' %name)
    time.sleep(0.5)
    print('task <%s> is done' % name)
    
    if __name__ == '__main__':
    p1=Process(target=work,args=('jack',))
    p1.daemon = True # 必须在进程开启之前,设置为守护进程
    p1.start()

      重复守护进程概念,守护进程什么时间结束;在主进程代码结束,就会结束

    from multiprocessing import Process
    import time
    def foo():
    print("from foo start")
    time.sleep(0.5)
    print("from foo end")
    
    def bar():
    print("from bar start")
    time.sleep(0.8)
    print("from bar end")
    if __name__ == '__main__':
    process1 = Process(target = foo)
    process2 = Process(target = bar)
    
    process1.daemon = True
    process1.start()
    process2.start()
    print("主进程")
    
    #打印该行则主进程代码结束,则守护进程process1应该被终止,
    # 可能会有process1任务执行的打印信息from foo start,
    # 因为主进程打印主进程时,process1也执行了,但是随即被终止

    2.7 进程同步锁
      核心点:保证一个进程用完一个终端,再交个另一个终端使用,独享终端,保证有序;

    2.7.1 基本用法
      加锁,变为串行,保证数据不会错乱;效率与错乱之间做出取舍

    from multiprocessing import Process,Lock
    import time
    def work(name, mutex):
    mutex.acquire()
    print('task <%s> is runing' %name)
    time.sleep(0.5)
    print('task <%s> is done' % name)
    mutex.release()
    
    if __name__ == '__main__':
    mutex = Lock()
    process1 = Process(target = work, args = ('jack', mutex))
    process2 = Process(target = work, args = ('mary', mutex))
    process1.start()
    process2.start()

    2.7.2 模拟购票
      模拟购票,查询票的余额,不要考虑先后顺序;而到真正购票环境,需要保证一张票不被多次购买,需要加锁。

    import json,time,os
    from multiprocessing import Process,Lock
    def search():
    dic = json.load(open('ticket.txt'))
    print('33[32m[%s] 看到剩余票数<%s>33[0m' %(os.getpid(), dic['count']))
    def get_ticket():
    dic = json.load(open('ticket.txt'))
    time.sleep(0.5) # 模拟读数据库的网络延迟
    if dic['count'] > 0:
    dic['count'] -= 1
    time.sleep(0.5) # 模拟写数据库的网络延迟
    json.dump(dic,open('ticket.txt','w'))
    print('33[31m%s 购票成功33[0m' %os.getpid())
    def work(mutex):
    search()
    mutex.acquire()
    get_ticket()
    mutex.release()
    if __name__ == '__main__':
    mutex = Lock()
    for index in range(10):
    process = Process(target = work, args = (mutex,))
    process.start()

    2.7.3 共享数据通信
      基于共享内存方式,进行数据通信,需要考虑锁的形式。

    from multiprocessing import Process,Manager,Lock
    
    def work(dic, mutex):
    with mutex:
    dic['count'] -= 1
    
    if __name__ == '__main__':
    mutex = Lock()
    manager = Manager()
    dic = manager.dict({'count':100})
    p_list = []
    for i in range(100):
    process = Process(target = work, args = (dic, mutex))
    p_list.append(process)
    process.start()
    
    for process in p_list:
    process.join()
    print(dic)

    2.7.3 进程间通信
      进程间的通信,有很多方式,例如:管道、共享数据、消息队列等;推荐的方式是:通过消息队列的方式进行通信。
      Queue,常用方法:put、get;队列就是管道加锁,进行实现的

    from multiprocessing import Queue
    
    queue = Queue(3) # 队列的最大长度为3
    
    queue.put('first')
    queue.put('second')
    queue.put('third')
    queue.put('fourth') # 超过队列长度,满了会卡着
    
    print(queue.get())
    print(queue.get())
    print(queue.get())
    print(queue.get()) # 队列空了,一直卡着,等待队里有值,进行获取
    
    # 了解知识点
    queue = Queue(3)
    queue.put('first',block = False) # 队列满了,不进行锁住,会抛异常
    queue.put('second', block = False)
    queue.put('third', block = False)
    queue.put_nowait('fourth') # 等价queue.put('fourth', block = False)
    queue.put('fourth',timeout = 3) # 默认等待3秒钟,指定超时时间

    3 生产者、消费者
      应该具有两类模型,生产者和消费者
    3.1 基本版本的生产者消费者

    from multiprocessing import Process,Queue
    import time,os
    def producer(q,name):
    for i in range(5):
    time.sleep(0.5)
    res='%s%s' %(name,i)
    q.put(res)
    print('33[42m<%s> 制造 [%s]33[0m' %(os.getpid(),res))
    
    def consumer(q):
    while True:
    res=q.get()
    if res is None:break
    time.sleep(0.8)
    print('33[31m<%s> 购买 [%s]33[0m' % (os.getpid(), res))
    
    if __name__ == '__main__':
    queue = Queue()
    # 生产者
    producer1 = Process(target = producer, args = (queue, '自行车'))
    producer2 = Process(target = producer, args = (queue, '汽车'))
    producer3 = Process(target = producer, args = (queue, '飞机'))
    
    # 消费者
    consumer1 = Process(target = consumer, args = (queue,))
    consumer2 = Process(target = consumer, args = (queue,))
    
    producer1.start()
    producer2.start()
    producer3.start()
    consumer1.start()
    consumer2.start()
    
    producer1.join()
    producer2.join()
    producer3.join()
    queue.put(None) # 利用None通知消费者,东西已经生产完了
    queue.put(None) # 有几个消费者,就要通知几次

    3.2 JoinableQueue改进生产者消费者模型
      生产者等待消费者把队列的内容全部去空,就结束生产过程;消费等待主进程结束,也就随之结束
      主进程等待生产者结束,才执行剩余代码;需要消费者利用守护进程,随者主进程结束也就结束。
      逻辑性比较强,利用JoinableQueue和守护进程和join。

    from multiprocessing import Process,JoinableQueue
    import time,os
    def producer(queue, name):
    for i in range(5):
    time.sleep(0.5)
    res = '%s%s' %(name,i)
    queue.put(res)
    print('33[42m<%s> 制造 [%s]33[0m' %(os.getpid(),res))
    queue.join() # 生产者等待queue里面的所有内容都被卖掉了,就结束了
    
    def consumer(queue):
    while True:
    res = queue.get()
    if res is None:break
    time.sleep(0.8)
    print('33[31m<%s> 购买 [%s]33[0m' % (os.getpid(), res))
    queue.task_done() # 确定购买了生产者的一个内容,通知生产者
    # 通知queue,已经取走一个东西
    
    if __name__ == '__main__':
    queue = JoinableQueue()
    # 生产者
    producer1 = Process(target = producer, args = (queue, '自行车'))
    producer2 = Process(target = producer, args = (queue, '汽车'))
    producer3 = Process(target = producer, args = (queue, '飞机'))
    
    # 消费者
    consumer1 = Process(target = consumer, args = (queue,))
    consumer2 = Process(target = consumer, args = (queue,))
    
    consumer1.daemon = True # 消费者利用守护进程,随着主进程结束也就结束
    consumer2.daemon = True
    producer1.start()
    producer2.start()
    producer3.start()
    consumer1.start()
    consumer2.start()
    
    producer1.join()

    4 进程池
      默认开启进程池的个数,是cpu核数的个数
    4.1 同步提交任务
      同步提交任务,损失效率,保证数据有序和安全

    from multiprocessing import Pool
    import os,time
    
    def task(n):
    print('task <%s> is runing' %os.getpid())
    time.sleep(0.5)
    return n**3
    
    if __name__ == '__main__':
    print(os.cpu_count())
    p = Pool(4)
    for index in range(10):
    res = p.apply(task, args = (index,))
    print(res)

    4.2 异步提交任务
      等待进程池中的所有任务结束,就可以拿到运行结果
      p.close(),禁止向进程池中提交新任务

    from multiprocessing import Pool
    import os,time
    
    def task(n):
    print('task <%s> is runing' %os.getpid())
    time.sleep(0.5)
    return n**3
    
    if __name__ == '__main__':
    print(os.cpu_count())
    pool = Pool(4)
    res_list = []
    for index in range(10):
    res = pool.apply_async(task, args = (index,)) # 只负责向队列仍任务,不等任务结束
    res_list.append(res)
    
    for res in res_list:
    print(res.get())
    
    pool.close()
    pool.join()

    4.3 进程池控制并发
      利用进程池,控制并发的进程数量
    4.3.1 服务端

    from multiprocessing import Pool
    import os
    import socket
    
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind(('127.0.0.1',9999))
    server.listen(5)
    def work(conn, addr):
    print(os.getpid())
    print(addr)
    while True:
    try:
    data=conn.recv(1024)
    if not data:break
    conn.send(data.upper())
    except Exception:
    break
    conn.close()
    
    if __name__ == '__main__':
    pool = Pool(4)
    while True:
    conn,addr = server.accept()
    pool.apply_async(work, args = (conn, addr))
    server.close()

    4.3.2 客户端

    import socket
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(('127.0.0.1',9999))
    
    while True:
    info = input('>>').strip()
    if not info:continue
    client.send(info.encode('utf-8'))
    data = client.recv(1024)
    print(data.decode('utf-8'))
    
    client.close()

    4.4 回调函数
      让下载的函数进行并发,解析的函数进行串行执行(利用回调函数进行执行)
      耗时时间长的利用进程池进行并发处理,利用异步进行处理

    import requests # pip3 install requests
    import os,time
    from multiprocessing import Pool
    def get_info(url):
    print('<%s> get :%s' %(os.getpid(),url))
    respone = requests.get(url)
    if respone.status_code == 200:
    return {'url':url, 'text':respone.text}
    
    def parse_page(dic):
    print('<%s> parse :%s' %(os.getpid(),dic['url']))
    time.sleep(0.5)
    parse_res='url:<%s> size:[%s]
    ' %(dic['url'],len(dic['text'])) # 模拟解析网页内容
    with open('reptile.txt','a') as f:
    f.write(parse_res)
    
    if __name__ == '__main__':
    pool = Pool(4)
    urls = [
    'https://www.baidu.com',
    'https://www.python.org',
    'https://www.openstack.org',
    'https://help.github.com',
    'https://www.sina.com.cn',
    ]
    
    for url in urls:
    pool.apply_async(get_info, args = (url,), callback = parse_page)
    # 利用回调函数,通知主进程,调用parse_page,需要执行parse_page函数了
    # 把get_page执行结果,传递给parse_page作为参数进行传递
    
    pool.close()
    pool.join()

    5 paramike模块
    5.1 介绍
      paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作,值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实。

    5.2 基于密码连接

    import paramiko
    
    # 创建SSH对象
    ssh = paramiko.SSHClient()
    # 允许连接不在know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 连接服务器
    ssh.connect(hostname='127.0.0.1', port=22, username='root', password='xxx')
    
    # 执行命令
    stdin, stdout, stderr = ssh.exec_command('df')
    # 获取命令结果
    result = stdout.read()
    print(result.decode('utf-8'))
    # 关闭连接
    ssh.close()

    5.3 基于秘钥链接
      客户端文件名:id_rsa
      服务端必须有文件名:authorized_keys(在用ssh-keygen时,必须制作一个authorized_keys,可以用ssh-copy-id来制作)

    import paramiko
    
    private_key = paramiko.RSAKey.from_private_key_file('id_rsa')
    
    # 创建SSH对象
    ssh = paramiko.SSHClient()
    # 允许连接不在know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 连接服务器
    ssh.connect(hostname='127.0.0.1', port=22, username='root', pkey=private_key)
    
    # 执行命令
    stdin, stdout, stderr = ssh.exec_command('df -h')
    # 获取命令结果h
    result = stdout.read()
    print(result.decode('utf-8'))
    # 关闭连接
    ssh.close()
    import paramiko
    from io import StringIO
    key_str = """-----BEGIN RSA PRIVATE KEY-----
    MIIEpQIBAAKCAQEAl6OGU27q3YV1LMdAZH02PWFtPJaxQ68zbWIBmgP0NfnVQz30
    Bvhe+rZCFEQCwLoeld4jj7vA+Vk8nKPoBhX1WVvq9MfSF+YRXeu1+XAXvL1pH+Q9
    6gJ1l4NmwHq8DVgUCx3TsAdK0wOqEIUDSOu+2/nECK1lmKN1Cf5Va20r+B0CDjkT
    RRA6C9MIb2WWwf4EiEWjBl+lnZ0vDXoB38oZQR71tdLsvrVumUDzyGvF79jWW3O6
    9gc4F2ivoKVbXswyJfq1bVpH1N7aM6DzUiELv1mFsyIVveydMVzFIrDbUI8gCErx
    NhI6esUbhNwfd3NfBOTzEQxBd5V3/a6rkC5MpwIDAQABAoIBAQCD9cJHiRbKgAFg
    XmUjDfPNpqMxPtI0XJscbVWHejljX26/fYKHLk05ULJggG8E2PMU6KN5yaI9W/Lr
    PZgE88b3ZI4rRlkGgyhJ234Y+/ssPIjnP/DBXDKJD8izaBuOYT/QDLzTSwVKbL3q
    clZRdxY4yDpYcs0e7+BCOhqLyg2hdAYA8Z4VOOs4SQqRW6k9K+oXeNMhc4htozOf
    tVsSM3XkFZ4hpug34S89+FKEwZ00RTJEEkK8IjBfLJ5w+RfLFXth8hTVMbeLhcv+
    RqDYdUwq/JAXCrri0687hCwi5J06xTrY7BwgoKJznxlpz6tiyEPNrnqZ1vAayWqS
    G/x+R/cBAoGBAMj49kKmEpKZesbbSD9KJvD1YnkIGwEheLvNZzRtx+2cwejHHWDZ
    F0i+KzDTt+7QZ2zb+ABOgK1sq8Xhfn40M90xqixbqp0UzaFyMFnmiB2iyZ56I3Fi
    Kqeerr+f3i/djewNhMJZZZhO/n+YxhCpQUBotepQ3tGA/G6vvkSSMe73AoGBAMEo
    i9LaSDyJxk51mW8OmgiIyJ1376LKu3sHlUkn+Ca5dm2/iYNIuN5dC0YTPjo1A5It
    jZTid5VBEb4OMOpKYygR4S9euAxv22Uxib1xGZLdJHKblwizdJnBazsqDQW6mPfN
    o/BADQl/+h3pPpOWoIxSxi07xYq+gAToW2tc6TPRAoGBAKhGHRwtJbvuGqlKjjHA
    Ct8S94LT0JifyBGnqNRzX0WLTal0nxqqax6TbGKTw5yIjzDM9dh74q5TIXismFdf
    qlV48j31+uNPueWGUQnVRv9ZgGvbZLXZNlHnQfZdC5MUdXLC1vhMFg7zhZCdAKqO
    rX4arsclM4xD7hlXuX580qZ9AoGALs5te43LnWfhdxfGM4Q9TT4gJxBuMGuiHMEM
    quqVloSwrw2P/BE+QxwW5Ec7eA1qrRx+x4pNYgyfiQeVUODvwED86WaxgMoGRzJG
    53IluVH/SApuAfzCj5OwMWkSOMYr1TiutkQ/JIMvj9n6gPcqNnbEcSefyew5x3aq
    2IxuMlECgYEAtVuORz9C7WnJIVW6HwNiS4OBs7becOgXDHAOw0hnu+3mxVm/NIkX
    yeGK7rP1KKbS4I3pbG+H0nWAQkfQtW6nQjU5nvoCTX8Yyk6ZNC0mhGNTqKRqpjI/
    eXe8nUib71izC6g6kJY66+BTg2SCBsHUAqAB7L4gvFHGt8sq46TQ3jw=
    -----END RSA PRIVATE KEY-----"""
    
    private_key = paramiko.RSAKey(file_obj=StringIO(key_str))
    transport = paramiko.Transport(('127.0.0.1', 22))
    transport.connect(username='root', pkey=private_key)
    
    ssh = paramiko.SSHClient()
    ssh._transport = transport
    
    stdin, stdout, stderr = ssh.exec_command('df')
    result = stdout.read()
    print(result.decode('utf-8'))
    transport.close()
    
    print(result)
    View Code
  • 相关阅读:
    JQuery有几种选择器?
    Java有哪些基本数据类型
    您认为做好测试用例设计工作的关键是什么?
    final关键字
    目前主要的测试用例设计方法是什么?
    举例说明同步和异步。
    maven配置多个镜像
    参数Parameters、变量Variables
    jsp文件导包
    动态横切
  • 原文地址:https://www.cnblogs.com/goodshipeng/p/7455078.html
Copyright © 2011-2022 走看看