zoukankan      html  css  js  c++  java
  • 并发编程——多线程(4)

    1.线程理论

    • 线程是CPU的执行单位
    • 多线程(即多个控制线程)的概念是,在一个进程中存在多个线程,多个线程共享该进程的地址空间,相当于一个车间内又多条流水线,都共用一个车间的资源。例如,北京地铁与上海地铁是不同的进程,而北京地铁里的13号线是一个线程,北京地铁所有的线路共享北京地铁所有的资源,比如所有的乘客可以被所有线路拉。

    2.线程与进程的区别

    • 同一个进程内的多个线程共享该进程内的地址资源
    • 创建线程的开销要远小于创建进程的开销(创建一个进程,就是创建一个车间,涉及到申请空间,而且在该空间内建至少一条流水线,但创建线程,就只是在一个车间内造一条流水线,无需申请空间,所以创建开销小)

    3.开启线程的两种方式

    • 方式一:函数
    •  1 #-*- coding:utf-8 -*-
       2 from threading import Thread
       3 import time
       4 import os
       5 def sayhi(name):
       6     time.sleep(2)
       7     print("%s say hello"%name)
       8 if __name__ == "__main__":
       9     t = Thread(target=sayhi,args=('egon',))
      10     t.start()
      11     print("")
      方式一
    • 方式二:类
    •  1 class SayHi(Thread):
       2     def __init__(self,name):
       3         super().__init__()
       4         self.name = name
       5     def run(self):
       6         print("%s say hello" % self.name)
       7         print("线程pid:",os.getpid())
       8 if __name__ =="__main__":
       9     t1 = SayHi("egon")
      10     t2 = SayHi("alex")
      11     t1.start()
      12     t2.start()
      13     print("主进程pid:",os.getpid())
      方式二

    4.多线程与多进程的区别

    •  开启速度:线程快于进程
    • pid:同一进程下不同线程的pid是相同的,进程之间的pid是不同的
    • 内存:进程之间内存地址空间是隔离的,而同一进程内开启的多个线程是共享该进程内存地址空间的

     5.Thread对象的其他属性或方法

    • Thread实例对象的方法
      • isAlive():返回线程是否活动的
      • getName():返回线程名
      • setName():设置线程名
    • threading模块提供的一些方法
      • threading.currentThread():返回当前的线程变量
      • threading.enumerate():返回一个包含正在运行的线程的list。正在运行指线程启动后、结束前,不包括启动前后终止后的线程
      • threading.activeCount():返回正在运行的线程数量,与len(threading.enumerate())有相同的结果
    •  1 #-*- coding:utf-8 -*-
       2 from threading import Thread
       3 import time
       4 import threading
       5 import os
       6 
       7 def sayhi(name):
       8     time.sleep(2)
       9     print("%s say hello"%name)
      10     print(threading.current_thread().getName())#Thread-1
      11 if __name__ == "__main__":
      12     t = Thread(target=sayhi,args=('egon',))
      13     t.start()
      14     print(threading.current_thread().getName())#MainThread
      15     print(threading.current_thread())#<_MainThread(MainThread, started 8308)>
      16     print(threading.enumerate())#[<_MainThread(MainThread, started 9072)>, <Thread(Thread-1, started 7052)>]
      17     print(threading.active_count())#2
      18     print(t.is_alive())#True
      19     t.join()
      20     print('主线程/主进程')
      21     print(t.is_alive())#False
      View Code

    5.守护线程

    • 无论是进程还是线程,都遵循:守护xxx会等待主xxx运行完毕后被销毁
      • 强调:运行完毕并非终止运行
      • 对于主进程来说,运行完毕指的是主进程代码运行完毕
      • 对于主线程来说,运行完毕指的是主线程所在进程内所有非守护线程统统运行完毕,主线程才算运行完毕
    • 守护线程:在同一进程内,其他非守护线程运行完毕后才算运行完毕,此时守护线程被回收
    •  1 #-*- coding:utf-8 -*-
       2 from threading import Thread
       3 import time
       4 
       5 def walk():
       6     print("start123")
       7     time.sleep(1)
       8     print("end123")
       9 def run():
      10     print("start456")
      11     time.sleep(3)
      12     print("end456")
      13 if __name__ == "__main__":
      14     t1 = Thread(target=walk)
      15     t2 = Thread(target=run)
      16     t1.daemon = True
      17     t1.start()
      18     t2.start()
      19     print("")
      20 
      21 
      22 #start123
      23 #start456
      24 #
      25 #end123
      26 #end456
      View Code
    •  1 #-*- coding:utf-8 -*-
       2 from threading import Thread
       3 import time
       4 
       5 def walk():
       6     print("start123")
       7     time.sleep(3)
       8     print("end123")
       9 def run():
      10     print("start456")
      11     time.sleep(1)
      12     print("end456")
      13 if __name__ == "__main__":
      14     t1 = Thread(target=walk)
      15     t2 = Thread(target=run)
      16     t1.daemon = True
      17     t1.start()
      18     t2.start()
      19     print("")
      20 #start123
      21 #start456
      22 #
      23 #end456
      View Code

    6.GIL全局解释器锁

    • 本质上也是互斥锁
    • 保护不同的数据应该加不同的锁,GIL是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据);lock是保护用户自己开发的应用程序的数据,很明显GIL不负责这件事,只能用户自己加锁定义
    • 有了GIL的存在,同一时刻同一进程中只能有一个线程被执行
    • GIL与多线程
      • ·对于计算来说,CPU越多越好,但是对于I/O来说,再多的CPU也没用
      • 对于单核计算机来说,常采用开启一个进程,多个线程的方案
      • 对于多核计算机来说,如果任务是计算密集型,多核意味着并行计算,多进程方案更优;如果任务是I/O密集型,则多线程方案更优
      • 多线程用于IO密集型,如socket,爬虫,web
      • 多进程用于计算密集型,如金融分析
      •  1 #-*- coding:utf-8 -*-
         2 from multiprocessing import Process
         3 from threading import Thread
         4 import os
         5 import time
         6 def work():
         7     time.sleep(2)
         8     print("===>")
         9 if __name__ == "__main__":
        10     l_p = []
        11     print(os.cpu_count())
        12     start = time.time()
        13     for i in range(400):
        14         # p = Process(target=work)#耗时时间长
        15         p = Thread(target=work)#耗时时间短
        16         l_p.append(p)
        17         p.start()
        18     for p in l_p:
        19         p.join()
        20     stop = time.time()
        21     print("run time is %s"%(stop-start))
        I/O密集型
         1 #-*- coding:utf-8 -*-
         2 from multiprocessing import Process
         3 from threading import Thread
         4 import os
         5 import time
         6 def work():
         7     res = 0
         8     for i in range(100000000):
         9         res*=1
        10 if __name__ == "__main__":
        11     l_p = []
        12     print(os.cpu_count())
        13     start = time.time()
        14     for i in range(4):
        15         # p = Process(target=work)#耗时时间短
        16         p = Thread(target=work)#耗时时间长
        17         l_p.append(p)
        18         p.start()
        19     for p in l_p:
        20         p.join()
        21     stop = time.time()
        22     print("run time is %s"%(stop-start))
        计算密集型

    7.死锁现象与递归锁RLOCK

    • 死锁现象
      •  是指两个或两个以上的进程或线程在执行过程中,因争夺资源而造成的一种互相等待的现象,若无外力作用,它们都将无法推进下去。此时称系统处于死锁状态或系统产生了死锁,这些永远在互相等待的进程称为死锁进程,如下就是死锁
      •  1 # -*- coding:utf-8 -*-
         2 from threading import Thread, Lock
         3 import time
         4 
         5 mutexA = Lock()
         6 mutexB = Lock()
         7 
         8 
         9 class MyThread(Thread):
        10     def run(self):
        11         self.func1()
        12         self.func2()
        13 
        14     def func1(self):
        15         mutexA.acquire()
        16         print('33[41m%s 拿到A锁33[0m' % self.name)
        17         mutexB.acquire()
        18         print('33[42m%s 拿到B锁33[0m' % self.name)
        19         mutexB.release()
        20         mutexA.release()
        21     def func2(self):
        22         mutexB.acquire()
        23         print('33[43m%s 拿到B锁33[0m' % self.name)
        24         time.sleep(2)
        25         mutexA.acquire()
        26         print('33[44m%s 拿到A锁33[0m' % self.name)
        27         mutexA.release()
        28         mutexB.release()
        29 if __name__ == "__main__":
        30     for i in range(10):
        31         t = MyThread()
        32         t.start()
        死锁
    • 递归锁
      • 递归锁RLOCK用于解决死锁现象
      • 这个RLock内部维护着一个Lock和一个counter变量,counter记录了acquire的次数,从而使得资源可以被多次require。直到一个线程所有的acquire都被release,其他的线程才能获得资源。上面的例子如果使用RLock代替Lock,则不会发生死锁,
      • 二者的区别
        • 递归锁可以连续acquire多次,
        • 互斥锁只能acquire一次
      •  1 # -*- coding:utf-8 -*-
         2 from threading import Thread, Lock, RLock
         3 import time
         4 
         5 mutexA = mutexB = RLock()
         6 
         7 
         8 class MyThread(Thread):
         9     def run(self):
        10         self.func1()
        11         self.func2()
        12 
        13     def func1(self):
        14         mutexA.acquire()
        15         print('33[41m%s 拿到A锁33[0m' % self.name)
        16         mutexB.acquire()
        17         print('33[42m%s 拿到B锁33[0m' % self.name)
        18         mutexB.release()
        19         mutexA.release()
        20 
        21     def func2(self):
        22         mutexB.acquire()
        23         print('33[43m%s 拿到B锁33[0m' % self.name)
        24         time.sleep(2)
        25         mutexA.acquire()
        26         print('33[44m%s 拿到A锁33[0m' % self.name)
        27         mutexA.release()
        28         mutexB.release()
        29 
        30 
        31 if __name__ == "__main__":
        32     for i in range(10):
        33         t = MyThread()
        34         t.start()
        递归锁

    8.信号量

    • 信号量也是一把锁,可以指定信号量为5,对比互斥锁同一时间只能有一个任务抢到锁去执行,信号量同一时间可以有5个任务拿到锁去执行,如果说互斥锁是合租房屋的人去抢一个厕所,那么信号量就相当于一群路人争抢公共厕所,公共厕所有多个坑位,这意味着同一时间可以有多个人上公共厕所,但公共厕所容纳的人数是一定的,这便是信号量的大小
    •  1 #-*- coding:utf-8 -*-
       2 from threading import Thread,Semaphore
       3 import threading
       4 import time
       5 def func():
       6     sm.acquire()
       7     print('%s get sm' % threading.current_thread().getName())
       8     time.sleep(3)
       9     sm.release()
      10 if __name__ == "__main__":
      11     sm = Semaphore(5)
      12     for i in range(23):
      13         t = Thread(target=func)
      14         t.start()
      信号量
    • 1 Semaphore管理一个内置的计数器,
      2 每当调用acquire()时内置计数器-13 调用release() 时内置计数器+14 计数器不能小于0;当计数器为0时,acquire()将阻塞线程直到其他线程调用release()。

    9.event

    • 线程的一个关键特性是每个线程都是独立运行且状态不可预测。如果程序中的其 他线程需要通过判断某个线程的状态来确定自己下一步的操作,这时线程同步问题就会变得非常棘手。为了解决这些问题,我们需要使用threading库中的Event对象。 对象包含一个可由线程设置的信号标志,它允许线程等待某些事件的发生。在 初始情况下,Event对象中的信号标志被设置为假。如果有线程等待一个Event对象, 而这个Event对象的标志为假,那么这个线程将会被一直阻塞直至该标志为真。一个线程如果将一个Event对象的信号标志设置为真,它将唤醒所有等待这个Event对象的线程。如果一个线程等待一个已经被设置为真的Event对象,那么它将忽略这个事件, 继续执行。
    • 1 from threading import Event
      2 
      3 event.isSet():返回event的状态值;
      4 
      5 event.wait():如果 event.isSet()==False将阻塞线程;
      6 
      7 event.set(): 设置event的状态值为True,所有阻塞池的线程激活进入就绪状态, 等待操作系统调度;
      8 
      9 event.clear():恢复event的状态值为False。
       1 def conn_mysql():
       2     count = 1
       3     while not event.is_set():
       4         if count > 3:
       5             print("%s try too many times " % threading.currentThread().getName())
       6             return
       7         print('<%s>第%s次尝试链接'%(threading.current_thread().getName(),count))
       8         event.wait(0.5)
       9         count += 1
      10     print('<%s>链接成功' % threading.current_thread().getName())
      11 
      12 def check_mysql():
      13     print('33[45m[%s]正在检查mysql33[0m' % threading.current_thread().getName())
      14     time.sleep(random.randint(2,4))
      15     event.set()
      16 
      17 if __name__ == "__main__":
      18     event = Event()
      19     conn1 = Thread(target=conn_mysql)
      20     conn2 = Thread(target=conn_mysql)
      21     conn3 = Thread(target=conn_mysql)
      22     check = Thread(target=check_mysql)
      23 
      24     conn1.start()
      25     conn2.start()
      26     conn3.start()
      27     check.start()
      event

    10.定时器

    • 指定n秒后执行某操作
    • def hello():
          print("hello,world!")
      
      t = Timer(2,hello)
      t.start() # after 2 seconds, "hello, world" will be printed
    •  1 # -*- coding:utf-8 -*-
       2 from threading import Timer
       3 import random
       4 
       5 
       6 class Code(object):
       7     def __init__(self):
       8         self.make_cache()
       9 
      10     def make_cache(self, interval=5):
      11         self.cache = self.make_code()
      12         print(self.cache)
      13         self.t = Timer(interval, self.make_cache)
      14         self.t.start()
      15 
      16     def make_code(self, n=4):
      17         res = ""
      18         for i in range(n):
      19             s1 = str(random.randint(0, 9))
      20             s2 = chr(random.randint(65, 90))
      21             res += random.choice([s1, s2])
      22         return res
      23 
      24     def check(self):
      25         while True:
      26             code = input("请输入验证码>>:").strip()
      27             if code.upper() == self.cache:
      28                 print("验证码输入正确")
      29                 self.t.cancel()
      30                 break
      31 
      32 if __name__ == "__main__":
      33     obj = Code()
      34     obj.check()
      定时器

    .11.线程queue

    •  1 #-*- coding:utf-8 -*-
       2 import queue
       3 q = queue.Queue(3)
       4 q.put(1)
       5 q.put(2)
       6 q.put(3)
       7 # q.put(4)#阻塞
       8 # q.put(4,block=False)#抛出异常
       9 # q.put(4,block=True,timeout=3)  #3s阻塞后,抛出异常
      10 
      11 q.get()
      12 q.get()
      13 q.get()
      14 # q.get()#阻塞
      15 # q.get(block=False)#抛出异常
      16 # q.get_nowait()#抛出异常,和上一条等价
      17 q.get(block=True,timeout=3)#3s阻塞后,抛出异常
      先进先出
    • 1 #堆栈,先进后出
      2 import queue
      3 q = queue.LifoQueue()
      4 q.put('first')
      5 q.put('second')
      6 q.put('third')
      7 print(q.get())#third
      8 print(q.get())#second
      9 print(q.get())#first
      堆栈,先进后出
    •  1 #优先级队列
       2 import queue
       3 q = queue.PriorityQueue()
       4 #put进入一个元组,元组的第一个元素是优先级(通常是数字,也可以是非数字之间的比较),数字越小优先级越高
       5 q.put((20,'a'))
       6 q.put((10,'b'))
       7 q.put((30,'c'))
       8 
       9 print(q.get())
      10 print(q.get())
      11 print(q.get())
      12 
      13 # 结果(数字越小优先级越高,优先级高的优先出队):
      14 # (10, 'b')
      15 # (20, 'a')
      16 # (30, 'c')
      优先级队列
    • 多线程套接字通信
    •  1 #-*- coding:utf-8 -*-
       2 import socket
       3 from threading import Thread
       4 from concurrent.futures import ThreadPoolExecutor
       5 
       6 def communicate(conn):
       7     while True:
       8         try:
       9             data = conn.recv(1024)
      10             if not data:break
      11             conn.send(data.upper())
      12         except ConnectionResetError:
      13             break
      14     conn.close()
      15 
      16 
      17 def server(server_ip,port):
      18     server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      19     server.bind((server_ip, port))
      20     server.listen(5)
      21     while True:
      22         conn, client_addr = server.accept()
      23         # t = Thread(target=communicate,args=(conn,))
      24         # t.start()
      25         pool.submit(communicate,conn)
      26     server.close()
      27 
      28 if __name__ == "__main__":
      29     pool = ThreadPoolExecutor(2)
      30     server_ip = socket.gethostbyname(socket.gethostname())
      31     port = 8080
      32     server(server_ip, port)
      服务端
       1 #-*- coding:utf-8 -*-
       2 import socket
       3 server_ip = socket.gethostbyname(socket.gethostname())
       4 client = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
       5 client.connect((server_ip,8080))
       6 while True:
       7     msg = input(">>:").strip()
       8     if not msg:continue
       9     client.send(msg.encode("utf-8"))
      10     data = client.recv(1024)
      11     print(data.decode("utf-8"))
      12 client.close()
      客户端

    12.线程池与进程池

    • concurrent.futures模块提供了高度封装的异步调用接口
      ThreadPoolExecutor:线程池,提供异步调用
      ProcessPoolExecutor: 进程池,提供异步调用
      Both implement the same interface, which is defined by the abstract Executor class.
      1、submit(fn, *args, **kwargs)
      异步提交任务
      
      2、map(func, *iterables, timeout=None, chunksize=1) 
      取代for循环submit的操作
      
      3、shutdown(wait=True) 
      相当于进程池的pool.close()+pool.join()操作
      wait=True,等待池内所有任务执行完毕回收完资源后才继续
      wait=False,立即返回,并不会等待池内的任务执行完毕
      但不管wait参数为何值,整个程序都会等到所有任务执行完毕
      submit和map必须在shutdown之前
      
      4、result(timeout=None)
      取得结果
      
      5、add_done_callback(fn)
      回调函数
    •  1 #-*- coding:utf-8 -*-
       2 from threading import Thread,currentThread
       3 from multiprocessing import Process
       4 from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
       5 import os
       6 import time
       7 import random
       8 def task(name):
       9     print("name:%s  pid:%s"%(name,os.getpid()))
      10     # print("name:%s  pid:%s"%(name,currentThread().getName()))
      11     time.sleep(random.randint(1,3))
      12 if __name__ == "__main__":
      13     pool = ProcessPoolExecutor(5)
      14     # pool = ThreadPoolExecutor(5)
      15     for i in range(10):
      16         pool.submit(task,i)
      17     pool.shutdown(wait=True)
      18     print("")
      View Code
    • map函数
    •  1 # -*- coding:utf-8 -*-
       2 import os
       3 import random
       4 import time
       5 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
       6 
       7 
       8 def task(n):
       9     print('%s is running' % os.getpid())
      10     time.sleep(random.randint(1, 3))
      11     return n ** 2
      12 
      13 
      14 if __name__ == "__main__":
      15     executor = ProcessPoolExecutor(max_workers=3)
      16     # for i in range(11):
      17     #     executor.submit(task,i)
      18     executor.map(task,range(1,12))#map取代了for+submit
      map函数
    • 回调函数
      • 可以为进程池或线程池内的每个进程或线程绑定一个函数,该函数在进程或线程的任务执行完毕后自动触发,并接收任务的返回值当作参数,该函数称为回调函数
    •  1 #-*- coding:utf-8 -*-
       2 import requests
       3 import time
       4 from concurrent.futures import ThreadPoolExecutor
       5 def get(url):
       6     response = requests.get(url)
       7     time.sleep(3)
       8     return {"url":url,"content":response.text}
       9 def parse(res):
      10     res = res.result()
      11     print("%s parse res is %s"%(res["url"],len(res["content"])))
      12 if __name__=="__main__":
      13     urls = [
      14         "http://www.woshipm.com/rp/415309.html",
      15         "https://www.python.org",
      16         "http://blog.csdn.net/shanzhizi/article/details/50903748",
      17     ]
      18     pool = ThreadPoolExecutor(2)
      19     for url in urls:
      20         pool.submit(get,url).add_done_callback(parse)
      21     pool.shutdown()
      22     print("")
      回调函数
  • 相关阅读:
    Linked List Cycle leetcode java (链表检测环)
    Remove Duplicates from Sorted List II leetcode java
    Remove Duplicates from Sorted List leetcode java
    Merge Two Sorted Lists leetcode java
    Swap Nodes in Pairs leetcode java
    Median of Two Sorted Array leetcode java
    阿里云最便宜的四种域名注册
    nohup和&后台运行,进程查看及终止
    ipv6转ipv4 NAT64与DNS64基本原理概述
    ros使用pppoe拨号获取ipv6,并且下发IPV6的dns到客户机win7
  • 原文地址:https://www.cnblogs.com/GraceZ/p/8429029.html
Copyright © 2011-2022 走看看