zoukankan      html  css  js  c++  java
  • 进程与线程

    1、进程

    进程:正在进行的一个过程或者说一个任务。而负责执行任务则是cpu。

    要以一个整体的形式暴露给操作系统管理,里面包含了对各种资源的调用,内存的管理,网络接口的调用等;对各种资源的管理集合,就可以称为进程

    1.1 multiprocessing模块(多进程

    from multiprocessing import Process
    import time
    def work(name):
        print('task <%s> is runing' %name)
        time.sleep(2)
        print('task <%s> is done' % name)
    
    if __name__ == '__main__':
        # Process(target=work,kwargs={'name':'jun'})
        p1=Process(target=work,args=('jun',))
        p2=Process(target=work,args=('xun',))
        p1.start()
        p2.start()
        print('主')
    

    1.2 join

    join等待线程执行完后,其他线程再继续执行(串行)

    from multiprocessing import Process
    import time
    def work(name):
        print('task <%s> is runing' %name)
        time.sleep(3)
        print('task <%s> is done' % name)
    
    if __name__ == '__main__':
        p1=Process(target=work,args=('egon',))
        p2=Process(target=work,args=('alex',))
        p3=Process(target=work,args=('yuanhao',))
    
        # p1.start()
        # p2.start()
        # p3.start()
        #
        # p1.join() #主进程等,等待p1运行结束
        # p2.join() #主进程等,等待p2运行结束
        # p3.join() #主进程等,等待p3运行结束
    
        p_l = [p1, p2, p3]
        for p in p_l:
            p.start()
    
        for p in p_l:
            p.join()
    
        print('主')
    
        # p_l = [p1, p2, p3]
        # for p in p_l:
        #     p.start()
        #     p.join()
        # p1.start()
        # p1.join()
        # p2.start()
        # p2.join()
        # p3.start()
        # p3.join()
    
    
        # print('主')
    

    1.3  Process对象的其他方法或属性

     创建进程的类

    Process([group [, target [, name [, args [, kwargs]]]]]),由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
    
    强调:
    1. 需要使用关键字的方式来指定参数
    2. args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
    

     参数介绍: 

    group参数未使用,值始终为None
    
    target表示调用对象,即子进程要执行的任务
    
    args表示调用对象的位置参数元组,args=(1,2,'egon',)
    
    kwargs表示调用对象的字典,kwargs={'name':'egon','age':18}
    
    name为子进程的名称
    

    方法介绍:

    p.start():启动进程,并调用该子进程中的p.run() 
    p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法  
    
    p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
    p.is_alive():如果p仍然运行,返回True
    
    p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
    

     属性介绍:

    p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
    
    p.name:进程的名称
    
    p.pid:进程的pid
    
    p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
    
    p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
    

      

    实战

    from multiprocessing import Process
    import time,os
    def work():
        print('parent:%s task <%s> is runing' %(os.getppid(),os.getpid()))
        time.sleep(1000)
        print('parent:%s task <%s> is done'  %(os.getppid(),os.getpid()))
    
    
    if __name__ == '__main__':
        p1=Process(target=work)
        p1.start()
    
        # p1.terminate()
        # time.sleep(3)
        # print(p1.is_alive())
        # print(p1.name)
        # print(p1.pid)
        print('主',os.getpid(),os.getppid())
        time.sleep(10000)
    

    1.4 守护进程

    主进程创建守护进程

      其一:守护进程会在主进程代码执行结束后就终止

      其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

    注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

    # from multiprocessing import Process
    # import time
    # def work(name):
    #     print('task <%s> is runing' %name)
    #     time.sleep(2)
    #     print('task <%s> is done' % name)
    #
    # if __name__ == '__main__':
    #     p1=Process(target=work,args=('egon',))
    #     p1.daemon = True
    #     p1.start()
    #
    #     print('主')
    
    
    
    #主进程代码运行完毕,守护进程就会结束
    from multiprocessing import Process
    import time
    def foo():
        print(123)
        time.sleep(1)
        print("end123")
    
    def bar():
        print(456)
        time.sleep(3)
        print("end456")
    if __name__ == '__main__':
    
        p1=Process(target=foo)
        p2=Process(target=bar)
    
        p1.daemon=True
        p1.start()
        p2.start()
        print("main-------") #打印该行则主进程代码结束,则守护进程p1应该被终止,可能会有p1任务执行的打印信息123,因为主进程打印main----时,p1也执行了,但是随即被终止
    

    1.5  进程同步(锁)

    进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件,或同一个打印终端,是没有问题的,

    竞争带来的结果就是错乱,如何控制,就是加锁处理

    part1:多个进程共享同一打印终端

    # from multiprocessing import Process,Lock
    # import time
    # def work(name,mutex):
    #     mutex.acquire()
    #     print('task <%s> is runing' %name)
    #     time.sleep(2)
    #     print('task <%s> is done' % name)
    #     mutex.release()
    #
    # if __name__ == '__main__':
    #     mutex=Lock()
    #     p1=Process(target=work,args=('egon',mutex))
    #     p2=Process(target=work,args=('alex',mutex))
    #     p1.start()
    #     p2.start()
    #     print('主')
    

    1.6  paramiko模块

    1、介绍

    paramiko是一个用于做远程控制的模块,使用该模块可以对远程服务器进行命令或文件操作,值得一说的是,fabric和ansible内部的远程管理就是使用的paramiko来现实。
    

    2、 下载安装

    pip3 install paramiko #在python3中

    ##python2中
    pycrypto,由于 paramiko 模块内部依赖pycrypto,所以先下载安装pycrypto #在python2中
    pip3 install pycrypto
    pip3 install paramiko
    注:如果在安装pycrypto2.0.1时发生如下错误
            command 'gcc' failed with exit status 1...
    可能是缺少python-dev安装包导致
    如果gcc没有安装,请事先安装gcc
    
    在python2中
    

    3. 使用  

    SSHClient

    用于连接远程服务器并执行基本命令

    基于用户名密码连接:

    import paramiko
    
    # 创建SSH对象
    ssh = paramiko.SSHClient()
    # 允许连接不在know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 连接服务器
    ssh.connect(hostname='192.168.1.172', port=22, username='root', password='xxx')
    
    # 执行命令
    stdin, stdout, stderr = ssh.exec_command('df')
    # 获取命令结果
    result = stdout.read()
    print(result.decode('utf-8'))
    # 关闭连接
    ssh.close()
    

    基于公钥密钥连接:  

    客户端文件名:id_rsa

    服务端必须有文件名:authorized_keys(在用ssh-keygen时,必须制作一个authorized_keys,可以用ssh-copy-id来制作)

    import paramiko
    
    private_key = paramiko.RSAKey.from_private_key_file('/tmp/id_rsa')
    
    # 创建SSH对象
    ssh = paramiko.SSHClient()
    # 允许连接不在know_hosts文件中的主机
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    # 连接服务器
    ssh.connect(hostname='192.168.1.172', port=22, username='root', pkey=private_key)
    
    # 执行命令
    stdin, stdout, stderr = ssh.exec_command('df')
    # 获取命令结果
    result = stdout.read()
    print(result.decode('utf-8'))
    # 关闭连接
    ssh.close()
    

    SFTPClient

    用于连接远程服务器并执行上传下载

    基于用户名密码上传下载

    import paramiko
     
    transport = paramiko.Transport(('192.168.1.172',22))
    transport.connect(username='root',password='xxx')
     
    sftp = paramiko.SFTPClient.from_transport(transport)
    # 将location.py 上传至服务器 /tmp/test.py
    sftp.put('/tmp/id_rsa', '/etc/test.rsa')
    # 将remove_path 下载到本地 local_path
    sftp.get('remove_path', 'local_path')
     
    transport.close()
    

    2、线程 

    2.1什么是线程 

    在传统操作系统中,每个进程有一个地址空间,而且默认就有一个控制线程

      线程顾名思义,就是一条流水线工作的过程,一条流水线必须属于一个车间,一个车间的工作过程是一个进程

          车间负责把资源整合到一起,是一个资源单位,而一个车间内至少有一个流水线

          流水线的工作需要电源,电源就相当于cpu

      所以,进程只是用来把资源集中到一起(进程只是一个资源单位,或者说资源集合),而线程才是cpu上的执行单位。

      多线程(即多个控制线程)的概念是,在一个进程中存在多个控制线程,多个控制线程共享该进程的地址空间,相当于一个车间内有多条流水线,都共用一个车间的资源。

          例如,北京地铁与上海地铁是不同的进程,而北京地铁里的13号线是一个线程,北京地铁所有的线路共享北京地铁所有的资源,比如所有的乘客可以被所有线路拉。

    2.2 threading模块

    线程创建有2种方式:如下

    直接调用

    import  threading,time
     
    def run(n):
        print("test...",n)
        time.sleep(2)
     
    if __name__ == '__main__':
         
        t1 = threading.Thread(target=run,args=("t1",))
        t2 = threading.Thread(target=run,args=("t2",))
         
        # 两个同时执行,然后等待两秒程序结束
        t1.start()
        t2.start()
     
    # 程序输出
    # test... t1
    # test... t2
    

    继承式调用  

    import threading,time
     
    class MyThread(threading.Thread):
        def __init__(self,num):
           # threading.Thread.__init__(self)
            super(MyThread,self).__init__()
            self.num =num
     
        def run(self):#定义每个线程要运行的函数
            print("running on number:%s" %self.num)
            time.sleep(2)
     
     
    if __name__ == '__main__':
        # 两个同时执行,然后等待两秒程序结束
        t1 = MyThread(1)
        t2 = MyThread(2)
        t1.start()
        t2.start()
     
    # 程序输出
    # running on number:1
    # running on number:2
    

    2.3 join  

    join等待线程执行完后,其他线程再继续执行(串行)

    import  threading,time
     
    def run(n,sleep_time):
        print("test...",n)
        time.sleep(sleep_time)
        print("test...done", n)
    if __name__ == '__main__':
     
        t1 = threading.Thread(target=run,args=("t1",2))
        t2 = threading.Thread(target=run,args=("t2",3))
     
        # 两个同时执行,然后等待t1执行完成后,主线程和子线程再开始执行
        t1.start()
        t2.start()
        t1.join()   # 等待t1
     
        print("main thread")
     
    # 程序输出
    # test... t1
    # test... t2
    # test...done t1
    # main thread
    # test...done t2
    

    2.4  线程的互斥锁  

    # from threading import Thread,Lock
    # import time
    # n=100
    # def work():
    #     global n
    #     mutex.acquire()
    #     temp=n
    #     time.sleep(0.1)
    #     n=temp-1
    #     mutex.release()
    #
    # if __name__ == '__main__':
    #     mutex=Lock()
    #     l=[]
    #     start=time.time()
    #     for i in range(100):
    #         t=Thread(target=work)
    #         l.append(t)
    #         t.start()
    #
    #     for t in l:
    #         t.join()
    #     print('run time:%s value:%s' %(time.time()-start,n))
    

    2.5 互斥锁与join的区别  

    # from threading import Thread,Lock
    # import time
    # n=100
    # def work():
    #     time.sleep(0.05)
    #     global n
    #     temp=n
    #     time.sleep(0.1)
    #     n=temp-1
    #
    #
    # if __name__ == '__main__':
    #     start=time.time()
    #     for i in range(100):
    #         t=Thread(target=work)
    #         t.start()
    #         t.join()
    #
    #     print('run time:%s value:%s' %(time.time()-start,n))
    
    
    #互斥锁
    from threading import Thread,Lock
    import time
    n=100
    def work():
        time.sleep(0.05)
        global n
        mutex.acquire()
        temp=n
        time.sleep(0.1)
        n=temp-1
        mutex.release()
    
    if __name__ == '__main__':
        mutex=Lock()
        l=[]
        start=time.time()
        for i in range(100):
            t=Thread(target=work)
            l.append(t)
            t.start()
    
        for t in l:
            t.join()
        print('run time:%s value:%s' %(time.time()-start,n))
    

    2.6 GIL与多线程性能讨论  

    #多进程:
    #优点:可以利用多核优势
    #缺点:开销大
    
    
    #多线程:
    #优点:开销小
    #缺点:不能利用多核优势
    
    # from threading import Thread
    # from multiprocessing import Process
    # import time
    # #计算密集型
    # def work():
    #     res=1
    #     for i in range(100000000):
    #         res+=i
    #
    # if __name__ == '__main__':
    #     p_l=[]
    #     start=time.time()
    #     for i in range(4):
    #         # p=Process(target=work) #6.7473859786987305
    #         p=Thread(target=work) #24.466399431228638
    #         p_l.append(p)
    #         p.start()
    #     for p in p_l:
    #         p.join()
    #
    #     print(time.time()-start)
    
    
    from threading import Thread
    from multiprocessing import Process
    import time
    #IO密集型
    def work():
        time.sleep(2)
    
    if __name__ == '__main__':
        p_l=[]
        start=time.time()
        for i in range(400):
            # p=Process(target=work) #12.104692220687866
            p=Thread(target=work) #2.038116455078125
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
    
        print(time.time()-start)
    

    2.7 死锁与递归锁

    #多进程:
    #优点:可以利用多核优势
    #缺点:开销大
    
    
    #多线程:
    #优点:开销小
    #缺点:不能利用多核优势
    
    # from threading import Thread
    # from multiprocessing import Process
    # import time
    # #计算密集型
    # def work():
    #     res=1
    #     for i in range(100000000):
    #         res+=i
    #
    # if __name__ == '__main__':
    #     p_l=[]
    #     start=time.time()
    #     for i in range(4):
    #         # p=Process(target=work) #6.7473859786987305
    #         p=Thread(target=work) #24.466399431228638
    #         p_l.append(p)
    #         p.start()
    #     for p in p_l:
    #         p.join()
    #
    #     print(time.time()-start)
    
    
    from threading import Thread
    from multiprocessing import Process
    import time
    #IO密集型
    def work():
        time.sleep(2)
    
    if __name__ == '__main__':
        p_l=[]
        start=time.time()
        for i in range(400):
            # p=Process(target=work) #12.104692220687866
            p=Thread(target=work) #2.038116455078125
            p_l.append(p)
            p.start()
        for p in p_l:
            p.join()
    
        print(time.time()-start)
    

    2.8 信号量  

    from threading import Thread,current_thread,Semaphore
    import time,random
    
    sm=Semaphore(5)
    def work():
        sm.acquire()
        print('%s 上厕所' %current_thread().getName())
        time.sleep(random.randint(1,3))
        sm.release()
    
    if __name__ == '__main__':
        for i in range(20):
            t=Thread(target=work)
            t.start()
    

    2.9 事件Event  

    通过Event来实现两个或多个线程间的交互,下面是一个红绿灯的例子,即起动一个线程做交通指挥灯,生成几个线程做车辆,车辆行驶按红灯停,绿灯行的规则。  

    import threading,time
     
    def light():
        count = 0
        while True:
            if count < 10:      #红灯
                print("33[41;1m红灯33[0m",10-count)
            elif count >= 10 and count < 30:    #绿灯
                event.set() # 设置标志位
                print("33[42;1m绿灯33[0m",30-count)
            else:
                event.clear() #把标志位清空
                count = 0
            time.sleep(1)
            count +=1
     
    def car(n):
        while True:
            if event.is_set():
                print("33[32;0m[%s]在路上飞奔.....33[0m"%n)
            else:
                print("33[31;0m[%s]等红灯等的花都谢了.....33[0m" % n)
            time.sleep(1)
     
    if __name__ == "__main__":
        event = threading.Event()
        light = threading.Thread(target=light)
        light.start()
        car = threading.Thread(target=car,args=("tesla",))
        car.start()
    
    from threading import Thread,current_thread,Event
    import time
    event=Event()
    
    def conn_mysql():
        count=1
        while not event.is_set():
            if count > 3:
                raise ConnectionError('链接失败')
            print('%s 等待第%s次链接mysql' %(current_thread().getName(),count))
            event.wait(0.5)
            count+=1
    
        print('%s 链接ok' % current_thread().getName())
    
    
    def check_mysql():
        print('%s 正在检查mysql状态' %current_thread().getName())
        time.sleep(1)
        event.set()
    
    
    if __name__ == '__main__':
        t1=Thread(target=conn_mysql)
        t2=Thread(target=conn_mysql)
        check=Thread(target=check_mysql)
    
        t1.start()
        t2.start()
        check.start()
    

    2.10 定时器  

    from threading import Timer
    
    
    def hello(n):
        print("hello, world",n)
    
    
    t = Timer(3, hello,args=(11,))
    t.start()  # after 1 seconds, "hello, world" will be printed
    

    2.11 线程queue

    import queue
    
    # q=queue.Queue(3) #队列:先进先出
    # q.put(1)
    # q.put(2)
    # q.put(3)
    #
    # print(q.get())
    # print(q.get())
    # print(q.get())
    
    
    # q=queue.LifoQueue(3) #堆栈:后进先出
    # q.put(1)
    # q.put(2)
    # q.put(3)
    #
    # print(q.get())
    # print(q.get())
    # print(q.get())
    
    
    q=queue.PriorityQueue(3) #数字越小优先级越高
    q.put((10,'data1'))
    q.put((11,'data2'))
    q.put((9,'data3'))
    
    print(q.get())
    print(q.get())
    print(q.get())
    

    2.12  进程池与线程池

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    import os,time,random
    def work(n):
        print('%s is running' %os.getpid())
        time.sleep(random.randint(1,3))
        return n**2
    
    if __name__ == '__main__':
        p=ProcessPoolExecutor()
        # objs=[]
        # for i in range(10):
        #     obj=p.submit(work,i)
        #     objs.append(obj)
        # p.shutdown()
        # for obj in objs:
        #     print(obj.result())
    
    
    
        obj=p.map(work,range(10))
        p.shutdown()
        print(list(obj))
    
    
    
    
    # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    # from threading import current_thread
    # import os,time,random
    # def work(n):
    #     print('%s is running' %current_thread().getName())
    #     time.sleep(random.randint(1,3))
    #     return n**2
    #
    # if __name__ == '__main__':
    #     p=ThreadPoolExecutor()
    #     objs=[]
    #     for i in range(21):
    #         obj=p.submit(work,i)
    #         objs.append(obj)
    #     p.shutdown()
    #     for obj in objs:
    #         print(obj.result())
    
    
    
    #进程池
    # import requests #pip3 install requests
    # import os,time
    # from multiprocessing import Pool
    # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    # def get_page(url):
    #     print('<%s> get :%s' %(os.getpid(),url))
    #     respone = requests.get(url)
    #     if respone.status_code == 200:
    #         return {'url':url,'text':respone.text}
    #
    # def parse_page(obj):
    #     dic=obj.result()
    #     print('<%s> parse :%s' %(os.getpid(),dic['url']))
    #     time.sleep(0.5)
    #     res='url:%s size:%s
    ' %(dic['url'],len(dic['text'])) #模拟解析网页内容
    #     with open('db.txt','a') as f:
    #         f.write(res)
    #
    #
    # if __name__ == '__main__':
    #
    #     # p=Pool(4)
    #     p=ProcessPoolExecutor()
    #     urls = [
    #         'http://www.baidu.com',
    #         'http://www.baidu.com',
    #         'http://www.baidu.com',
    #         'http://www.baidu.com',
    #         'http://www.baidu.com',
    #         'http://www.baidu.com',
    #         'http://www.baidu.com',
    #     ]
    #
    #
    #     for url in urls:
    #         # p.apply_async(get_page,args=(url,),callback=parse_page)
    #         p.submit(get_page,url).add_done_callback(parse_page)
    #
    #     p.shutdown()
    #     print('主进程pid:',os.getpid())
    
    
    #线程池
    # import requests #pip3 install requests
    # import os,time,threading
    # from multiprocessing import Pool
    # from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    # def get_page(url):
    #     print('<%s> get :%s' %(threading.current_thread().getName(),url))
    #     respone = requests.get(url)
    #     if respone.status_code == 200:
    #         return {'url':url,'text':respone.text}
    #
    # def parse_page(obj):
    #     dic=obj.result()
    #     print('<%s> parse :%s' %(threading.current_thread().getName(),dic['url']))
    #     time.sleep(0.5)
    #     res='url:%s size:%s
    ' %(dic['url'],len(dic['text'])) #模拟解析网页内容
    #     with open('db.txt','a') as f:
    #         f.write(res)
    #
    #
    # if __name__ == '__main__':
    #
    #     # p=Pool(4)
    #     p=ThreadPoolExecutor(3)
    #     urls = [
    #         'http://www.baidu.com',
    #         'http://www.baidu.com',
    #         'http://www.baidu.com',
    #         'http://www.baidu.com',
    #         'http://www.baidu.com',
    #         'http://www.baidu.com',
    #         'http://www.baidu.com',
    #     ]
    #
    #
    #     for url in urls:
    #         # p.apply_async(get_page,args=(url,),callback=parse_page)
    #         p.submit(get_page,url).add_done_callback(parse_page)
    #
    #     p.shutdown()
    #     print('主进程pid:',os.getpid())

      

  • 相关阅读:
    IP,子网,子网掩码,网关,DNS到底都是啥(二)
    (能被11整除的数的特征)The shortest problem --hdu
    (匹配 匈牙利)棋盘游戏 -- Hdu --1281
    (匹配)Courses -- hdu --1083
    (匹配 最小路径覆盖)Air Raid --hdu --1151
    (匹配)The Accomodation of Students --HDU --2444
    (匹配)Fire Net --hdu --1045
    (二分匹配 模板)过山车 -- hdu --2063
    (连通图 ) Redundant Paths --POJ --3177
    (二分匹配 模板 KM)奔小康赚大钱--hdu--2255
  • 原文地址:https://www.cnblogs.com/junxun/p/7459243.html
Copyright © 2011-2022 走看看