zoukankan      html  css  js  c++  java
  • 线程queue、线程进程池,协程

    线程queue

    import queue
    q = queue.Queue() #先进先出
    q = queue.LifoQueue() #先进后出
    t = queue.PriorityQueue() #优先级取数据,通常这个元组的第一个值是int类型
    q.put('123')
    q.put('qweqwe')
    print(q.get())
    print(q.get())
    t.put('100', 'tank')
    t.put('10', 'nick')
    t.put('1', 'jason')
    print(t.get())
    print(t.get())
    print(t.get())
    q.task_done()
    q.task_done()
    q.join()
    

    线程定时器

    from threading import Thread, Timer
    import time
    
    def task():
        print('线程执行了')
        time.sleep(2)
        print('线程结束了')
    
    t = Timer(3, task) #过了3秒后开启了一个线程
    t.start()
    

    多线程实现socket服务端

    #服务端
    
    from threading import Thread
    import socket
    
    def talk(conn):
        while True:
            try:
                info = conn.recv(1024)
                if len(info) == 0: break
                print(str(info, encoding = 'utf8'))
                conn.send(info.upper())
            except ConnectionResetError:
                print('客户端关闭了一个连接')
                break
        conn.close()
    
    def server_demo():
    	server = socket.socket(
            socket.AF_INET, socket.SOCK_STREAM
        )
        server.bind(('127.0.0.1', 8001))
        server.listen(4)
        while True:
            conn,addr = server.accept()
            print(conn, addr)
            t = Thread(target=talk)
            t.start()
    if __name__ == '__main__':
        server_demo()
        
    
        
    #客户端
    from threading import Thread,currentThread
    import socket
    
    def client_demo():
        client = socket.socket()
    	client.connect(('127.0.0.1', 8001))
        while True:
            msg = f'{currentThread().name}'
            client.send(bytes(msg, encoding = 'utf8'))
            info = client.recv(1024)
            print(str(info, encoding = 'utf8'))
        client.close()
    
    if __name__ == '__main__':
    	for i in range(10):
    		t = Thread(target = client_demo)
    		t.start()
    	
            
    

    多线程中的Queue队列中join()与task_done()

    '''
    Queue.task_done():在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号。
    Queue.join():实际上意味着等到队列为空,再执行别的操作
    
    如果线程里每从队列里取一次,但没有执行task_done(),则join无法判断队列中到底有没有结束。
    可以理解为,每task_done一次,就从队列里删掉一个元素,这样在最后Join的时候根据队列长度是否为0来判断队列是否结束,从而执行主线程
    '''
    import queue
    q = queue.Queue()
    q.put('123')
    q.put('qwe')
    q.task_done()
    q.task_done()
    q.join() #这样的话,程序不会被挂起,但是如果只有一个task_done(),则会被挂起
    
    

    线程池和进程池

    进程池和线程池:

    池的功能限制进程数或线程数.

    什么时候限制?

    当并发的任务数量远远大于计算机所能承受的范围时,即无法 一次性开启过多的任务数量,我就应该考虑去限制进程数或线程数 ,从而保证服务器不崩。

    ## 线程池和进程池
    
    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    from threading import currentThread
    from multiprocessing import current_process
    import time
    
    def task(i):
        print(f'{currentThread().name} 正在执行任务 {i}')
        time.sleep(2)
        print('
    
    
    ') 
        return i**2 #每次线/进程执行完都会返回一个值
    if __name__ == '__main__':
        pool = ProcessPoolExecutor(5) #设置进程池的大小,每次只允许有5个进程同时运行
        # pool = ThreadPoolExecutor(5)
        fu_list = [] #列表用于保存任务对象
        for i in range(15): #一共有15个任务
            future = pool.submit(task, i) #把任务提交给进程执行
            fu_list.append(future) #把任务对象加到列表中
        pool.shutdown() #关闭线程池的入口
        for fu in fu_list: #任务执行完接收返回值
            print(fu.result())
            
    
    #回调函数
    from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
    from threading import currentThread
    from multiprocessing import current_process
    import time
    
    def task(i):
        print(f'{currentThread().name} 正在执行任务 {i}')
        time.sleep(2)
        return i**2
    
    def fun(future):
        print(future.result())
    if __name__ == '__main__':
        pool = ProcessPoolExecutor(5)
        # pool = ThreadPoolExecutor(5)
        for i in range(15):
            future = pool.submit(task, i)
    		future.add_done_callback(fun)
            '''
            回调函数:
            	为当前任务绑定了一个函数,在当前
            任务执行结束的时候会触发这个函数,会把future对象作为参数传给函数
            ,这个称为回调函数,处理完了回来就调用这个函数
            '''
    

    协程(待补充)

    协程

    '''
    python的线程用的是操作系统原生的线程
    
    协程:单线程下实现并发
    	
    	并发:切换加保存状态
    	
    	多线程:主要由操作系统帮忙实现,遇到io操作或者执行时间过
    长就会切换。
    	
    	什么样的协程是有意义的:
    		遇到io切换的时候才有意义
    		具体:
    			协程概念本质是程序员抽象出来的,操作系统根本不知道协程的存在,一个线程遇到io,该线程内部把CPU切到别的任务上了,操作系统就发现不了,这样实现了单线程下效率最高
    '''
    
    '''
    优点:
    	自己控制切换要比操作系统切换快的多
    
    缺点:
    	对比多线程
    	自己检测所有io, 但凡有一个阻塞整体都跟着阻塞
    	对比多进程
    	无法利用多核优势
    	
    为什么要有协程(遇到io切换)?
    	自己控制切换要比操作系统切换快的多,降低了单个线程的io时间
    '''
    
  • 相关阅读:
    数据仓库基础(十四)缓慢变化维
    数据仓库基础(十三)Informatica workflow
    数据仓库基础(十二)Informatica组件(2)
    数据仓库基础(十一)Informatica小技巧(2)
    数据仓库基础(十)组件1
    数据仓库基础(九)Informatica小技巧(1)
    数据仓库基础(八)Informatica 小例子
    数据仓库基础(七)Informatica PowerCenter介绍
    数据仓库基础(六)数据的ETL
    数据仓库基础(五)数据仓库系统应用实例
  • 原文地址:https://www.cnblogs.com/michealjy/p/11553146.html
Copyright © 2011-2022 走看看