zoukankan      html  css  js  c++  java
  • python网络编程-2

    1.理解相关概念

    #浅显理解下
    
    对比cpu与io的差距如:io从硬盘读取一条数据9ms ,cpu在9ms可以做450万次指令 
    
    cpu切换上下文的方式:1.遇到io操作切换cpu 2.cpu时间片分配
    
    操作系统的调度算法:多级反馈队列的调度算法(队列一的任务在时间片内为执行完会被下放到队列二,队列二中在时间片内未执行完毕下放到队列三)
    
    
    进程概念
        操作系统资源分配调度的最小单位,进程间数据资源都是隔离的!!,由(程序+数据集+进程控制块三部分组成),那么就是说进程就是分配系统资源,标识任务.
        进程间可以切换合理利用cpu ,进程的状态--记录--切换称上下文切换(资源开销大)
        一个程序启动就会开启一个进程
        每条线程可以利用多核
        
    线程概念
        cpu执行的最小单位!!
        线程为了降低上下文切换,提高系统并发,突破进程仅能做一件任务的傻缺操作,进程作为容器,给线程提供资源,多线程资源共享上下文切换减少
        线程健壮性差,如果一个线程挂了,整个进程就被牵连了
    遇到IO操作实现并发效果
        
    协程概念(就是一条线程)
        协程就是一条线程基础上实现切换多任务
        不能利用多核,切换频率高,在多io操作时非常优势
        
        
    进程的三种队列
        就绪队列:这里面的任务等待cpu分配时间片
        运行队列:这里面的任务正在使用cpu
        阻塞队列:这里面的任务遇到阻塞操作(如:input)
        
    同步(针对函数任务调用方式)
      当a事件中调用了b事件,a事件一定会等待b事件有了结果再继续执行
    
    异步(针对函数任务调度方式)
      当a事件调用了b事件,a事件不用等待b事件结果直接继续执行
    
    阻塞(针对进程线程)
      线程在执行过程遇到input sleep conn.recv等会阻塞
    
    非阻塞(针对进程线程)
      线程在执行过程遇到input sleep conn.recv也不会阻塞

    并行
      需要多核或者多cpu,计算机在同一时间节点执行处理多个计算任务

    并发
      计算机在同一节点,只能计算一个任务,但是cpu快速切换在多个线程间貌似实现同时计算


    2.非阻塞的socket ,可以同时处理多个client的连接 ,避免阻塞在accpet的位置(大量的循环recv ,cpu消耗很大)

    import socket
    
    sk = socket.socket()
    sk.setblocking(False)  # 关闭阻塞,但是会循环不停的要求connect连接
    sk.bind(('127.0.0.1', 8000))
    sk.listen(5)
    conn_l = []  # 将所有的管道对象放入列表
    while True:
        try:
            conn, addr = sk.accept()
            conn_l.append(conn)
        except BlockingIOError as e:  # 捕获10035错误直接处理 ,可以使用select来完美处理
            for conn in conn_l:  # 列表的内容在不断的循环
                try:
                    msg = conn.recv(1024)  # 不间断的从各个管道拿数据
                    print(msg)
                    conn.send(b'recieved')
                except BlockingIOError as e1:  # 再次捕获10035错误直接处理
                    pass
    ##################################
    import socket

    client = socket.socket()
    client.connect(('127.0.0.1', 8000))

    while 1:
    cRequest = input("send: ")
    client.send(cRequest.encode('utf-8'))
    if cRequest == "quit":
    client.close()
    break

    3.创建进程

      默认情况下主进程执行完毕会关闭 ,子进程继续执行不受影响 ,程序继续运行, 子进程不能有交互终端命令

      僵尸进程-孤儿进程:父进程创建子进程来不及处理 ,就会产生出大量等待父进程处理的子进程 ,当父进程被杀死或退出时 ,他的所有子进程成为孤儿进程由int接管进程回收

      Procee对象的一些方法

        子进程对象.start()方法启动子进程

        子进程对象.join()方法可以让主进程阻塞等待子进程执行完毕

        子进程对象.daemon =True 可以设置守护进程(守护主进程的子进程) ,当主进程代码执行完毕子进程立即停止 ,守护进程不能有子进程

        子进程对象.p.is_alive() 返回bool值判断子进程是否还在执行  

    import time
    import os
    
    
    from multiprocessing import Process
    
    
    def func():
        time.sleep(2)
        print("子进程ID", os.getpid())
        print("子进程的父进程ID", os.getppid())
    
    
    
    if __name__ == '__main__':      #windows下必须使用该判断 ,由于Windows没有fork的原因 ,启动新的py进程导入模块在Process()的时候会无线执行!!!
        p = Process(target=func, )
        # p.daemon=True
        p.start()
    
        print(p.is_alive())
        # p.join()
        print("父进程ID", os.getpid())
        print("父进程的父进程ID", os.getppid())

    4.创建线程

      一个进程必有一个主线程 ,主线程执行完代码会等待所有子线程执行完毕后退出

      多线程之间可以完成一个目标 ,遇io切换cpu给其他线程 ,易于在多IO使用

      threading常用方法

        子线程.start()启动子线程

        子线程.join()阻塞主线程 ,必须等到该子线程结束才能继续走主线程代码

        子线程.setDaemon(), 设置子线程是守护进程当主线程停止后子线程也会终止 ,正常情况主线程会等子线程

    import threading  # 引用线程模块
    import time
    
    def countnum(n):
        print('running on number{}'.format(n))
        time.sleep(n)
    
    start_time1 = time.time()
    countnum(2)  # 程序io阻塞cpu挂住 ,如果此时cpu还能继续工作最好 ,不跟io一起阻塞
    countnum(1)  # 程序io阻塞cpu挂住
    print('花费了{}'.format(time.time() - start_time1))  # cpu花费0.01s完成 ,io阻塞了很久导致后面代码也执行较慢!
    
    t1 = threading.Thread(target=countnum, args=(3,))  # 实例化线程对象,绑定执行的任务
    t2 = threading.Thread(target=countnum, args=(2,))
    
    t1.start()  # t1线程遇io切换t2
    
    print('1')
    t1.join()  # 阻塞主线程,必须等t1执行完成后在执行后面代码
    t2.start()  # t2线程遇io切换下面代码
    print('2')
    
    t3 = threading.Thread(target=countnum, args=(6,))
    t3.setDaemon(True)  # 设置t3为守护线程,当主线程终止的时候,t3这个子线程立即终止
    t3.start()
    print('end')  # 打印end但实际程序还在运行线程中的代码

    ######################################join()的操作######################################
    import time
    import threading

    def countnum(n):
    print("running is {}".format(n))
    time.sleep(n)
    print("{} is ending".format(n))

    start = time.time()

    thread_list = [] # 线程对象列表

    for i in range(1, 6): # 5个线程 并发执行
    t = threading.Thread(target=countnum, args=(i,))
    thread_list.append(t)
    t.start()

    # 上面开始执行t1.start()t2.start()t3.start()t4.start()t5.start()t6.start()
    # 下面开始执行t1.join() ,t1执行完执行t2.join()...
    for x in thread_list:
    x.join()
    # thread_list[-1].join()
    print("程序执行时间{}".format(time.time() - start))

    5.进程池与线程池

      concurrent.futures加载模块

        ProcessPoolExecutor    进程池(计算密集操作)

        ThreadPoolExecutor     线程池(io多操作)

        父进程定义好task函数 ,直接submit提交到池中 ,池中很多的任务 ,同一时间只有指定个数的进程或线程

    from concurrent.futures import ProcessPoolExecutor
    import time
    
    def task(name):
        print('name', name)
        time.sleep(1)
    
    if __name__ == '__main__':
        start = time.time()
        p1 = ProcessPoolExecutor(2)
        for i in range(5):
            p1.submit(task, i)
        p1.shutdown(wait=True)      # 主进程是否对池中子进程join() ,否的就注释
        print('主线程')
        end = time.time()
        print(end - start)
    ###########################################################################
    from concurrent.futures import ThreadPoolExecutor
    import time
    
    def task(name):
        print('name', name)
        time.sleep(1)
    
    if __name__ == '__main__':
        start = time.time()
        p1 = ThreadPoolExecutor(2)
        for i in range(5):
            p1.submit(task, i)
        p1.shutdown(wait=True)      # 主线程是否对池中子线程join() ,否的就注释
        print('主线程')
        end = time.time()
        print(end - start)

     6.锁

      GIL全局解释器锁

        Cpython的解释器给python的每个进程一把锁 ,一个进程下的同一时间仅有一个线程可以获得GIL锁 ,获得锁的线程才能使用cpu ,所以python多线程不能利用多核优势 ,GIL锁出现保证了进程内共享数据安全 ,因为线程数据共共享,当各个线程访问数据资源时会出现竞争状态,造成数据混乱 ,理解为给解释器加了把互斥锁 ,当该线程遇到io会自动解锁

    ####python的GIL锁情况下,多线程执行计算密集型任务比串行还慢
    import
    threading import time start_time = time.time() def task_js(): return 2 ** 70000000 def Test2(): for i in range(5): threading.Thread(target=task_js, args=(1,)).start() # Test2() # 多线程执行并发执行计算密集任务6秒多 task_js() # 串行计算计算密集任务仅5.7秒 task_js() task_js() task_js() task_js() end_time = time.time() print('执行了', end_time - start_time)

      同步锁-互斥锁

        同步锁就是协同操作的的锁 ,当A线程与B线程与C线程协同操作 ,规定先后顺序 ,因为可能会依赖产物 .如A线程开始操作前B线程需要先执行 ,最后执行C线程

        互斥锁当多个线程访问同一个共享资源 ,拥有互斥锁的线程才能使用 ,当释放锁之后其他的线程才能获得锁继续访问共享数据

        锁方法      

          R = threading.Lock()  #创建锁对象

          R.acquire()      #锁定改时间仅允许一个线程操作

          R.relases()      #解锁

    ##模拟不加互斥锁 ,全局变量被混乱调用
    import
    threading import time totle = 100 def task(): global totle time.sleep(0.00001) # 模拟程序可能会出现的io,cpu会切换大家得到的totle可能就不对劲了本来应该是0!! a = totle - 1 totle = a for i in range(1, 101): threading.Thread(target=task, args=()).start() print(totle)
    ##加入互斥锁############################################################
    import threading
    import time

    totle = 100
    R = threading.Lock() # 创建一个互斥锁


    def task():
    R.acquire() # 锁住操作全局变量
    global totle
    time.sleep(0.00001) # 模拟程序可能会出现的io,cpu会切换大家得到的totle可能就不对劲了本来应该是0!!
    a = totle
    totle = a - 1
    R.release() # 解锁,其他进程可以操作全局变量


    thread_list = [] # 用于join
    for i in range(1, 101):
    t = threading.Thread(target=task, args=())
    t.start()
    thread_list.append(t)

    for i in thread_list: # 等待所有的子进程执行完毕 ,要不然print(totle)没有意义
    i.join()
    print(totle)

    7.队列

      可以用于生产者消费者模型 ,生产者仅产生数据 ,消费者处理数据 ,二者通过缓冲区解耦 ,缓冲区使用队列 ,生产消费二者解耦

      队列常用方法

        q = queue.Queue()     #创建队列对象

        q.put()          #放入数据

        q.get()          #拿出一条数据 ,queue队列就是先进先出

        q.empty()          #判断队列是否为空

    import time, random
    import queue, threading
    
    q = queue.Queue()  # 创建一个队列
    
    
    def Producer(name):  # 定义生产者操作动作
        while 1:
            a = random.randrange(1, 5)
            q.put(a)
            print('本次生产数据{}'.format(a))
            time.sleep(random.randrange(3))
    
    
    def Consumer(name):  # 定义消费者操作动作
        while 1:
            if not q.empty():  # 如果队列不为空
                print('{}开始消费数据'.format(name))
                data = q.get()  # 取出队列中数据
                time.sleep(random.randrange(2))  # 模拟消费数据时间
                print('{}本次消费了数据{}'.format(name, data))
            else:
                print('队列暂时无数据')
                time.sleep(random.randrange(3))  # 队列没数据就等等生产数据
    
    
    p1 = threading.Thread(target=Producer, args=('zookerper',))  # 开启一个线程定义一个zk开始生产数据
    c1 = threading.Thread(target=Consumer, args=('kafka1',))  # 在开一个定义一个kafka开始消费数据
    c2 = threading.Thread(target=Consumer, args=('kafka2',))  # 再开一个定义一个kafka开始消费数据
    p1.start()  # 生产者产生的数据与消费者消费的数据的时间完全随机
    c1.start()
    c2.start()

    8.io模型(https://www.cnblogs.com/Eva-J/articles/8324837.html)  (https://blog.csdn.net/sehanlingfeng/article/details/78920423)

      梳理linux下的network的IO模型 ,当IO出现会涉及两个对象 ,1.调用IO的进程或线程 2.内核 .当一个read磁盘数据开始会进行两个阶段 1.等待数据准备 2.将数据从内核拷贝到进程中

      阻塞IO(使用多线程多进程池来减少IO的影响)

        实际上除非特别指定,几乎所有的IO接口都是阻塞 ,在执行recv(1024)线程被阻塞 ,在这个期间线程不计算和响应请求,被挂起来

      非阻塞IO

        进程需要不断的轮询询问kernel是否准备好数据 ,这个过程循环耗费大量cpu

      IO多路复用

        也称为异步阻塞IO ,建立在内核提供的多路分离函数select基础上

        使用select可以避免非阻塞IO中不断轮询询问内核的过程 ,使用select函数对线程创建的所有socket的IO添加监视, 如果数据准备好select就会通知用户线程去拷贝数据

        实际上每个IO请求在select函数上是阻塞的 ,相对于阻塞IO(recvfrom阻塞)+多线程而言 ,IO多路复用优势在于处理超高并发, 最大的优势是可以在一个用户线程内同时处理多个socket的IO请求

     9.socketserver模块

    https://www.cnblogs.com/eric_yi/p/7701381.html

    import socketserver
    """
    根据继承顺序查看
    实例化socket对象ThreadingTCPServer类 ,RequestHandlerClass = Handler
    socket对象执行了server_forever()方法
    最后跳来跳去执行了finish_request()方法 ,self.RequestHandlerClass(request, client_address, self)实例化自己写的类
    执行了BaseRequestHandler的init方法 ,self.handle()又执行了自定义的方法开始处理数据
    """
    
    class Handler(socketserver.BaseRequestHandler):  # 定义类,必须继承BaseRquestHandler方法!
        def handle(self):  # 必须有handler方法
            print('New connection:', self.client_address)
            while 1:
                data = self.request.recv(1024)
                if not data: break
                print('client data', data)
                self.request.send(data)
    
    
    if __name__ == '__main__':
        server = socketserver.ThreadingTCPServer(('127.0.0.1', 8009), Handler)  # 实例化socket对象绑定bind ,自定义的类
        server.serve_forever()  # 事件监听调用handler方法

    10.socketserver模块实现ftp ,先简单实现一下

      ftp的功能:1.上传文件 2.下载文件 

    import socketserver
    import os
    import json
    
    
    class FtpServer(socketserver.BaseRequestHandler):
        def handle(self):
            head_info = json.loads(self.request.recv(1024).decode('utf-8'))
            print(head_info)
            if hasattr(self, head_info['type']):
                func = getattr(self, head_info['type'])
                func(head_info)
            else:
                print('type?')
    
        def get(self, head_info):
            filename = head_info['filename']
    
            if os.path.isfile(filename):
                filezize = str(os.path.getsize(filename))
                print(filezize)
                self.request.send(filezize.encode('utf-8'))
                with open(filename, mode='rb') as f1:
                    for i in f1:
                        self.request.send(i)
    
        def put(self, head_info):
            filename = head_info['filename']
            file_size = head_info['filesize']
    
            recv_size = 0
            with open(filename, mode='wb') as f1:
                while recv_size < file_size:
                    print(file_size, recv_size)
                    data = self.request.recv(1024)
                    f1.write(data)
                    recv_size += len(data)
    
    
    if __name__ == '__main__':
        server = socketserver.ThreadingTCPServer(('127.0.0.1', 8000), FtpServer)
        server.serve_forever()
    
    ########################################################################################################################################################
    
    
    
    import socket
    import os
    import json
    
    
    class FtpClient():
        def __init__(self):
            self.Client = socket.socket()
    
        def connect(self, Ip_Port):
            self.connect = self.Client.connect(Ip_Port)
    
        def put(self, cmd_list):
            if os.path.isfile(cmd_list[1]):
                file_dict = {
                    'type': 'put',
                    'filename': cmd_list[1],
                    'filesize': os.path.getsize(cmd_list[1])
                }
    
                self.Client.send(json.dumps(file_dict).encode('utf-8'))
                with open(cmd_list[1], mode='rb') as f1:
                    for i in f1:
                        self.Client.send(i)
                print('上传完成')
    
            else:
                print('上传的不是文件')
    
        def get(self, cmd_list):
            filename = cmd_list[1]
            file_dict = {
                'type': 'get',
                'filename': filename,
            }
    
            self.Client.send(json.dumps(file_dict).encode('utf-8'))
    
            filesize = int(self.Client.recv(1024).decode('utf-8'))
    
            recv_size = 0
            print(filesize)
            with open(filename, mode='wb') as f1:
                while 1:
                    data = self.Client.recv(1024)
                    f1.write(data)
                    recv_size += len(data)
    
                    if recv_size == filesize: break
    
        def Interface_cmd(self):
            cmd = input('>>>')
            cmd_list = cmd.split()
            if hasattr(self, cmd_list[0]):
                func = getattr(self, cmd_list[0])
                func(cmd_list)
    
    
    c1 = FtpClient()
    c1.connect(('127.0.0.1', 8000))
    c1.Interface_cmd()

         

      

  • 相关阅读:
    论球迷和程序员
    山哥,你是怎么提高设计能力的?
    一个想休息的线程:JVM到底是怎么处理锁的?怎么不让我阻塞呢?
    由“面经”引发的思考
    99%的创业公司都不值得加入
    大牛是怎么炼成的?
    RMQ问题 与众不同 尚未攻克
    YbtOj例题:二叉堆3 龙珠游戏
    离散化模板
    YbtOJ练习:广搜 3 追捕小狗
  • 原文地址:https://www.cnblogs.com/quguanwen/p/11227017.html
Copyright © 2011-2022 走看看