zoukankan      html  css  js  c++  java
  • python线程

    为什么由线程?1,进程的创建,撤销,切换开销比较大。2,由于对称多处理及(SMP)即多cpu的出现,进程并行开销过大。线程,轻量级进程,一个基本的cpu执行单元,程序执行过程中的最小单元,由线程id,程序计数器,寄存器集合,和堆栈共同组成。线程可以与同属于一个进程的其他线程共享进程拥有的资源。线程是属于进程的,线程运行在进程空间内,同一进程所产生的线程共享同一内存空间,当进程退出时该进程所产生的数据被强制删除并退出。在pycharm上运行一段代码并不会开一个进程,所以有时需要守护线程

    线程是轻量级进程,一个标准的线程有线程id,当前指令,寄存器集合和堆栈组成,线程是进程的实体,是被系统独立调度和分派的基本单位,线程自己不拥有系统资源,只拥有在运行中必不可少的资源,但它可以与同属有一个进程的其他线程共享进程所拥有的全部资源。

    使用threading模块创建线程

    #coding=utf-8
    import threading
    import time
    
    def saySorry():
        print("亲爱的,我错了,我能吃饭了吗?")
        time.sleep(1)
    
    if __name__ == "__main__":
        for i in range(5):
            t = threading.Thread(target=saySorry)
            t.start() #启动线程,即让线程开始执行
    

      多线程并发操作,当调用start()方法后才真正创建线程并执行,每一个线程都有一个唯一的标识符用来区分程序的主次关系,程序主入口被称为主线程,使用threading.Thread()创建的都是子线程,线程数量等于:主线程+子线程数

    主线程默认会等待所有子线程结束后再结束

    len(threading.enumerate())  查看线程数量

    同时开启几个线程,是没有执行顺序的

    t1 = threading.Thread(target=)

    t1.setDaemon(True)  设置守护线程

    多线程共享全局变量,如果多个线程同时更改同一个全局变量,会造成错误的结果,需要通过t1.join() 让其他线程等待t1线程执行结束再执行,也就相当于没有并发而是并行

    互斥锁

    murtes = threading.Lock()  使用Lock模块创建线程锁对象

    mutes.acquire   锁定同一个进程下不同线程的共享资源,其他线程不能更改此资源

    mutes.release()  释放锁

    确保了某段关键代码只能由一个线程从头到尾完整地执行

    死锁

    在线程共享多个资源 时,如果两个线程分别占有一部分资源并同时等待对方的资源,就会造成死锁。

    线程的建立

    import threading
    import time
    import os
    '''
    theading模块建立在_thread模块基础上,thread模块以低级原始的方式处理和控制线程,threading模块对
    thread模块进行二次封装,提供了更方便的api处理线程
    本代码创建了20个’前台‘线程,然后控制器交给cpu,通过指定算法调度,分片执行指令,
    '''
    
    def work(num):
        print('in work{}'.format(num))
        time.sleep(2)
    
    
    if __name__ == '__main__':
    
        for i in range(20):
            t = threading.Thread(target=work, args=(i,), name='t.{}'.format(i))  # args = 后面必须是元组, name参数设置线程名
            t.setDaemon(True)  # 在start方法之前设置默认为False,即前台线程,子线程结束后主线程再结束;为True时设置为守护线程,主线程结束,则无论子线程是否执行完毕都结束,即后台线程
    
            t.start()
            # t.setName('tt.i')  # 设置线程名
            # print(t.getName())  # 获取线程名
            t.name = 'ttt.{}'.format(i)  # t.name设置线程名
            print(t.name)  # t.name 获取线程名
            # print(t.is_alive())  # 判断线程是否是激活状态
            # print(t.isAlive())  # 也是判断线程是否是激活状态 后面的括号不能忘!!
            print(t.isDaemon())  # 判断是否为守护线程
            print(t.ident)  # 获取线程的标识符,在本脚本多线程运行的时候,140310960539392线程标示符不同,进程号相同
            # t.join()  # 逐个执行每个线程,执行完毕后在继续向下执行,此方法让多线程变的毫无意义,线程标识符相同,进程号相同
            print(os.getpid())  # 获取进程号,在本脚本多线程,单线程运行时进程号都相同
            # t.run() :线程被cpu调度后自动执行线程对象的run方法
    
    
    # import threading
    # import time
    #
    #
    # def worker(num):
    #     """
    #     thread worker function
    #     :return:
    #     """
    #     time.sleep(1)
    #     print("The num is  %d" % num)
    #     return
    #
    #
    # for i in range(20):
    #     t = threading.Thread(target=worker, args=(i,),name ='t. % d' % i)
    #     t.start()
    

      

    线程锁

    import threading
    import time
    '''
    由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。
    为了保证数据的准确性,引入了锁的概念。所以,可能出现如下问题:
    例:假设列表A的所有元素就为0,当一个线程从前向后打印列表的所有元素,
    另外一个线程则从后向前修改列表的元素为1,那么输出的时候,列表的元素就会一部分为0,
    一部分为1,这就导致了数据的不一致。锁的出现解决了这个问题。
    '''
    
    
    globals_num = 0
    
    lock = threading.RLock()
    
    
    def Func():
        lock.acquire()  # 获得锁 
        global globals_num
        globals_num += 1
        time.sleep(1)
        print(globals_num)
        lock.release()  # 释放锁
    
    
    for i in range(10):  # 连续创建了十个进程,由于Func方法内有锁,多线程只能一个一个运行
        t = threading.Thread(target=Func)
        t.start()
    

      

    线程锁Lock和RLock区别

    import threading
    import time
    
    '''
    由于线程之间是进行随机调度,并且每个线程可能只执行n条执行之后,CPU接着执行其他线程。为了保证数据的准确性,引入了锁的概念。
    RLock允许在同一线程中被多次acquire。而Lock却不允许这种情况。
     如果使用RLock,那么acquire和release必须成对出现,即调用了n次acquire,
     必须调用n次的release才能真正释放所占用的琐。
    '''
    # lock = threading.Lock()
    # lock.acquire()
    # lock.acquire()  # 代码在此行产生了死锁
    # lock.release()
    # lock.release()
    
    # rLock = threading.RLock()
    # rLock.acquire()
    # rLock.acquire()
    # rLock.release()
    # rLock.release()
    
    
    
    
    
    
    
    '''下面的代码都计算错误了'''
    
    
    num = 0
    
    
    def work(lock):
        global num
        num += 1
        time.sleep(0.5)
        # print(num)
    
    
    def work1(lock):
        time.sleep(0.5)
        global num
        num += 1
    
    
    if __name__ == '__main__':
        lock = threading.Lock()
        for i in range(10000):
            t = threading.Thread(target=work, args=(lock,))
            t1 = threading.Thread(target=work1, args=(lock,))
    
            t.start()
            t1.start()
    
        print(num)
    
    
    
    import threading
    import time
    
    '''
    两个线程同时连续修改同一个进程中的同一个全局变量一百万次,运行结束后全局变量被计算错误。
    可以使用threading.Lock,acquire上锁,release解锁
    '''
    
    
    def work1(num):
        for i in range(num):
            global count
            count += 1
        print('in work1', count)
    
    
    def work2(num):
        for i in range(num):
            global count
            count += 1
        print('in work2', count)
    
    
    if __name__ == '__main__':
        count = 0
        num = 10000
        t1 = threading.Thread(target=work1, args=(num,))
        t2 = threading.Thread(target=work2, args=(num,))
        t1.start()
        t2.start()
    
        while len(threading.enumerate()) != 1:
            time.sleep(0.5)
    
        print(count)
    

      

    线程Event

    import threading
    
    '''
    python线程的事件用于主线程控制其他线程的执行,事件主要提供了三个方法 set、wait、clear。
    事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
    clear:将“Flag”设置为False
    set:将“Flag”设置为True
    Event.isSet() :判断标识位是否为Ture。
    当线程执行的时候,如果flag为False,则线程会阻塞,当flag为True的时候,线程不会阻塞。它提供了本地和远程的并发性。
    '''
    
    
    def do(event):
        print('do start')
        event.wait()
        print('do over')
    
    
    if __name__ == '__main__':
        event_obj = threading.Event()
        for i in range(10):
            t1 = threading.Thread(target=do, args=(event_obj,))
            t1.start()
        event_obj.clear()
        in_data = input('true?')
        if in_data == 'true':
            event_obj.set()
    

      

    自己做一个线程池

    # 简单往队列中传输线程数
    import threading
    import time
    import queue
    
    class Threadingpool():
        def __init__(self,max_num = 10):
            self.queue = queue.Queue(max_num)
            for i in range(max_num):
                self.queue.put(threading.Thread)
    
        def getthreading(self):
            return self.queue.get()
    
        def addthreading(self):
            self.queue.put(threading.Thread)
    
    
    def func(p,i):
        time.sleep(1)
        print(i)
        p.addthreading()
    
    
    if __name__ == "__main__":
        p = Threadingpool()
        for i in range(20):
            thread = p.getthreading()
            t = thread(target = func, args = (p,i))
            t.start()
    

      

    #往队列中无限添加任务
    import queue
    import threading
    import contextlib
    import time
    
    StopEvent = object()
    
    
    class ThreadPool(object):
    
        def __init__(self, max_num):
            self.q = queue.Queue()
            self.max_num = max_num
    
            self.terminal = False
            self.generate_list = []
            self.free_list = []
    
        def run(self, func, args, callback=None):
            """
            线程池执行一个任务
            :param func: 任务函数
            :param args: 任务函数所需参数
            :param callback: 任务执行失败或成功后执行的回调函数,回调函数有两个参数1、任务函数执行状态;2、任务函数返回值(默认为None,即:不执行回调函数)
            :return: 如果线程池已经终止,则返回True否则None
            """
    
            if len(self.free_list) == 0 and len(self.generate_list) < self.max_num:
                self.generate_thread()
            w = (func, args, callback,)
            self.q.put(w)
    
        def generate_thread(self):
            """
            创建一个线程
            """
            t = threading.Thread(target=self.call)
            t.start()
    
        def call(self):
            """
            循环去获取任务函数并执行任务函数
            """
            current_thread = threading.currentThread
            self.generate_list.append(current_thread)
    
            event = self.q.get()  # 获取线程
            while event != StopEvent:   # 判断获取的线程数不等于全局变量
    
                func, arguments, callback = event   # 拆分元祖,获得执行函数,参数,回调函数
                try:
                    result = func(*arguments)   # 执行函数
                    status = True
                except Exception as e:    # 函数执行失败
                    status = False
                    result = e
    
                if callback is not None:
                    try:
                        callback(status, result)
                    except Exception as e:
                        pass
    
                # self.free_list.append(current_thread)
                # event = self.q.get()
                # self.free_list.remove(current_thread)
                with self.work_state():
                    event = self.q.get()
            else:
                self.generate_list.remove(current_thread)
    
        def close(self):
            """
            关闭线程,给传输全局非元祖的变量来进行关闭
            :return:
            """
            for i in range(len(self.generate_list)):
                self.q.put(StopEvent)
    
        def terminate(self):
            """
            突然关闭线程
            :return:
            """
            self.terminal = True
            while self.generate_list:
                self.q.put(StopEvent)
            self.q.empty()
    
        @contextlib.contextmanager
        def work_state(self):
            self.free_list.append(threading.currentThread)
            try:
                yield
            finally:
                self.free_list.remove(threading.currentThread)
    
    
    
    
    def work(i):
        print(i)
        return i +1 # 返回给回调函数
    
    def callback(ret):
        print(ret)
    
    pool = ThreadPool(10)
    for item in range(50):
        pool.run(func=work, args=(item,),callback=callback)
    
    pool.terminate()
    # pool.close()
    
    

      

  • 相关阅读:
    fastapi+vue搭建免费代理IP网站部署至heroku
    如何选择免费代理ip,需要注意哪些指标值和基本参数
    如何部署MongoDB并开启远程访问Docker版
    Linux设置Frps Frpc服务开启启动
    Docker搭建VS Code Server ,设置访问密码随时随地写代码
    薅羊毛须及时 多平台微信线报提醒脚本
    python+selenium实现百度关键词搜索自动化操作
    用python selenium 单窗口单IP刷网站流量脚本
    杂记 内容会在留言中补充
    c#杂记
  • 原文地址:https://www.cnblogs.com/serpent/p/9352025.html
Copyright © 2011-2022 走看看