zoukankan      html  css  js  c++  java
  • 第十章:Python の 网络编程基础(二)

    本課主題

    • Python中的作用域补充
    • socketserver 源码
    • 线程的介绍和操作实战
    • 进程的介绍和操作实战
    • 协程的介绍和操作实战
    • 本周作业

    Python中的作用域补充

    1. Python世界里沒有块级作用域的在 Java/C+ 世界里没法打印 name。
      # 在 Java/C+ 世界里没法打印 name
      # 但在 Python/ JavaScript 世界里可以打印 name
      >>> if 1 == 1:
      ...     name = 'Janice'
      ... 
      >>> print(name)
      Janice
      Python中无块级作用域(小知识点一)
      >>> for i in range(10):
      ...    name = i
      ... 
      >>> print(name)
      9
      Python中无块级作用域(小知识点二)
    2. 但在 Python/ JavaScript 世界里可以打印 name。在 Python 中是以函数作为作用域。
      >>> del name
      >>> def func():
      ...     name = 'Janice'
      ... 
      >>> print(name)
      Traceback (most recent call last):
        File "<stdin>", line 1, in <module>
      NameError: name 'name' is not defined
      Python 中是以函数作为作用域(小知识点三)
    3. Python是有作用域链条,对于作用域来说,在函数没有执行之前,它的作用域已经确定啦,作用域链也已经确定啦
      >>> name = 'alex'
      
      # 这是 f1()是作用域,它是一个内部的作用域
      >>> def f1():
      ...     print(name)
      ... 
      
      # 这是 f2()是作用域
      >>> def f2():
      ...     name = 'eric'
      ...     f1()
      ... 
      
      >>> f2()
      alex
      Python是有作用域链条(小知识点四)
      >>> name = 'alex'
      
      # 这是 f1()是作用域,它是一个内部的作用域
      >>> def f1():
      ...     print(name)
      ... 
      
      # 这是 f2()是作用域
      >>> def f2():
      ...     name = 'eric'
      ...     return f1
      ... 
      
      >>> ret = f2()
      >>> ret() # 这相当于运行 f1()
      alex
      Python是有作用域链条(小知识点五)
    4. For 循环,然后把每一个元素都加1,最后生成一个列表
      # 它会执行一个 For 循环,然后把每一个元素都加1,最后生成一个列表
      
      li = [x+1 for x in range(10) if x > 6]
      print(li)
      Python lambda + for 循环(小知识点六)
      >>> li2 = [lambda :x for x in range(10)]
      >>> ret = li2[0]()
      >>> print(ret)
      9
      Python lambda + for 循环(小知识点七)
      >>> li = []
      >>> for i in range(10):
      ...     def f1(x=i):
      ...        return x
      ...     
      ...     li.append(f1)
      ... 
      >>> print(li[0]())
      0
      >>> print(li[1]())
      1
      >>> print(li[2]())
      2
      Python lambda + for 循环(小知识点八)

        

    socketserver 源码

    图片来源在此

    IO操作就是输入输出,其实它不会一直占用 CPU 的,这个IO多路复用目的是在管理IO操作,用来监听socket 对象的内部是否变化了,如果有一个机制可以同时监听多个客户端的连接,就可以实现接受多连接,IO多路复用主要是用 select, poll 和 epoll 来实现这个功能。

    Socket内部什么时候会有变化?

    当连接或者是收发消息的时候,socket 内部会产生变化,当客户端连接服务器端时,服务器端接收的一刻 e.g. conn, addrs = sk.accept( )。它的 socket 对象就会产生变化,如果服务器端的socket对象发生变化,代表有新连接进来了,然后会创建一个新的 socket 对象。

    select 模块 

    调用 select 模块中的方法来实现IO多路复用

    rlist, wlist, e = select.select(inputs,outputs,[],1)
    

    xxxxxx

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # Author: Janice Cheng
    
    import socket
    import select
    
    #select 中有一个功能就是用来监听socket 对象的内部是否变化了
    
    sk = socket.socket()
    sk.bind(("127.0.0.1",8088,))
    sk.listen(5)
    
    # 这个 input 可以是 sk,或者是接受 message
    inputs = [sk,]
    outputs = []
    message = {}
    #message = {
    #   Janice: [message1, message2]
    #}
    
    while True:
        # 监听 sk(服务器端)对象,如果sk发生变化,表示有客户端来连接了,此时 rlist值为[sk]
        # 监听 conn 对象,如果conn发生变化,表示客户端有新消息发送过来了,此时 rlist值为[客户端]
    
        # 第一个参数: select会监听sk,判断是否有新连接,有的话就会新增到 rlist,它获取的就是 socket 的对象列表
        # 第二个参数: wlist 有所有给我发过消息的人
        # 第三个参数: 是一个错误列表
        # 第四个参数: 是超时时间
    
        rlist, wlist, elist = select.select(inputs,outputs,[],1)
    
        print(len(inputs),len(rlist), len(wlist), len(outputs))
    
        for r in rlist:
    
            if r == sk: # 因为只有 sk 才有 sk.accept()方法
                # 新客户端来连接
                conn, addr = r.accept() # 接受一个客户端的连接
                #conn是什么? 其实也是一个 socket 对象
                inputs.append(conn) #添加到 inputs 那个列表中 [sk,sk1]
                message[conn] = []
                conn.sendall(bytes('Hello client-side', encoding='utf-8'))
            else:
                # 接受消息
                # 不是 sk 而且能加入 rlist,就表示有人给我发消息
                print("=========")
    
                try:
                    ret = r.recv(1024)
                    # r.sendall(ret)
                    if not ret:
                        raise Exception("断开连接")
                    else:
                        outputs.append(r)
                        message[r].append(ret)
    
                except Exception as e:
    
                    inputs.remove(r)
                    del message[r]
    
    
        # wlist 有所有给我发过消息的人
        for w in wlist:
            msg = message[w].pop()
            resp = bytes("response: ",encoding = 'utf-8') + msg
    
            w.sendall(resp)
            outputs.remove(w)
    
    
    sk.close()
    IO多路复用(服务器端)
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # Author: Janice Cheng
    
    
    import socket
    
    sk=socket.socket()
    sk.connect(("127.0.0.1",8088,))
    
    data = sk.recv(1024)
    print(data.decode())
    
    while True:
    
        inp = input(">>> ")
    
        if inp == 'q': break
    
        sk.sendall(bytes(inp,encoding='utf-8'))
        print(sk.recv(1024))
    
    sk.close()
    IO多路复用(客户端)

    用到了IO多路复用:监听 socket 内部是否变化,在它连接 conn.accept( ) 或者是收发消息 conn.sendall( )/ conn.recv(1024) 的时候,内部会产生变化

    多线程、多进程、协程

    rlist 中获取的就是 socket的对象列表

    线程的介绍和操作实战

    什么是多线程

    线程就是程序执行时的基本单位,我们平常写的一些脚本一般都是单线程单进程的应用程序,一个应用程序其实可以创建多条线程,以達到提高程序运行的并发度,就可以有更高的效率。在 Python 世界裡有一個叫全区解释器锁 GIL,如果你要占用 CPU 的话,默应每次只能用一个线程去处理。

    什么情况下用多线程,和多进程会发挥最大的效果?

    一个应用程序可以有多线程和多进程,目的在于让CPU 能充份地运用,在Python 里有一个叫GIL,全区解释器锁,如果不用CPU 的话,在Python 就可以实现并发执行,因为IO 操作不占用CPU,一般用多线程;对于计算性操作一些需要占用CPU的,一般会使用多进程来提高并发 

    创建线程有两种方法:

    • 创建 threading.Thread( ) 方法
      t=threading.Thread(target=f1, args=(123,))
      t.start()
      创建线程(方法一)
    • 自定义 MyThread,继承者threading.Thread( )
      class MyThread(threading.Thread):
      
          def __init__(self, target, args):
              self.target = target
              self.args = args
              super(MyThread,self).__init__()
      
          def run(self):
              self.target(self.args)
      
      
      def f2(args):
          print(args)
      
      
      obj = MyThread(target=f2, args=(123,))
      obj.start()
      创建线程(方法二)

    线程其他方法:

    创建了 t = threading.Thread( )对象之后,可以使用一些方法根据你的逻辑,设计线程的调度。

    1. t.setDaemon( ):
    2. t.join(n):
    import time
    
    
    def f1():
        time.sleep(2)
        print('f1')
    
    
    import threading
    t = threading.Thread(target=f1) # 创建子线程
    t.setDaemon(True) # True 表示主线程不等子线程,直接运行主线程的程序完毕就终止
    t.start()
    
    t.join() # 它会先等子线程执行完毕,再运下它下面的代码,表不主线程到此等待,直到子线程执行完毕
    t.join(2) # 参数表主线程在此最多等待n秒
    
    print('end')
    线程(其他小知识)

    线程锁

    什么是线程锁,线程锁就是锁定程序,当它被处理的时候,去确保只有一个线程在运理程序,这是用来确保数据一致性。有什麼作用呢?

    import threading
    import time
    
    NUM = 10
    
    def func():
        global NUM
        NUM -= 1
    
        time.sleep(1)
        print(NUM)
    
    for i in range(10):
        t = threading.Thread(target=func)
        t.start()
    没有线程锁的程序

    线程锁有以下几种:

    1. threading.Lock( ): 同一时刻只有一个线程来操作,它只能有一把锁。
      import threading
      import time
      
      NUM = 10
      
      def func(l):
          global NUM
          l.acquire() # 上锁
          NUM -= 1
          time.sleep(1)
          print(NUM)
          l.release() # 开锁
      
      lock = threading.Lock() # 只能锁一次
      
      for i in range(10):
          t = threading.Thread(target=func, args=(lock,))
          t.start()
      线程锁 threading.Lock()
    2. threading.RLock( ):可以允许多层锁的嵌套。
      import threading
      import time
      
      NUM = 10
      
      def func(l):
          global NUM
          l.acquire() # 上锁
          NUM -= 1
          l.acquire()  # 上锁
      
          l.release() # 开锁
          time.sleep(1)
          print(NUM)
          l.release() # 开锁
      
      lock = threading.RLock() # 多层锁的嵌套
      
      for i in range(10):
          t = threading.Thread(target=func, args=(lock,))
          t.start()
      线程锁 threading.RLock()
    3. threading.BoundedSemaphore(n):信号量,允许一定数目(n)的线程同时执行
      import threading
      import time
      
      NUM = 10
      
      def func(i, l):
          global NUM
          l.acquire() # 上锁
          NUM -= 1
          time.sleep(1)
          print(NUM, i)
          l.release() # 开锁
      
      lock = threading.BoundedSemaphore(2) # 每次只放 X 個
      
      for i in range(10):
          t = threading.Thread(target=func, args=(i, lock,))
          t.start()
      线程锁 threading.BoundedSemaphore(n)
    4. threading.Event( ):事件,批量将所有线程都挡住,这里要注意3个方法:event.wait( )、event.clear( ) 和 event.set( )
      event.wait( ) #检查当前是什么灯,默应是红灯
      event.clear() #主动设置成红灯
      event.set() #主动设置成绿灯
      import threading
      
      def func(i,e):
          print(i)
          e.wait() # 检查当前是什么颜色的灯: 如果是红灯,停; 如果是绿灯,行。默应Flag是False 表示是 红灯
          print(i + 100)
      
      event = threading.Event()
      
      for i in range(10):
          t = threading.Thread(target=func, args=(i,event))
          t.start()
      
      
      event.clear() # 设置成红灯
      
      inp = input(">>> ")
      if inp == '1':
          event.set() # 设置成绿灯
      线程锁 threading.Event()
    5. threading.Condition( ):满足自定义条件后,可以放出一条线程。
      import threading
      
      def func(i, con):
          print(i)
          con.acquire()  #上条件锁
          con.wait()
          print(i+100)
          con.release() #开锁
      
      c = threading.Condition()
      
      for i in range(10):
          t = threading.Thread(target=func, args=(i,c))
          t.start()
      
      while True:
          inp = input(">>> ")
      
          if inp == 'q':
              break
          else:
              c.acquire() #上锁
              c.notify(int(inp)) # 放出多少数据
              c.release() #开锁
      线程锁 threading.Condition( )方法一
      import threading
      
      
      def condition():
          ret = False
          r = input(">>> ")
      
          if r:
              ret = 'True'
          else:
              ret = 'False'
      
          return ret
      
      
      def func(i, con):
          print(i)
          con.acquire()  #上锁
          con.wait_for(condition)
          print(i+100)
          con.release() #开锁
      
      c = threading.Condition()
      
      
      for i in range(10):
          t = threading.Thread(target=func, args=(i,c))
          t.start()
      线程锁 threading.Condition( )方法二
    6. threading.Timer( )
      import threading
      
      def hello():
          print("hello world")
      
      t = threading.Timer(1, hello)
      t.start() # 一秒之后 hello world 就会打印出来
      线程锁 threading.Timer( ) 

    自定义线程池

    线程其实不是愈多愈好,必需跟据系统的 CPU 的个数来定的。线程池的概念是需要维护一个池,可以允许特定人数的人来连接,如果已经到达线程池的上限的话,其他的运接就必需等待着,等到有空闲的线程才可以连接,就像排队一样。

    什么是线程池,又有什么用呢?

    自定义线程池有以下几个元素:

    • 一个容器
    • 取一个少一个
    • 无线程时必须等待
    • 线程执行完毕,交还线程
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # Author: Janice Cheng
    
    import queue
    import threading
    import time
    
    
    class ThreadPool:
    
        def __init__(self, maxsize):
            self.maxsize = maxsize
            self._q = queue.Queue(maxsize)
    
            for i in range(maxsize):
                self._q.put(threading.Thread) # 添加threading.Thread的类到消息对列中
    
        def get_thread(self):
            return self._q.get() # 获取一个类
    
        def add_thread(self):
            self._q.put(threading.Thread) # 新增一个threading.Thread
    
    pool = ThreadPool(5)
    # 添加5个 threading.Thread 的类
    # [threading.Thread,threading.Thread,threading.Thread,threading.Thread,threading.Thread]
    
    def task(arg,p):
        print(arg)
        time.sleep(1)
        p.add_thread()
    
    # 假设有100个任务
    for i in range(100):
        # threading.Thread 类
        t = pool.get_thread()
    
        obj = t(target=task, args=(i,pool,)) #threading.Thread(target=func, args=(i,c))
        obj.start()
    自定义线程池(低级版本)
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import queue
    import threading
    import contextlib
    import time
    
    StopEvent = object()
    
    
    class ThreadPool(object):
    
        def __init__(self, max_num, max_task_num = None):
            if max_task_num:
                self.q = queue.Queue(max_task_num)
            else:
                self.q = queue.Queue()
            self.max_num = max_num
            self.cancel = False
            self.terminal = False
            self.generate_list = []
            self.free_list = []
    
        def run(self, func, args, callback=None):
            """
            线程池执行一个任务
            :param func: 任务函数
            :param args: 任务函数所需参数
            :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
            :return: 如果线程池已经终止,则返回True否则None
            """
            if self.cancel:
                return
            if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
                self.generate_thread()
            w = (func, args, callback,)
            self.q.put(w)
    
        def generate_thread(self):
            """
            创建一个线程
            """
            t = threading.Thread(target=self.call)
            t.start()
    
        def call(self):
            """
            循环去获取任务函数并执行任务函数
            """
            current_thread = threading.currentThread()
            self.generate_list.append(current_thread)
    
            event = self.q.get()
            while event != StopEvent:
    
                func, arguments, callback = event
                try:
                    result = func(*arguments)
                    success = True
                except Exception as e:
                    success = False
                    result = None
    
                if callback is not None:
                    try:
                        callback(success, result)
                    except Exception as e:
                        pass
    
                with self.worker_state(self.free_list, current_thread):
                    if self.terminal:
                        event = StopEvent
                    else:
                        event = self.q.get()
            else:
    
                self.generate_list.remove(current_thread)
    
        def close(self):
            """
            执行完所有的任务后,所有线程停止
            """
            self.cancel = True
            full_size = len(self.generate_list)
            while full_size:
                self.q.put(StopEvent)
                full_size -= 1
    
        def terminate(self):
            """
            无论是否还有任务,终止线程
            """
            self.terminal = True
    
            while self.generate_list:
                self.q.put(StopEvent)
    
            self.q.queue.clear()
    
        @contextlib.contextmanager
        def worker_state(self, state_list, worker_thread):
            """
            用于记录线程中正在等待的线程数
            """
            state_list.append(worker_thread)
            try:
                yield
            finally:
                state_list.remove(worker_thread)
    
    
    
    # How to use
    
    
    pool = ThreadPool(5)
    
    def callback(status, result):
        # status, execute action status
        # result, execute action return value
        pass
    
    
    def action(i):
        print(i)
    
    for i in range(30):
        ret = pool.run(action, (i,), callback)
    
    time.sleep(5)
    print(len(pool.generate_list), len(pool.free_list))
    print(len(pool.generate_list), len(pool.free_list))
    # pool.close()
    # pool.terminate()
    自定义线程池(武Sir的高级版本)

      

    进程的介绍和操作实战

    GIL 的存在使得 Python 中的多线程无法充分利用多核的优势来提高性能,因而提出了多进程来解决这个问题

    每个进程里都有自己的内存空间,而且数据默应是不会共享的

    基本使用

    进程锁

    进程锁跟线程锁也是一样的

    from multiprocessing import Process
    from multiprocessing import Array
    from multiprocessing import RLock,Lock,Event,Condition
    
    import time
    
    def foo(i,lis, lc):
        lc.acquire()
        lis[0] = lis[0] - 1
        time.sleep(1)
        print('say hi',lis[0])
        lc.release()
    
    if __name__=='__main__':
    
        li = Array('i',10)
        li[0] = 10
        lock = RLock()
        for i in range(10):
            p = Process(target=foo, args=(i,li,lock,))
            p.start()
    进程锁

    默应数据不共享

    如何让进程之间的数据可以共享?

    1. 对列的方式:queues.Queue
    2. 数组的方式:Array
      from multiprocessing import Process
      from multiprocessing import queues
      import multiprocessing
      from multiprocessing import Array
      
      
      def foo(i,arg):
          arg[i] = i + 100
          for item in arg:
              print(item)
          print("=============")
      
      if __name__=='__main__':
      
          li = Array('i',10)
      
          for i in range(10):
              p = Process(target=foo, args=(i,li,))
              #p.daemon=True
              p.start()
              #p.join()
      数组的方式
    3. 创建对象的方式:Manager( ) 

    进程池

    线程的生命周期分为 5 个状态:创建、就绪、运行、阻塞和终止、自线程创建到终止,线程便不断在就绪、运行和阻塞这三个状态之间转换直至销毁。而真正占有 CPU 的只有创建、运行和销毁这3个状态。一个线程的运行时间可以分为3部分

    • 线程的启动时间 (Ts)
    • 线程体的运行的时间 (Tr)
    • 线程的销毁时间 (Td)

    在多线程处理的场境下,如果线程不能被重用,就意味著每次创建都需要经过启动、运行和销毁这3个过程,这必然会增加系统的相应时间,降底效率。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # Author: Janice Cheng
    
    from multiprocessing import Pool
    import time
    
    
    def f1(arg):
        time.sleep(1)
        print(arg)
    
    
    if __name__=='__main__':
        pool = Pool(5) # 最多有5个进程的进程池
    
        for i in range(30): # 创建30个任务
            # pool.apply(func=f1, args=(i,)) # 到进程池拿一个进程来进行穿形的操作处理数据
            pool.apply_async(func=f1, args=(i,))
    
    
        # pool.close() # 必须等待所有任务执行完毕才会终止程序。
        time.sleep(1)
        pool.terminate() # 立即终止程序,程序一遇到 terminate()就会立即终止程序。
        # pool.join() # 状态必须是CLOSE, TERMINATE,如果不是就会报错。assert self._state in (CLOSE, TERMINATE)
    
        print('end')
    进程池 Pool

    协程的介绍和操作实战

    协程的原理是利用一个线程,分解一个线程成为多个微线程,这是程序级别做的,它更适合于 IO操作。

    1. greenlet
      from greenlet import greenlet
      # 通过 greenlet 可以控制一下线程,让它先执行一个再执行下一个,
      
      def test1():
          print('------12-------')
          gr2.switch() # 转换并执行一下对象 gr2
          print('------34-------')
          gr2.switch() # 转换并执行一下对象 gr2
      
      
      def test2():
          print('------56-------')
          gr1.switch() # 转换并执行一下对象 gr1
          print('------78-------')
      
      
      
      gr1=greenlet(test1)
      gr2=greenlet(test2)
      
      gr1.switch() # 转换并执行一下对象 gr1
      
      """
      ------12-------
      ------56-------
      ------34-------
      ------78-------
      """
      greenlet例子
    2. gevent
      import gevent
      
      def foo():
          print("Running foo")
          gevent.sleep(0)
          print("Explicit context switch to foo again!")
      
      
      def bar():
          print("Running bar")
          gevent.sleep(0)
          print("Explicit context switch back to bar")
      
      gevent.joinall([
          gevent.spawn(foo),
          gevent.spawn(bar)
      ])
      
      """
      Running foo
      Running bar
      Explicit context switch to foo again!
      Explicit context switch back to bar
      """
      gevent例子
    3. xxxxx

    本周作业

    day10作业

    參考資料

    [1] 银角大王:Python之路【第六篇】:socket

    [2] 银角大王:Python之路【第七篇】:线程、进程和协程

    [3] 银角大王:Python之路 线程池参考例子

    [4] 金角大王:

    [5] Python的GIL是什么鬼,多线程性能究竟如何

    [6] Parallelism in one line

  • 相关阅读:
    Codeforces714C【映射】
    Codeforces712C【贪心】
    Codeforces712B【= =】
    lightoj1259 【素数预处理】
    Codeforces482B【线段树构造】
    51nod 1348【next_permutation】
    hdoj5289【RMQ+二分】【未完待续】
    hdoj5875【二分+RMQ】
    RMQ算法
    AtCoder Regular Contest 061 DSnuke's Coloring
  • 原文地址:https://www.cnblogs.com/jcchoiling/p/5937538.html
Copyright © 2011-2022 走看看