zoukankan      html  css  js  c++  java
  • python基础之并发编程

    开启并发进程:

    方式一:

    import time
    from multiprocessing import Process
    
    
    def task(name):
        print('%s is running' % name)
        time.sleep(5)
        print('%s done' % name)
    
    
    if __name__ == '__main__':
        p1 = Process(target=task, args=('子进程1',))
        p1.start()
        print('main Process')
    View Code

    方式二:

    # -*- coding: utf-8 -*-
    import time
    from multiprocessing import Process
    
    
    class MyProcess(Process):
        def __init__(self, name):
            super(MyProcess, self).__init__()
            self.name = name
    
        def run(self):
            print('%s is runing ' % self.name)
            time.sleep(5)
            print('%s done' % self.name)
    
    
    if __name__ == '__main__':
        p = MyProcess('子进程1')
        p.start()
        print('this is main process')
    View Code

    socket通信多用户同时操作(多进程方式)

    服务端:

    import socket
    from multiprocessing import Process
    
    
    def talk(conn):
        while 1:
            try:
                data = conn.recv(1024)
                conn.send(data.upper())
            except ConnectionResetError:
                print('客户端端开...')
                break
        conn.close()
    
    
    def server_xxx(ip, port):
        total = 0
        server1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server1.bind((ip, port))
        server1.listen(2)
        print('服务器已经启动....')
        client_list = []
        while True:
            total += 1
            conn, addr = server1.accept()
    
            # 有客户端链接上来的时候,就开一个进程,并且把进程信息加入到列表中
            def start_process(total, conn):
                print('有客户端连接上来了....')
                p_name = 'p' + str(total)
                p_name = Process(target=talk, name=p_name, args=(conn,))
                p_name.start()
                client_list.append(p_name)
    
            start_process(total, conn)
            alive_num = 0
            # 判断下子进程是否是alive的状态
            for pp in client_list:
                if pp.is_alive():
                    alive_num += 1
    
            print('总的连接数:%s,alive的连接数:%s' % (total, alive_num))
    
        server1.close()
    
    
    if __name__ == '__main__':
        server_xxx('localhost', 8080)
    View Code

    客户端:

    # -*- coding: utf-8 -*-
    import socket
    
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(('localhost', 8080))
    while True:
        cmd = input('>>:').strip()
        if cmd:
            client.send(cmd.encode('utf-8'))
            res = client.recv(1024)
            print(res.decode('utf-8'))
    View Code

     互斥锁:

    # -*- coding: utf-8 -*-
    import time
    from multiprocessing import Process, Lock
    
    
    def task(name, mutex):
        mutex.acquire()
        print('%s 1' % name)
        time.sleep(1)
        print('%s 2' % name)
        time.sleep(1)
        print('%s 3' % name)
        mutex.release()
    
    
    if __name__ == '__main__':
        mutex = Lock()
        for i in range(3):
            p = Process(target=task, name=str(i), args=('进程%s' % i, mutex))
            p.start()
    View Code

    互斥锁模拟抢票流程

    import json
    import os
    import time
    from multiprocessing import Process, Lock
    
    if not os.path.exists('db.txt'):
        db_json = {'count': 3}
        with open('db.txt', 'w', encoding='utf-8') as f:
            json.dump(db_json, f)
    
    
    def search(name):
        time.sleep(1)
        dic = json.load(open('db.txt', 'r', encoding='utf-8'))
        print('用户【%s】查到的剩余票数为:%s' % (name, dic['count']))
    
    
    def get(name):
        time.sleep(1)
        dic = json.load(open('db.txt', 'r', encoding='utf-8'))
        if dic['count'] > 0:
            dic['count'] -= 1
            time.sleep(3)
            json.dump(dic, open('db.txt', 'w', encoding='utf-8'))
            print('%s购票成功!' % name)
        else:
            print('%s购票失败!' % name)
    
    
    # 定义子程序的函数
    def task(name, mutex):
        search(name)
        # 实际上互斥锁在这儿的作用就是保证了get函数是一个串行的方式运行,因为买完票改数据库只能是每次只能操作一人操作保证数据唯一
        mutex.acquire()
        get(name)
        mutex.release()
    
    
    if __name__ == '__main__':
        mutex = Lock()
        for i in range(10):
            p = Process(target=task, args=('用户%s' % i, mutex))
            p.start()
    View Code

    生产者消费者模型:

    1、程序中有两类角色

    一类负责生产数据(生产者)
    一类负责处理数据(消费者)
    

    2、引入生产者消费者模型为了解决的问题是

    平衡生产者与消费者之间的速度差
    程序解开耦合
    

    3、如何实现生产者消费者模型

    生产者<--->队列<--->消费者
    import time
    from multiprocessing import Queue, Process
    
    
    def producer(q, p_name, food_name):
        for i in range(10):
            res = food_name + str(i)
            time.sleep(0.5)
            print('生产者[%s]生产了[%s][%s]号' % (p_name, food_name, i))
            q.put(res)
    
    
    def consumer(q, c_name):
        while True:
            res = q.get()
            if res is not None:
                time.sleep(1)
                print('消费者[%s]吃了[%s]' % (c_name, res))
            else:
                print('数据处理完毕,准备结束!')
                break
    
    
    if __name__ == '__main__':
        q = Queue()  # 生成个队列
    
        # 生产者开始并行生产数据
        p1 = Process(target=producer, args=(q, '1号', '苹果',))
        p2 = Process(target=producer, args=(q, '2号', '蔬菜',))
        p3 = Process(target=producer, args=(q, '3号', '牛奶',))
    
        # 处理数据的人
        c1 = Process(target=consumer, args=(q, '吃货1号',))
        c2 = Process(target=consumer, args=(q, '吃货2号',))
    
        p1.start()
        c1.start()
        p2.start()
        c2.start()
        p3.start()
    
        p1.join()
        p2.join()
        p3.join()
        # 必须等生产数据的程序都生产完成之后,放入两个空数据,作为结束的标记,因为消费者是两个,所以两个空数据
        q.put(None)
        q.put(None)
    
        print('main process')
    View Code

    作了解的JoinableQueue:

    import time
    from multiprocessing import JoinableQueue, Process
    
    
    def producer(q, p_name, food_name):
        for i in range(10):
            res = food_name + str(i)
            time.sleep(0.1)
            print('生产者[%s]生产了[%s][%s]号' % (p_name, food_name, i))
            q.put(res)
        q.join()
    
    
    def consumer(q, c_name):
        while True:
            res = q.get()
            if res is None: break
            time.sleep(1)
            print('消费者[%s]吃了[%s]' % (c_name, res))
            q.task_done()  # 结束了才给生产者发送q.join的信号,因为主程序在等生产者的结束,所以会处理完所有的数据
    
    
    if __name__ == '__main__':
        q = JoinableQueue()  # 生成个队列
    
        # 生产者开始并行生产数据
        p1 = Process(target=producer, args=(q, '1号', '苹果',))
        p2 = Process(target=producer, args=(q, '2号', '蔬菜',))
        p3 = Process(target=producer, args=(q, '3号', '牛奶',))
    
        # 处理数据的人
        c1 = Process(target=consumer, args=(q, '吃货1号',))
        c2 = Process(target=consumer, args=(q, '吃货2号',))
        c1.daemon = 1
        c2.daemon = 1
    
        p1.start()
        p2.start()
        p3.start()
        c1.start()
        c2.start()
    
        p1.join()
        p2.join()
        p3.join()
    
        print('main process')
    View Code

    线程:

    开启方式一:

    import time
    from threading import Thread
    
    
    def task(name):
        print('%s is running' % name)
        time.sleep(5)
        print('%s done' % name)
    
    
    if __name__ == '__main__':
        p1 = Thread(target=task, args=('线程1',))
        p1.start()
        print('main Process')
    View Code

    开启方式二:

    # -*- coding: utf-8 -*-
    import time
    from threading import Thread
    
    
    class MyThread(Thread):
        def __init__(self, name):
            super(MyThread, self).__init__()
            self.name = name
    
        def run(self):
            print('%s is runing ' % self.name)
            time.sleep(5)
            print('%s done' % self.name)
    
    
    if __name__ == '__main__':
        p = MyThread('子线程1')
        p.start()
        print('this is main process')
    View Code

     线程的其他属性:

    import time
    from threading import Thread, currentThread, enumerate
    
    
    def task():
        print('%s is ruuning' % currentThread().getName())
        time.sleep(2)
        print('%s is done' % currentThread().getName())
    
    
    if __name__ == '__main__':
        t = Thread(target=task, name='子线程1')
        t.start()
        # t.setName('儿子线程1')
        # t.join()
        # print(t.getName())
        # currentThread().setName('主线程')
        # print(t.isAlive())
    
    
        # print('主线程',currentThread().getName())
    
        # t.join()
        # print(active_count())
        print(enumerate())
    View Code

    线程的互斥锁

    import time
    from threading import Thread, Lock
    
    n = 100
    
    
    # 因为线程是共享进程的内存空间,所以都对同一个数据操作时可能导致数据不安全
    def task():
        global n
        mutex.acquire()
        temp = n
        time.sleep(0.1)
        n = temp - 1
        mutex.release()
    
    
    if __name__ == '__main__':
        mutex = Lock()  # 获得锁 因为线程是共享进程的内存空间的,所以不不要把锁传给线程
        t_list = []
        for i in range(100):
            t = Thread(target=task)
            t_list.append(t)
            t.start()
    
        # 为了线程都结束
        for t in t_list:
            t.join()
    
        print('main', n)
    View Code

     死锁

    import time
    from threading import Thread, Lock
    
    mutexA = Lock()
    mutexB = Lock()
    
    
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            mutexA.acquire()
            print('%s拿到了A锁' % self.name)
            mutexB.acquire()
            print('%s拿到B锁' % self.name)
            mutexB.release()
            mutexA.release()
    
        def f2(self):
            mutexB.acquire()
            print('%s拿到了A锁' % self.name)
            time.sleep(0.1)
            mutexA.acquire()
            print('%s拿到B锁' % self.name)
            mutexA.release()
            mutexB.release()
    
    
    if __name__ == '__main__':
        for i in range(10):
            t = MyThread()
            t.start()
    View Code

     递归锁:

    # 递归锁:可以连续acquire多次,每acquire一次计数器+1,只有计数为0时,才能被抢到acquire
    import time
    from threading import Thread, RLock
    
    mutexB = mutexA = RLock()
    
    
    class MyThread(Thread):
        def run(self):
            self.f1()
            self.f2()
    
        def f1(self):
            mutexA.acquire()
            print('f1 中%s拿到了A锁' % self.name)
            mutexB.acquire()
            print('f1 中%s拿到B锁' % self.name)
            mutexB.release()
            print('f1 中%s释放B锁' % self.name)
            mutexA.release()
            print('f1 中%s释放A锁' % self.name)
    
        def f2(self):
            mutexB.acquire()
            print('f2 中%s拿到了A锁' % self.name)
            time.sleep(3)
            mutexA.acquire()
            print('f2 中%s拿到B锁' % self.name)
            mutexA.release()
            print('f2 中%s释放A锁' % self.name)
            mutexB.release()
            print('f2 中%s释放B锁' % self.name)
    
    
    if __name__ == '__main__':
        for i in range(10):
            t = MyThread()
            t.start()
    View Code

     信号量:

    信号量也是一把锁,可以指定信号量为5,对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有5个任务拿到锁去执行,如果说互斥锁是合租房屋的人去抢一个厕所,那么信号量就相当于一群路人争抢公共厕所,公共厕所有多个坑位,这意味着同一时间可以有多个人上公共厕所,但公共厕所容纳的人数是一定的,这便是信号量的大小

    import threading
    import time
    from threading import Thread, Semaphore
    
    
    def func():
        sm.acquire()
        print('%s get sm' % threading.current_thread().getName())
        time.sleep(3)
        sm.release()
    
    
    if __name__ == '__main__':
        sm = Semaphore(5)
        for i in range(23):
            t = Thread(target=func)
            t.start()
    View Code

    加锁解锁的另一种写法:

    import time
    from threading import Thread, Lock
    
    n = 100
    
    
    # 因为线程是共享进程的内存空间,所以都对同一个数据操作时可能导致数据不安全
    def task():
        global n
        with mutex:
            temp = n
            print(n)
            time.sleep(0.1)
            n = temp - 1
    
    
    if __name__ == '__main__':
        mutex = Lock()  # 获得锁 因为线程是共享进程的内存空间的,所以不不要把锁传给线程
        t_list = []
        for i in range(100):
            t = Thread(target=task)
            t_list.append(t)
            t.start()
    
        # 为了线程都结束
        for t in t_list:
            t.join()
    
        print('main', n)
    View Code

     Event:

    线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行

    import threading
    import time
    from threading import Thread, Event
    
    
    # from threading import Event
    # 
    # event.isSet():返回event的状态值;
    # 
    # event.wait():如果 event.isSet()==False将阻塞线程;
    # 
    # event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
    # 
    # event.clear():恢复event的状态值为False。
    def conn_mysql():
        count = 1
        while not event.is_set():
            if count > 3:
                print('have tried too many times ')
                return
            print('【%s】第%s次尝试链接' % (threading.current_thread(), count))
            event.wait(3)
            count += 1
        print('【%s】链接成功!' % threading.current_thread().getName())
    
    
    def check_mysql():
        print('[%s]正在检查mysql' % threading.current_thread().getName())
        time.sleep(14)
        event.set()
    
    
    if __name__ == '__main__':
        event = Event()
        conn1 = Thread(target=conn_mysql)
        conn2 = Thread(target=conn_mysql)
        check = Thread(target=check_mysql)
    
        conn1.start()
        conn2.start()
        check.start()
    View Code

     定时器实现20s更新验证码:

    import random
    from threading import Timer
    
    
    class Code(object):
        def __init__(self):
            self.make_cache()
    
        def make_cache(self, interval=20):
            self.cache = self.make_code()
            print(self.cache)
            self.t = Timer(interval, self.make_cache)
            self.t.start()
    
        def make_code(self, n=4):
            res = ''
            for i in range(n):
                s1 = str(random.randint(0, 9))
                s2 = chr(random.randint(65, 90))
                res += random.choice([s1, s2])
            return res
    
        def check(self):
            while True:
                code = input('输入你的验证码:').strip()
                if code.upper() == self.cache:
                    print('正确!')
                else:
                    print('错误!')
    
    
    obj = Code()
    obj.check()
    View Code

    线程queue

    import queue
    
    # 先进先出队列
    q = queue.Queue(3)
    q.put('asd')
    q.put('asd2')
    q.put('asd3')
    # q.put('asd4', block=True, timeout=5) # 阻塞5秒
    q.get_nowait()
    # 等同于 # q.put('asd4', block=False)
    # get有一样的方式
    # print(q.get())
    # # print(q.get(block=False)) #q.get_nowait()
    # # print(q.get_nowait())
    
    
    # 先进后出队列,堆栈的方式
    q = queue.LifoQueue(3)
    q.put(1)
    q.put(2)
    print(q.get())
    print(q.get())
    
    # 优先级队列,数字越小,优先级越高
    q = queue.PriorityQueue(3)
    q.put((8, 'ssss'))
    q.put((4, '44444'))
    q.put((9, '9999'))
    print(q.get())
    print(q.get())
    print(q.get())
    View Code

     进程池和线程池

    用法一样,使用场景不同:线程是I/O密集型应用,如,socket,web,爬虫。进程是计算密集型,利用多核计算的优势,如金融软件。

    进程池和线程池的目的都是为了限制开启进程(线程)并发的最大数

    import os
    import random
    import time
    from concurrent.futures import ThreadPoolExecutor
    # from concurrent.futures import ProcessPoolExecutor # 进程和线程的用法一样,但是使用场景不同
    from threading import currentThread
    
    
    def task():
        print('name:[%s],pid:[%s] is running ' % (currentThread().getName(), os.getpid()))
        time.sleep(random.randint(1, 3))
    
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(5)
        for i in range(10):
            pool.submit(task, )
    
        pool.shutdown(wait=True)  # 关闭线程池,为了不能让新的线程再加入到池中,等池中线程都执行完再往下执行
    
        print('main')
    View Code

    map 取代 for + submit

    import os
    import random
    import time
    from concurrent.futures import ThreadPoolExecutor
    # from concurrent.futures import ProcessPoolExecutor # 进程和线程的用法一样,但是使用场景不同
    from threading import currentThread
    
    
    def task(n):
        print('name:[%s],pid:[%s] is running ' % (currentThread().getName(), os.getpid()))
        time.sleep(random.randint(1, 3))
    
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(max_workers=5)
        # for i in range(10):
        #     pool.submit(task, i)
        pool.map(task, range(10))  # map取代了for+submit
        pool.shutdown(wait=True)  # 关闭线程池,为了不能让新的线程再加入到池中,等池中线程都执行完再往下执行
    
        print('main')
    View Code

    同步:提交任务后,原地等待任务执行结果,拿到结果后再往下执行下一行代码。结果:程序串行

    异步:提交任务后不等待任务执行完毕。

    这样便需要异步回调,以爬虫来解释就是,同时爬取多个网站的时候,要先下载网络上的内容,再解析得到自己想要的结果,所以在下载(读取网页内容)后回调解析的命令

    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    
    import requests
    
    
    def get_page(url):
        print('线程 [%s] 正在下载[%s]' % (current_thread().getName(), url))
        response = requests.get(url)
        if response.status_code == 200:
            return {'url': url, 'text': response.text}
    
    
    def parse_page(res):
        res = res.result()
        print('线程[%s]正在解析[%s]' % (current_thread().getName(), res['url']))
        parse_res = 'url[%s] 大小[%s]
    ' % (res['url'], len(res['text']))
        with open('db.txt', 'a', encoding='utf-8') as f:
            f.write(parse_res)
    
    
    if __name__ == '__main__':
        urls = ['https://www.baidu.com',
                'https://www.python.org',
                'https://www.openstack.org',
                'https://help.github.com/',
                'http://www.sina.com.cn/'
                ]
        p = ThreadPoolExecutor(3)
    
        for url in urls:
            p.submit(get_page, url).add_done_callback(parse_page)  # parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
    View Code

    小练习:线程池方式实现socket并行并且限制并行数量

    服务端

    import socket
    # from threading import Thread
    from concurrent.futures import ThreadPoolExecutor
    
    
    def talk(conn):
        while 1:
            try:
                data = conn.recv(1024)
                conn.send(data.upper())
            except ConnectionResetError:
                print('客户端端开...')
                break
        conn.close()
    
    
    def server_xxx(ip, port):
        server1 = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server1.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server1.bind((ip, port))
        server1.listen(2)
        print('服务器已经启动....')
        while True:
            conn, addr = server1.accept()
    
            print('有客户端连接上来了....')
            pool.submit(talk, conn)
    
        server1.close()
    
    
    if __name__ == '__main__':
        pool = ThreadPoolExecutor(3)
        server_xxx('localhost', 8080)
    View Code

    客户端

    # -*- coding: utf-8 -*-
    import socket
    
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(('localhost', 8080))
    while True:
        cmd = input('>>:').strip()
        if cmd:
            client.send(cmd.encode('utf-8'))
            res = client.recv(1024)
            print(res.decode('utf-8'))
    View Code

    协程:单线程下并发(遇到IO阻塞就切换任务操作)

    生成器(yield)方式

    greenlet模块和gevent模块,其中gevent模块和monkey.patch_all() 即可标记所有IO操作

    IO模型:

    (前言:http://www.cnblogs.com/linhaifeng/articles/7430066.html#_label4)

    详细:

    https://www.luffycity.com/python-book/di-7-zhang-bing-fa-bian-cheng/75-iomo-xing/751-iomo-xing-jie-shao.html

    简单版本:

    到目前为止,已经将四个IO Model都介绍完了。现在回过头来回答最初的那几个问题:blocking和non-blocking的区别在哪,synchronous IO和asynchronous IO的区别在哪。
    先回答最简单的这个:blocking vs non-blocking。前面的介绍中其实已经很明确的说明了这两者的区别。调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。
    再说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。Stevens给出的定义(其实是POSIX的定义)是这样子的:
    A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
    An asynchronous I/O operation does not cause the requesting process to be blocked;
    两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,四个IO模型可以分为两大类,
    之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO这一类,而 asynchronous I/O后一类 。

    有人可能会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,
    就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,
    这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,
    在这段时间内,进程是被block的。而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,
    告诉进程说IO完成。在这整个过程中,进程完全没有被block。

  • 相关阅读:
    sql 2008查看进程情况和对应语句,检查死锁进程
    sqlserver2008锁表语句详解(锁定数据库一个表)
    sqlserver 中的时间算法
    SQL server 2008 数据库优化常用脚本
    Win7下64位机安装SQL2000
    Git回退到服务器某个版本正确姿势
    一个有趣的倒计时问题
    如何申请AWS免费ACM
    AWS账号、用户、服务之间的关系
    [RDL]中多行组列组占比报表制作
  • 原文地址:https://www.cnblogs.com/Simonsun002/p/8168645.html
Copyright © 2011-2022 走看看