zoukankan      html  css  js  c++  java
  • python-线程进程与队列

    线程,有时被称为轻量级进程,是程序执行流的最小单元
    线程是程序中一个单一的顺序控制流程。进程内一个相对独立的、可调度的执行单元,是系统独立调度和分派CPU的基本单位指进行中的程序的调度单位。在单个程序中同时运行多个线程完成不同的工作,称为多线程。
    python中多个cpu无法同时处理一个进程或其子进程,多个cpu可以同时处理多个线程
    1
    import time 2 def f1(arg): 3 time.sleep(1) 4 print(arg) 5 import threading 6 t = threading.Thread(target = f1,args = (123,)) 7 #t.setDaemon(True)#表示主线程不等子线程 8 t.start()#不代表当前线程会立即被执行 9 t.join(2) #表示主线程到此等待。。。直到子线程执行完成#参数,表示住线程再次最多等待n秒 10 11 print('end') 12 #一秒后显示 13 >>>123 14 >>>end 15 16 import time 17 def f1(arg): 18 time.sleep(1) 19 print(arg) 20 import threading 21 t = threading.Thread(target = f1,args = (123,)) 22 t.setDaemon(True)#表示主线程不等子线程 23 t.start()#不代表当前线程会立即被执行 24 #t.join(2) #表示主线程到此等待。。。直到子线程执行完成#参数,表示住线程再次最多等待n秒 25 26 print('end') 27 #立即显示 28 >>>end
    #我们可以写个类继承threading模块的Thead类并加入自定义的构造方法,
    #用来添加新功能
    class MyThread(threading.Thread):
        def __init__(self,func,args):
            self.func =func
            self.args = args
            #继承父类的构造方法
            super(MyThread,self).__init__()
    
        def run(self):
            self.func(self.args)
    
    def f2(arg):
        print(arg)
    
    obj = MyThread(f2,123)
    obj.start()
    >>>123

    队列

     

     1 #Python中,队列是线程间最常用的交换数据的形式。Queue模块是提供队列操作的模块,不同的队列应用在不同的场景中
     2 #queue.Queue先进先出队列
     3 #queue.LifoQueue,后进先出队列
     4 #queue.PriorityQueue,优先级队列
     5 #queue.deque,双向队列
     6 
     7 import queue
     8 q = queue.Queue(3)#参数为队列中最大个数
     9 print(q.empty())#判断是否为空
    10 >>>True
    11 print(q.full())#判断队列是否已满
    12 >>>False
    13 q.put(11)
    14 >>>传入元素
    15 q.put(22)
    16 >>>传入元素
    17 #q.put(33,block=False)#最大值为2传入第三个的时候默认阻塞
    18 print(q.qsize())#真实队列剩余个数
    19 >>>2
    20 print(q.maxsize)#最大个数
    21 >>>3
    22 print(q.get())#取值当队列中取完之后继续取得时候阻塞
    23 >>>11
    24 q.task_done()#任务完成
    25 print(q.get())#取值
    26 >>>22
    27 q.task_done()#任务完成,用于释放队列
    28 q.join()#不带这个的时候程序完成后释放队列,加上的时候阻塞
     1 #先进后出队列
     2 q = queue.LifoQueue()
     3 q.put(123)
     4 q.put(456)
     5 print(q.get())
     6 #>>>456
     7 
     8 #优先级队列
     9 q = queue.PriorityQueue()
    10 q.put((1,'alex1'))
    11 q.put((1,'alex2'))
    12 q.put((3,'alex3'))
    13 print(q.get())
    14 >>>(1, 'alex1')
    15 
    16 
    17 #双向队列
    18 q= queue.deque()
    19 q.append(123)
    20 q.append(333)
    21 q.appendleft(456)
    22 #从左侧插入队列
    23 print(q.pop())#从右侧取值
    24 print(q.popleft())#从左侧取值
    25 >>>333
    26 >>>456

    生产者消费者模型

    #生产者消费者模型
    import queue
    import threading
    import time
    
    #创建队列
    q = queue.Queue(50)
    
    #定义消费者
    def productor(arg):
        '''
        买票
        :param arg:
        :return:
        '''
        while True:
            q.put(str(arg) + '号产生订单')#提交到队列
    
    #创建300个线程发送请求
    for i in range(300):#300个线程同时提交订单相当于300个人同时提交订单
        t = threading.Thread(target= productor,args= (i,))
        t.start()
    
    #定义生产者
    def consumer(arg):
        '''
        服务器后台
        :param arg:
        :return:
        '''
        while True:
            print(str(arg) + '处理了'+q.get())#进程从队列中取订单进行处理
    
    #3个线程同时工作
    for j in range(3):
        t = threading.Thread(target=consumer,args=(j,))
        t.start()

    线程锁 

     1 #线程锁
     2 import threading
     3 import time
     4 
     5 NUM = 10
     6 #线程锁线程执行进程通过的接口,用来限制多个线程同时修改一个数据
     7 def func(l):
     8     global NUM
     9     #上锁
    10     l.acquire()
    11     NUM -=1
    12     time.sleep(2)
    13     print(NUM)
    14     #开锁
    15     l.release()
    16 #单层锁
    17 lock = threading.Lock()
    18 #多层锁
    19 #lock = threading.RLock()
    20 
    21 for i in range(30):
    22     t  = threading.Thread(target=func,args = (lock,))
    23     t.start()

    信号量

     1 #设置可通过线程个数
     2 import threading
     3 import time
     4 NUM =10
     5 def func(i,l):
     6     global NUM
     7     #上锁
     8     l.acquire()
     9     NUM -=1
    10     time.sleep(2)
    11     print(NUM,i)
    12     #开锁
    13     l.release()
    14 
    15 #调用信号量设置每次多少个线程处理进程
    16 lock = threading.BoundedSemaphore(5)
    17 
    18 for i in range(30):
    19     t = threading.Thread(target= func,args=(i,lock,))
    20     t.start()
     1 #event相当于红绿灯,通过一个标识来批量管理线程
     2 import threading
     3 
     4 def func(i,e):
     5     print(i)
     6     e.wait()#检测时什么灯,如果是红灯,停,绿灯,行
     7     print(i+100)
     8 
     9 event = threading.Event()
    10 
    11 for i in range(10):
    12     t = threading.Thread(target= func,args = (i,event,))
    13     t.start()
    14 
    15 event.clear()#设置成红灯
    16 inp = input('>>>')
    17 if inp == '1':
    18     event.set()#设置成绿灯
     1 #根据条件限定线程的执行
     2 #!/usr/bin/env python
     3 # -*- coding:utf-8 -*-
     4 #设置条件设置线程数第一种方式
     5 import threading
     6 def func(i,con):
     7     print(i)
     8     con.acquire()
     9     con.wait()
    10     print(i+100)
    11     con.release()
    12 
    13 c = threading.Condition()
    14 for i in range(10):
    15     t = threading.Thread(target=func,args=(i,c))
    16     t.start()
    17 
    18 while True:
    19     inp = input('>>>')
    20     if inp == 'q':
    21         break
    22     c.acquire()
    23     c.notify(int(inp))#根据输入设置通过几个线程数
    24     c.release()
    25 
    26 #第二种
    27 import threading
    28 
    29 def condition():
    30     ret =False
    31     r = input('>>>')
    32     if r == 'true':
    33         ret =True
    34     else:
    35         ret = False
    36     return ret
    37 
    38 def func(i,con):
    39     print(i)
    40     con.acquire()
    41     # 设置condition函数为条件返回true继续运行,条件不成立则不执行此线程
    42     con.wait_for(condition)
    43     print(i+100)
    44     con.release()
    45 
    46 c = threading.Condition()
    47 for i in range(10):
    48     t = threading.Thread(target=func,args=(i,c))
    49     t.start()

    Timer

    1 #定时器
    2 form threading import Time
    3 
    4 def hello():
    5     print('hello')
    6 t = Timer(1,hello)#一秒后执行
    7 t.start()

    线程池

     1 import queue
     2 import threading
     3 import time
     4 
     5 class ThreadPool:
     6     def __init__(self,maxsize):
     7         self.maxsize = maxsize
     8         self._q = queue.Queue(maxsize)#创建队列
     9         for i in range(maxsize):
    10             self._q.put(threading.Thread)#将创建线程的类放入队列
    11 
    12     def get_thread(self):
    13         return self._q.get()#获取队列的值
    14 
    15     def add_thread(self):
    16         self._q.put(threading.Thread)
    17 
    18 pool = ThreadPool(5)#设置线程池
    19 def task(arg,p):
    20     print(arg)
    21     time.sleep(1)
    22     p.add_thread()#添加新的线程
    23 
    24 for i in range(100):
    25     t = pool.get_thread()#当获取5次后,阻塞在此
    26     obj = t(target = task,args = (i,pool,))#创建线程调用函数task
    27     obj.start()
      1 #第二种创建线程池方式
      2 #!/usr/bin/env python
      3 # -*- coding:utf-8 -*-
      4 
      5 
      6 import queue
      7 import threading
      8 import contextlib
      9 import time
     10 
     11 StopEvent = object()
     12 class Pool:
     13     def __init__(self,max_num,max_task_num=None):
     14         if max_task_num:
     15             self.q=queue.Queue(max_task_num)#创建队列并指定接受任务最大数
     16         else:
     17             self.q = queue.Queue()#不指定参数
     18         self.max_num = max_num#最多有多少个线程
     19         self.cancel = False
     20         self.terminal = False
     21         self.generate_list = []#已创建线程
     22         self.free_list = []#空闲线程
     23 
     24     def run(self,func,args,callback = None):#接收参数
     25         if self.cancel:
     26             return
     27         if len(self.free_list) ==0 and len(self.generate_list) < self.max_num:#当没有空闲线程并且,已创建的线程没有达到最大值
     28             self.generate_thread() #创建新线程调generte_thread函数
     29         w = (func,args,callback,)#将参数传入队列中
     30         self.q.put(w)#将参数作为元组传入队列中
     31 
     32     def generate_thread(self):
     33         '''
     34         创建线程
     35         :return:
     36         '''
     37         t = threading.Thread(target=self.call)#执行call函数
     38         t.start()
     39     def call(self):
     40         '''
     41         让线程执行任务
     42         :return:
     43         '''
     44         current_thread = threading.currentThread#获取当前线程数
     45         self.generate_list.append(current_thread)#传入已创建线程列表中
     46         event =self.q.get()#获取任务
     47         while event != StopEvent:#如果任务不为空
     48             func,args,callback = event#将传过来的参数赋值给event
     49             try:
     50                 result = func(args)#执行action(i)
     51                 success = True#任务执行成功
     52             except Exception as e:
     53                 success = False#action任务执行失败
     54             if callback is not None:
     55                 try:
     56                     callback(success,result)
     57                 except Exception as c:
     58                     pass
     59             #event.self.q.get()#继续去任务,当存在任务则执行action不存在则删除当前进程
     60                 with self.worker_state(self.free_list,current_thread):#任务执行完成后设置该线程为空闲
     61                     if self.terminal:#如果是空闲的
     62                         event = StopEvent
     63                     else:
     64                         event = self.q.get()#如果不是空闲的,则去取任务
     65         else:
     66             self.generate_list.remove(current_thread)#如果任务为空则删除当前线程
     67 
     68     def close(self):
     69         '''
     70         执行完所有任务后,所有线程停止
     71         :return:
     72         '''
     73         self.cancel = True
     74         full_size = len(self.generate_list)#统计线程个数
     75         while full_size:#根据线程个数传入对应个数的False标志
     76             self.q.put(StopEvent)
     77             full_size -=1
     78 
     79     def terminate(self):
     80         """
     81         无论是否还有任务,终止线程
     82         """
     83         self.terminal = True
     84 
     85         while self.generate_list:
     86             self.q.put(StopEvent)
     87         self.q.empty()
     88 
     89 
     90     @contextlib.contextmanager
     91     def worker_state(self, state_list, worker_thread):
     92         """
     93         用于记录线程中正在等待的线程数
     94         """
     95         state_list.append(worker_thread)
     96         try:
     97             yield
     98         finally:
     99             state_list.remove(worker_thread)
    100 pool = Pool(5)
    101 
    102 def action():
    103     pass
    104 def callback(i):
    105     print(i)
    106 for i in range(300):
    107     ret = pool.run(action,(i,),callback)#将函数i的值与callback函数传入类中
  • 相关阅读:
    python 执行sql得到字典格式数据
    python爬虫 url链接编码成gbk2312格式
    windows环境下elasticsearch安装教程(单节点)
    python SQLServer 存储图片
    爬虫的本质是和分布式爬虫的关系
    requests form data 请求 爬虫
    mysql 删除 binlog 日志文件
    查看mysql数据表的大小
    xshell 连接报错 Disconnected from remote host
    centos 7.3 安装 mysqldb 报错 EnvironmentError: mysql_config not found ERROR: Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
  • 原文地址:https://www.cnblogs.com/liguangxu/p/5686558.html
Copyright © 2011-2022 走看看