zoukankan      html  css  js  c++  java
  • python3 进程和线程(一)

    进程和线程

    1. 进程:内存独立,线程共享同一进程的内存,一个进程就像一个应用程序,例如qq和word,这属于两个进程,
    2. 进程是资源的集合,线程是执行单位
    3. 进程之间不能直接互相访问,同一进程中的程可以互相通信
    4. 创建新进程消耗系统资源,线程非常轻量,只保存线程运行时的必要数据,如上下文、程序堆栈信息
    5. 同一进程里的线程可以相互控制,父进程可以控制子进程
     1 import threading
     2 import time
     3 
     4 def sayhi(num):
     5     print('num:',num)
     6     time.sleep(3)
     7 
     8 a = threading.Thread(target=sayhi,args=(1,))
     9 b = threading.Thread(target=sayhi,args=(2,))
    10 now1 = time.time()
    11 print(now1)
    12 a.start()
    13 b.start()
    14 now2 = time.time()
    15 print(now2)
    16 print(threading.active_count()) 
    17 # 包含主线程,总共3个
    18 print(a.getName())
    19 print(b.getName())
    20 
    21 class MyThread(threading.Thread):
    22     def __init__(self,n):
    23         threading.Thread.__init__(self)
    24         self.n = n
    25 
    26         def run(self):
    27             print('running on thread $s'%self.n)
    28             time.sleep(3)
    29 now3 = time.time()
    30 print(now3)
    31 t1 = MyThread(1)
    32 t2 = MyThread(2)
    33 t1.start()
    34 t2.start()
    35 print(t1.getName())
    36 print(t2.getName())
    37 now4 = time.time()
    38 print(now4)
    39 
    40 thread_list = []
    41 for i in range(10):
    42     s1 = threading.Thread(target=sayhi,args=(i,))
    43     s1.start()
    44     thread_list.append(s1)
    45 now5 = time.time()
    46 print(now5)
    47 for r in thread_list:
    48     r.join() # s1.wait()
    49 print('--work done--')
    50 now6 = time.time()
    51 print(now6)
    52 print('primary'.center(20,'-'))
    53 
    54 
    55 for ii in range(10):
    56     s2 = threading.Thread(target=sayhi)
    57     s2.setDaemon(s2)
    58     s2.start()
    View Code

    一、线程的基本使用 threading模块

    threading 模块建立在 _thread 模块之上。

    thread 模块以低级、原始的方式来处理和控制线程,

    而 threading 模块通过对 thread 进行二次封装,提供了更方便的 api 来处理线程。

     1 import threading
     2 import time
     3 
     4 def task(arg):
     5     time.sleep(arg)
     6     print(arg)
     7 
     8 for i in range(30):
     9     # 创建线程,args必须是可迭代的对象
    10     t = threading.Thread(target=task,args=[i,])
    11 
    12     # 主线程终止,不等待子线程
    13     # t.setDaemon(True)
    14     # 等待子线程
    15     # t.setDaemon(False)
    16 
    17     # 开始工作,等待cpu使用
    18     t.start()
    19 
    20     # 变成串行
    21     # t.join() #一直等
    22     # t.join(1) # 超时,等待最大时间
    23 print('end')
    基本用法

    使用线程中的run方法

     1 import threading
     2 import time
     3 class MyThread(threading.Thread): #继承threading方法
     4     def __init__(self,func,*args,**kwargs):
     5         super(MyThread,self).__init__(*args,**kwargs)
     6         self.func = func
     7 
     8     def run(self): # 线程中的run方法
     9         self.func()
    10 
    11 def task():
    12     time.sleep(1)
    13     print('is a test')
    14 
    15 obj = MyThread(func=task)
    16 obj.start()
    View Code

    Thread方法说明

    t.start() : 激活线程,
    
    t.getName() : 获取线程的名称
    
    t.setName() : 设置线程的名称 
    
    t.name : 获取或设置线程的名称
    
    t.is_alive() : 判断线程是否为激活状态
    
    t.isAlive() :判断线程是否为激活状态
    
    t.setDaemon() 设置为后台线程或前台线程(默认:False);通过一个布尔值设置线程是否为守护线程,必须在执行start()方法之后才可以使用。如果是后台线程,主线程执行过程中,后台线程也在进行,主线程执行完毕后,后台线程不论成功与否,均停止;如果是前台线程,主线程执行过程中,前台线程也在进行,主线程执行完毕后,等待前台线程也执行完成后,程序停止
    
    t.isDaemon() : 判断是否为守护线程
    
    t.ident :获取线程的标识符。线程标识符是一个非零整数,只有在调用了start()方法之后该属性才有效,否则它只返回None。
    
    t.join() :逐个执行每个线程,执行完毕后继续往下执行,该方法使得多线程变得无意义
    
    t.run() :线程被cpu调度后自动执行线程对象的run方法
    Thread使用方法

    二、线程锁threading.RLock和threading.Lock

    由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。为了保证数据的准确性,引入了锁的概念

    假如现在有10个人要上厕所,钥匙在我手里,这个时候A进来,他把门关上了,别人就进不去,只有等A完事后其他人才能进来,这个门就是控制线程的那把锁

    锁的基本使用:

    1 lock=threading.Lock() 
    2 lock.acquire() 
    3 lock.release()
    lock用法

    Rlock和Lock的区别:

      RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。 如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,必须调用n次的release才能真正释放所占用的琐

     1 # 只能一个人使用锁
     2 # 创建锁
     3 lock = threading.Lock()
     4 # 递归锁,多把锁
     5 # lock = threading.RLock()
     6 
     7 def task(arg):
     8     time.sleep(2)
     9     print('arg:',arg)
    10     # 申请使用锁,其他人等
    11     lock.acquire()
    12     # lock.acquire() # Rlock可使用多把锁
    13 
    14     global v
    15     v -= 1
    16     print(v)
    17 
    18     # 释放锁
    19     lock.release()
    20     # lock.release() # Rlock可解锁
    21 
    22 for i in range(2):
    23     t = threading.Thread(target=task,args=(i,))
    24     t.start()
    Lock和Rlock

    如果厕所有3个坑,同时允许3个人上,使用 BoundedSemaphore

     1 # 多个人同时使用锁
     2 # 信号链,定义3个人同时使用
     3 lock = threading.BoundedSemaphore(3)
     4 def task(arg):
     5     lock.acquire()
     6     time.sleep(1)
     7     global v
     8     v -= 1
     9     print(v)
    10     lock.release()
    11 for i in range(10):
    12     t = threading.Thread(target=task,args=(i,))
    13     t.start()
    BoundedSemaphore

    三、事件Event

    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。

    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。

    • clear:将“Flag”设置为False
    • set:将“Flag”设置为True
    • Event.isSet() :判断标识位是否为Ture
     1 # 事件 event
     2 lock = threading.Event()
     3 def task(arg):
     4     time.sleep(1)
     5     # 锁住所有的线程
     6     lock.wait()
     7     print(arg)
     8 for i in range(10):
     9     t = threading.Thread(target=task,args=(i,))
    10     t.start()
    11 while 1:
    12     value = input('>>:').strip()
    13     if value == '1':
    14         lock.set() # 打开锁,执行上面的print
    15         # lock.clear() # 再锁上
    event

    四、条件Condition

    condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。

    acquire() 和 release() 会调用与锁相关联的相应的方法。

    其他和锁关联的方法必须被调用,wait()方法会释放锁,当另外一个线程使用 notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回,

    • wait(timeout=None) : 等待通知,或者等到设定的超时时间。当调用这wait()方法时,如果调用它的线程没有得到锁,那么会抛出一个RuntimeError 异常。 wati()释放锁以后,在被调用相同条件的另一个进程用notify() or notify_all() 叫醒之前 会一直阻塞。wait() 还可以指定一个超时时间。

    如果有等待的线程,notify()方法会唤醒一个在等待conditon变量的线程。notify_all() 则会唤醒所有在等待conditon变量的线程。

     1 lock = threading.Condition()
     2 
     3 def task(arg):
     4     time.sleep(1)
     5     # 锁住所有线程
     6     lock.acquire()
     7     lock.wait()
     8     print('线程:',arg)
     9     lock.release()
    10 for i in range(10):
    11     t = threading.Thread(target=task,args=(i,))
    12     t.start()
    13 while 1:
    14     value = input('>>>:').strip()
    15     lock.acquire()
    16     lock.notify(int(value))
    17     lock.release()
    18 
    19 >>>:1
    20 线程: 3
    21 >>>:2
    22 线程: 4
    23 线程: 2
    24 >>>:3
    25 线程: 0
    26 线程: 7
    27 线程: 1
    28 >>>:4
    29 线程: 8
    30 线程: 9
    31 线程: 6
    32 线程: 5
    33 >>>:5
    threading.Condition

    五、queue模块

    Queue 就是对队列,它是线程安全的

    举例来说,我们去麦当劳吃饭。饭店里面有厨师职位,前台负责把厨房做好的饭卖给顾客,顾客则去前台领取做好的饭。这里的前台就相当于我们的队列。形成管道样,厨师做好饭通过前台传送给顾客,所谓单向队列

     1 import queue
     2  
     3 q = queue.Queue(maxsize=0)  # 构造一个先进显出队列,maxsize指定队列长度,为0 时,表示队列长度无限制。
     4  
     5 q.join()    # 等到队列为kong的时候,在执行别的操作
     6 q.qsize()   # 返回队列的大小 (不可靠)
     7 q.empty()   # 当队列为空的时候,返回True 否则返回False (不可靠)
     8 q.full()    # 当队列满的时候,返回True,否则返回False (不可靠)
     9 q.put(item, block=True, timeout=None) #  将item放入Queue尾部,item必须存在,可以参数block默认为True,表示当队列满时,会等待队列给出可用位置,
    10                          为False时为非阻塞,此时如果队列已满,会引发queue.Full 异常。 可选参数timeout,表示 会阻塞设置的时间,过后,
    11                           如果队列无法给出放入item的位置,则引发 queue.Full 异常
    12 q.get(block=True, timeout=None) #   移除并返回队列头部的一个值,可选参数block默认为True,表示获取值的时候,如果队列为空,则阻塞,为False时,不阻塞,
    13                       若此时队列为空,则引发 queue.Empty异常。 可选参数timeout,表示会阻塞设置的时候,过后,如果队列为空,则引发Empty异常。
    14 q.put_nowait(item) #   等效于 put(item,block=False)
    15 q.get_nowait() #    等效于 get(item,block=False)
    queue

    六、线程池

     1 from concurrent.futures import ThreadPoolExecutor as TPE
     2 
     3 def task(arg):
     4     time.sleep(0.5)
     5     print('Thread:',arg)
     6 
     7 pool = TPE(5) # 线程池里放5个线程
     8 
     9 for i in range(100):
    10     # 去连接池中获取连接
    11     pool.submit(task,i)
    线程池
     1 import requests
     2 def task(url):
     3     response = requests.get(url)
     4     print('得到结果:',url,len(response.content))
     5 
     6 pool = TPE(2)
     7 url_list = [
     8     'http://www.oldboyedu.com',
     9     'http://www.baidu.com',
    10     'http://www.sohu.com',
    11 ]
    12 
    13 for url in url_list:
    14     print('开始请求',url)
    15     # 去连接池获取连接
    16     pool.submit(task,url)
    线程池2

    七、回调函数

    线程池定义为一个变量,使用变量.add_done_callback(函数名称)进行函数回调

     1 import requests
     2 
     3 def txt(future):
     4     download_response = future.result()
     5     print('得到结果:', url, len(download_response.content))
     6 
     7 def download(url):
     8     response = requests.get(url)
     9     # print('得到结果:',url,len(response.content))
    10     return response
    11 pool = TPE(2)
    12 url_list = [
    13     'http://www.oldboyedu.com',
    14     'http://www.baidu.com',
    15     'http://www.sohu.com',
    16 ]
    17 
    18 for url in url_list:
    19     print('开始请求',url)
    20     # 去连接池获取连接
    21     future = pool.submit(download,url)
    22     # 一旦download函数return,开始执行txt函数
    23     future.add_done_callback(txt)
    回调函数

    八、进程

    multiprocessing是python的多进程管理包,和threading.Thread类似。

     1 from multiprocessing import Process
     2 def task(arg):
     3     time.sleep(arg)
     4     print(arg)
     5 
     6 if __name__ == '__main__':
     7     for i in range(10):
     8         p = Process(target=task,args=(i,))
     9         # 守护true,不执行子进程
    10         # p.daemon = True
    11         # false执行子进程,默认
    12         # p.daemon = False
    13         p.start()
    14         # p.join()
    15         p.join(1)
    16     print('主进程中的主线程...')
    Process

    九、multiprocessing,Array,Value

    数据可以用Value或Array存储在一个共享内存地图里,如下:

     1 # 进程数据共享
     2 from multiprocessing import Process,Array
     3 def task(num,li):
     4     li[num] = num
     5     print(list(li))
     6 
     7 if __name__ == '__main__':
     8     L = Array('i',10) # 数据类型,长度
     9     for i in range(10):
    10         p = Process(target=task,args=(i,L))
    11         p.start()
    Array
    from multiprocessing import Array, Value, Process
    
    
    def func(a, b):
        a.value = 6.66666
        for i in range(len(b)):
            b[i] = -b[i]
    
    
    if __name__ == "__main__":
        num = Value('d', 0.0)
        arr = Array('i', range(11))
    
        c = Process(target=func, args=(num, arr))
        d = Process(target=func, args=(num, arr))
        c.start()
        d.start()
        c.join()
        d.join()
    
        print(num.value)
        for i in arr:
            print(i)
    Value Array

    十、进程池

    和线程池差不多

     1 from concurrent.futures import ProcessPoolExecutor as PPE
     2 
     3 
     4 #基本用法
     5 def task(arg):
     6     time.sleep(1)
     7     print(arg)
     8 
     9 pool = PPE(5)
    10 for i in range(10):
    11     pool.submit(task,i)
    12 
    13 
    14 # 进程池回调
    15 def call(arg):
    16     data = arg.result()
    17     print(data)
    18 
    19 def task(arg):
    20     print(arg)
    21     return arg+100
    22 
    23 pool = PPE(5)
    24 for i in range(10):
    25     obj = pool.submit(task,i)
    26     obj.add_done_callback(call)
    进程池和回调
  • 相关阅读:
    Java 虚拟机部分面试题
    Java 多线程部分面试题
    Java IO部分面试题
    Java 集合框架部分面试题
    Java 面向对象面试题
    Java 多线程同步的五种方法
    Python操作redis
    Python操作mysql
    Python操作mongodb
    数据处理的全过程---(获取数据-清洗数据-数据建模-数据可视化)
  • 原文地址:https://www.cnblogs.com/xp1005/p/6565625.html
Copyright © 2011-2022 走看看