zoukankan      html  css  js  c++  java
  • 并发编程——多线程(4)

    1.线程理论

    • 线程是CPU的执行单位
    • 多线程(即多个控制线程)的概念是,在一个进程中存在多个线程,多个线程共享该进程的地址空间,相当于一个车间内又多条流水线,都共用一个车间的资源。例如,北京地铁与上海地铁是不同的进程,而北京地铁里的13号线是一个线程,北京地铁所有的线路共享北京地铁所有的资源,比如所有的乘客可以被所有线路拉。

    2.线程与进程的区别

    • 同一个进程内的多个线程共享该进程内的地址资源
    • 创建线程的开销要远小于创建进程的开销(创建一个进程,就是创建一个车间,涉及到申请空间,而且在该空间内建至少一条流水线,但创建线程,就只是在一个车间内造一条流水线,无需申请空间,所以创建开销小)

    3.开启线程的两种方式

    • 方式一:函数
    •  1 #-*- coding:utf-8 -*-
       2 from threading import Thread
       3 import time
       4 import os
       5 def sayhi(name):
       6     time.sleep(2)
       7     print("%s say hello"%name)
       8 if __name__ == "__main__":
       9     t = Thread(target=sayhi,args=('egon',))
      10     t.start()
      11     print("")
      方式一
    • 方式二:类
    •  1 class SayHi(Thread):
       2     def __init__(self,name):
       3         super().__init__()
       4         self.name = name
       5     def run(self):
       6         print("%s say hello" % self.name)
       7         print("线程pid:",os.getpid())
       8 if __name__ =="__main__":
       9     t1 = SayHi("egon")
      10     t2 = SayHi("alex")
      11     t1.start()
      12     t2.start()
      13     print("主进程pid:",os.getpid())
      方式二

    4.多线程与多进程的区别

    •  开启速度:线程快于进程
    • pid:同一进程下不同线程的pid是相同的,进程之间的pid是不同的
    • 内存:进程之间内存地址空间是隔离的,而同一进程内开启的多个线程是共享该进程内存地址空间的

     5.Thread对象的其他属性或方法

    • Thread实例对象的方法
      • isAlive():返回线程是否活动的
      • getName():返回线程名
      • setName():设置线程名
    • threading模块提供的一些方法
      • threading.currentThread():返回当前的线程变量
      • threading.enumerate():返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前后终止后的线程
      • threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
    •  1 #-*- coding:utf-8 -*-
       2 from threading import Thread
       3 import time
       4 import threading
       5 import os
       6 
       7 def sayhi(name):
       8     time.sleep(2)
       9     print("%s say hello"%name)
      10     print(threading.current_thread().getName())#Thread-1
      11 if __name__ == "__main__":
      12     t = Thread(target=sayhi,args=('egon',))
      13     t.start()
      14     print(threading.current_thread().getName())#MainThread
      15     print(threading.current_thread())#<_MainThread(MainThread, started 8308)>
      16     print(threading.enumerate())#[<_MainThread(MainThread, started 9072)>, <Thread(Thread-1, started 7052)>]
      17     print(threading.active_count())#2
      18     print(t.is_alive())#True
      19     t.join()
      20     print('主线程/主进程')
      21     print(t.is_alive())#False
      View Code

    5.守护线程

    • 无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁
      • 强调:运行完毕并非终止运行
      • 对于主进程来说,运行完毕指的是主进程代码运行完毕
      • 对于主线程来说,运行完毕指的是主线程所在进程内所有非守护线程统统运行完毕,主线程才算运行完毕
    • 守护线程:在同一进程内,其他非守护线程运行完毕后才算运行完毕,此时守护线程被回收
    •  1 #-*- coding:utf-8 -*-
       2 from threading import Thread
       3 import time
       4 
       5 def walk():
       6     print("start123")
       7     time.sleep(1)
       8     print("end123")
       9 def run():
      10     print("start456")
      11     time.sleep(3)
      12     print("end456")
      13 if __name__ == "__main__":
      14     t1 = Thread(target=walk)
      15     t2 = Thread(target=run)
      16     t1.daemon = True
      17     t1.start()
      18     t2.start()
      19     print("")
      20 
      21 
      22 #start123
      23 #start456
      24 #
      25 #end123
      26 #end456
      View Code
    •  1 #-*- coding:utf-8 -*-
       2 from threading import Thread
       3 import time
       4 
       5 def walk():
       6     print("start123")
       7     time.sleep(3)
       8     print("end123")
       9 def run():
      10     print("start456")
      11     time.sleep(1)
      12     print("end456")
      13 if __name__ == "__main__":
      14     t1 = Thread(target=walk)
      15     t2 = Thread(target=run)
      16     t1.daemon = True
      17     t1.start()
      18     t2.start()
      19     print("")
      20 #start123
      21 #start456
      22 #
      23 #end456
      View Code

    6.GIL全局解释器锁

    • 本质上也是互斥锁
    • 保护不同的数据应该加不同的锁,GIL是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据);lock是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自己加锁定义
    • 有了GIL的存在,同一时刻同一进程中只能有一个线程被执行
    • GIL与多线程
      • ·对于计算来说,CPU越多越好,但是对于I/O来说,再多的CPU也没用
      • 对于单核计算机来说,常采用开启一个进程,多个线程的方案
      • 对于多核计算机来说,如果任务是计算密集型,多核意味着并行计算,多进程方案更优;如果任务是I/O密集型,则多线程方案更优
      • 多线程用于IO密集型,如socket,爬虫,web
      • 多进程用于计算密集型,如金融分析
      •  1 #-*- coding:utf-8 -*-
         2 from multiprocessing import Process
         3 from threading import Thread
         4 import os
         5 import time
         6 def work():
         7     time.sleep(2)
         8     print("===>")
         9 if __name__ == "__main__":
        10     l_p = []
        11     print(os.cpu_count())
        12     start = time.time()
        13     for i in range(400):
        14         # p = Process(target=work)#耗时时间长
        15         p = Thread(target=work)#耗时时间短
        16         l_p.append(p)
        17         p.start()
        18     for p in l_p:
        19         p.join()
        20     stop = time.time()
        21     print("run time is %s"%(stop-start))
        I/O密集型
         1 #-*- coding:utf-8 -*-
         2 from multiprocessing import Process
         3 from threading import Thread
         4 import os
         5 import time
         6 def work():
         7     res = 0
         8     for i in range(100000000):
         9         res*=1
        10 if __name__ == "__main__":
        11     l_p = []
        12     print(os.cpu_count())
        13     start = time.time()
        14     for i in range(4):
        15         # p = Process(target=work)#耗时时间短
        16         p = Thread(target=work)#耗时时间长
        17         l_p.append(p)
        18         p.start()
        19     for p in l_p:
        20         p.join()
        21     stop = time.time()
        22     print("run time is %s"%(stop-start))
        计算密集型

    7.死锁现象与递归锁RLOCK

    • 死锁现象
      •  是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
      •  1 # -*- coding:utf-8 -*-
         2 from threading import Thread, Lock
         3 import time
         4 
         5 mutexA = Lock()
         6 mutexB = Lock()
         7 
         8 
         9 class MyThread(Thread):
        10     def run(self):
        11         self.func1()
        12         self.func2()
        13 
        14     def func1(self):
        15         mutexA.acquire()
        16         print('33[41m%s 拿到A锁33[0m' % self.name)
        17         mutexB.acquire()
        18         print('33[42m%s 拿到B锁33[0m' % self.name)
        19         mutexB.release()
        20         mutexA.release()
        21     def func2(self):
        22         mutexB.acquire()
        23         print('33[43m%s 拿到B锁33[0m' % self.name)
        24         time.sleep(2)
        25         mutexA.acquire()
        26         print('33[44m%s 拿到A锁33[0m' % self.name)
        27         mutexA.release()
        28         mutexB.release()
        29 if __name__ == "__main__":
        30     for i in range(10):
        31         t = MyThread()
        32         t.start()
        死锁
    • 递归锁
      • 递归锁RLOCK用于解决死锁现象
      • 这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁,
      • 二者的区别
        • 递归锁可以连续acquire多次,
        • 互斥锁只能acquire一次
      •  1 # -*- coding:utf-8 -*-
         2 from threading import Thread, Lock, RLock
         3 import time
         4 
         5 mutexA = mutexB = RLock()
         6 
         7 
         8 class MyThread(Thread):
         9     def run(self):
        10         self.func1()
        11         self.func2()
        12 
        13     def func1(self):
        14         mutexA.acquire()
        15         print('33[41m%s 拿到A锁33[0m' % self.name)
        16         mutexB.acquire()
        17         print('33[42m%s 拿到B锁33[0m' % self.name)
        18         mutexB.release()
        19         mutexA.release()
        20 
        21     def func2(self):
        22         mutexB.acquire()
        23         print('33[43m%s 拿到B锁33[0m' % self.name)
        24         time.sleep(2)
        25         mutexA.acquire()
        26         print('33[44m%s 拿到A锁33[0m' % self.name)
        27         mutexA.release()
        28         mutexB.release()
        29 
        30 
        31 if __name__ == "__main__":
        32     for i in range(10):
        33         t = MyThread()
        34         t.start()
        递归锁

    8.信号量

    • 信号量也是一把锁,可以指定信号量为5,对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有5个任务拿到锁去执行,如果说互斥锁是合租房屋的人去抢一个厕所,那么信号量就相当于一群路人争抢公共厕所,公共厕所有多个坑位,这意味着同一时间可以有多个人上公共厕所,但公共厕所容纳的人数是一定的,这便是信号量的大小
    •  1 #-*- coding:utf-8 -*-
       2 from threading import Thread,Semaphore
       3 import threading
       4 import time
       5 def func():
       6     sm.acquire()
       7     print('%s get sm' % threading.current_thread().getName())
       8     time.sleep(3)
       9     sm.release()
      10 if __name__ == "__main__":
      11     sm = Semaphore(5)
      12     for i in range(23):
      13         t = Thread(target=func)
      14         t.start()
      信号量
    • 1 Semaphore管理一个内置的计数器,
      2 每当调用acquire()时内置计数器-13 调用release() 时内置计数器+14 计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

    9.event

    • 线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。
    • 1 from threading import Event
      2 
      3 event.isSet():返回event的状态值;
      4 
      5 event.wait():如果 event.isSet()==False将阻塞线程;
      6 
      7 event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
      8 
      9 event.clear():恢复event的状态值为False。
       1 def conn_mysql():
       2     count = 1
       3     while not event.is_set():
       4         if count > 3:
       5             print("%s try too many times " % threading.currentThread().getName())
       6             return
       7         print('<%s>第%s次尝试链接'%(threading.current_thread().getName(),count))
       8         event.wait(0.5)
       9         count += 1
      10     print('<%s>链接成功' % threading.current_thread().getName())
      11 
      12 def check_mysql():
      13     print('33[45m[%s]正在检查mysql33[0m' % threading.current_thread().getName())
      14     time.sleep(random.randint(2,4))
      15     event.set()
      16 
      17 if __name__ == "__main__":
      18     event = Event()
      19     conn1 = Thread(target=conn_mysql)
      20     conn2 = Thread(target=conn_mysql)
      21     conn3 = Thread(target=conn_mysql)
      22     check = Thread(target=check_mysql)
      23 
      24     conn1.start()
      25     conn2.start()
      26     conn3.start()
      27     check.start()
      event

    10.定时器

    • 指定n秒后执行某操作
    • def hello():
          print("hello,world!")
      
      t = Timer(2,hello)
      t.start() # after 2 seconds, "hello, world" will be printed
    •  1 # -*- coding:utf-8 -*-
       2 from threading import Timer
       3 import random
       4 
       5 
       6 class Code(object):
       7     def __init__(self):
       8         self.make_cache()
       9 
      10     def make_cache(self, interval=5):
      11         self.cache = self.make_code()
      12         print(self.cache)
      13         self.t = Timer(interval, self.make_cache)
      14         self.t.start()
      15 
      16     def make_code(self, n=4):
      17         res = ""
      18         for i in range(n):
      19             s1 = str(random.randint(0, 9))
      20             s2 = chr(random.randint(65, 90))
      21             res += random.choice([s1, s2])
      22         return res
      23 
      24     def check(self):
      25         while True:
      26             code = input("请输入验证码>>:").strip()
      27             if code.upper() == self.cache:
      28                 print("验证码输入正确")
      29                 self.t.cancel()
      30                 break
      31 
      32 if __name__ == "__main__":
      33     obj = Code()
      34     obj.check()
      定时器

    .11.线程queue

    •  1 #-*- coding:utf-8 -*-
       2 import queue
       3 q = queue.Queue(3)
       4 q.put(1)
       5 q.put(2)
       6 q.put(3)
       7 # q.put(4)#阻塞
       8 # q.put(4,block=False)#抛出异常
       9 # q.put(4,block=True,timeout=3)  #3s阻塞后,抛出异常
      10 
      11 q.get()
      12 q.get()
      13 q.get()
      14 # q.get()#阻塞
      15 # q.get(block=False)#抛出异常
      16 # q.get_nowait()#抛出异常,和上一条等价
      17 q.get(block=True,timeout=3)#3s阻塞后,抛出异常
      先进先出
    • 1 #堆栈,先进后出
      2 import queue
      3 q = queue.LifoQueue()
      4 q.put('first')
      5 q.put('second')
      6 q.put('third')
      7 print(q.get())#third
      8 print(q.get())#second
      9 print(q.get())#first
      堆栈,先进后出
    •  1 #优先级队列
       2 import queue
       3 q = queue.PriorityQueue()
       4 #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
       5 q.put((20,'a'))
       6 q.put((10,'b'))
       7 q.put((30,'c'))
       8 
       9 print(q.get())
      10 print(q.get())
      11 print(q.get())
      12 
      13 # 结果(数字越小优先级越高,优先级高的优先出队):
      14 # (10, 'b')
      15 # (20, 'a')
      16 # (30, 'c')
      优先级队列
    • 多线程套接字通信
    •  1 #-*- coding:utf-8 -*-
       2 import socket
       3 from threading import Thread
       4 from concurrent.futures import ThreadPoolExecutor
       5 
       6 def communicate(conn):
       7     while True:
       8         try:
       9             data = conn.recv(1024)
      10             if not data:break
      11             conn.send(data.upper())
      12         except ConnectionResetError:
      13             break
      14     conn.close()
      15 
      16 
      17 def server(server_ip,port):
      18     server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      19     server.bind((server_ip, port))
      20     server.listen(5)
      21     while True:
      22         conn, client_addr = server.accept()
      23         # t = Thread(target=communicate,args=(conn,))
      24         # t.start()
      25         pool.submit(communicate,conn)
      26     server.close()
      27 
      28 if __name__ == "__main__":
      29     pool = ThreadPoolExecutor(2)
      30     server_ip = socket.gethostbyname(socket.gethostname())
      31     port = 8080
      32     server(server_ip, port)
      服务端
       1 #-*- coding:utf-8 -*-
       2 import socket
       3 server_ip = socket.gethostbyname(socket.gethostname())
       4 client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
       5 client.connect((server_ip,8080))
       6 while True:
       7     msg = input(">>:").strip()
       8     if not msg:continue
       9     client.send(msg.encode("utf-8"))
      10     data = client.recv(1024)
      11     print(data.decode("utf-8"))
      12 client.close()
      客户端

    12.线程池与进程池

    • concurrent.futures模块提供了高度封装的异步调用接口
      ThreadPoolExecutor:线程池,提供异步调用
      ProcessPoolExecutor: 进程池,提供异步调用
      Both implement the same interface, which is defined by the abstract Executor class.
      1、submit(fn, *args, **kwargs)
      异步提交任务
      
      2、map(func, *iterables, timeout=None, chunksize=1) 
      取代for循环submit的操作
      
      3、shutdown(wait=True) 
      相当于进程池的pool.close()+pool.join()操作
      wait=True,等待池内所有任务执行完毕回收完资源后才继续
      wait=False,立即返回,并不会等待池内的任务执行完毕
      但不管wait参数为何值,整个程序都会等到所有任务执行完毕
      submit和map必须在shutdown之前
      
      4、result(timeout=None)
      取得结果
      
      5、add_done_callback(fn)
      回调函数
    •  1 #-*- coding:utf-8 -*-
       2 from threading import Thread,currentThread
       3 from multiprocessing import Process
       4 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
       5 import os
       6 import time
       7 import random
       8 def task(name):
       9     print("name:%s  pid:%s"%(name,os.getpid()))
      10     # print("name:%s  pid:%s"%(name,currentThread().getName()))
      11     time.sleep(random.randint(1,3))
      12 if __name__ == "__main__":
      13     pool = ProcessPoolExecutor(5)
      14     # pool = ThreadPoolExecutor(5)
      15     for i in range(10):
      16         pool.submit(task,i)
      17     pool.shutdown(wait=True)
      18     print("")
      View Code
    • map函数
    •  1 # -*- coding:utf-8 -*-
       2 import os
       3 import random
       4 import time
       5 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
       6 
       7 
       8 def task(n):
       9     print('%s is running' % os.getpid())
      10     time.sleep(random.randint(1, 3))
      11     return n ** 2
      12 
      13 
      14 if __name__ == "__main__":
      15     executor = ProcessPoolExecutor(max_workers=3)
      16     # for i in range(11):
      17     #     executor.submit(task,i)
      18     executor.map(task,range(1,12))#map取代了for+submit
      map函数
    • 回调函数
      • 可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数
    •  1 #-*- coding:utf-8 -*-
       2 import requests
       3 import time
       4 from concurrent.futures import ThreadPoolExecutor
       5 def get(url):
       6     response = requests.get(url)
       7     time.sleep(3)
       8     return {"url":url,"content":response.text}
       9 def parse(res):
      10     res = res.result()
      11     print("%s parse res is %s"%(res["url"],len(res["content"])))
      12 if __name__=="__main__":
      13     urls = [
      14         "http://www.woshipm.com/rp/415309.html",
      15         "https://www.python.org",
      16         "http://blog.csdn.net/shanzhizi/article/details/50903748",
      17     ]
      18     pool = ThreadPoolExecutor(2)
      19     for url in urls:
      20         pool.submit(get,url).add_done_callback(parse)
      21     pool.shutdown()
      22     print("")
      回调函数
  • 相关阅读:
    Java中的HashMap
    单机百万连接调优和Netty应用级别调优
    简单排序(冒泡排序,插入排序,选择排序)
    使用AC自动机解决文章匹配多个候选词问题
    树状数组解决数组单点更新后快速查询区间和的问题
    LeetCode 763. Partition Labels
    LeetCode 435. Non-overlapping Intervals
    线段树
    无序数组求第K大的数
    KMP算法解决字符串匹配问题
  • 原文地址:https://www.cnblogs.com/GraceZ/p/8429029.html
Copyright © 2011-2022 走看看