zoukankan      html  css  js  c++  java
  • 第十三章 Python并发编程

    并发编程之多进程

      python中如果想要充分的利用多核CPU的资源,大部分情况需要使用多进程,python提供了multiprocessing

      multiprocessing模块用来开启子进程,并在子进程中执行定制的任务

    #方式一
    from multiprocessing import Process
    import time
    
    def task(name):
        print('%s is running'%name)
        time.sleep(3)
        print('%s is done'%name)
    
    if __name__ == '__main__':
        p=Process(target=task,args=('alex',))
        p.start()
        print('')
    
    
    #方式二
    from multiprocessing import Process
    import time
    
    class MyProcess(Process):
        def __init__(self,name):
            super(MyProcess,self).__init__()
            self.name=name
    
        def run(self):
            print('%s is running'%self.name)
            time.sleep(3)
            print('%s is done'%self.name)
    
    if __name__ == '__main__':
        p=MyProcess('进程1')
        p.start()
        print('')

      进程之间的内存空间是隔离的

    from multiprocessing import Process
    
    n = 100
    def task():
        global n
        n = 0
    
    if __name__ == '__main__':
        p = Process(target=task)
        p.start()
        p.join()
        print(p.is_alive()) #查看进程是否完毕
        print('', n)      #进程之间的内存是互相隔离的

      Process对象的join()方法:主进程等待子进程结束

    from multiprocessing import Process
    import time
    import os
    
    def task(n):
        print('%s is running' % os.getpid())
        time.sleep(n)
        print('%s is done' % os.getpid())
    
    if __name__ == '__main__':
        start_time=time.time()
        p1 = Process(target=task, args=(1,))
        p2 = Process(target=task, args=(2,))
        p3 = Process(target=task, args=(3,))
        p_l = [p1, p2, p3]
        # p1.start()
        # p2.start()
        # p3.start()
        for p in p_l:
            p.start()
    
        # p1.join()
        # p2.join()
        # p3.join()
        for p in p_l:
            p.join()
    
        stop_time=time.time()
        print('',stop_time-start_time)
    join()方法

      Process对象的其他方法或属性

    from multiprocessing import Process
    import time
    import os
    
    def task(n):
        print('pid:%s ppid:%s' % (os.getpid(), os.getppid()))
        time.sleep(n)
    
    if __name__ == '__main__':
        p = Process(target=task,args=(15,),name='进程1')
        p.start()
        p.terminate()  #终止进程
        time.sleep(1)
        print(p.is_alive())  #查看进程是否完毕
        print('主...pid:%s ppid:%s' % (os.getpid(), os.getppid())) #主进程的ppid即为pycharm
        print(p.pid)  #查看进程pid
        print(p.name) #查看进程名
    View Code

      僵尸进程:一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中,这种进程称之为僵尸进程

      孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将称为孤儿进程,孤儿进程将被init进程所收养,并由init进程对它们完成状态收集工作

      守护进程:守护进程会在主进程代码执行结束后就终止;守护进程内无法再开启子进程,否则抛出异常;进程之间是互相独立的,主进程代码运行结束,守护进程随即终止

    from multiprocessing import Process
    import time
    
    def task(name):
        print('%s is running' % name)
        time.sleep(3)
        print('%s is done' % name)
    
    if __name__ == '__main__':
        p = Process(target=task,args=('lary',))
        p.daemon = True #将子进程设置为守护进程
        p.start()
    
        print('')
    View Code

       进程同步:进程之间如果共享同一套资源会带来竞争,通过加锁处理来控制资源竞争,加锁可以保证多个进程修改同一块数据时,同一时间只有一个任务可以修改,即串行的修改,牺牲了速度却保证了数据安全

    #互斥锁
    from multiprocessing import Process,Lock
    import json
    import time
    import random
    import os
    
    def search():
        time.sleep(random.randint(1,3))
        dic=json.load(open('db.txt','r',encoding='utf-8'))
        print('%s 查看剩余票数%s'%(os.getpid(),dic["count"]))
    
    def get():
        dic=json.load(open('db.txt','r',encoding='utf-8'))
        if dic["count"] > 0:
            dic["count"]-=1
            time.sleep(random.randint(1,3))
            json.dump(dic,open('db.txt','w',encoding='utf-8'))
            print('%s 购票成功'%os.getpid())
    
    def task(mutex):
        search()
        mutex.acquire()     #加锁
        get()
        mutex.release()     #解锁
    
    if __name__ == '__main__':
        mutex=Lock()        #创建实例对象
        for i in range(10):
            p=Process(target=task,args=(mutex,))
            p.start()
    互斥锁

      队列:进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种模式:队列和管道

    #队列
    from multiprocessing import Queue
    
    q=Queue(3)
    q.put('first')
    q.put(2)
    q.put({"count":3})
    #q.put('fourth')
    #q.put('fourth',block=False)
    #q.put('fourth',block=True,timeout=3)
    
    print(q.get())
    print(q.get())
    print(q.get())
    #print(q.get())
    #print(q.get(block=False))
    print(q.get(block=True,timeout=3))

      生产者消费者模型

      生产者消费者模式是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取,阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力

    #生产者消费者模型
    from multiprocessing import Process,JoinableQueue
    import time
    import random
    
    def producer(name,food,q):
        for i in range(2):
            res='%s%s'%(food,i)
            time.sleep(random.randint(1,3))
            q.put(res)
            print('厨师[%s]生产了<%s>'%(name,res))
    
    def consumer(name,q):
        while True:
            res=q.get()
            #if res is None:break
            time.sleep(random.randint(1,3))
            print('吃货[%s]吃了<%s>'%(name,res))
            q.task_done()
    
    
    if __name__ == '__main__':
        q=JoinableQueue()
        p1 = Process(target=producer,args=('egon1','peach',q))
        p2 = Process(target=producer, args=('egon2', 'peach', q))
    
        c1 = Process(target=consumer,args=('lary1',q))
        c2 = Process(target=consumer, args=('lary2', q))
    
        c1.daemon=True
        c2.daemon=True
    
        p1.start()
        p2.start()
    
        c1.start()
        c2.start()
    
        p1.join()
        p2.join()
    
        q.join()
        print('')
    基于队列实现生产者消费者模型

    并发编程之多线程

      开启线程的两种方式

    from threading import Thread
    import time
    import random
    
    #开启线程方式一
    def eat(name):
        print('%s is eating'%name)
        time.sleep(random.randint(1,3))
        print('%s eating end'%name)
    
    if __name__ == '__main__':
        t1=Thread(target=eat,args=('lary',))
        t1.start()
        print('')
    
    #开启线程方式二
    class MyThread(Thread):
        def __init__(self,name):
            super().__init__()
            self.name=name
    
        def run(self):
            print('%s is eating' %self.name)
            time.sleep(random.randint(1,3))
            print('%s eating end'%self.name)
    
    if __name__ == '__main__':
        t1=MyThread('lary')
        t1.start()
        print('')
    from threading import Thread,current_thread
    from socket import *
    
    def communicate(conn):
        print('子线程:%s'%current_thread().getName())
        while True:
            try:
                data=conn.recv(1024)
                if not data:break
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
    
    def server(ip,port):
        print('主线程:%s'%current_thread().getName())
        server=socket(AF_INET,SOCK_STREAM)
        server.bind((ip,port))
        server.listen(5)
    
        while True:
            conn,addr=server.accept()
            print(addr)
            t=Thread(target=communicate,args=(conn,))
            t.start()
    
        server.close()
    
    if __name__ == '__main__':
        server('127.0.0.1',8081)
    多线程并发的服务端
    from socket import *
    
    client=socket(AF_INET,SOCK_STREAM)
    client.connect(('127.0.0.1',8081))
    
    while True:
        msg=input('>>:').strip()
        if not msg:continue
        client.send(msg.encode('utf-8'))
        data=client.recv(1024)
        print(data.decode('utf-8'))
    
    client.close()
    客户端

       线程相关的其他方法

    #Thread实例对象的方法
      # isAlive(): 返回线程是否活动的。
      # getName(): 返回线程名。
      # setName(): 设置线程名。
    
    #threading模块提供的一些方法:
      # threading.currentThread(): 返回当前的线程变量。
      # threading.enumerate(): 返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前和终止后的线程。
      # threading.activeCount(): 返回正在运行的线程数量,与len(threading.enumerate())有相同的结果。

      守护线程

    from threading import Thread
    import time
    
    def sayhi(name):
        time.sleep(2)
        print('%s say hello' % name)
    
    if __name__ == '__main__':
        t = Thread(target=sayhi, args=('egon',))
        t.setDaemon(True)  # 必须在t.start()之前设置
        t.start()
    
        print('主线程')
        print(t.is_alive())
  • 相关阅读:
    阿里云服务器Linux CentOS安装配置(五)jetty配置、部署
    阿里云服务器Linux CentOS安装配置(四)yum安装tomcat
    阿里云服务器Linux CentOS安装配置(三)yum安装mysql
    阿里云服务器Linux CentOS安装配置(二)yum安装svn
    【搭建git+maven+jenkins持续集成环境】[一] 搭建git服务器
    使用nginx的反向代理后play获取不到客户端的ip的问题
    MyBatis Generator配置文件翻译
    MyBatis -generator应用
    编程之术与道
    java.lang.Class cannot be cast to java.lang.reflect.ParameterizedType
  • 原文地址:https://www.cnblogs.com/iamluoli/p/8431796.html
Copyright © 2011-2022 走看看