zoukankan      html  css  js  c++  java
  • python开发【第4篇】【进程、线程、协程】

    一、进程与线程概述:

    1. 进程,是并发执行的程序在执行过程中分配和管理资源的基本单位,每一个进程都有一个自己的地址空 间。
    2. 线程,是进程的一部分,一个没有线程的进程可以被看作是单线程的。线程有时又被称为轻权进程或轻量级进程,也是 CPU 调度的一个基本单位。
    3. 联系:
      • 进程拥有一个完整的虚拟地址空间,不依赖于线程而独立存在;
      • 线程是进程的一部分,没有自己的地址空间,与进程内的其他线程一起共享分配给该进程的所有资源

      4.区别:

      • 每个独立的线程有一个程序运行的入口、顺序执行序列和程序的出口。线程不能够立执行,必须依存在应用程序中,由应用程序提供多个线程执行控制。 
      • 进程就是一个应用程序在处理机上的一次执行过程,它是一个动态的概念,而线程是进程中的一部分,进程包含多个线程在运行。

      5. 线程的执行特性:

      • 线程只有 3 个基本状态:就绪,执行,阻塞。
      • 线程存在 5 种基本操作来切换线程的状态:派生,阻塞,激活,调度,结束。

      6. 进程通信:

      • 单机系统中进程通信有 4 种形式:主从式,会话式,消息或邮箱机制,共享存储区方式。
      • 主从式典型例子:终端控制进程和终端进程。
      • 会话式典型例子:用户进程与磁盘管理进程之间的通信。

       7.多进程和多线程:

        为何需要多进程(或者多线程),为何需要并发?

        多线程/进程,就像一个快餐点的服务员,既要在前台接待客户点 餐,又要接电话送外卖,没有分身术肯定会忙得你焦头烂额的。

        多进程/线程技术是这么一种技术,让你可以像孙悟空一样分身,灵魂出窍,乐哉乐哉地轻松应付一切状 况。

        并发技术,就是可以让你在同一时间同时执行多条任务的技术。你的代码将不仅仅是从上到下,从左到右这样规规矩矩的一条线执行。

        你可以一条线在main函数里跟你的客户交流,另一条线,你早就把你外卖送到了其他客户的手里。

     二、Python-线程

      1.Threading模块                                                                

    用于提供线程相关的操作,线程是应用程序中工作的最小单元。

    “““
    创建了10个“前台”线程,然后控制器就交给了CPU,CPU根据指定算法进行调度,分片执行指令
    ”””
    
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import threading
    import time
      
    def show(arg):
        time.sleep(1)
        print 'thread'+str(arg)
      
    for i in range(10):
        t = threading.Thread(target=show, args=(i,))
        t.start()
      
    print 'main thread stop'

    更多Threading模块方法:

    • start                  线程准备就绪,等待CPU调度
    • setName                为线程设置名称
    • getName                获取线程名称
    • setDaemon            设置为后台线程或前台线程(默认)
                                      如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止
                                           如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
    • join                        逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
    • run                        线程被cpu调度后自动执行线程对象的run方法
    线程自定义类:
    import threading
    import time
     
    class MyThread(threading.Thread):
        def __init__(self,num):
            threading.Thread.__init__(self)
            self.num = num
     
        def run(self):#定义每个线程要运行的函数
     
            print("running on number:%s" %self.num)
     
            time.sleep(3)
     
    if __name__ == '__main__':
     
        t1 = MyThread(1)
        t2 = MyThread(2)
        t1.start()
        t2.start()

      2.线程锁(Lock、RLock)                                                          

    由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,当多个线程同时修改同一条数据时可能会出现脏数据,所以,出现了线程锁 - 同一时刻允许一个线程执行操作。

    • Lock对象【acquire、release方法】若1个线程连续2次进行acquire操作,那么忧郁第1次acquire后未release,第2次acquire将挂起线程,会导致Lock对象一直不会release,导致线程死
    • RLock对象【acquire、release方法】允许1个线程多次对其进行acquire操作(原因:内部通过1个counter变量维护线程acquire的次数),且每1次acquire操作必须有1个release操作与之对应,在所有的release操作完成后,别的线程才能申请该RLock对象
      #!/usr/bin/env python
      #coding:utf-8
         
      import threading
      import time
         
      gl_num = 0
         
      lock = threading.RLock()
         
      def Func():
          lock.acquire()
          global gl_num
          gl_num +=1
          time.sleep(1)
          print gl_num
          lock.release()
             
      for i in range(10):
          t = threading.Thread(target=Func)
          t.start()

      3.互斥锁【信号量:Semaphore】                                                         

    互斥锁 同时只允许一个线程更改数据,而Semaphore是同时允许一定数量的线程更改数据 ,比如厕所有3个坑,那最多只允许3个人上厕所,后面的人只能等里面有人出来了才能再进去。

    import threading,time
     
    def run(n):
        semaphore.acquire()
        time.sleep(1)
        print("run the thread: %s" %n)
        semaphore.release()
     
    if __name__ == '__main__':
     
        num= 0
        semaphore  = threading.BoundedSemaphore(5) #最多允许5个线程同时运行
        for i in range(20):
            t = threading.Thread(target=run,args=(i,))
            t.start()

      4.事件【event】                                                                

    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,

    如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

    • clear  将“Flag”设置为False
    • set          将“Flag”设置为True
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
     
    import threading
     
    def do(event):
        print 'start'
        event.wait()
        print 'execute'
      
    event_obj = threading.Event()
    for i in range(10):
        t = threading.Thread(target=do, args=(event_obj,))
        t.start()
     
    event_obj.clear()
    inp = raw_input('input:')
    if inp == 'true':
        event_obj.set()

      5.条件(Condition)                                                              

    使得线程等待,只有满足某条件时,才释放n个线程

    import threading
     
    def run(n):
        con.acquire()
        con.wait()
        print("run the thread: %s" %n)
        con.release()
     
    if __name__ == '__main__':
     
        con = threading.Condition()
        for i in range(10):
            t = threading.Thread(target=run, args=(i,))
            t.start()
     
        while True:
            inp = input('>>>')
            if inp == 'q':
                break
            con.acquire()
            con.notify(int(inp))
            con.release()

      6.定时器【Timer】                                                              

    定时器,指定n秒后执行某操作

    from threading import Timer
    
    def hello():
        print("hello, world")
     
    t = Timer(1, hello)
    t.start()  # after 1 seconds, "hello, world" will be printed

    三、Python-进程

      1.进程创建                                                                  

    from multiprocessing import Process
    import threading
    import time
      
    def foo(i):
        print 'say hi',i
      
    for i in range(10):
        p = Process(target=foo,args=(i,))
        p.start()

      2.进程数据共享                                                                

     进程各自持有一份数据,默认无法共享数据

    • 进程间默认无法数据共享
    #!/usr/bin/env python
    #coding:utf-8
     
    from multiprocessing import Process
    from multiprocessing import Manager
     
    import time
     
    li = []
     
    def foo(i):
        li.append(i)
        print 'say hi',li
      
    for i in range(10):
        p = Process(target=foo,args=(i,))
        p.start()
         
    print 'ending',li
    • 进程间数据共享实现方法
    #方法一,Array
    from multiprocessing import Process,Array
    temp = Array('i', [11,22,33,44])
     
    def Foo(i):
        temp[i] = 100+i
        for item in temp:
            print i,'----->',item
     
    for i in range(2):
        p = Process(target=Foo,args=(i,))
        p.start()
     
    #方法二:manage.dict()共享数据
    from multiprocessing import Process,Manager
     
    manage = Manager()
    dic = manage.dict()
     
    def Foo(i):
        dic[i] = 100+i
        print dic.values()
     
    for i in range(2):
        p = Process(target=Foo,args=(i,))
        p.start()
        p.join()
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from multiprocessing import Process, Array, RLock
    
    def Foo(lock,temp,i):
        """
        将第0个数加100
        """
        lock.acquire()
        temp[0] = 100+i
        for item in temp:
            print i,'----->',item
        lock.release()
    
    lock = RLock()
    temp = Array('i', [11, 22, 33, 44])
    
    for i in range(20):
        p = Process(target=Foo,args=(lock,temp,i,))
        p.start()
    进程锁实例:

      4.进程池【Pool】                                                              

    进程池内部维护一个进程序列,当使用时,则去进程池中获取一个进程,如果进程池序列中没有可供使用的进进程,那么程序就会等待,直到进程池中有可用进程为止。

    进程池中有两个方法:

    • apply
    • apply_async
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from  multiprocessing import Process,Pool
    import time
      
    def Foo(i):
        time.sleep(2)
        return i+100
      
    def Bar(arg):
        print arg
      
    pool = Pool(5)
    #print pool.apply(Foo,(1,))
    #print pool.apply_async(func =Foo, args=(1,)).get()
      
    for i in range(10):
        pool.apply_async(func=Foo, args=(i,),callback=Bar)
      
    print 'end'
    pool.close()
    pool.join()#进程池中进程执行完毕后再关闭,如果注释,那么程序直接关闭。

      5.进程间通信                                                                

      ①.Queue:可在多个进程间的数据传递(Put和Get两种方法)                                            

    • Put方法:插入数据到队列中(blocked,timeoutl两个可选参数,如果blocked为True(默认值),并且timeout为正值,
      • 该方法会阻塞timeout指定时间,直至队列有剩余空间,如果超时,会抛出Queue.Full异常,
      • 如果blocked为False,但Queue已满,会立即抛出Queue.Full异常
    • Get方法:从队列读取并删除一个元素(blcoked,timeout两个可选参数,如果blocked为True(默认值)并且timeout为正值
      • name在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,则分为两种情况:
      • 如果Queue有一个值可用,则立即返回该值;
      • 否则如果队列为空,则立即抛出Queue.Empty异常
    from multiprocessing import Process,Queue
    import os,time,random
    
    # 写数据进程执行的代码
    def proc_write(q,urls):
        print ('Process (%s) is writing...' % os.getpid())
        for url in urls:
            q.put(url)
            print('Put %s to queue...' % url)
            time.sleep(random.random() * 3)
    
    # 读数据进程执行的带啊
    def proc_read(q):
        print ('Process (%s) is reading...' % os.getpid())
        while True:
            url = q.get(True)
            print('Get %s from queue' % url)
                
    if __name__=='__main__':
        # 父进程创建Queue,并传给各个子进程
        q = Queue()
        proc_writer1 = Process(target=proc_write,args=(q,['url1','url2','url3']))
        proc_writer2 = Process(target=proc_write,args=(q,['url4','url5','url6']))
        proc_reader = Process(target=proc_read,args=(q,))
        
        # 启动子进程proc_writer,写入:
        proc_writer1.start()
        proc_writer2.start()
        
        # 启动子进程proc_reader,读取:
        proc_reader.start()
        
        # 等待proc_writer结束:
        proc_writer1.join()
        proc_writer2.join()
        
        #proc_reader进程是死循环,无法等待其结束,只能强行终止:
        proc_reader.terminate()

      ②.Pipe:用来在两个进程间进行通信,两个进程分别位于管道两端                                          

    • Pipe方法返回(conn1,conn2)代表一个管道的两个端
    • pipe方法有duplex参数:
      • 默认值为True,则该管道是全双工模式,即conn1、conn2均可收发
      • duplex为False,则conn1只负责接收消息,conn2只负责发送消息
    • send方法:发送消息方法
    • recv方法:接收消息方法
    • 全双工模式:调用conn1.send()方法发送消息,conn1.recv接收消息,若无消息可接收,recv方法会一直阻塞;若管道已被关闭,recv方法会报错
    import multiprocessing
    import random
    import os,random
    
    def proc_send(pipe,urls):
        for url in urls:
            print 'Process (%s) send: %s' %(os.getpid(),url)
            pipe.send(url)
            time.sleep(random.random())
            
    def proc_recv(pipe):
        while True:
            print 'Process (%s) rev:%s' %(os.getpid(),pipe.recv())
            time.sleep(random.random())
            
    if __name__=='__main__':
        pipe = multiprocessing.Pipe()
        p1 = multiprocessing.Process(target=proc_send,args=(pipe[0],['url_'+str(i) for i in range(10)]))
        p2 = multiprocessing.Process(target=proc_recv,args=(pipe[1],))
        p1.start()
        p2.start()
        p1.join()
        p2.join()

    四、Python-协程

    线程和进程的操作是由程序触发系统接口,最后的执行者是系统;协程的操作则是程序员。

    协程存在的意义:对于多线程应用,CPU通过切片的方式来切换线程间的执行,线程切换时需要耗时(保存状态,下次继续)。协程,则只使用一个线程,在一个线程中规定某个代码块执行顺序。

    协程的适用场景:当程序中存在大量不需要CPU的操作时(IO),适用于协程;

      1.greenlet模块                                                                

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
      
    from greenlet import greenlet
     
    def test1():
        print 12
        gr2.switch()
        print 34
        gr2.switch()
     
     
    def test2():
        print 56
        gr1.switch()
        print 78
     
    gr1 = greenlet(test1)
    gr2 = greenlet(test2)
    gr1.switch()

      2.gevent模块                                                                

    import gevent
     
    def foo():
        print('Running in foo')
        gevent.sleep(0)
        print('Explicit context switch to foo again')
     
    def bar():
        print('Explicit context to bar')
        gevent.sleep(0)
        print('Implicit context switch back to bar')
     
    gevent.joinall([
        gevent.spawn(foo),
        gevent.spawn(bar),
    ])
    from gevent import monkey; monkey.patch_all()
    import gevent
    import urllib2
    
    def f(url):
        print('GET: %s' % url)
        resp = urllib2.urlopen(url)
        data = resp.read()
        print('%d bytes received from %s.' % (len(data), url))
    
    gevent.joinall([
            gevent.spawn(f, 'https://www.python.org/'),
            gevent.spawn(f, 'https://www.yahoo.com/'),
            gevent.spawn(f, 'https://github.com/'),
    ])
    from gevent import monkey; monkey.patch_all()
    import gevent
    import urllib2
    
    def f(url):
        print('GET: %s' % url)
        resp = urllib2.urlopen(url)
        data = resp.read()
        print('%d bytes received from %s.' % (len(data), url))
    
    gevent.joinall([
            gevent.spawn(f, 'https://www.python.org/'),
            gevent.spawn(f, 'https://www.yahoo.com/'),
            gevent.spawn(f, 'https://github.com/'),
    ])
    遇到IO操作自动切换

    本节作业一

    题目:IO多路复用版FTP

    需求:

    1. 实现文件上传及下载功能
    2. 支持多连接并发传文件
    3. 使用select or selectors

    本节作业二

    题目:rpc命令端

    需求:

    1. 可以异步的执行多个命令
    2. 对多台机器

    >>:run "df -h" --hosts 192.168.3.55 10.4.3.4 
    task id: 45334
    >>: check_task 45334 
    >>:

    题目:简单主机批量管理工具

    需求:

    1. 主机分组
    2. 主机信息配置文件用configparser解析
    3. 可批量执行命令、发送文件,结果实时返回,执行格式如下 
      1. batch_run  -h h1,h2,h3   -g web_clusters,db_servers    -cmd  "df -h" 
      2. batch_scp   -h h1,h2,h3   -g web_clusters,db_servers  -action put  -local test.py  -remote /tmp/ 
    4. 主机用户名密码、端口可以不同
    5. 执行远程命令使用paramiko模块
    6. 批量命令需使用multiprocessing并发
  • 相关阅读:
    软工结对第一次作业
    16061023-软件工程第1次作业
    OO最后一次总结
    OO第三次博客作业
    OO第二次博客作业
    OO第一次博客
    提问回顾与个人总结
    软件工程第一次阅读作业
    test个人博客
    软件工程结对作业
  • 原文地址:https://www.cnblogs.com/loser1949/p/9249818.html
Copyright © 2011-2022 走看看