zoukankan      html  css  js  c++  java
  • python3 进程和线程(一)

    进程和线程

    1. 进程:内存独立,线程共享同一进程的内存,一个进程就像一个应用程序,例如qq和word,这属于两个进程,
    2. 进程是资源的集合,线程是执行单位
    3. 进程之间不能直接互相访问,同一进程中的程可以互相通信
    4. 创建新进程消耗系统资源,线程非常轻量,只保存线程运行时的必要数据,如上下文、程序堆栈信息
    5. 同一进程里的线程可以相互控制,父进程可以控制子进程
     1 import threading
     2 import time
     3 
     4 def sayhi(num):
     5     print('num:',num)
     6     time.sleep(3)
     7 
     8 a = threading.Thread(target=sayhi,args=(1,))
     9 b = threading.Thread(target=sayhi,args=(2,))
    10 now1 = time.time()
    11 print(now1)
    12 a.start()
    13 b.start()
    14 now2 = time.time()
    15 print(now2)
    16 print(threading.active_count()) 
    17 # 包含主线程,总共3个
    18 print(a.getName())
    19 print(b.getName())
    20 
    21 class MyThread(threading.Thread):
    22     def __init__(self,n):
    23         threading.Thread.__init__(self)
    24         self.n = n
    25 
    26         def run(self):
    27             print('running on thread $s'%self.n)
    28             time.sleep(3)
    29 now3 = time.time()
    30 print(now3)
    31 t1 = MyThread(1)
    32 t2 = MyThread(2)
    33 t1.start()
    34 t2.start()
    35 print(t1.getName())
    36 print(t2.getName())
    37 now4 = time.time()
    38 print(now4)
    39 
    40 thread_list = []
    41 for i in range(10):
    42     s1 = threading.Thread(target=sayhi,args=(i,))
    43     s1.start()
    44     thread_list.append(s1)
    45 now5 = time.time()
    46 print(now5)
    47 for r in thread_list:
    48     r.join() # s1.wait()
    49 print('--work done--')
    50 now6 = time.time()
    51 print(now6)
    52 print('primary'.center(20,'-'))
    53 
    54 
    55 for ii in range(10):
    56     s2 = threading.Thread(target=sayhi)
    57     s2.setDaemon(s2)
    58     s2.start()
    View Code

    一、线程的基本使用 threading模块

    threading 模块建立在 _thread 模块之上。

    thread 模块以低级、原始的方式来处理和控制线程,

    而 threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。

     1 import threading
     2 import time
     3 
     4 def task(arg):
     5     time.sleep(arg)
     6     print(arg)
     7 
     8 for i in range(30):
     9     # 创建线程,args必须是可迭代的对象
    10     t = threading.Thread(target=task,args=[i,])
    11 
    12     # 主线程终止,不等待子线程
    13     # t.setDaemon(True)
    14     # 等待子线程
    15     # t.setDaemon(False)
    16 
    17     # 开始工作,等待cpu使用
    18     t.start()
    19 
    20     # 变成串行
    21     # t.join() #一直等
    22     # t.join(1) # 超时,等待最大时间
    23 print('end')
    基本用法

    使用线程中的run方法

     1 import threading
     2 import time
     3 class MyThread(threading.Thread): #继承threading方法
     4     def __init__(self,func,*args,**kwargs):
     5         super(MyThread,self).__init__(*args,**kwargs)
     6         self.func = func
     7 
     8     def run(self): # 线程中的run方法
     9         self.func()
    10 
    11 def task():
    12     time.sleep(1)
    13     print('is a test')
    14 
    15 obj = MyThread(func=task)
    16 obj.start()
    View Code

    Thread方法说明

    t.start() : 激活线程,
    
    t.getName() : 获取线程的名称
    
    t.setName() : 设置线程的名称 
    
    t.name : 获取或设置线程的名称
    
    t.is_alive() : 判断线程是否为激活状态
    
    t.isAlive() :判断线程是否为激活状态
    
    t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
    
    t.isDaemon() : 判断是否为守护线程
    
    t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
    
    t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
    
    t.run() :线程被cpu调度后自动执行线程对象的run方法
    Thread使用方法

    二、线程锁threading.RLock和threading.Lock

    由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。为了保证数据的准确性,引入了锁的概念

    假如现在有10个人要上厕所,钥匙在我手里,这个时候A进来,他把门关上了,别人就进不去,只有等A完事后其他人才能进来,这个门就是控制线程的那把锁

    锁的基本使用:

    1 lock=threading.Lock() 
    2 lock.acquire() 
    3 lock.release()
    lock用法

    Rlock和Lock的区别:

      RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐

     1 # 只能一个人使用锁
     2 # 创建锁
     3 lock = threading.Lock()
     4 # 递归锁,多把锁
     5 # lock = threading.RLock()
     6 
     7 def task(arg):
     8     time.sleep(2)
     9     print('arg:',arg)
    10     # 申请使用锁,其他人等
    11     lock.acquire()
    12     # lock.acquire() # Rlock可使用多把锁
    13 
    14     global v
    15     v -= 1
    16     print(v)
    17 
    18     # 释放锁
    19     lock.release()
    20     # lock.release() # Rlock可解锁
    21 
    22 for i in range(2):
    23     t = threading.Thread(target=task,args=(i,))
    24     t.start()
    Lock和Rlock

    如果厕所有3个坑,同时允许3个人上,使用 BoundedSemaphore

     1 # 多个人同时使用锁
     2 # 信号链,定义3个人同时使用
     3 lock = threading.BoundedSemaphore(3)
     4 def task(arg):
     5     lock.acquire()
     6     time.sleep(1)
     7     global v
     8     v -= 1
     9     print(v)
    10     lock.release()
    11 for i in range(10):
    12     t = threading.Thread(target=task,args=(i,))
    13     t.start()
    BoundedSemaphore

    三、事件Event

    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

    • clear:将“Flag”设置为False
    • set:将“Flag”设置为True
    • Event.isSet() :判断标识位是否为Ture
     1 # 事件 event
     2 lock = threading.Event()
     3 def task(arg):
     4     time.sleep(1)
     5     # 锁住所有的线程
     6     lock.wait()
     7     print(arg)
     8 for i in range(10):
     9     t = threading.Thread(target=task,args=(i,))
    10     t.start()
    11 while 1:
    12     value = input('>>:').strip()
    13     if value == '1':
    14         lock.set() # 打开锁,执行上面的print
    15         # lock.clear() # 再锁上
    event

    四、条件Condition

    condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。

    acquire() 和 release() 会调用与锁相关联的相应的方法。

    其他和锁关联的方法必须被调用,wait()方法会释放锁,当另外一个线程使用 notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,

    • wait(timeout=None) : 等待通知,或者等到设定的超时时间。当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError 异常。 wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前 会一直阻塞。wait() 还可以指定一个超时时间。

    如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒所有在等待conditon变量的线程。

     1 lock = threading.Condition()
     2 
     3 def task(arg):
     4     time.sleep(1)
     5     # 锁住所有线程
     6     lock.acquire()
     7     lock.wait()
     8     print('线程:',arg)
     9     lock.release()
    10 for i in range(10):
    11     t = threading.Thread(target=task,args=(i,))
    12     t.start()
    13 while 1:
    14     value = input('>>>:').strip()
    15     lock.acquire()
    16     lock.notify(int(value))
    17     lock.release()
    18 
    19 >>>:1
    20 线程: 3
    21 >>>:2
    22 线程: 4
    23 线程: 2
    24 >>>:3
    25 线程: 0
    26 线程: 7
    27 线程: 1
    28 >>>:4
    29 线程: 8
    30 线程: 9
    31 线程: 6
    32 线程: 5
    33 >>>:5
    threading.Condition

    五、queue模块

    Queue 就是对队列,它是线程安全的

    举例来说,我们去麦当劳吃饭。饭店里面有厨师职位,前台负责把厨房做好的饭卖给顾客,顾客则去前台领取做好的饭。这里的前台就相当于我们的队列。形成管道样,厨师做好饭通过前台传送给顾客,所谓单向队列

     1 import queue
     2  
     3 q = queue.Queue(maxsize=0)  # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。
     4  
     5 q.join()    # 等到队列为kong的时候,在执行别的操作
     6 q.qsize()   # 返回队列的大小 (不可靠)
     7 q.empty()   # 当队列为空的时候,返回True 否则返回False (不可靠)
     8 q.full()    # 当队列满的时候,返回True,否则返回False (不可靠)
     9 q.put(item, block=True, timeout=None) #  将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置,
    10                          为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后,
    11                           如果队列无法给出放入item的位置,则引发 queue.Full 异常
    12 q.get(block=True, timeout=None) #   移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞,
    13                       若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。
    14 q.put_nowait(item) #   等效于 put(item,block=False)
    15 q.get_nowait() #    等效于 get(item,block=False)
    queue

    六、线程池

     1 from concurrent.futures import ThreadPoolExecutor as TPE
     2 
     3 def task(arg):
     4     time.sleep(0.5)
     5     print('Thread:',arg)
     6 
     7 pool = TPE(5) # 线程池里放5个线程
     8 
     9 for i in range(100):
    10     # 去连接池中获取连接
    11     pool.submit(task,i)
    线程池
     1 import requests
     2 def task(url):
     3     response = requests.get(url)
     4     print('得到结果:',url,len(response.content))
     5 
     6 pool = TPE(2)
     7 url_list = [
     8     'http://www.oldboyedu.com',
     9     'http://www.baidu.com',
    10     'http://www.sohu.com',
    11 ]
    12 
    13 for url in url_list:
    14     print('开始请求',url)
    15     # 去连接池获取连接
    16     pool.submit(task,url)
    线程池2

    七、回调函数

    线程池定义为一个变量,使用变量.add_done_callback(函数名称)进行函数回调

     1 import requests
     2 
     3 def txt(future):
     4     download_response = future.result()
     5     print('得到结果:', url, len(download_response.content))
     6 
     7 def download(url):
     8     response = requests.get(url)
     9     # print('得到结果:',url,len(response.content))
    10     return response
    11 pool = TPE(2)
    12 url_list = [
    13     'http://www.oldboyedu.com',
    14     'http://www.baidu.com',
    15     'http://www.sohu.com',
    16 ]
    17 
    18 for url in url_list:
    19     print('开始请求',url)
    20     # 去连接池获取连接
    21     future = pool.submit(download,url)
    22     # 一旦download函数return,开始执行txt函数
    23     future.add_done_callback(txt)
    回调函数

    八、进程

    multiprocessing是python的多进程管理包,和threading.Thread类似。

     1 from multiprocessing import Process
     2 def task(arg):
     3     time.sleep(arg)
     4     print(arg)
     5 
     6 if __name__ == '__main__':
     7     for i in range(10):
     8         p = Process(target=task,args=(i,))
     9         # 守护true,不执行子进程
    10         # p.daemon = True
    11         # false执行子进程,默认
    12         # p.daemon = False
    13         p.start()
    14         # p.join()
    15         p.join(1)
    16     print('主进程中的主线程...')
    Process

    九、multiprocessing,Array,Value

    数据可以用Value或Array存储在一个共享内存地图里,如下:

     1 # 进程数据共享
     2 from multiprocessing import Process,Array
     3 def task(num,li):
     4     li[num] = num
     5     print(list(li))
     6 
     7 if __name__ == '__main__':
     8     L = Array('i',10) # 数据类型,长度
     9     for i in range(10):
    10         p = Process(target=task,args=(i,L))
    11         p.start()
    Array
    from multiprocessing import Array, Value, Process
    
    
    def func(a, b):
        a.value = 6.66666
        for i in range(len(b)):
            b[i] = -b[i]
    
    
    if __name__ == "__main__":
        num = Value('d', 0.0)
        arr = Array('i', range(11))
    
        c = Process(target=func, args=(num, arr))
        d = Process(target=func, args=(num, arr))
        c.start()
        d.start()
        c.join()
        d.join()
    
        print(num.value)
        for i in arr:
            print(i)
    Value Array

    十、进程池

    和线程池差不多

     1 from concurrent.futures import ProcessPoolExecutor as PPE
     2 
     3 
     4 #基本用法
     5 def task(arg):
     6     time.sleep(1)
     7     print(arg)
     8 
     9 pool = PPE(5)
    10 for i in range(10):
    11     pool.submit(task,i)
    12 
    13 
    14 # 进程池回调
    15 def call(arg):
    16     data = arg.result()
    17     print(data)
    18 
    19 def task(arg):
    20     print(arg)
    21     return arg+100
    22 
    23 pool = PPE(5)
    24 for i in range(10):
    25     obj = pool.submit(task,i)
    26     obj.add_done_callback(call)
    进程池和回调
  • 相关阅读:
    BZOJ 1977: [BeiJing2010组队]次小生成树 Tree( MST + 树链剖分 + RMQ )
    BZOJ 2134: 单选错位( 期望 )
    BZOJ 1030: [JSOI2007]文本生成器( AC自动机 + dp )
    BZOJ 2599: [IOI2011]Race( 点分治 )
    BZOJ 3238: [Ahoi2013]差异( 后缀数组 + 单调栈 )
    ZOJ3732 Graph Reconstruction Havel-Hakimi定理
    HDU5653 Bomber Man wants to bomb an Array 简单DP
    HDU 5651 xiaoxin juju needs help 水题一发
    HDU 5652 India and China Origins 并查集
    HDU4725 The Shortest Path in Nya Graph dij
  • 原文地址:https://www.cnblogs.com/xp1005/p/6565625.html
Copyright © 2011-2022 走看看