zoukankan      html  css  js  c++  java
  • day9-python并发编程之多进程多线程

    第1章 Cpython解释器下实现并发编程

    1.1 背景知识

    顾名思义,进程即正在执行的一个过程。进程是对正在运行程序的一个抽象。

    进程的概念起源于操作系统,是操作系统最核心的概念,也是操作系统提供的最古老也是最重要的抽象概念之一。操作系统的其他所有内容都是围绕进程的概念展开的。

    所以想要真正了解进程,必须事先了解操作系统,

    点击链接地址 http://www.cnblogs.com/linhaifeng/p/6295875.html

    PS:即使可以利用的cpu只有一个(早期的计算机确实如此),也能保证支持(伪)并发的能力。将一个单独的cpu变成多个虚拟的cpu(多道技术:时间多路复用和空间多路复用+硬件上支持隔离),没有进程的抽象,现代计算机将不复存在。

    1.2 必备的理论基础:

    #一 操作系统的作用:

        1:隐藏丑陋复杂的硬件接口,提供良好的抽象接口

        2:管理、调度进程,并且将多个进程对硬件的竞争变得有序

    #二 多道技术:

        1.产生背景:针对单核,实现并发

        ps:

        现在的主机一般是多核,那么每个核都会利用多道技术

        有4个cpu,运行于cpu1的某个程序遇到io阻塞,会等到io结束再重新调度,会被调度到4个

        cpu中的任意一个,具体由操作系统调度算法决定。

        2.空间上的复用:如内存中同时有多道程序

        3.时间上的复用:复用一个cpu的时间片

           强调:遇到io切,占用cpu时间过长也切,核心在于切之前将进程的状态保存下来,这样

                才能保证下次切换回来时,能基于上次切走的位置继续运行

    第2章 python并发编程之多进程理论篇

    2.1 什么是进程

    进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu。

        举例(单核+多道,实现多个进程的并发执行):

        egon在一个时间段内有很多任务要做:python备课的任务,写书的任务,交女朋友的任务,王者荣耀上分的任务,  

        但egon同一时刻只能做一个任务(cpu同一时间只能干一个活),如何才能玩出多个任务并发执行的效果?

        egon备一会课,再去跟李杰的女朋友聊聊天,再去打一会王者荣耀....这就保证了每个任务都在进行中.

    2.2 进程与程序的区别

    程序仅仅只是一堆代码而已,而进程指的是程序的运行过程。

    举例:

    想象一位有一手好厨艺的计算机科学家egon正在为他的女儿元昊烘制生日蛋糕。

    他有做生日蛋糕的食谱,

    厨房里有所需的原料:面粉、鸡蛋、韭菜,蒜泥等。

    在这个比喻中:

        做蛋糕的食谱就是程序(即用适当形式描述的算法)

        计算机科学家就是处理器(cpu)

        而做蛋糕的各种原料就是输入数据。

       进程就是厨师阅读食谱、取来各种原料以及烘制蛋糕等一系列动作的总和

    现在假设计算机科学家egon的儿子alex哭着跑了进来,说:XXXXXXXXXXXXXX。

    科学家egon想了想,处理儿子alex蛰伤的任务比给女儿元昊做蛋糕的任务更重要,于是

    计算机科学家就记录下他照着食谱做到哪儿了(保存进程的当前状态),然后拿出一本急救手册,按照其中的指示处理蛰伤。这里,我们看到处理机从一个进程(做蛋糕)切换到另一个高优先级的进程(实施医疗救治),每个进程拥有各自的程序(食谱和急救手册)。当蜜蜂蛰伤处理完之后,这位计算机科学家又回来做蛋糕,从他

    离开时的那一步继续做下去。

    需要强调的是:同一个程序执行两次,那也是两个进程,比如打开暴风影音,虽然都是同一个软件,但是一个可以播放苍井空,一个可以播放饭岛爱。

    2.3 并发与并行

    无论是并行还是并发,在用户看来都是'同时'运行的,不管是进程还是线程,都只是一个任务而已,真是干活的是cpu,cpu来做这些任务,而一个cpu同一时刻只能执行一个任务

    2.3.1 并发:是伪并行,即看起来是同时运行。单个cpu+多道技术就可以实现并发,(并行也属于并发)

    ###################单cpu,多进程,并发举例一##################
    
    你是一个cpu,你同时谈了三个女朋友,每一个都可以是一个恋爱任务,你被这三个任务共享
    
    要玩出并发恋爱的效果,
    
    应该是你先跟女友1去看电影,看了一会说:不好,我要拉肚子,然后跑去跟第二个女友吃饭,吃了一会说:那啥,我
    
    去趟洗手间,然后跑去跟女友3开了个房
    单cpu,多进程,并发举例一
    ####################单cpu,多进程,并发举例二########################
    
    某天下午,egon,yuanhao,wupeiqi,alex约好了一起去嫖娼,但娼只有一个,cpu只有一个,但是却要‘同时’干
    
    四个任务(嫖出并发的效果),那就必须是干一会egon,再干一会yuanhao,再干一会wupeiqi,再干一会alex
    
    egon:花了200块钱,因为人美活好
    
    yuanhao:500块钱
    
    wupeiqi:100块钱,可能是不太行
    
    alex:没要钱,为啥???因为大家刚刚嫖的是他女朋友
    单cpu,多进程,并发举例二

    2.3.2  并行:同时运行,只有具备多个cpu才能实现并行

             单核下,可以利用多道技术,多个核,每个核也都可以利用多道技术(多道技术是针对单核而言的)

             有四个核,六个任务,这样同一时间有四个任务被执行,假设分别被分配给了cpu1,cpu2,cpu3,cpu4,

             一旦任务1遇到I/O就被迫中断执行,此时任务5就拿到cpu1的时间片去执行,这就是单核下的多道技术

             而一旦任务1的I/O结束了,操作系统会重新调用它(需知进程的调度、分配给哪个cpu运行,由操作系统说了算),可能被分配给四个cpu中的任意一个去执行

     

    所有现代计算机经常会在同一时间做很多件事,一个用户的PC(无论是单cpu还是多cpu),都可以同时运行多个任务(一个任务可以理解为一个进程)。

        启动一个进程来杀毒(360软件)

        启动一个进程来看电影(暴风影音)

        启动一个进程来聊天(腾讯QQ)

    所有的这些进程都需被管理,于是一个支持多进程的多道程序系统是至关重要的

    多道技术概念回顾:内存中同时存入多道(多个)程序,cpu从一个进程快速切换到另外一个,使每个进程各自运行几十或几百毫秒,这样,虽然在某一个瞬间,一个cpu只能执行一个任务,但在1秒内,cpu却可以运行多个进程,这就给人产生了并行的错觉,即伪并发,以此来区分多处理器操作系统的真正硬件并行(多个cpu共享同一个物理内存)

    2.4 同步异步and阻塞非阻塞(重点)

    2.4.1 同步

    #所谓同步,就是在发出一个功能调用时,在没有得到结果之前,该调用就不会返回。按照这个定义,其实绝大多数函数都是同步调用。但是一般而言,我们在说同步、异步的时候,特指那些需要其他部件协作或者需要一定时间完成的任务。
    
    #举例:
    
    #1. multiprocessing.Pool下的apply #发起同步调用后,就在原地等着任务结束,根本不考虑任务是在计算还是在io阻塞,总之就是一股脑地等任务结束
    
    #2. concurrent.futures.ProcessPoolExecutor().submit(func,).result()
    
    #3. concurrent.futures.ThreadPoolExecutor().submit(func,).result()

    2.4.2 异步

    #异步的概念和同步相对。当一个异步功能调用发出后,调用者不能立刻得到结果。当该异步功能完成后,通过状态、通知或回调来通知调用者。如果异步功能用状态来通知,那么调用者就需要每隔一定时间检查一次,效率就很低(有些初学多线程编程的人,总喜欢用一个循环去检查某个变量的值,这其实是一 种很严重的错误)。如果是使用通知的方式,效率则很高,因为异步功能几乎不需要做额外的操作。至于回调函数,其实和通知没太多区别。
    
    #举例:
    
    #1. multiprocessing.Pool().apply_async() #发起异步调用后,并不会等待任务结束才返回,相反,会立即获取一个临时结果(并不是最终的结果,可能是封装好的一个对象)。
    
    #2. concurrent.futures.ProcessPoolExecutor(3).submit(func,)
    
    #3. concurrent.futures.ThreadPoolExecutor(3).submit(func,)

    2.4.3 阻塞

    #阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。函数只有在得到结果之后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。
    
    #举例:
    
    #1. 同步调用:apply一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止,但并未阻塞住(即便是被抢走cpu的执行权限,那也是处于就绪态);
    
    #2. 阻塞调用:当socket工作在阻塞模式的时候,如果没有数据的情况下调用recv函数,则当前线程就会被挂起,直到有数据为止。

    2.4.4 非阻塞

    #非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程。

    2.4.5 小结

    #1. 同步与异步针对的是函数/任务的调用方式:同步就是当一个进程发起一个函数(任务)调用的时候,一直等到函数(任务)完成,而进程继续处于激活状态。而异步情况下是当一个进程发起一个函数(任务)调用的时候,不会等函数返回,而是继续往下执行当,函数返回的时候通过状态、通知、事件等方式通知进程任务完成。
    
     
    
    #2. 阻塞与非阻塞针对的是进程或线程:阻塞是当请求不能满足的时候就将进程挂起,而非阻塞则不会阻塞当前进程

    2.5 进程的创建(了解)

    但凡是硬件,都需要有操作系统去管理,只要有操作系统,就有进程的概念,就需要有创建进程的方式,一些操作系统只为一个应用程序设计,比如微波炉中的控制器,一旦启动微波炉,所有的进程都已经存在。

    2.5.1 对于通用系统

      而对于通用系统(跑很多应用程序),需要有系统运行过程中创建或撤销进程的能力,主要分为4种形式创建新的进程

    1. 系统初始化(查看进程linux中用ps命令,windows中用任务管理器,前台进程负责与用户交互,后台运行的进程与用户无关,运行在后台并且只在需要时才唤醒的进程,称为守护进程,如电子邮件、web页面、新闻、打印)

    2. 一个进程在运行过程中开启了子进程(如nginx开启多进程,os.fork,subprocess.Popen等)

      3. 用户的交互式请求,而创建一个新进程(如用户双击暴风影音)

      4. 一个批处理作业的初始化(只在大型机的批处理系统中应用)

    2.5.2 新进程的创建

    无论哪一种,新进程的创建都是由一个已经存在的进程执行了一个用于创建进程的系统调用而创建的:

      1. 在UNIX中该系统调用是:fork,fork会创建一个与父进程一模一样的副本,二者有相同的存储映像、同样的环境字符串和同样的打开文件(在shell解释器进程中,执行一个命令就会创建一个子进程)

      2. 在windows中该系统调用是:CreateProcess,CreateProcess既处理进程的创建,也负责把正确的程序装入新进程。

    2.5.3 关于创建的子进程,UNIX和windows

      关于创建的子进程,UNIX和windows

      1.相同的是:进程创建后,父进程和子进程有各自不同的地址空间(多道技术要求物理层面实现进程之间内存的隔离),任何一个进程的在其地址空间中的修改都不会影响到另外一个进程。

      2.不同的是:在UNIX中,子进程的初始地址空间是父进程的一个副本,提示:子进程和父进程是可以有只读的共享内存区的。但是对于windows系统来说,从一开始父进程与子进程的地址空间就是不同的。

    2.6 进程的终止(了解)

    1. 正常退出(自愿,如用户点击交互式页面的叉号,或程序执行完毕调用发起系统调用正常退出,在linux中用exit,在windows中用ExitProcess)

      2. 出错退出(自愿,python a.py中a.py不存在)

      3. 严重错误(非自愿,执行非法指令,如引用不存在的内存,1/0等,可以捕捉异常,try...except...)

      4. 被其他进程杀死(非自愿,如kill -9)

    2.7 进程的层次结构

    无论UNIX还是windows,进程只有一个父进程,不同的是:

      1. 在UNIX中所有的进程,都是以init进程为根,组成树形结构。父子进程共同组成一个进程组,这样,当从键盘发出一个信号时,该信号被送给当前与键盘相关的进程组中的所有成员。

      2. 在windows中,没有进程层次的概念,所有的进程都是地位相同的,唯一类似于进程层次的暗示,是在创建进程时,父进程得到一个特别的令牌(称为句柄),该句柄可以用来控制子进程,但是父进程有权把该句柄传给其他子进程,这样就没有层次了。

    2.8 进程的状态

    tail -f access.log |grep '404'

      执行程序tail,开启一个子进程,执行程序grep,开启另外一个子进程,两个进程之间基于管道'|'通讯,将tail的结果作为grep的输入。

      进程grep在等待输入(即I/O)时的状态称为阻塞,此时grep命令都无法运行

      其实在两种情况下会导致一个进程在逻辑上不能运行,

      1. 进程挂起是自身原因,遇到I/O阻塞,便要让出CPU让其他进程去执行,这样保证CPU一直在工作

      2. 与进程无关,是操作系统层面,可能会因为一个进程占用时间过多,或者优先级等原因,而调用其他的进程去使用CPU。

      因而一个进程由三种状态

     

    2.9 进程并发的实现(了解)

    进程并发的实现在于,硬件中断一个正在运行的进程,把此时进程运行的所有状态保存下来,为此,操作系统维护一张表格,即进程表(process table),每个进程占用一个进程表项(这些表项也称为进程控制块)

     

    该表存放了进程状态的重要信息:程序计数器、堆栈指针、内存分配状况、所有打开文件的状态、帐号和调度信息,以及其他在进程由运行态转为就绪态或阻塞态时,必须保存的信息,从而保证该进程在再次启动时,就像从未被中断过一样。

    第3章 python并发编程之多进程使用篇

    3.1 multiprocessing模块介绍

    python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。

        multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。

      multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。

        需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。

    3.2 Process类的介绍

    3.2.1 创建进程的类:

    Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
    
    强调:
    
    1. 需要使用关键字的方式来指定参数
    
    2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号

    3.2.2 参数介绍:

    group参数未使用,值始终为None
    
    target表示调用对象,即子进程要执行的任务
    
    args表示调用对象的位置参数元组,args=(1,2,'egon',)
    
    kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
    
    name为子进程的名称

    3.2.3 方法介绍:

    p.start():启动进程,并调用该子进程中的p.run()
    
    p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法 
    
     
    
    p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
    
    p.is_alive():如果p仍然运行,返回True
    
     
    
    p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程 

    3.2.4 属性介绍:

    p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
    
    p.name:进程的名称
    
    p.pid:进程的pid
    
    p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
    
    p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)

    3.3 Process类的使用

    注意:在windows中Process()必须放到# if __name__ == '__main__':

    #######################详细解释#########################
    
    Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module.
    
    If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources).
    
    This is the reason for hiding calls to Process() inside
    
     
    
    if __name__ == "__main__"
    
    since statements inside this if-statement will not get called upon import.
    
    由于Windows没有fork,多处理模块启动一个新的Python进程并导入调用模块。
    
    如果在导入时调用Process(),那么这将启动无限继承的新进程(或直到机器耗尽资源)。
    
    这是隐藏对Process()内部调用的原,使用if __name__ == “__main __”,这个if语句中的语句将不会在导入时被调用。
    详细解释

    3.3.1 创建并开启子进程的两种方式博客实例

    #########################方法一###########################
    
    #开进程的方法一:
    
    import time
    
    import random
    
    from multiprocessing import Process
    
    def piao(name):
    
        print('%s piaoing' %name)
    
        time.sleep(random.randrange(1,5))
    
        print('%s piao end' %name)
    
     
    
     
    
     
    
    p1=Process(target=piao,args=('egon',)) #必须加,号
    
    p2=Process(target=piao,args=('alex',))
    
    p3=Process(target=piao,args=('wupeqi',))
    
    p4=Process(target=piao,args=('yuanhao',))
    
     
    
    p1.start()
    
    p2.start()
    
    p3.start()
    
    p4.start()
    
    print('主线程')
    方法一
    ############################方法二#########################
    
    #开进程的方法二:
    
    import time
    
    import random
    
    from multiprocessing import Process
    
     
    
     
    
    class Piao(Process):
    
        def __init__(self,name):
    
            super().__init__()
    
            self.name=name
    
        def run(self):
    
            print('%s piaoing' %self.name)
    
     
    
            time.sleep(random.randrange(1,5))
    
            print('%s piao end' %self.name)
    
     
    
    p1=Piao('egon')
    
    p2=Piao('alex')
    
    p3=Piao('wupeiqi')
    
    p4=Piao('yuanhao')
    
     
    
    p1.start() #start会自动调用run
    
    p2.start()
    
    p3.start()
    
    p4.start()
    
    print('主线程')
    方法二

    3.3.2 创建并开启子进程的两种方式课堂实例

    ###############方式一:####################
    
    from multiprocessing import Process
    
    import time
    
     
    
    def task(name):
    
        print('%s is running' %name)
    
        time.sleep(5)
    
        print('%s is done' %name)
    
     
    
     
    
    if __name__ == '__main__':
    
        p=Process(target=task,args=('alex',))
    
        p.start()
    
        print('')
    方式一
    ###############方式二:##################
    
    # from multiprocessing import Process
    
    # import time
    
    #
    
    # class MyProcess(Process):
    
    #     def __init__(self,name):
    
    #         super(MyProcess,self).__init__()
    
    #         self.name=name
    
    #
    
    #     def run(self):
    
    #         print('%s is running' %self.name)
    
    #         time.sleep(3)
    
    #         print('%s is done' %self.name)
    
    #
    
    # if __name__ == '__main__':
    
    #     p=MyProcess('进程1')
    
    #     p.start() #p.run()
    
    #     print('主')
    方式二

    3.3.3 进程之间的内存空间是隔离的

    ################实例一################
    
    from multiprocessing import Process
    
    n=100 #在windows系统中应该把全局变量定义在if __name__ == '__main__'之上就可以了
    
    def work():
    
        global n
    
        n=0
    
        print('子进程内: ',n)
    
     
    
     
    
    if __name__ == '__main__':
    
        p=Process(target=work)
    
        p.start()
    
        print('主进程内: ',n)
    实例一
    #################实例二##########################
    
    from multiprocessing import Process
    
    import time
    
     
    
    n=100
    
     
    
    def task():
    
        global n
    
        time.sleep(5)
    
        n=0
    
     
    
    if __name__ == '__main__':
    
        p=Process(target=task)
    
        p.start()
    
        # time.sleep(5)
    
        print(p.is_alive())
    
        p.join()
    
        print(p.is_alive())
    
        print('',n)
    实例二

    3.3.4 练习1:把上周所学的socket通信变成并发的形式

    ##############################server端###############################
    
    from socket import *
    
    from multiprocessing import Process
    
     
    
    server=socket(AF_INET,SOCK_STREAM)
    
    server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    
    server.bind(('127.0.0.1',8080))
    
    server.listen(5)
    
     
    
    def talk(conn,client_addr):
    
        while True:
    
            try:
    
                msg=conn.recv(1024)
    
                if not msg:break
    
                conn.send(msg.upper())
    
            except Exception:
    
                break
    
     
    
    if __name__ == '__main__': #windows下start进程一定要写到这下面
    
        while True:
    
            conn,client_addr=server.accept()
    
            p=Process(target=talk,args=(conn,client_addr))
    
            p.start()
    
     
    server端
    ###########################多个client端##########################
    
    from socket import *
    
     
    
    client=socket(AF_INET,SOCK_STREAM)
    
    client.connect(('127.0.0.1',8080))
    
     
    
     
    
    while True:
    
        msg=input('>>: ').strip()
    
        if not msg:continue
    
     
    
        client.send(msg.encode('utf-8'))
    
        msg=client.recv(1024)
    
        print(msg.decode('utf-8'))
    多个client端
    ##############这么实现有没有问题???#######################
    
    每来一个客户端,都在服务端开启一个进程,如果并发来一个万个客户端,要开启一万个进程吗,你自己尝试着在你自己的机器上开启一万个,10万个进程试一试。
    
    解决方法:进程池

    3.4 Process对象的join方法

    3.4.1 博客实例

    ########################join:主进程等,等待子进程结束#########################
    
    from multiprocessing import Process
    
    import time
    
    import random
    
     
    
    class Piao(Process):
    
        def __init__(self,name):
    
            self.name=name
    
            super().__init__()
    
        def run(self):
    
            print('%s is piaoing' %self.name)
    
            time.sleep(random.randrange(1,3))
    
            print('%s is piao end' %self.name)
    
     
    
     
    
    p=Piao('egon')
    
    p.start()
    
    p.join(0.0001) #等待p停止,等0.0001秒就不再等了
    
    print('开始')
    
     
    join:主进程等,等待子进程结束
    #####################有了join,程序不就是串行了吗???################
    
    from multiprocessing import Process
    
    import time
    
    import random
    
    def piao(name):
    
        print('%s is piaoing' %name)
    
        time.sleep(random.randint(1,3))
    
        print('%s is piao end' %name)
    
     
    
    p1=Process(target=piao,args=('egon',))
    
    p2=Process(target=piao,args=('alex',))
    
    p3=Process(target=piao,args=('yuanhao',))
    
    p4=Process(target=piao,args=('wupeiqi',))
    
     
    
    p1.start()
    
    p2.start()
    
    p3.start()
    
    p4.start()
    
     
    
    #有的同学会有疑问:既然join是等待进程结束,那么我像下面这样写,进程不就又变成串行的了吗?
    
    #当然不是了,必须明确:p.join()是让谁等?
    
    #很明显p.join()是让主线程等待p的结束,卡住的是主线程而绝非进程p,
    
     
    
    #详细解析如下:
    
    #进程只要start就会在开始运行了,所以p1-p4.start()时,系统中已经有四个并发的进程了
    
    #而我们p1.join()是在等p1结束,没错p1只要不结束主线程就会一直卡在原地,这也是问题的关键
    
    #join是让主线程等,而p1-p4仍然是并发执行的,p1.join的时候,其余p2,p3,p4仍然在运行,等#p1.join结束,可能p2,p3,p4早已经结束了,这样p2.join,p3.join.p4.join直接通过检测,无需等待
    
    # 所以4个join花费的总时间仍然是耗费时间最长的那个进程运行的时间
    
    p1.join()
    
    p2.join()
    
    p3.join()
    
    p4.join()
    
     
    
    print('主线程')
    
     
    
     
    
    #上述启动进程与join进程可以简写为
    
    # p_l=[p1,p2,p3,p4]
    
    #
    
    # for p in p_l:
    
    #     p.start()
    
    #
    
    # for p in p_l:
    
    #     p.join()
    
     
    有了join,程序不就是串行了吗???

    3.4.2 课堂实例

    # from multiprocessing import Process
    
    # import time
    
    # import os
    
    #
    
    # def task(n):
    
    #     print('%s is runing' %os.getpid())
    
    #     time.sleep(n)
    
    #     print('%s is done' %os.getpid())
    
    #
    
    #
    
    # if __name__ == '__main__':
    
    #
    
    #     start_time=time.time()
    
    #     p1=Process(target=task,args=(1,))
    
    #     p2=Process(target=task,args=(2,))
    
    #     p3=Process(target=task,args=(3,))
    
    #
    
    #     p_l=[p1,p2,p3]
    
    #     # p1.start()
    
    #     # p2.start()
    
    #     # p3.start()
    
    #     for p in p_l:
    
    #         p.start()
    
    #
    
    #     # p3.join()  #3
    
    #     # p1.join() #
    
    #     # p2.join() #
    
    #     for p in p_l:
    
    #         p.join()
    
    #     stop_time=time.time()
    
    #     print('主',(stop_time-start_time))
    
     
    
     
    
     
    
     
    
    # from multiprocessing import Process
    
    # import time
    
    # import os
    
    #
    
    # def task(n):
    
    #     print('%s is runing' %os.getpid())
    
    #     time.sleep(n)
    
    #     print('%s is done' %os.getpid())
    
    #
    
    #
    
    # if __name__ == '__main__':
    
    #
    
    #     start_time=time.time()
    
    #     p1=Process(target=task,args=(1,))
    
    #     p2=Process(target=task,args=(2,))
    
    #     p3=Process(target=task,args=(3,))
    
    #
    
    #     p_l=[p1,p2,p3]
    
    #     # p1.start()
    
    #     # p2.start()
    
    #     # p3.start()
    
    #     for p in p_l:
    
    #         p.start()
    
    #
    
    #     # p3.join()  #3
    
    #     # p1.join() #
    
    #     # p2.join() #
    
    #     for p in p_l:
    
    #         p.join()
    
    #     stop_time=time.time()
    
    #     print('主',(stop_time-start_time))
    
     
    
     
    
    from multiprocessing import Process
    
    import time
    
    import os
    
     
    
    def task(n):
    
        print('%s is runing' %os.getpid())
    
        time.sleep(n)
    
        print('%s is done' %os.getpid())
    
     
    
     
    
    if __name__ == '__main__':
    
     
    
        start_time=time.time()
    
        p1=Process(target=task,args=(1,))
    
        p2=Process(target=task,args=(2,))
    
        p3=Process(target=task,args=(3,))
    
     
    
     
    
        p1.start()
    
        p1.join()
    
        p2.start()
    
        p2.join()
    
        p3.start()
    
        p3.join()
    
        stop_time=time.time()
    
        print('',(stop_time-start_time))
    View Code

    3.5 Process对象的其他方法或属性(了解)

    #######################terminate与is_alive#######################
    
    #进程对象的其他方法一:terminate,is_alive
    
    from multiprocessing import Process
    
    import time
    
    import random
    
     
    
    class Piao(Process):
    
        def __init__(self,name):
    
            self.name=name
    
            super().__init__()
    
     
    
        def run(self):
    
            print('%s is piaoing' %self.name)
    
            time.sleep(random.randrange(1,5))
    
            print('%s is piao end' %self.name)
    
     
    
     
    
    p1=Piao('egon1')
    
    p1.start()
    
     
    
    p1.terminate()#关闭进程,不会立即关闭,所以is_alive立刻查看的结果可能还是存活
    
    print(p1.is_alive()) #结果为True
    
     
    
    print('开始')
    
    print(p1.is_alive()) #结果为False
    terminate与is_alive
    ########################name与pid########################
    
    from multiprocessing import Process
    
    import time
    
    import random
    
    class Piao(Process):
    
        def __init__(self,name):
    
            # self.name=name
    
            # super().__init__() #Process的__init__方法会执行self.name=Piao-1,
    
            #                    #所以加到这里,会覆盖我们的self.name=name
    
     
    
            #为我们开启的进程设置名字的做法
    
            super().__init__()
    
            self.name=name
    
     
    
        def run(self):
    
            print('%s is piaoing' %self.name)
    
            time.sleep(random.randrange(1,3))
    
            print('%s is piao end' %self.name)
    
     
    
    p=Piao('egon')
    
    p.start()
    
    print('开始')
    
    print(p.pid) #查看pid
    
     
    
     
    
     
    
    from multiprocessing import Process
    
    import time
    
    import os
    
     
    
    def task(n):
    
        print('pid:%s ppid:%s' %(os.getpid(),os.getppid()))
    
        time.sleep(n)
    
     
    
     
    
    if __name__ == '__main__':
    
        p=Process(target=task,args=(15,),name='进程1')
    
        p.start()
    
        p.terminate()
    
        # time.sleep(1)
    
        print(p.is_alive())
    
        print('主pid:%s ppid:%s' %(os.getpid(),os.getppid()))
    
        # print(p.pid)
    
        p.name='xxxx'
    
        print(p.name)
    name与pid

    3.6 僵尸进程与孤儿进程(了解)

    3.6.1 僵尸进程(有害)

    参考博客:http://www.cnblogs.com/Anker/p/3271773.html
    
     
    
    一:僵尸进程(有害)
    
      僵尸进程:一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程。详解如下
    
     
    
    我们知道在unix/linux中,正常情况下子进程是通过父进程创建的,子进程在创建新的进程。子进程的结束和父进程的运行是一个异步过程,即父进程永远无法预测子进程到底什么时候结束,如果子进程一结束就立刻回收其全部资源,那么在父进程内将无法获取子进程的状态信息。
    
     
    
    因此,UNⅨ提供了一种机制可以保证父进程可以在任意时刻获取子进程结束时的状态信息:
    
    1、在每个进程退出的时候,内核释放该进程所有的资源,包括打开的文件,占用的内存等。但是仍然为其保留一定的信息(包括进程号the process ID,退出状态the termination status of the process,运行时间the amount of CPU time taken by the process等)
    
    2、直到父进程通过wait / waitpid来取时才释放. 但这样就导致了问题,如果进程不调用wait / waitpid的话,那么保留的那段信息就不会释放,其进程号就会一直被占用,但是系统所能使用的进程号是有限的,如果大量的产生僵死进程,将因为没有可用的进程号而导致系统不能产生新的进程. 此即为僵尸进程的危害,应当避免。
    
     
    
      任何一个子进程(init除外)在exit()之后,并非马上就消失掉,而是留下一个称为僵尸进程(Zombie)的数据结构,等待父进程处理。这是每个子进程在结束时都要经过的阶段。如果子进程在exit()之后,父进程没有来得及处理,这时用ps命令就能看到子进程的状态是“Z”。如果父进程能及时 处理,可能用ps命令就来不及看到子进程的僵尸状态,但这并不等于子进程不经过僵尸状态。  如果父进程在子进程结束之前退出,则子进程将由init接管。init将会以父进程的身份对僵尸状态的子进程进行处理。
    僵尸进程(有害)

    3.6.2 孤儿进程(无害)

    二:孤儿进程(无害)
    
     
    
      孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。
    
     
    
      孤儿进程是没有父进程的进程,孤儿进程这个重任就落到了init进程身上,init进程就好像是一个民政局,专门负责处理孤儿进程的善后工作。每当出现一个孤儿进程的时候,内核就把孤 儿进程的父进程设置为init,而init进程会循环地wait()它的已经退出的子进程。这样,当一个孤儿进程凄凉地结束了其生命周期的时候,init进程就会代表党和政府出面处理它的一切善后工作。因此孤儿进程并不会有什么危害。
    
     
    
    我们来测试一下(创建完子进程后,主进程所在的这个脚本就退出了,当父进程先于子进程结束时,子进程会被init收养,成为孤儿进程,而非僵尸进程),文件内容
    
     
    
    import os
    
    import sys
    
    import time
    
     
    
    pid = os.getpid()
    
    ppid = os.getppid()
    
    print 'im father', 'pid', pid, 'ppid', ppid
    
    pid = os.fork()
    
    #执行pid=os.fork()则会生成一个子进程
    
    #返回值pid有两种值:
    
    #    如果返回的pid值为0,表示在子进程当中
    
    #    如果返回的pid值>0,表示在父进程当中
    
    if pid > 0:
    
        print 'father died..'
    
        sys.exit(0)
    
     
    
    # 保证主线程退出完毕
    
    time.sleep(1)
    
    print 'im child', os.getpid(), os.getppid()
    
     
    
    执行文件,输出结果:
    
    im father pid 32515 ppid 32015
    
    father died..
    
    im child 32516 1
    
     
    
    看,子进程已经被pid为1的init进程接收了,所以僵尸进程在这种情况下是不存在的,存在只有孤儿进程而已,孤儿进程声明周期结束自然会被init来销毁。
    
     
    孤儿进程(无害)

    3.6.3 僵尸进程危害场景

    三:僵尸进程危害场景:
    
     
    
      例如有个进程,它定期的产 生一个子进程,这个子进程需要做的事情很少,做完它该做的事情之后就退出了,因此这个子进程的生命周期很短,但是,父进程只管生成新的子进程,至于子进程 退出之后的事情,则一概不闻不问,这样,系统运行上一段时间之后,系统中就会存在很多的僵死进程,倘若用ps命令查看的话,就会看到很多状态为Z的进程。 严格地来说,僵死进程并不是问题的根源,罪魁祸首是产生出大量僵死进程的那个父进程。因此,当我们寻求如何消灭系统中大量的僵死进程时,答案就是把产生大 量僵死进程的那个元凶枪毙掉(也就是通过kill发送SIGTERM或者SIGKILL信号啦)。枪毙了元凶进程之后,它产生的僵死进程就变成了孤儿进 程,这些孤儿进程会被init进程接管,init进程会wait()这些孤儿进程,释放它们占用的系统进程表中的资源,这样,这些已经僵死的孤儿进程 就能瞑目而去了。
    
     
    僵尸进程危害场景

    3.6.4 测试

    四:测试
    
    #1、产生僵尸进程的程序test.py内容如下
    
     
    
    #coding:utf-8
    
    from multiprocessing import Process
    
    import time,os
    
     
    
    def run():
    
        print('',os.getpid())
    
     
    
    if __name__ == '__main__':
    
        p=Process(target=run)
    
        p.start()
    
       
    
        print('',os.getpid())
    
        time.sleep(1000)
    
     
    
     
    
    #2、在unix或linux系统上执行
    
    [root@vm172-31-0-19 ~]# python3  test.py &
    
    [1] 18652
    
    [root@vm172-31-0-19 ~]# 主 18652
    18653
    
     
    
    [root@vm172-31-0-19 ~]# ps aux |grep Z
    
    USER       PID %CPU %MEM    VSZ   RSS TTY      STAT START   TIME COMMAND
    
    root     18653  0.0  0.0      0     0 pts/0    Z    20:02   0:00 [python3] <defunct> #出现僵尸进程
    
    root     18656  0.0  0.0 112648   952 pts/0    S+   20:02   0:00 grep --color=auto Z
    
     
    
    [root@vm172-31-0-19 ~]# top #执行top命令发现1zombie
    
    top - 20:03:42 up 31 min,  3 users,  load average: 0.01, 0.06, 0.12
    
    Tasks:  93 total,   2 running,  90 sleeping,   0 stopped,   1 zombie
    
    %Cpu(s):  0.0 us,  0.3 sy,  0.0 ni, 99.7 id,  0.0 wa,  0.0 hi,  0.0 si,  0.0 st
    
    KiB Mem :  1016884 total,    97184 free,    70848 used,   848852 buff/cache
    
    KiB Swap:        0 total,        0 free,        0 used.   782540 avail Mem
    
     
    
      PID USER      PR  NI    VIRT    RES    SHR S %CPU %MEM     TIME+ COMMAND                                                                                                                                       
    
    root      20   0   29788   1256    988 S  0.3  0.1   0:01.50 elfin                                                                                                                      
    
     
    
     
    
    #3、
    
    等待父进程正常结束后会调用wait/waitpid去回收僵尸进程
    
    但如果父进程是一个死循环,永远不会结束,那么该僵尸进程就会一直存在,僵尸进程过多,就是有害的
    
    解决方法一:杀死父进程
    
    解决方法二:对开启的子进程应该记得使用join,join会回收僵尸进程
    
    参考python2源码注释
    
    class Process(object):
    
        def join(self, timeout=None):
    
            '''
    
            Wait until child process terminates
    
            '''
    
            assert self._parent_pid == os.getpid(), 'can only join a child process'
    
            assert self._popen is not None, 'can only join a started process'
    
            res = self._popen.wait(timeout)
    
            if res is not None:
    
                _current_process._children.discard(self)
    
     
    
    join方法中调用了wait,告诉系统释放僵尸进程。discard为从自己的children中剔除
    
     
    
    解决方法三:http://blog.csdn.net/u010571844/article/details/50419798
    
     
    测试

    3.7 守护进程

    主进程创建守护进程

      其一:守护进程会在主进程代码执行结束后就终止

      其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

    注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

    3.7.1 博客实例

    from multiprocessing import Process
    
    import time
    
    import random
    
     
    
    class Piao(Process):
    
        def __init__(self,name):
    
            self.name=name
    
            super().__init__()
    
        def run(self):
    
            print('%s is piaoing' %self.name)
    
            time.sleep(random.randrange(1,3))
    
            print('%s is piao end' %self.name)
    
     
    
     
    
    p=Piao('egon')
    
    p.daemon=True #一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
    
    p.start()
    
    print('')
    View Code
    ############迷惑人的例子#################
    
    #主进程代码运行完毕,守护进程就会结束
    
    from multiprocessing import Process
    
    from threading import Thread
    
    import time
    
    def foo():
    
        print(123)
    
        time.sleep(1)
    
        print("end123")
    
     
    
    def bar():
    
        print(456)
    
        time.sleep(3)
    
        print("end456")
    
     
    
     
    
    p1=Process(target=foo)
    
    p2=Process(target=bar)
    
     
    
    p1.daemon=True
    
    p1.start()
    
    p2.start()
    
    print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止
    
     
    迷惑人的例子

    3.7.2 课堂讲解实例

    #守护进程:当子进程执行的任务在父进程代码运行完毕后就没有存在的必要了,那
    
    #该子进程就应该被设置为守护进程
    
    # from multiprocessing import Process
    
    # import time
    
    #
    
    # def task(name):
    
    #     p=Process(target=time.sleep,args=(6,))
    
    #     p.start()
    
    #     print('%s is running' %name)
    
    #     time.sleep(5)
    
    #     print('%s is done' %name)
    
    #
    
    #
    
    # if __name__ == '__main__':
    
    #     p=Process(target=task,args=('alex',))
    
    #     p.daemon=True
    
    #     p.start()
    
    #     time.sleep(1)
    
    #     print('主')
    
     
    
     
    
    #主进程代码运行完毕,守护进程就会结束
    
    from multiprocessing import Process
    
    from threading import Thread
    
    import time
    
    def foo():
    
        print(123)
    
        time.sleep(1)
    
        print("end123")
    
     
    
    def bar():
    
        print(456)
    
        time.sleep(3)
    
        print("end456")
    
     
    
    if __name__ == '__main__':
    
     
    
        p1=Process(target=foo)
    
        p2=Process(target=bar)
    
     
    
        p1.daemon=True
    
        p1.start()
    
        p2.start()
    
        print("main-------")
    View Code

    3.8 进程同步(互斥锁)

    进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,

    而共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理

    3.8.1 part1:多个进程共享同一打印终端

    ################并发运行,效率高,但竞争同一打印终端,带来了打印错乱#################
    
    from multiprocessing import Process
    
    import os,time
    
    def work():
    
        print('%s is running' %os.getpid())
    
        time.sleep(2)
    
        print('%s is done' %os.getpid())
    
     
    
    if __name__ == '__main__':
    
        for i in range(3):
    
            p=Process(target=work)
    
            p.start()
    并发运行,效率高,但竞争同一打印终端,带来了打印错乱
    ###########由并发变成了串行,牺牲了运行效率,但避免了竞争##############
    
    from multiprocessing import Process,Lock
    
    import os,time
    
    def work(lock):
    
        lock.acquire()
    
        print('%s is running' %os.getpid())
    
        time.sleep(2)
    
        print('%s is done' %os.getpid())
    
        lock.release()
    
    if __name__ == '__main__':
    
        lock=Lock()
    
        for i in range(3):
    
            p=Process(target=work,args=(lock,))
    
            p.start()
    由并发变成了串行,牺牲了运行效率,但避免了竞争

    3.8.2 part2:多个进程共享同一文件

    文件当数据库,模拟抢票

    #############并发运行,效率高,但竞争写同一文件,数据写入错乱#################
    
    #文件db的内容为:{"count":1}
    
    #注意一定要用双引号,不然json无法识别
    
    from multiprocessing import Process,Lock
    
    import time,json,random
    
    def search():
    
        dic=json.load(open('db.txt'))
    
        print('33[43m剩余票数%s33[0m' %dic['count'])
    
     
    
    def get():
    
        dic=json.load(open('db.txt'))
    
        time.sleep(0.1) #模拟读数据的网络延迟
    
        if dic['count'] >0:
    
            dic['count']-=1
    
            time.sleep(0.2) #模拟写数据的网络延迟
    
            json.dump(dic,open('db.txt','w'))
    
            print('33[43m购票成功33[0m')
    
     
    
    def task(lock):
    
        search()
    
        get()
    
    if __name__ == '__main__':
    
        lock=Lock()
    
        for i in range(100): #模拟并发100个客户端抢票
    
            p=Process(target=task,args=(lock,))
    
            p.start()
    
     
    并发运行,效率高,但竞争写同一文件,数据写入错乱
    #########加锁:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全###########
    
    #文件db的内容为:{"count":1}
    
    #注意一定要用双引号,不然json无法识别
    
    from multiprocessing import Process,Lock
    
    import time,json,random
    
    def search():
    
        dic=json.load(open('db.txt'))
    
        print('33[43m剩余票数%s33[0m' %dic['count'])
    
     
    
    def get():
    
        dic=json.load(open('db.txt'))
    
        time.sleep(0.1) #模拟读数据的网络延迟
    
        if dic['count'] >0:
    
            dic['count']-=1
    
            time.sleep(0.2) #模拟写数据的网络延迟
    
            json.dump(dic,open('db.txt','w'))
    
            print('33[43m购票成功33[0m')
    
     
    
    def task(lock):
    
        search()
    
        lock.acquire()
    
        get()
    
        lock.release()
    
    if __name__ == '__main__':
    
        lock=Lock()
    
        for i in range(100): #模拟并发100个客户端抢票
    
            p=Process(target=task,args=(lock,))
    
            p.start()
    加锁:购票行为由并发变成了串行,牺牲了运行效率,但保证了数据安全

    3.8.3 互斥锁课堂讲解实例

    ############db.txt############
    
    {"count": 1}
    from multiprocessing import Process,Lock
    
    import json
    
    import time
    
    import random
    
    import os
    
     
    
    def search():
    
        time.sleep(random.randint(1,3))
    
        dic=json.load(open('db.txt','r',encoding='utf-8'))
    
        print('%s 查看到剩余票数%s' %(os.getpid(),dic['count']))
    
     
    
    def get():
    
        dic=json.load(open('db.txt','r',encoding='utf-8'))
    
        if dic['count'] > 0:
    
            dic['count']-=1
    
            time.sleep(random.randint(1,3))
    
            json.dump(dic,open('db.txt','w',encoding='utf-8'))
    
            print('%s 购票成功' %os.getpid())
    
     
    
    def task(mutex):
    
        search()
    
        mutex.acquire()
    
        get()
    
        mutex.release()
    
     
    
    if __name__ == '__main__':
    
        mutex=Lock()
    
        for i in range(10):
    
            p=Process(target=task,args=(mutex,))
    
            p.start()
    
            # p.join()
    View Code

    3.8.4 总结:

    #加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
    
    虽然可以用文件共享数据实现进程间通信,但问题是:
    
    1.效率低(共享数据基于文件,而文件是硬盘上的数据)
    
    2.需要自己加锁处理
    
     
    
     
    
     
    
    #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。
    
    队列和管道都是将数据存放于内存中
    
    队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来,
    
    我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
    
     
    View Code

    3.9 队列(推荐使用)

    进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的

    3.9.1 创建队列的类(底层就是以管道和锁定的方式实现):

    Queue([maxsize]):创建共享的进程队列,Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。

    3.9.2 参数介绍:

    maxsize是队列中允许最大项数,省略则无大小限制。

    3.9.3 方法介绍:

        主要方法:

    q.put方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
    
    q.get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
    
     
    
    q.get_nowait():同q.get(False)
    
    q.put_nowait():同q.put(False)
    
     
    
    q.empty():调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
    
    q.full():调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
    
    q.qsize():返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
    View Code

    3.9.4 其他方法(了解):

    q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
    
    q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
    
    q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为

    3.9.5 应用:

    '''
    
    multiprocessing模块支持进程间通信的两种主要形式:管道和队列
    
    都是基于消息传递实现的,但是队列接口
    
    '''
    
     
    
    from multiprocessing import Process,Queue
    
    import time
    
    q=Queue(3)
    
     
    
     
    
    #put ,get ,put_nowait,get_nowait,full,empty
    
    q.put(3)
    
    q.put(3)
    
    q.put(3)
    
    print(q.full()) #满了
    
     
    
    print(q.get())
    
    print(q.get())
    
    print(q.get())
    
    print(q.empty()) #空了
    View Code

    3.9.6 课堂实例

    from multiprocessing import Queue
    
     
    
    q=Queue(3)
    
     
    
    q.put('first')
    
    q.put(2)
    
    q.put({'count':3})
    
    # q.put('fourth',block=False) #q.put_nowait('fourth')
    
    # q.put('fourth',block=True,timeout=3)
    
     
    
    print(q.get())
    
    print(q.get())
    
    print(q.get())
    
    # print(q.get(block=False)) #q.get_nowait()
    
    print(q.get(block=True,timeout=3))
    View Code

    3.10 生产者消费者模型

    在并发编程中使用生产者和消费者模式能够解决绝大多数并发问题。该模式通过平衡生产线程和消费线程的工作能力来提高程序的整体处理数据的速度。

    3.10.1 为什么要使用生产者和消费者模式

    在线程世界里,生产者就是生产数据的线程,消费者就是消费数据的线程。在多线程开发当中,如果生产者处理速度很快,而消费者处理速度很慢,那么生产者就必须等待消费者处理完,才能继续生产数据。同样的道理,如果消费者的处理能力大于生产者,那么消费者就必须等待生产者。为了解决这个问题于是引入了生产者和消费者模式。

    3.10.2 什么是生产者消费者模式

    生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    3.10.3 基于队列实现生产者消费者模型

    from multiprocessing import Process,Queue
    
    import time,random,os
    
    def consumer(q):
    
        while True:
    
            res=q.get()
    
            time.sleep(random.randint(1,3))
    
            print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
    
     
    
    def producer(q):
    
        for i in range(10):
    
            time.sleep(random.randint(1,3))
    
            res='包子%s' %i
    
            q.put(res)
    
            print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    
     
    
    if __name__ == '__main__':
    
        q=Queue()
    
        #生产者们:即厨师们
    
        p1=Process(target=producer,args=(q,))
    
     
    
        #消费者们:即吃货们
    
        c1=Process(target=consumer,args=(q,))
    
     
    
        #开始
    
        p1.start()
    
        c1.start()
    
        print('')
    View Code

    3.10.4 生产者消费者模型总结

    #生产者消费者模型总结
    
     
    
        #程序中有两类角色
    
            一类负责生产数据(生产者)
    
            一类负责处理数据(消费者)
    
           
    
        #引入生产者消费者模型为了解决的问题是:
    
            平衡生产者与消费者之间的工作能力,从而提高程序整体处理数据的速度
    
           
    
        #如何实现:
    
            生产者<-->队列<——>消费者
    
        #生产者消费者模型实现类程序的解耦和

    3.10.5 生产者消费者模型推算解决过程

    此时的问题是主进程永远不会结束,原因是:生产者p在生产完后就结束了,但是消费者c在取空了q之后,则一直处于死循环中且卡在q.get()这一步。

    解决方式无非是让生产者在生产完毕后,往队列中再发一个结束信号,这样消费者在接收到结束信号后就可以break出死循环

    ############生产者在生产完毕后发送结束信号None################
    
    from multiprocessing import Process,Queue
    
    import time,random,os
    
    def consumer(q):
    
        while True:
    
            res=q.get()
    
            if res is None:break #收到结束信号则结束
    
            time.sleep(random.randint(1,3))
    
            print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
    
     
    
    def producer(q):
    
        for i in range(10):
    
            time.sleep(random.randint(1,3))
    
            res='包子%s' %i
    
            q.put(res)
    
            print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    
        q.put(None) #发送结束信号
    
    if __name__ == '__main__':
    
        q=Queue()
    
        #生产者们:即厨师们
    
        p1=Process(target=producer,args=(q,))
    
     
    
        #消费者们:即吃货们
    
        c1=Process(target=consumer,args=(q,))
    
     
    
        #开始
    
        p1.start()
    
        c1.start()
    
        print('')
    
     
    生产者在生产完毕后发送结束信号None

    注意:结束信号None,不一定要由生产者发,主进程里同样可以发,但主进程需要等生产者结束后才应该发送该信号

    ############主进程在生产者生产完毕后发送结束信号None##############
    
    from multiprocessing import Process,Queue
    
    import time,random,os
    
    def consumer(q):
    
        while True:
    
            res=q.get()
    
            if res is None:break #收到结束信号则结束
    
            time.sleep(random.randint(1,3))
    
            print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
    
     
    
    def producer(q):
    
        for i in range(2):
    
            time.sleep(random.randint(1,3))
    
            res='包子%s' %i
    
            q.put(res)
    
            print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    
     
    
    if __name__ == '__main__':
    
        q=Queue()
    
        #生产者们:即厨师们
    
        p1=Process(target=producer,args=(q,))
    
     
    
        #消费者们:即吃货们
    
        c1=Process(target=consumer,args=(q,))
    
     
    
        #开始
    
        p1.start()
    
        c1.start()
    
     
    
        p1.join()
    
        q.put(None) #发送结束信号
    
        print('')
    
     
    进程在生产者生产完毕后发送结束信号None

    但上述解决方式,在有多个生产者和多个消费者时,我们则需要用一个很low的方式去解决

    ##############有几个消费者就需要发送几次结束信号:相当low################
    
    from multiprocessing import Process,Queue
    
    import time,random,os
    
    def consumer(q):
    
        while True:
    
            res=q.get()
    
            if res is None:break #收到结束信号则结束
    
            time.sleep(random.randint(1,3))
    
            print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
    
     
    
    def producer(name,q):
    
        for i in range(2):
    
            time.sleep(random.randint(1,3))
    
            res='%s%s' %(name,i)
    
            q.put(res)
    
            print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    
     
    
     
    
     
    
    if __name__ == '__main__':
    
        q=Queue()
    
        #生产者们:即厨师们
    
        p1=Process(target=producer,args=('包子',q))
    
        p2=Process(target=producer,args=('骨头',q))
    
        p3=Process(target=producer,args=('泔水',q))
    
     
    
        #消费者们:即吃货们
    
        c1=Process(target=consumer,args=(q,))
    
        c2=Process(target=consumer,args=(q,))
    
     
    
        #开始
    
        p1.start()
    
        p2.start()
    
        p3.start()
    
        c1.start()
    
     
    
        p1.join() #必须保证生产者全部生产完毕,才应该发送结束信号
    
        p2.join()
    
        p3.join()
    
        q.put(None) #有几个消费者就应该发送几次结束信号None
    
        q.put(None) #发送结束信号
    
        print('')
    有几个消费者就需要发送几次结束信号:相当low

    其实我们的思路无非是发送结束信号而已,有另外一种队列提供了这种机制

    #JoinableQueue([maxsize]):这就像是一个Queue对象,但队列允许项目的使用者通知生成者项目已经被成功处理。通知进程是使用共享的信号和条件变量来实现的。
    
       #参数介绍:
    
        maxsize是队列中允许最大项数,省略则无大小限制。   
    
      #方法介绍:
    
        JoinableQueue的实例p除了与Queue对象相同的方法之外还具有:
    
        q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
    
        q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

    3.10.6 实例应用一

    from multiprocessing import Process,JoinableQueue
    
    import time,random,os
    
    def consumer(q):
    
        while True:
    
            res=q.get()
    
            time.sleep(random.randint(1,3))
    
            print('33[45m%s 吃 %s33[0m' %(os.getpid(),res))
    
     
    
            q.task_done() #向q.join()发送一次信号,证明一个数据已经被取走了
    
     
    
    def producer(name,q):
    
        for i in range(10):
    
            time.sleep(random.randint(1,3))
    
            res='%s%s' %(name,i)
    
            q.put(res)
    
            print('33[44m%s 生产了 %s33[0m' %(os.getpid(),res))
    
        q.join()
    
     
    
     
    
    if __name__ == '__main__':
    
        q=JoinableQueue()
    
        #生产者们:即厨师们
    
        p1=Process(target=producer,args=('包子',q))
    
        p2=Process(target=producer,args=('骨头',q))
    
        p3=Process(target=producer,args=('泔水',q))
    
     
    
        #消费者们:即吃货们
    
        c1=Process(target=consumer,args=(q,))
    
        c2=Process(target=consumer,args=(q,))
    
        c1.daemon=True
    
        c2.daemon=True
    
     
    
        #开始
    
        p_l=[p1,p2,p3,c1,c2]
    
        for p in p_l:
    
            p.start()
    
     
    
        p1.join()
    
        p2.join()
    
        p3.join()
    
        print('')
    
       
    
        #主进程等--->p1,p2,p3等---->c1,c2
    
        #p1,p2,p3结束了,证明c1,c2肯定全都收完了p1,p2,p3发到队列的数据
    
        #因而c1,c2也没有存在的价值了,应该随着主进程的结束而结束,所以设置成守护进程
    View Code

    3.10.7 实例应用二

    # from multiprocessing import Process,Queue
    
    # import time
    
    # import random
    
    #
    
    # def producer(name,food,q):
    
    #     for i in range(3):
    
    #         res='%s%s' %(food,i)
    
    #         time.sleep(random.randint(1,3))
    
    #         q.put(res)
    
    #         print('厨师[%s]生产了<%s>' %(name,res))
    
    #
    
    #
    
    # def consumer(name,q):
    
    #     while True:
    
    #         res=q.get()
    
    #         if res is None:break
    
    #         time.sleep(random.randint(1,3))
    
    #         print('吃货[%s]吃了<%s>' % (name, res))
    
    #
    
    # if __name__ == '__main__':
    
    #     #队列
    
    #     q=Queue()
    
    #     #生产者们
    
    #     p1=Process(target=producer,args=('egon1','泔水',q))
    
    #     p2=Process(target=producer,args=('egon2','骨头',q))
    
    #     #消费者们
    
    #     c1=Process(target=consumer,args=('管廷威',q))
    
    #     c2=Process(target=consumer,args=('oldboy',q))
    
    #     c3=Process(target=consumer,args=('oldgirl',q))
    
    #
    
    #
    
    #     p1.start()
    
    #     p2.start()
    
    #     c1.start()
    
    #     c2.start()
    
    #     c3.start()
    
    #
    
    #     p1.join()
    
    #     p2.join()
    
    #     q.put(None)
    
    #     q.put(None)
    
    #     q.put(None)
    
    #     print('主')
    View Code

    3.10.8 JoinableQueue与守护进程应用

    from multiprocessing import Process,JoinableQueue
    
    import time
    
    import random
    
     
    
    def producer(name,food,q):
    
        for i in range(3):
    
            res='%s%s' %(food,i)
    
            time.sleep(random.randint(1,3))
    
            q.put(res)
    
            print('厨师[%s]生产了<%s>' %(name,res))
    
     
    
     
    
    def consumer(name,q):
    
        while True:
    
            res=q.get()
    
            if res is None:break
    
            time.sleep(random.randint(1,3))
    
            print('吃货[%s]吃了<%s>' % (name, res))
    
            q.task_done()
    
     
    
    if __name__ == '__main__':
    
        #队列
    
        q=JoinableQueue()
    
        #生产者们
    
        p1=Process(target=producer,args=('egon1','泔水',q))
    
        p2=Process(target=producer,args=('egon2','骨头',q))
    
        #消费者们
    
        c1=Process(target=consumer,args=('管廷威',q))
    
        c2=Process(target=consumer,args=('oldboy',q))
    
        c3=Process(target=consumer,args=('oldgirl',q))
    
        c1.daemon=True
    
        c2.daemon=True
    
        c3.daemon=True
    
     
    
        p1.start()
    
        p2.start()
    
        c1.start()
    
        c2.start()
    
        c3.start()
    
     
    
        p1.join()
    
        p2.join()
    
        q.join()
    
        print('')
    View Code

    第4章 python并发编程之多线程理论篇

    4.1 什么是线程

    在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程

      线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程

          车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线

          流水线的工作需要电源,电源就相当于cpu

      所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位

      多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。

          例如,北京地铁与上海地铁是不同的进程,而北京地铁里的13号线是一个线程,北京地铁所有的线路共享北京地铁所有的资源,比如所有的乘客可以被所有线路拉。

    4.2 线程的创建开销小

    4.2.1 创建进程的开销要远大于线程?

    如果我们的软件是一个工厂,该工厂有多条流水线,流水线工作需要电源,电源只有一个即cpu(单核cpu)

    一个车间就是一个进程,一个车间至少一条流水线(一个进程至少一个线程)

    创建一个进程,就是创建一个车间(申请空间,在该空间内建至少一条流水线)

    而建线程,就只是在一个车间内造一条流水线,无需申请空间,所以创建开销小

    4.2.2 进程之间是竞争关系,线程之间是协作关系?

    车间直接是竞争/抢电源的关系,竞争(不同的进程直接是竞争关系,是不同的程序员写的程序运行的,迅雷抢占其他进程的网速,360把其他进程当做病毒干死)

    一个车间的不同流水线式协同工作的关系(同一个进程的线程之间是合作关系,是同一个程序写的程序内开启动,迅雷内的线程是合作关系,不会自己干自己)

    4.3 线程与进程的区别

    1、Threads share the address space of the process that created it; processes have their own address space.
    
    线程的地址空间共享,每个进程有自己的地址空间。
    
    2、Threads have direct access to the data segment of its process; processes have their own copy of the data segment of the parent process.
    
    一个进程中的线程直接接入他的进程的数据段,但是每个进程都有他们自己的从父进程拷贝过来的数据段
    
    3、Threads can directly communicate with other threads of its process; processes must use interprocess communication to communicate with sibling processes.
    
    一个进程内部的线程之间能够直接通信,进程之间必须使用进程间通信实现通信
    
    4、New threads are easily created; new processes require duplication of the parent process.
    
    新的线程很容易被创建,新的进程需要从父进程复制
    
    5、Threads can exercise considerable control over threads of the same process; processes can only exercise control over child processes.
    
    一个进程中的线程间能够有相当大的控制力度,进程仅仅只能控制他的子进程
    
    6、Changes to the main thread (cancellation, priority change, etc.) may affect the behavior of the other threads of the process; changes to the parent process does not affect child processes.
    
    改变主线程(删除,优先级改变等)可能影响这个进程中的其他线程;修改父进程不会影响子进程

    4.4 为何要用多线程

    多线程指的是,在一个进程中开启多个线程,简单的讲:如果多个任务共用一块地址空间,那么必须在一个进程内开启多个线程。详细的讲分为4点:

      1. 多线程共享一个进程的地址空间

          2. 线程比进程更轻量级,线程比进程更容易创建可撤销,在许多操作系统中,创建一个线程比创建一个进程要快10-100倍,在有大量线程需要动态和快速修改时,这一特性很有用

          3. 若多个线程都是cpu密集型的,那么并不能获得性能上的增强,但是如果存在大量的计算和大量的I/O处理,拥有多个线程允许这些活动彼此重叠运行,从而会加快程序执行的速度。

          4. 在多cpu系统中,为了最大限度的利用多核,可以开启多个线程,比开进程开销要小的多。(这一条并不适用于python)

    4.5 多线程的应用举例

     

    开启一个字处理软件进程,该进程肯定需要办不止一件事情,比如监听键盘输入,处理文字,定时自动将文字保存到硬盘,这三个任务操作的都是同一块数据,因而不能用多进程。只能在一个进程里并发地开启三个线程,如果是单线程,那就只能是,键盘输入时,不能处理文字和自动保存,自动保存时又不能输入和处理文字。

    第5章 python并发编程之多线程使用篇

    5.1 threading模块介绍

    multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性,因而不再详细介绍

    官网链接:https://docs.python.org/3/library/threading.html?highlight=threading#

    5.2 开启线程的两种方式

    5.2.1 方式一

    #方式一
    
    from threading import Thread
    
    import time
    
    def sayhi(name):
    
        time.sleep(2)
    
        print('%s say hello' %name)
    
     
    
    if __name__ == '__main__':
    
        t=Thread(target=sayhi,args=('egon',))
    
        t.start()
    
        print('主线程')
    方式一

    5.2.2 方式二

    #方式二
    
    from threading import Thread
    
    import time
    
    class Sayhi(Thread):
    
        def __init__(self,name):
    
            super().__init__()
    
            self.name=name
    
        def run(self):
    
            time.sleep(2)
    
            print('%s say hello' % self.name)
    
     
    
     
    
    if __name__ == '__main__':
    
        t = Sayhi('egon')
    
        t.start()
    
        print('主线程')
    方式二

     

    5.2.3 课堂讲解实例

    # from threading import Thread
    
    # import time
    
    # import random
    
    #
    
    # def piao(name):
    
    #     print('%s is piaoing' %name)
    
    #     time.sleep(random.randint(1,3))
    
    #     print('%s is piao end' %name)
    
    #
    
    #
    
    # if __name__ == '__main__':
    
    #     t1=Thread(target=piao,args=('alex',))
    
    #     t1.start()
    
    #     print('主')
    
     
    
     
    
    from threading import Thread
    
    import time
    
    import random
    
     
    
    class MyThread(Thread):
    
        def __init__(self,name):
    
            super().__init__()
    
            self.name=name
    
     
    
        def run(self):
    
            print('%s is piaoing' %self.name)
    
            time.sleep(random.randint(1,3))
    
            print('%s is piao end' %self.name)
    
     
    
     
    
    if __name__ == '__main__':
    
        t1=MyThread('alex')
    
        t1.start()
    
        print('')
    View Code

    5.3 在一个进程下开启多个线程与在一个进程下开启多个子进程的区别

    5.3.1 谁的开启速度快

    from threading import Thread
    
    from multiprocessing import Process
    
    import os
    
     
    
    def work():
    
        print('hello')
    
     
    
    if __name__ == '__main__':
    
        #在主进程下开启线程
    
        t=Thread(target=work)
    
        t.start()
    
        print('主线程/主进程')
    
        '''
    
        打印结果:
    
        hello
    
        主线程/主进程
    
        '''
    
     
    
        #在主进程下开启子进程
    
        t=Process(target=work)
    
        t.start()
    
        print('主线程/主进程')
    
        '''
    
        打印结果:
    
        主线程/主进程
    
        hello
    
        '''
    View Code

    5.3.2 瞅一瞅pid

    from threading import Thread
    
    from multiprocessing import Process
    
    import os
    
     
    
    def work():
    
        print('hello',os.getpid())
    
     
    
    if __name__ == '__main__':
    
        #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
    
        t1=Thread(target=work)
    
        t2=Thread(target=work)
    
        t1.start()
    
        t2.start()
    
        print('主线程/主进程pid',os.getpid())
    
     
    
        #part2:开多个进程,每个进程都有不同的pid
    
        p1=Process(target=work)
    
        p2=Process(target=work)
    
        p1.start()
    
        p2.start()
    
        print('主线程/主进程pid',os.getpid())
    View Code

    5.3.3 同一进程内的线程共享该进程的数据?

    from  threading import Thread
    
    from multiprocessing import Process
    
    import os
    
    def work():
    
        global n
    
        n=0
    
     
    
    if __name__ == '__main__':
    
        # n=100
    
        # p=Process(target=work)
    
        # p.start()
    
        # p.join()
    
        # print('主',n) #毫无疑问子进程p已经将自己的全局的n改成了0,但改的仅仅是它自己的,查看父进程的n仍然为100
    
     
    
     
    
        n=1
    
        t=Thread(target=work)
    
        t.start()
    
        t.join()
    
        print('',n) #查看结果为0,因为同一进程内的线程之间共享进程内的数据
    View Code

    5.3.4 课堂讲解实例线程与进程的区别

    # from threading import Thread
    
    # import time
    
    # import random
    
    # import os
    
    #
    
    # def piao():
    
    #     print('%s is piaoing' %os.getpid())
    
    #     time.sleep(random.randint(1,3))
    
    #
    
    #
    
    # if __name__ == '__main__':
    
    #     t1=Thread(target=piao,)
    
    #     t2=Thread(target=piao,)
    
    #     t3=Thread(target=piao,)
    
    #     t1.start()a
    
    #     t2.start()
    
    #     t3.start()
    
    #     print('主',os.getpid())
    
    #
    
     
    
     
    
    from threading import Thread
    
    import time
    
    import random
    
    import os
    
     
    
    n=100
    
    def piao():
    
        global n
    
        n=0
    
     
    
    if __name__ == '__main__':
    
        t1=Thread(target=piao,)
    
        t1.start()
    
        t1.join()
    
        print('',n)
    View Code

    5.4 练习

    5.4.1 练习一课堂讲解实例

    #####################服务端##################
    
    from threading import Thread,current_thread
    
    from socket import *
    
     
    
    def comunicate(conn):
    
        print('子线程:%s' %current_thread().getName())
    
        while True:
    
            try:
    
                data=conn.recv(1024)
    
                if not data:break
    
                conn.send(data.upper())
    
            except ConnectionResetError:
    
                break
    
        conn.close()
    
     
    
    def server(ip,port):
    
        print('主线程:%s' %current_thread().getName())
    
        server = socket(AF_INET, SOCK_STREAM)
    
        server.bind((ip,port))
    
        server.listen(5)
    
     
    
        while True:
    
            conn, addr = server.accept()
    
            print(addr)
    
            # comunicate(conn)
    
            t=Thread(target=comunicate,args=(conn,))
    
            t.start()
    
     
    
        server.close()
    
     
    
    if __name__ == '__main__':
    
        server('127.0.0.1', 8081)
    服务端
    ###################客户端######################
    
    from socket import *
    
     
    
    client=socket(AF_INET,SOCK_STREAM)
    
    client.connect(('127.0.0.1',8081))
    
     
    
    while True:
    
        msg=input('>>: ').strip()
    
        if not msg:continue
    
        client.send(msg.encode('utf-8'))
    
        data=client.recv(1024)
    
        print(data.decode('utf-8'))
    
     
    
    client.close()
    客户端

    5.4.2 练习二多线程并发的socket

    ###################多线程并发的socket服务端##################
    
    #_*_coding:utf-8_*_
    
    #!/usr/bin/env python
    
    import multiprocessing
    
    import threading
    
     
    
    import socket
    
    s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    
    s.bind(('127.0.0.1',8080))
    
    s.listen(5)
    
     
    
    def action(conn):
    
        while True:
    
            data=conn.recv(1024)
    
            print(data)
    
            conn.send(data.upper())
    
     
    
    if __name__ == '__main__':
    
     
    
        while True:
    
            conn,addr=s.accept()
    
     
    
     
    
            p=threading.Thread(target=action,args=(conn,))
    
            p.start()
    
     
    多线程并发的socket服务端
    ########################客户端######################
    
    #_*_coding:utf-8_*_
    
    #!/usr/bin/env python
    
     
    
     
    
    import socket
    
     
    
    s=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    
    s.connect(('127.0.0.1',8080))
    
     
    
    while True:
    
        msg=input('>>: ').strip()
    
        if not msg:continue
    
     
    
        s.send(msg.encode('utf-8'))
    
        data=s.recv(1024)
    
        print(data)
    客户端

    5.4.3 练习三

    练习三:三个任务,一个接收用户输入,一个将用户输入的内容格式化成大写,一个将格式化后的结果存入文件

    from threading import Thread
    
    msg_l=[]
    
    format_l=[]
    
    def talk():
    
        while True:
    
            msg=input('>>: ').strip()
    
            if not msg:continue
    
            msg_l.append(msg)
    
     
    
    def format_msg():
    
        while True:
    
            if msg_l:
    
                res=msg_l.pop()
    
                format_l.append(res.upper())
    
     
    
    def save():
    
        while True:
    
            if format_l:
    
                with open('db.txt','a',encoding='utf-8') as f:
    
                    res=format_l.pop()
    
                    f.write('%s
    ' %res)
    
     
    
    if __name__ == '__main__':
    
        t1=Thread(target=talk)
    
        t2=Thread(target=format_msg)
    
        t3=Thread(target=save)
    
        t1.start()
    
        t2.start()
    
        t3.start()
    View Code

    5.5 线程相关的其他方法

    5.5.1 Thread实例对象的方法

    Thread实例对象的方法
    
      # isAlive(): 返回线程是否活动的。
    
      # getName(): 返回线程名。
    
      # setName(): 设置线程名。
    
     
    
    threading模块提供的一些方法:
    
      # threading.currentThread(): 返回当前的线程变量。
    
      # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
    
      # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

    5.5.2 实例

    from threading import Thread
    
    import threading
    
    from multiprocessing import Process
    
    import os
    
     
    
    def work():
    
        import time
    
        time.sleep(3)
    
        print(threading.current_thread().getName())
    
     
    
     
    
    if __name__ == '__main__':
    
        #在主进程下开启线程
    
        t=Thread(target=work)
    
        t.start()
    
     
    
        print(threading.current_thread().getName())
    
        print(threading.current_thread()) #主线程
    
        print(threading.enumerate()) #连同主线程在内有两个运行的线程
    
        print(threading.active_count())
    
        print('主线程/主进程')
    
     
    
        '''
    
        打印结果:
    
        MainThread
    
        <_MainThread(MainThread, started 140735268892672)>
    
        [<_MainThread(MainThread, started 140735268892672)>, <Thread(Thread-1, started 123145307557888)>]
    
        主线程/主进程
    
        Thread-1
    
        '''
    
     
    
    主线程等待子线程结束
    
     
    
    from threading import Thread
    
    import time
    
    def sayhi(name):
    
        time.sleep(2)
    
        print('%s say hello' %name)
    
     
    
    if __name__ == '__main__':
    
        t=Thread(target=sayhi,args=('egon',))
    
        t.start()
    
        t.join()
    
        print('主线程')
    
        print(t.is_alive())
    
        '''
    
        egon say hello
    
        主线程
    
        False
    
        '''
    View Code

    5.6 守护线程

    5.6.1 描述说明

    无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁

    需要强调的是:运行完毕并非终止运行

    #1.对主进程来说,运行完毕指的是主进程代码运行完毕
    #2.对主线程来说,运行完毕指的是主线程所在的进程内所有非守护线程统统运行完毕,主线程才算运行完毕

    详细解释:

    #1 主进程在其代码结束后就已经算运行完毕了(守护进程在此时就被回收),然后主进程会一直等非守护的子进程都运行完毕后回收子进程的资源(否则会产生僵尸进程),才会结束,
    #2 主线程在其他非守护线程运行完毕后才算运行完毕(守护线程在此时就被回收)。因为主线程的结束意味着进程的结束,进程整体的资源都将被回收,而进程必须保证非守护线程都运行完毕后才能结束。

    5.6.2 博客实例

    from threading import Thread
    
    import time
    
    def sayhi(name):
    
        time.sleep(2)
    
        print('%s say hello' %name)
    
     
    
    if __name__ == '__main__':
    
        t=Thread(target=sayhi,args=('egon',))
    
        t.setDaemon(True) #必须在t.start()之前设置
    
        t.start()
    
     
    
        print('主线程')
    
        print(t.is_alive())
    
        '''
    
        主线程
    
        True
    
        '''
    View Code

    5.6.3 迷惑人的例子

    ########################迷惑人的例子#######################
    
    from threading import Thread
    
    import time
    
    def foo():
    
        print(123)
    
        time.sleep(1)
    
        print("end123")
    
     
    
    def bar():
    
        print(456)
    
        time.sleep(3)
    
        print("end456")
    
     
    
     
    
    t1=Thread(target=foo)
    
    t2=Thread(target=bar)
    
     
    
    t1.daemon=True
    
    t1.start()
    
    t2.start()
    
    print("main-------")
    
     
    迷惑人的例子

    5.6.4 课堂讲解实例

    # from threading import Thread
    
    # import time
    
    #
    
    # def sayhi(name):
    
    #     print('====>')
    
    #     time.sleep(2)
    
    #     print('%s say hello' %name)
    
    #
    
    # if __name__ == '__main__':
    
    #     t=Thread(target=sayhi,args=('egon',))
    
    #     # t.setDaemon(True)
    
    #     t.daemon=True
    
    #     t.start()
    
    #
    
    #     print('主线程')
    
    #
    
    from threading import Thread
    
    import time
    
    def foo():
    
        print(123)
    
        time.sleep(1)
    
        print("end123")
    
     
    
    def bar():
    
        print(456)
    
        time.sleep(3)
    
        print("end456")
    
     
    
    if __name__ == '__main__':
    
        t1=Thread(target=foo)
    
        t2=Thread(target=bar)
    
     
    
        t1.daemon=True
    
        t1.start()
    
        t2.start()
    
        print("main-------")
    View Code

    5.7 Python GIL(GIL解释器锁)

    5.7.1 介绍

    '''
    
    定义:
    
    In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple
    
    native threads from executing Python bytecodes at once. This lock is necessary mainly
    
    because CPython’s memory management is not thread-safe. (However, since the GIL
    
    exists, other features have grown to depend on the guarantees that it enforces.)
    
    '''
    
    结论:在Cpython解释器中,同一个进程下开启的多线程,同一时刻只能有一个线程执行,无法利用多核优势

    首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL。

    5.7.2 GIL介绍

    GIL本质就是一把互斥锁,既然是互斥锁,所有互斥锁的本质都一样,都是将并发运行变成串行,以此来控制同一时间内共享数据只能被一个任务所修改,进而保证数据安全。

    可以肯定的一点是:保护不同的数据的安全,就应该加不同的锁。

    要想了解GIL,首先确定一点:每次执行python程序,都会产生一个独立的进程。例如python test.py,python aaa.py,python bbb.py会产生3个不同的python进程

    ############验证python test.py只会产生一个进程################
    
    '''
    
    #验证python test.py只会产生一个进程
    
    #test.py内容
    
    import os,time
    
    print(os.getpid())
    
    time.sleep(1000)
    
    '''
    
    python3 test.py
    
    #在windows下
    
    tasklist |findstr python
    
    #在linux下
    
    ps aux |grep python
    验证python test.py只会产生一个进程

    在一个python的进程内,不仅有test.py的主线程或者由该主线程开启的其他线程,还有解释器开启的垃圾回收等解释器级别的线程,总之,所有线程都运行在这一个进程内,毫无疑问

    #1 所有数据都是共享的,这其中,代码作为一种数据也是被所有线程共享的(test.py的所有代码以及Cpython解释器的所有代码)
    例如:test.py定义一个函数work(代码内容如下图),在进程内所有线程都能访问到work的代码,于是我们可以开启三个线程然后target都指向该代码,能访问到意味着就是可以执行。
    #2 所有线程的任务,都需要将任务的代码当做参数传给解释器的代码去执行,即所有的线程要想运行自己的任务,首先需要解决的是能够访问到解释器的代码。

    综上:

    如果多个线程的target=work,那么执行流程是

    多个线程先访问到解释器的代码,即拿到执行权限,然后将target的代码交给解释器的代码去执行

    解释器的代码是所有线程共享的,所以垃圾回收线程也可能访问到解释器的代码而去执行,这就导致了一个问题:对于同一个数据100,可能线程1执行x=100的同时,而垃圾回收执行的是回收100的操作,解决这种问题没有什么高明的方法,就是加锁处理,如下图的GIL,保证python解释器同一时间只能执行一个任务的代码。

     

    5.7.3 GIL与Lock

    GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理,如下图

     

    5.7.4 GIL与多线程

    有了GIL的存在,同一时刻同一进程中只有一个线程被执行

    听到这里,有的同学立马质问:进程可以利用多核,但是开销大,而python的多线程开销小,但却无法利用多核优势,也就是说python没用了,php才是最牛逼的语言?

    别着急啊,老娘还没讲完呢。

    要解决这个问题,我们需要在几个点上达成一致:

    #1. cpu到底是用来做计算的,还是用来做I/O的?
    
    #2. 多cpu,意味着可以有多个核并行完成计算,所以多核提升的是计算性能
    
    #3. 每个cpu一旦遇到I/O阻塞,仍然需要等待,所以多核对I/O操作没什么用处

    一个工人相当于cpu,此时计算相当于工人在干活,I/O阻塞相当于为工人干活提供所需原材料的过程,工人干活的过程中如果没有原材料了,则工人干活的过程需要停止,直到等待原材料的到来。

    如果你的工厂干的大多数任务都要有准备原材料的过程(I/O密集型),那么你有再多的工人,意义也不大,还不如一个人,在等材料的过程中让工人去干别的活,

    反过来讲,如果你的工厂原材料都齐全,那当然是工人越多,效率越高

    结论:

      对计算来说,cpu越多越好,但是对于I/O来说,再多的cpu也没用

      当然对运行一个程序来说,随着cpu的增多执行效率肯定会有所提高(不管提高幅度多大,总会有所提高),这是因为一个程序基本上不会是纯计算或者纯I/O,所以我们只能相对的去看一个程序到底是计算密集型还是I/O密集型,从而进一步分析python的多线程到底有无用武之地

    #分析:
    
    我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是:
    
    方案一:开启四个进程
    
    方案二:一个进程下,开启四个线程
    
     
    
    #单核情况下,分析结果:
    
      如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜
    
      如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜
    
     
    
    #多核情况下,分析结果:
    
      如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜
    
      如果四个任务是I/O密集型,再多的核也解决不了I/O问题,方案二胜
    
     
    
     
    
    #结论:现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换),但是,对于IO密集型的任务效率还是有显著提升的。
    View Code

    5.7.5 多线程性能测试

    #################计算密集型:多进程效率高#####################
    
    from multiprocessing import Process
    
    from threading import Thread
    
    import os,time
    
    def work():
    
        res=0
    
        for i in range(100000000):
    
            res*=i
    
     
    
     
    
    if __name__ == '__main__':
    
        l=[]
    
        print(os.cpu_count()) #本机为4核
    
        start=time.time()
    
        for i in range(4):
    
            p=Process(target=work) #耗时5s多
    
            p=Thread(target=work) #耗时18s多
    
            l.append(p)
    
            p.start()
    
        for p in l:
    
            p.join()
    
        stop=time.time()
    
        print('run time is %s' %(stop-start))
    计算密集型:多进程效率高
    ####################I/O密集型:多线程效率高###################
    
    from multiprocessing import Process
    
    from threading import Thread
    
    import threading
    
    import os,time
    
    def work():
    
        time.sleep(2)
    
        print('===>')
    
     
    
    if __name__ == '__main__':
    
        l=[]
    
        print(os.cpu_count()) #本机为4核
    
        start=time.time()
    
        for i in range(400):
    
            # p=Process(target=work) #耗时12s多,大部分时间耗费在创建进程上
    
            p=Thread(target=work) #耗时2s多
    
            l.append(p)
    
            p.start()
    
        for p in l:
    
            p.join()
    
        stop=time.time()
    
        print('run time is %s' %(stop-start))
    I/O密集型:多线程效率高

    5.7.6 课堂讲解实例

    # from threading import Thread,Lock
    
    # import time
    
    #
    
    # n=100
    
    #
    
    # def task():
    
    #     global n
    
    #     with mutex:
    
    #         temp=n
    
    #         time.sleep(0.1)
    
    #         n=temp-1
    
    #
    
    # if __name__ == '__main__':
    
    #     start_time=time.time()
    
    #     mutex=Lock()
    
    #     t_l=[]
    
    #     for i in range(100):
    
    #         t=Thread(target=task)
    
    #         t_l.append(t)
    
    #         t.start()
    
    #
    
    #     for t in t_l:
    
    #         t.join()
    
    #     stop_time=time.time()
    
    #     print('主',n)
    
    #     print('run time is %s' %(stop_time-start_time))
    View Code

    5.7.7 计算密集型与I/O密集型

    #计算密集型:开多进程
    
    # from multiprocessing import Process
    
    # from threading import Thread
    
    # import os,time
    
    # def work():
    
    #     res=0
    
    #     for i in range(100000000):
    
    #         res*=i
    
    #
    
    #
    
    # if __name__ == '__main__':
    
    #     l=[]
    
    #     start=time.time()
    
    #     for i in range(4):
    
    #         # p=Process(target=work) #5.826333284378052
    
    #         p=Thread(target=work) #run time is 19.91913938522339
    
    #         l.append(p)
    
    #         p.start()
    
    #     for p in l:
    
    #         p.join()
    
    #     stop=time.time()
    
    #     print('run time is %s' %(stop-start))
    
     
    计算密集型:开多进程
    #I/O密集型:多线程效率高
    
    # from multiprocessing import Process
    
    # from threading import Thread
    
    # import threading
    
    # import os,time
    
    # def work():
    
    #     time.sleep(2)
    
    #
    
    # if __name__ == '__main__':
    
    #     l=[]
    
    #     start=time.time()
    
    #     for i in range(400):
    
    #         # p=Process(target=work) # 12.465712785720825
    
    #         p=Thread(target=work) #2.037116765975952
    
    #         l.append(p)
    
    #         p.start()
    
    #     for p in l:
    
    #         p.join()
    
    #     stop=time.time()
    
    #     print('run time is %s' %(stop-start))
    
    #
    
     
    
     
    
     
    
    from threading import Thread,Lock
    
    import time
    
     
    
    n=100
    
     
    
    def task():
    
        global n
    
        mutex.acquire()
    
        temp=n
    
        time.sleep(0.1)
    
        n=temp-1
    
        mutex.release()
    
     
    
    if __name__ == '__main__':
    
        mutex=Lock()
    
        for i in range(3):
    
            t=Thread(target=task)
    
            t.start()
    I/O密集型:多线程效率高

    应用:

    多线程用于IO密集型,如socket,爬虫,web

    多进程用于计算密集型,如金融分析

    第6章 补充:paramiko模块

    6.1 介绍

    paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作,值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实。

    6.2 下载安装

    pip3 install paramiko     #在python3中
    #####################在python2中###################
    
    pycrypto,由于 paramiko 模块内部依赖pycrypto,所以先下载安装pycrypto #在python2中
    
    pip3 install pycrypto
    
    pip3 install paramiko
    
    注:如果在安装pycrypto2.0.1时发生如下错误
    
            command 'gcc' failed with exit status 1...
    
    可能是缺少python-dev安装包导致
    
    如果gcc没有安装,请事先安装gcc
    
     
    在python2中

    6.3 使用

    6.3.1 SSHClient

    用于连接远程服务器并执行基本命令

    ###############基于用户名密码连接:#####################
    
    import paramiko
    
     
    
    # 创建SSH对象
    
    ssh = paramiko.SSHClient()
    
    # 允许连接不在know_hosts文件中的主机
    
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    # 连接服务器
    
    ssh.connect(hostname='120.92.84.249', port=22, username='root', password='xxx')
    
     
    
    # 执行命令
    
    stdin, stdout, stderr = ssh.exec_command('df')
    
    # 获取命令结果
    
    result = stdout.read()
    
    print(result.decode('utf-8'))
    
    # 关闭连接
    
    ssh.close()
    
     
    基于用户名密码连接:
    ##############SSHClient 封装 Transport###################
    
    import paramiko
    
     
    
    transport = paramiko.Transport(('120.92.84.249', 22))
    
    transport.connect(username='root', password='xxx')
    
     
    
    ssh = paramiko.SSHClient()
    
    ssh._transport = transport
    
     
    
    stdin, stdout, stderr = ssh.exec_command('df')
    
    res=stdout.read()
    
    print(res.decode('utf-8'))
    
     
    
    transport.close()
    
     
    SSHClient 封装

    ##########基于公钥密钥连接:###############

    客户端文件名:id_rsa

    服务端必须有文件名:authorized_keys(在用ssh-keygen时,必须制作一个authorized_keys,可以用ssh-copy-id来制作)

    import paramiko
    
     
    
    private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa')
    
     
    
    # 创建SSH对象
    
    ssh = paramiko.SSHClient()
    
    # 允许连接不在know_hosts文件中的主机
    
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    
    # 连接服务器
    
    ssh.connect(hostname='120.92.84.249', port=22, username='root', pkey=private_key)
    
     
    
    # 执行命令
    
    stdin, stdout, stderr = ssh.exec_command('df')
    
    # 获取命令结果
    
    result = stdout.read()
    
    print(result.decode('utf-8'))
    
    # 关闭连接
    
    ssh.close()
    View Code
    ##################SSHClient 封装 Transport###################
    
    import paramiko
    
     
    
    private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa')
    
     
    
    transport = paramiko.Transport(('120.92.84.249', 22))
    
    transport.connect(username='root', pkey=private_key)
    
     
    
    ssh = paramiko.SSHClient()
    
    ssh._transport = transport
    
     
    
    stdin, stdout, stderr = ssh.exec_command('df')
    
    result=stdout.read()
    
    print(result.decode('utf-8'))
    
     
    
    transport.close()
    
     
    SSHClient 封装 Transport
    ##################基于私钥字符串进行连接####################
    
    import paramiko
    
    from io import StringIO
    
     
    
    key_str = """-----BEGIN RSA PRIVATE KEY-----
    
    MIIEoQIBAAKCAQEAsJmFLrSeCumJvga0Gl5O5wVOVwMIy2MpqIyQPi5J87dg89a4
    
    Da9fczJog7qoSbRwHFOQoCHNphSlp5KPhGsF6RJewkIw9H1UKV4dCOyl/4HOAkAD
    
    rKrsEDmrJ9JlzF2GTTZSnTgVQWcvBS2RKB4eM2R9aJ11xV6X2Hk4YDLTExIWeabb
    
    h2TUKw0iyjI8pRuYLKkF2X16u9TBwfOTroGYgiNFHQvhsQppbEbI49NF2XkCkFMi
    
    8/7tLjf95InE/VUUq56JqfzyHwdpHou+waXbwtvGgXN3sz+KkuEv6R2qDz06upZV
    
    FCZRRpDhzoR8Uh/UEzTGZb8z7FB6EJXUiXJikQIBIwKCAQBBmBuGYFf1bK+BGG7H
    
    9ySe81ecqVsJtx4aCFLVRGScWg4RbQKIvXs5an6XU/VdNGQnx0RYvBkvDvuzRRC8
    
    J8Bd4kB0CfTtGJuaVigKoQp02HEWx1HSa17+tlWD0c4KFBvwywi+DYQ83S64x8gz
    
    eOalX9bPFenqORPUD8R7gJeKvPVc6ZTPeorpuH7u9xayP0Eop8qKxZza9Xh3foVj
    
    Qo4IxoYnDN57CIRX5PFSlDDggpmr8FtRF4nAxmFq8LhSp05ivzX/Ku1SNHdaMWZO
    
    7va8tISXdLI5m0EGzoVoBvohIbwlxI6kfmamrh6Eas2Jnsc4CLzMsR4jBWt0LHLv
    
    /SLnAoGBANaEUf/Jptab9G/xD9W2tw/636i3gLpTPY9KPtCcAxqStNeT6RAWZ5HF
    
    lKJg+NKpu3pI45ldAwvts0i+aCZk2xakEWIZWqCmXm31JSPDQTaMGe7H0vOmUaxx
    
    ncdpBVdvhMbfFUgei15iKfuafgrKaS9oIkntXEgrC+3wBOI0Gbx3AoGBANLAGxAF
    
    TK7ydr+Q1+6/ujs6e8WsXt8HZMa/1khCVSbrf1MgACvZPSSSrDpVwaDTSjlRI4AL
    
    bb0l0RFU+/0caMiHilscuJdz9Fdd9Ux4pjROZa3TF5CFhvP7PsZAoxOo+yqJg4zr
    
    996GG/aAv4M8lQJ2rDFk/Dgn5y/AaAun1oM3AoGAGIQmoOPYjY4qkHNSRE9lYOl4
    
    pZFQilKn8x5tlC8WTC4GCgJGhX7nQ9wQ/J1eQ/YkDfmznH+ok6YjHkGlgLsRuXHW
    
    GdcDCwuzBUCWh76LHC1EytUCKnloa3qy8jfjWnMlHgrd3FtDILrC+C7p1Vj2FAvm
    
    qVz0moiTpioPL8twp9MCgYEAin49q3EyZFYwxwdpU7/SJuvq750oZq0WVriUINsi
    
    A6IR14oOvbqkhb94fhsY12ZGt/N9uosq22H+anms6CicoQicv4fnBHDFI3hCHE9I
    
    pgeh50GTJHUA6Xk34V2s/kp5KpThazv6qCw+QubkQExh660SEdSlvoCfPKMCi1EJ
    
    TukCgYAZKY1NZ2bjJyyO/dfNvMQ+etUL/9esi+40GUGyJ7SZcazrN9z+DO0yL39g
    
    7FT9NMIc2dsmNJQMaGBCDl0AjO1O3b/wqlrNvNBGkanxn2Htn5ajfo+LBU7yHAcV
    
    7w4X5HLarXiE1mj0LXFKJhdvFqU53KUQJXBqR6lsMqzsdPwLMJg==
    
    -----END RSA PRIVATE KEY-----"""
    
     
    
    private_key = paramiko.RSAKey(file_obj=StringIO(key_str))
    
    transport = paramiko.Transport(('120.92.84.249', 22))
    
    transport.connect(username='root', pkey=private_key)
    
     
    
    ssh = paramiko.SSHClient()
    
    ssh._transport = transport
    
     
    
    stdin, stdout, stderr = ssh.exec_command('df')
    
    result = stdout.read()
    
    print(result.decode('utf-8'))
    
    transport.close()
    
     
    
    print(result)
    基于私钥字符串进行连接

    6.3.2 SFTPClient

    用于连接远程服务器并执行上传下载

    ##############基于用户名密码上传下载################
    
    import paramiko
    
     
    
    transport = paramiko.Transport(('120.92.84.249',22))
    
    transport.connect(username='root',password='xxx')
    
     
    
    sftp = paramiko.SFTPClient.from_transport(transport)
    
    # 将location.py 上传至服务器 /tmp/test.py
    
    sftp.put('/tmp/id_rsa', '/etc/test.rsa')
    
    # 将remove_path 下载到本地 local_path
    
    sftp.get('remove_path', 'local_path')
    
     
    
    transport.close()
    基于用户名密码上传下载
    ##############基于公钥密钥上传下载##################
    
    import paramiko
    
     
    
    private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa')
    
     
    
    transport = paramiko.Transport(('120.92.84.249', 22))
    
    transport.connect(username='root', pkey=private_key )
    
     
    
    sftp = paramiko.SFTPClient.from_transport(transport)
    
    # 将location.py 上传至服务器 /tmp/test.py
    
    sftp.put('/tmp/id_rsa', '/tmp/a.txt')
    
    # 将remove_path 下载到本地 local_path
    
    sftp.get('remove_path', 'local_path')
    
     
    
    transport.close()
    基于公钥密钥上传下载

    6.3.3 demo

    ###################Demo################
    
    #!/usr/bin/env python
    
    # -*- coding:utf-8 -*-
    
    import paramiko
    
    import uuid
    
     
    
    class Haproxy(object):
    
     
    
        def __init__(self):
    
            self.host = '172.16.103.191'
    
            self.port = 22
    
            self.username = 'root'
    
            self.pwd = '123'
    
            self.__k = None
    
     
    
        def create_file(self):
    
            file_name = str(uuid.uuid4())
    
            with open(file_name,'w') as f:
    
                f.write('sb')
    
            return file_name
    
     
    
        def run(self):
    
            self.connect()
    
            self.upload()
    
            self.rename()
    
            self.close()
    
     
    
        def connect(self):
    
            transport = paramiko.Transport((self.host,self.port))
    
            transport.connect(username=self.username,password=self.pwd)
    
            self.__transport = transport
    
     
    
        def close(self):
    
     
    
            self.__transport.close()
    
     
    
        def upload(self):
    
            # 连接,上传
    
            file_name = self.create_file()
    
     
    
            sftp = paramiko.SFTPClient.from_transport(self.__transport)
    
            # 将location.py 上传至服务器 /tmp/test.py
    
            sftp.put(file_name, '/home/root/tttttttttttt.py')
    
     
    
        def rename(self):
    
     
    
            ssh = paramiko.SSHClient()
    
            ssh._transport = self.__transport
    
            # 执行命令
    
            stdin, stdout, stderr = ssh.exec_command('mv /home/root/tttttttttttt.py /home/root/ooooooooo.py')
    
            # 获取命令结果
    
            result = stdout.read()
    
     
    
     
    
    ha = Haproxy()
    
    ha.run()
    
     
    Demo

    第7章 作业

    题目:简单主机批量管理工具

    需求:

    1. 主机分组
    2. 主机信息配置文件用configparser解析
    3. 可批量执行命令、发送文件,结果实时返回,执行格式如下

    batch_run  -h h1,h2,h3   -g web_clusters,db_servers    -cmd  "df -h" 

    batch_scp   -h h1,h2,h3   -g web_clusters,db_servers  -action put  -local test.py  -remote /tmp/ 

    1. 主机用户名密码、端口可以不同
    2. 执行远程命令使用paramiko模块
    3. 批量命令需使用threading模块并发
  • 相关阅读:
    Smartforms 设置纸张打印格式
    JVM的垃圾回收算法
    JVM垃圾回收器
    Java类加载过程
    Java内存模型
    计算机模型
    C#----对时间结构DateTime的使用(时间日期的使用)
    C#----我对坐标系的理解和图形转动
    C#----格式化字符串的操作
    其他----
  • 原文地址:https://www.cnblogs.com/maojiong/p/8423448.html
Copyright © 2011-2022 走看看