zoukankan      html  css  js  c++  java
  • 线程的Thread模块 同步控制:锁,事件,信号量,条件,定时器

    Thread 模块

    import time

    from threading import Thread

    from multiprocessing import Process

    #效率差别

    def func(a):

      a += 1

    if __name__ == "__main__":

      li = []

      start = time.time()

      for i in range(50):

        p = Thread(target=func)

        p.start()

        li.append(p)

      for i in li:

        i.join()

      pirnt(time.time()-start)

      start = time.time()

      li = []

      for i in range(50):

        t = Process(target=func)

        t.start()

        li.append(t)

      for i in li:

        i.join()

      print(time.time()-start)

    terminate(强制结束子进程) 在线程中没有

    线程之间的数据共享

    from threading import Thread

    n = 100

    def func():

      global n

      n -= 1

    li = []

    for i in range(100):

      t = Thread(target=func)

      t.start()

      li.append(t)

    for i in li:

      i.join()

    print(n)

    守护线程

    import time

    from threading import Thread

    def func():

      while 1:

        print(True)

        time.sleep(0.5)

    def inner():

      print("in  inner  start")

      time.sleep(3)

      print("in  inner  end")

    t1 = Thread(target=func)

    t1.setDaemon(True)

    t1.start()

    t2 = Thread(target=inner)

    t2.start()

    time.sleep(1)

    print("主进程")

    主线程如果结束了,那么整个进程就结束了,守护线程会等待主线程结束之后才结束.

    主进程 等待  守护进程  子进程  守护进程只守护主进程的代码就可以了

    守护线程不行,主线程结束了那么整个进程就结束了,所有的线程就都结束了

    例子 : 

    使用多线程实现tcp协议的socket server

    客户端:

    import socket

    sk = socket.socket()

    sk.connect_ex(("127.0.0.1",9090))

    while 1:

      sk.send(b"hello")

      ret = sk.recv(1024).decode("utf-8")

      print(ret)

    sk.close()

    服务器 :

    import socket

    from threading import Thread

    sk = socket.socket()

    sk.bind(("127.0.0.1",9090))

    sk.listen()

    def func(conn):

      while 1:

        ret = conn.recv(1024).decode("utf-8")

        print(ret)

        conn.send(b"world")

    if __name__ == "__main__":

      while 1:

        conn,addr = sk.accept()

        Thread(target=func,args=(conn,)).start()

    from threading import Thread,get_ident

    #开启线程的第二种方法和查看线程id

    class Mythread(Thread):

      def __init__(self,args):

        super().__init__()

        self.args = args

      def run(self):

        print("in my thread:",get_ident(),self.args)

    print("main",get_ident())

    t = Mythread("nana")

    t.start()

    线程中的方法

    import time

    from threading import Thread,get_ident,currentThread,enumerate,activeCount

    class Mythread(Thread):

      def __init__(self,args):

        super().__init__()

        self.args = args

      def run(self):

        time.sleep(0.1)

        print(currentThread())  #返回当前的线程变量

        print("in my thread:",get_ident(),self.args)

    print("main:",get_ident())

    t = Mythread("nana")

    print(t.is_alive())  #返回线程是否是活动的

    t.start()

    t.setname("nana")  #设置线程名称

    print(t.getname())  #获取线程名称

    print(activeCount())  #正在运行的线程的数量len(enumerate)  #返回一个包含正在运行的线程list

    print("t:",t)

    Thread类 : 开启线程 传参数 join

    和进程的差异 : 1,效率  2. 数据共享   3. 守护线程

    面向对象的方式开启线程 :

    thread对象的其他方法 : isAlive,setname,getname

    threading模块的方法 : currentTread,activeCount,encumerate

    GIL锁(全局解释器锁) : 锁线程, 同一个线程在同一时刻只能被一个CPU执行

    在多进程/线程同时访问一个数据的时候就会产生数据不安全的现象.

    多进程 访问文件

    多线程 : 同时访问一个数据

    GIL全局解释锁 : 在同一个进程里的每一个进程同一时间只能有一个线程访问CPU

    尽量不要设置全局变量

    只要在多线程/进程之间用到全局变量就加上锁

    from threading import Thread,Lock

    lock = Lock()  #互斥锁

    noodle = 100

    def func(name):

      global noodle

      lock.acquire()

      noodle -= 1

      lock.release()

      print("%s吃到面了"%name)

    if __name__ == "__main__":

      li = []

      for i in range(100):

        t = Thread(target=func)

        t.start()

        li.append(t)

      for i in li:

        i.join()

      print(noodle)

    科学家吃面问题(死锁现象)

    import time

    from threading import Thread,Lock

    noodle = Lock()

    fork = Lock()

    def eat1(name):

      noodle.acquire()

      print("%s拿到面了"%name)

      fork.acquire()

      print("%s拿到叉子了"%name)

      print("%s在吃面"%name)

      time.sleep(0.5)

      fork.release()

      noodle.release()

    def eat2(name):

      fork.acquire()

      print("%s拿到叉子了"%name)

      noodle.acquire()

      print("%s拿到面了"%name)

      print("%s 在吃面"%name)

      time.sleep(0.5)

      noodle.release()

      fork.release()

    li = ["alex","egon","wusir","taibai"]

    for i in li:

      Thread(target=eat1,args=(i,)).start()

      Thread(target=eat2,args=(i,)).start()

    递归锁 :

    from threading import RLock

    lock = RLock()

    lock.acquire()

    print(55555)

    lock.acquire()

    print(666666)

    lock.acquire()

    print(99999)

    递归锁解决死锁问题 :

    import time

    from threading import Thread,RLock

    lock = RLock()

    def eat1(name):

      lock.acquire()

      print("%s拿到面了"%name)

      lock.acquire()

      print("%s拿到了叉子了"%name)

      print("%s 在吃饭"%name)

      time.sleep(0.5)

      lock.release()

      lock.release()

    def eat2(name):

      lock.acquire()

      print("%s拿到叉子了"%name)

      lock.acquire()

      print("%s拿到面了"%name)

      time.sleep(0.5)
      lock.release()

      lock.release()

    li = ["alex","egon","wusir","taibai"]

    for i in li:

      Thread(target=eat1,args=(i,)).start()

      Thread(tarage=eat2,args=(i,)).start()

    互斥锁解决死锁问题 :

    import time

    from threading import Thred,Lock

    lock = Lock()

    def eat1(name):

      lock.acquire()
      print("%s拿到面了"%name)

      print("%s 拿到叉子了"%name)

      print("%s在吃面"%name)

      time.sleep(0.5)

      lock.release()

    def eat2(name):

      lock.acquire()

      print("%s拿到叉子了"%name)

      print("%s拿到面了"%name)

      print("%s 在吃面"%name)

      time.sleep(0.5)

      lock.release()

    li = ["alex","egon","wusir","taibai"]

    for i in li:

      Thread(target=eat1,args=(i,)).start()

      Thread(target=eat2,args=(i,)).start()

    死锁 :

    多把锁同时应用在多个线程中

    互斥锁和递归锁哪个好?

    递归锁 : 快速回复服务

    死锁问题的出现,是程序的设计或者逻辑的问题

    还应该进一步的排除和重构逻辑来保证使用互斥锁也不会发生死锁

    互斥锁和递归锁的区别 :

    互斥锁 : 就是在一个线程中不能连续多次acquire

    递归锁 : 可以在同一个线程中acquire任意次,注意acquire多少次就需要release多少次

    信号量 和 进程池

    信号量就是 : 锁 + 计数器

    import time

    from multiprocessing import Semaphore,Process,Pool

    def ktv(sem,i):

      sem.acquire()

      i += 1

      sem.release()

    def ktv1(i):

      i += 1

    if __name__ == "__main__":

      start = time.time()

      li = []

      sem  =Semaphore(5)

      for i in  range(100)

        p = Process(targe=ktv,args=(sem,i))

        p.start()

        li.append(p)

      for i in li:

        i.join()

      print(time.time()-start)

      start  = time.time()

      p = Pool(5)

      for i in range(100):

        p.apply_async(func=ktv1,args=(i,))

      p.close()

      p.join()

      print(time.time()-start)

    进程池和信号量:

    进程池 : 进程池里有几个进程就起几个,不管多少任务,池子里的进程个数是固定的,开启进程和关闭进程这些事都是需要固定的时间开销,就不产生额外的时间开销.且进程池中的进程数控制的好,那么操作系统的压力也小.

    信号量 :

    有多少个任务就起多少个进程/线程

    可以帮助减少操作系统切换负担

    但是并不能减少进/线程开启和关闭的时间

    数据库连接 :

    import time,random

    from threading import Thread,Event

    def check(e):

      print("正在检测两台机器之间的网络情况...")

      time.sleep(random.randint(1,3))

      e.set()

    def connect_db(e):

      e.wait()

      print("连接数据库..")

      print("连接数据库成功....")

     

    e = Event()

    Thread(target=check,args=(e,)).start()

    Thread(target=connect_db,args=(e,)).start()

    import time,random

    from threading import Thread,Event

    def check(e):

      print("正在检测两台机器之间的网络情况...")

      time.sleep(random.randint(0,2))

      e.set()

    def connect_db(e):

      n = 0

      while n<3:

        if e.is_set():

          break

        else:

          e.wait(0.5)

          n += 1

      if n == 3:

        raise TimeoutError

      print("连接数据库....")

      print("连接数据库成功...")

    e = Event()

    Thread(target=check,args=(e,)).start()

    Thread(target=connect_db,args=(e,)).start()

    条件 :

    from threading import Condition

    常用方法:

    acquire  ,  release  , wait  阻塞 ,  notify (让wait解除阻塞的工具)

    wait还是notify在执行这两个方法的前后,必须执行acquire和release

    from threading import Condition ,Thread

    def func(con,i):

      con.acquire()

      con.wait()

      print("thread:",i)

      con.release()

    con = Condition()

    for i in range(20):

      Thread(target=func,args=(con,i)).start()

    con.acquire()

    con.notify_all()  #帮助wait的子线程处理某个数据直到满足条件

    con.release()

    while 1:

      num = int(input("<<<"))

      con.acquire()

      con.notify(num)

      con.release()

    定时器 :

    from threading import Thread , Timer

    def func():

      print("我执行了")

    Timer(1,func).start()  #定时器

    创建线程的时候,就规定他多久之后去执行

  • 相关阅读:
    自动刷新页面
    超链接<A>链接到某个文档
    JS脚本的对话框
    表最后更新的标识列
    c#对象私有属性在重载Equals中的使用
    一个关于在Fedora下安装jdk的问题
    表格的行转列
    UNION和UNION ALL的区别
    关于使用存储过程的一些好处以及注意事项[转]
    SQL 外链接操作小结
  • 原文地址:https://www.cnblogs.com/fengkun125/p/9397923.html
Copyright © 2011-2022 走看看