zoukankan      html  css  js  c++  java
  • 网络编程 多线程/socketserver模块/ threading.local

    线程

    线程:能被操作系统调度的最小的执行单位

    多线程:在1个进程中存在多个线程。

    进程只是用来把资源集中在一起,而线程才是cpu上的执行单位。

    每个进程都会默认有一个控制线程也叫作主线程。

    进程之间是竞争关系,线程之间是协作关系。

    线程和进程之间的区别?

    1.线程时间开销小,不需要申请内存空间,创建速度快。进程需要申请内存空间,创建速度慢。

    2.同一进程下的多个线程,共享该进程的地址空间。即线程之前的数据是共享的.

    3.改变主进程 ,无法影响子进程,改变了主线程,影响其他线程。原因(该控制线程可以执行代码从而创建新的线程,该主线程的运行周期代表了进程的运行周期)

    多线程的出现的背景

    在单线程下如果有两个函数,这两个函数会顺序执行,如果其中一个函数需要访问互联网资源,另一个函数就会卡在原地等待互联网资源的返回,这时,cpu也会闲置下来,这就造成了cpu的浪费。多线程的目的:为了充分利用CPU资源,让cpu的闲置时间变的更少,这时候就出现了多线程

    开启多线程

    如何开启多线程?有两种方法。

    方法一:创建一个threading.Thread对象,在它的初始化函数(__init__)中将可调用对象作为参数传入。(这个方法是重点)

    方法二:通过继承Thread类,重写它的run方法;(这个方法作为了解)

    方法一:

    from threading import Thread, current_thread
    import time
    
    
    def task():
        print("%s is runing" % current_thread().ident)  # 查看线程id
        time.sleep(2)
        print("%s is done" % current_thread().ident)
    
    
    if __name__ == "__main__":
        t = Thread(target=task)
        t.start()
        print(t.ident)  # 查看线程id
        print("")  # 主线程运行周期代表进程的运行周期,他不能先死,他要等子线程运行完和进程意义不同

    结果:

    123145421389824 is runing
    123145421389824123145421389824 is done

     

    方法二:

    from threading import Thread current_thread
    import os
    
    class My_task(Thread):
        def __init__(self,name):
            super(My_task,self).__init__()
            self.name=name
        def run(self):
            print("%s is runing" % current_thread().ident)
            time.sleep(2)
            print("%s is done"%current_thread().ident)
    
    
    if __name__ == '__main__':
        t=My_task("小红")
        t.start()
        print(t.name)

    结果:

    24156 is runing
    小红
    24156 is done

    线程需要知道的

     1.子进程不能修改主进程的变量,子线程能修改主线程的变量因为线程之间不是隔离的.

    范例:

    n=1
    def t():
        global n
        n=15
    if __name__ == '__main__':
            t=Thread(target=t)
            t.start()
            print(n)

    结果:

    15

    2.线程只能自己执行完,不能用terminal强制结束

    3.  主线程运行周期代表进程的运行周期,他不能先死,如果主线程死了,主进程也会结束.那么子进程也会结束.所以主线程会等子线程结束再结束.

    线程主要掌握的方法有3个 :

    start(), # 开启线程,异步非阻塞
    
    join() # 等待当前线程执行完 同步阻塞
    
    from threading import current_thread  中
        有个叫current_thread .getName()这个可以看线程的名字,
        有个叫current_thread.ident可以看线程id
    from threading import enumerate   enumerate() # 查看当前活着的线程
    from threading import  active_count    active_count # 查看当前活着线程个数

    线程锁

    多线程会出现线程不安全的现象,看例子

    作用实现线程安全

    from threading import Thread
    
    n = 0
    
    
    def add():
        for k in range(500000): # 这里数值越大越会出现线程不安全
            global n
            n += 1
    
    
    def cut():
        for k in range(500000):
            global n
            n -= 1
    
    
    t_l = []
    
    for i in range(2):  # 开四个线程,两个加,两个减,开的线程越多,越会出现数据不安全
        t1 = Thread(target=add)
        t1.start()
        t2 = Thread(target=cut)
        t2.start()
        t_l.append(t1)
        t_l.append(t2)
    
    for t in t_l:
        t.join()
    print(n)  # 结果应该为0

    结果:

    425558

    线程不安全的原因 

    Python 代码先被编译为字节码后,再由Python虚拟机来执行字节码, Python的字节码是一种类似汇编指令的中间语言, 一个Python语句会对应若干字节码指令,虚拟机一条一条执行字节码指令, 从而完成程序执行。
    Python dis 模块支持对Python代码进行反汇编, 生成字节码指令。

    import dis
    
    n = 0
    def func():
        global n
        n += 1
    
    dis.dis(func)

    结果:

    10           0 LOAD_GLOBAL              0 (n)        # 加载全局变量
                  2 LOAD_CONST               1 (1)         # 加载全局变量的值
                  4 INPLACE_ADD                              # 就地计算加法
                  6 STORE_GLOBAL             0 (n)       # 把计算的值存到全局变量中
                  8 LOAD_CONST               0 (None)   
                 10 RETURN_VALUE

    # 10就是执行的代码行数

    根据dis的这个例子,我们假设开启了两个func线程,这时候其中一个线程得到了gil锁,执行,将要执行到store_GLOBAL中时,GIL突然被释放了,这时候第一个线程中的全局n并没有赋值还是0,此时线程2获得了gill锁,愉快的计算完了,并把全局n改成1,接着线程1又获得了gill锁,继续用自己之前计算好的值n=1执行store_global,把n设为1,此时可以发现两次加法运算全局n竟然变成1而不是2,这就是线程不安全

    在工作中我们怎么避免线程不安全?

    1.尽量不要使用全局变量

    2.尽量不要修改静态变量

    3.queque和logging模块绝对是线程安全的

    4.list中的append方法是线程安全的

    5.出现+= -= 使用

    解决方法

    from threading import Thread, Lock
    
    n = 0
    
    
    def add(lock):
        for k in range(500000):  # 这里数值越大越会出现线程不安全
            global n
            with lock:  # 只需要这一行加锁就可以了
                n += 1
    
    
    def cut(lock):
        for k in range(500000):
            global n
            with lock:  # 只需要这一行加锁就可以了
                n -= 1
    
    
    t_l = []
    mutex = Lock()
    for i in range(2):  # 开四个线程,两个加,两个减,开的线程越多,越会出现数据不安全
        t1 = Thread(target=add, args=(mutex,))
        t1.start()
        t2 = Thread(target=cut, args=(mutex,))
        t2.start()
        t_l.append(t1)
        t_l.append(t2)
    
    for t in t_l:
        t.join()
    print(n)  # 结果应该为0

    结果:

     0

    线程间的安全容器queue

    thread中并没有queue这个方法,我们使用的是Python自带的模块queue,来实现队列

    import queue
    
    q = queue.Queue()  # 可以设置队列大小
    q.put(0)  # 放数据
    q.get()  # 取数据 这个方法不好,以为当队列中没有数据的时候他会卡住,我们一般用q.get_nowait()
    try:
        q.get_nowait()  # 获取数据,没有数据会报异常,然后我们处理异常,这样就不会卡住
    except queue.Empty:
        pass

    栈LifoQueue

    queue中还有个栈的方法  这个是线程安全的

    from queue import LifoQueue
    l = LifoQueue() #后进先出
    l.put()# 放数据
    l.get() #取数据

    优先级队列PriorityQueue

    根据优先级大小出队列
    from queue import PriorityQueue
    
    l = PriorityQueue()  # 根据优先级大小出队列
    l.put((1, "A"))  # 放数据,第一个值就是排序依据根据ascii来作为优先级大小
    l.put((2, "G"))  # 放数据
    l.put((3, "F"))  # 放数据
    print(l.get())  # 取数据
    print(l.get())  # 取数据
    print(l.get())  # 取数据

    结果:

    (1, 'A')
    (2, 'G')
    (3, 'F')

     线程池

    出现背景:

      1.开多线程是为了并发,通常有几个cpu核心就开几个进程,但是进程开多了会影响效率,主要体现在切换的开销,所以引入进程池限制同时开进程的数量。

      2.如果有几个任务开几个线程的话,每个线程运行完都需要归还内存,归还内存的时候还需要开销,而线程池开好了,一直就开着不会关闭,没有归还开销

    线程池:如果你不指定默认是CPU的个数*5,进程池如果你不指定默认是CPU的个数。

    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    import time ,random
    def task(n):
        print("%s is running"%current_thread().getName())
        time.sleep(random.randint(1,3))
        return n**2
    if __name__ == '__main__':
        t=ThreadPoolExecutor(3) # 先开线程池,有任务后直接
        objs=[]
        for i in range(10):  
            obj=t.submit(task,i) # 注意这里传递参数的方式和thread不同,这里还可以使用关键字传参
            objs.append(obj)  #这里为啥不直接那值,如果直接拿值的话就变成串行了,所以这里用了一个列表
        t.shutdown(wait=True) # 相当于join,等待所有的值都计算完再拿结果,如果你不使用这个,for循环会得到一个结果那一个结果
        for obj in objs:
            print(obj.result()) #获得返回值
        print("",current_thread().getName())

    结果:

    ThreadPoolExecutor-0_0 is running
    ThreadPoolExecutor-0_1 is running
    ThreadPoolExecutor-0_2 is running
    ThreadPoolExecutor-0_1 is running
    ThreadPoolExecutor-0_2 is running
    ThreadPoolExecutor-0_0 is running
    ThreadPoolExecutor-0_2 is running
    ThreadPoolExecutor-0_0 is running
    ThreadPoolExecutor-0_1 is running
    ThreadPoolExecutor-0_0 is running
    0
    1
    4
    9
    16
    25
    36
    49
    64
    81
    主 MainThread

    绑定回调函数

    异步调用:提交完任务(为该任务绑定一个回调函数),不用在原地等任务执行完拿结果,可以直接提交下一个任务。一个任务一旦完成后就会立即触发回调函数的运行,然后就拿这个结果去执行下一个方法。

    add_done_callback(print_value)就是一个典型的异步非阻塞模型

    回调函数的参数是唯一的就是它绑定的返回值。

    rom concurrent.futures import ThreadPoolExecutor
    from threading import current_thread
    import time, random
    
    
    def task(n):
        # print("线程%s is running 值n%s" % (current_thread().getName(),n))
        time.sleep(random.randint(1, 3))
        print("线程%s is end 值n%s" % (current_thread().getName(), n))
        return n ** 2
    
    
    def print_value(obj):
        print(obj.result())
    
    
    if __name__ == '__main__':
        t = ThreadPoolExecutor(3)  # 先开线程池,有任务后直接
        for i in range(10):
            obj = t.submit(task, i)  # 注意这里传递参数的方式和thread不同,这里还可以使用关键字传参
            obj.add_done_callback(print_value)  # 回调函数谁先执行完,谁立即去执行打印操作
        t.shutdown(wait=True)  # 相当于join
    
        print("", current_thread().getName())

    结果:

    线程ThreadPoolExecutor-0_1 is end 值n1
    线程ThreadPoolExecutor-0_2 is end 值n2
    1
    4
    线程ThreadPoolExecutor-0_0 is end 值n0
    0
    线程ThreadPoolExecutor-0_2 is end 值n4
    16
    线程ThreadPoolExecutor-0_0 is end 值n5
    25
    线程ThreadPoolExecutor-0_2 is end 值n6
    36
    线程ThreadPoolExecutor-0_1 is end 值n3
    9
    线程ThreadPoolExecutor-0_0 is end 值n7
    49
    线程ThreadPoolExecutor-0_2 is end 值n8
    64
    线程ThreadPoolExecutor-0_1 is end 值n9
    81
    主 MainThread

    concurrent中的map函数

    今天在项目中用到了concurrent中的map函数,作用为把可迭代中的每一个元素分别传入到函数中,进行执行

    from concurrent.futures import ThreadPoolExecutor
    import time
    
    
    def get_html(times):
        """
        模拟获取页面响应
        :param times:
        :return:
        """
        time.sleep(times)
        print("获得页面{}的响应".format(times))
        return times
    
    executor = ThreadPoolExecutor(max_workers=2)# 开启两个线程线程
    urls = [3,2,1]
    for data in executor.map(get_html,urls):
        print("get 页面{}的数据".format(data))
    获得页面2的响应
    获得页面3的响应
    get 页面3的数据
    get 页面2的数据
    获得页面1的响应
    get 页面1的数据

    利用多线程来重写套接字

    服务端:

    from threading import Thread
    import socket
    import socketserver
    
    server=socket.socket(socket.AF_INET,socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    server.bind(("192.168.12.193",8085))
    server.listen(5)
    print("starting")
    def talk(conn,addr):
        """
        接收数据发送数据函数
        :param conn:
        :param addr:
        :return:
        """
        while True:
            try:
                data=conn.recv(1024)
                if not data:break
                conn.send(data.upper())
            except ConnectionResetError:
                break
        conn.close()
    while True:
        conn,addr=server.accept()
        t=Thread(target=talk,args=(conn,addr))
        t.start()
    
    server.close()

    客户端:

    import socket
    custom=socket.socket()
    custom.connect(("127.0.0.1",8080))
    while True:
        msg=input(">>>>").strip()
        if not msg:
            continue
        custom.send(msg.encode("utf-8"))
        data=custom.recv(1024)
        print(data.decode("utf-8"))
    custom.close()

    守护进程和守护线程

    守护进程:当主进程运行完后,不管子进程运没运行完,子进程都将立即被终止.不管有没有其他的子进程

    应用:如果有多个Python项目都在运行,我们如果写一个脚本来监控这些项目,可以用守护进程的方式,zaxbit

    不用守护进程的情况下:

    from multiprocessing import Process
    import os,time
    
    def task():
        time.sleep(2)
    
        print("这是进程%s"%1)
    
    
    if __name__ == '__main__':
    
        t=Process(target=task)
    
        t.start()
        print('主进程')

    结果:

    主进程
    这是进程1

    使用守护进程后:

    from multiprocessing import Process
    import os,time
    
    def task():
        time.sleep(2)
        print("这是进程%s"%1)
    
    if __name__ == '__main__':
        t=Process(target=task)
        t.daemon=True #注意守护进程必须在p.start()之前设置,开启守护进程不能再开启子进程
        t.start()
        print('主进程')

    结果:

    主进程

    注意:

    1. 注意守护进程必须在p.start()之前设置,
    2. 开启的守护进程不能再开启守护进程的子进程 因为进程之间是相互独立的.如果开启再开启子进程后,当守护进程停止后,没有人回收子进程.
      1.  

    守护进程死掉的时间: 当主进程代码执行完后,守护进程就会死掉

    守护线程

    注意: 

    1. 守护线程必须在p.start()之前设置
    2. 开启的守护线程可以再开启守护进程的子进程,因为在同一个进程中,如果主线程死掉,子线程也会死掉
    3. 当主进程结束了,当还有其他线程执行时,守护线程会继续守护其他的线程.

    列如:

    from threading import Thread
    import os,time
    
    def task():
        time.sleep(2)
        print("这是线程%s"%1)
        tt=Thread(target=time.time()) #开启子线程
        print(time.time())
        tt.start()
    
    if __name__ == '__main__':
        t=Thread(target=task)
        t.daemon=True #守护进程必须在p.start()之前设置,守护进程不能开启子进程
        t.start()
        print('主线程')

    结果:

    主线程

    守护线程死掉的时间:当进程內非守护线程都运行完后,进程就会死掉,守护线程才死掉.这也就是守护线程和守护进程的区别

    from threading import Thread
    import os,time
    
    def task():
        time.sleep(2)
        print("这是线程%s"%1)
    
    def task1():
        time.sleep(3)
        print("这是线程%s" % 2)
    if __name__ == '__main__':
        t1=Thread(target=task)
        t2=Thread(target=task1)
        t1.daemon=True #守护进程必须在p.start()之前设置,守护进程不能开启子进程
        t1.start()
        t2.start()
        print('主线程)

    结果:

    主线程
    这是线程1
    这是线程2

     threading.local

    我们知道线程之间的数据是共享的,但是有没有一种技术可以让线程内的数据不共享.恰好python 为我们提供了threading.local可以实现该功能.

    import threading
    
    local=threading.local()  #实例化一个全局对象
    local.val='main-Thread' #在该线程下给local对象添加对象属性
    
    def process_student():
        print('%s (in%s)内存地址为%s'%(local.val,threading.current_thread().getName(),id(local.val)))
    def process_thread(name):
        local.val=name
        process_student()
    
    if __name__ == '__main__':
        #开启两个子线程
        t1 = threading.Thread(target=process_thread, args=('one',), name="Thread-A")
        t2 = threading.Thread(target=process_thread, args=('two',), name="Thread-B")
        t1.start()
        t2.start()
        t1.join()
        t2.join()
        print('主线程的值为%s,内存地址为%s'%(local.val,id(local.val)))#打印当前现成的额值

    结果:

    one (inThread-A)内存地址为1545252686736
    two (inThread-B)内存地址为1545252717264
    主线程的值为main-Thread,内存地址为1545252704688

    从结果上看来,两个子线程并没有把local.val中的值覆盖,而是自己重新开辟了一块内存空间,来存放数据,这样就把线程之间的数据給隔离开来了.

    作用: 用于保存和隔离线程之间的数据.

    应用:

    这个东西可以用在那些地方呢,比如下载,现在都是多线程下载了,就像酷狗那样,可以同时下载很多首歌曲,那么

    就可以利用这个方法来保存每个下载线程的数据,比如下载进度,下载速度之类的

    所以  如果你在开发多线程应用的时候  需要每个线程保存一个单独的数据供当前线程操作,可以考虑使用这个方法,简单有效

    threading.event

    转载于:https://support.i-search.com.cn/article/1565161802455

    有时候,我们线程 A 和 B 运行中,需要等待某个条件 (某件事情发生),才能继续运行下面的代码。事件还没发生前,线程阻塞住,直到事件的发生。 
    Python 中 threading.Event 就可以实现线程间的事件通知。

    import threading
    import time, random
    
    
    def say(e):
        time.sleep(random.random())
        print('%s准备说话中。。。。' % threading.currentThread().name)
        e.wait() # 等待别人下达执行命令
        time.sleep(random.random())
        print('%s->可以开始说话了' % (threading.currentThread().name))
    
    
    def specified_say(e):
        time.sleep(5)
        print(">>>允许线程说话")
        e.set() #允许执行say函数
    
    
    e = threading.Event()
    t1 = threading.Thread(target=say, args=(e,), name='线程1')
    t2 = threading.Thread(target=specified_say, args=(e,), name='线程2')
    t1.start()
    t2.start()
    
    time.sleep(10)
    e.set()

    结果:

    线程1准备说话中。。。。
    >>>允许线程说话
    线程1->可以开始说话了

    socketserver模块

    优秀的博客https://www.cnblogs.com/MnCu8261/p/5546823.html

    python共提供了两个socket模块

    一个是:socket

    另一个是socketserver 模块   简化了网络服务器的开发,解决了io问题。

    SocketServer模块简化了编写网络服务程序的任务。同时SocketServer模块也 是Python标准库中很多服务器框架的基础。

    这个模块提供了多进程和多线程的接口,但是多进程不能在Windows系统下用

    服务端:

    import socketserver
    class
    MyTCPhandler(socketserver.BaseRequestHandler): def handle(self):#重写handle方法 conn=self.request #这步相当于conn addr=self.client_address #这步相当于addr print(conn,addr) while True: try: data=conn.recv(1024) if not data:break conn.send(data.upper()) except ConnectionResetError: break conn.close() if __name__ == '__main__': server=socketserver.ThreadingTCPServer(("127.0.0.1",8080),MyTCPhandler) #这一步就是做了,建立连接,开线程的工作
    注意在Windows系统上不能用socketserver来开线程,在其他系统上可以用
    socketserver.ForkingTCPServer(("127.0.0.1",8080),MyTCPhandler)来创建
       server.allow_reuse_address=True #是否允许地址的重复利用,默认为false
       server.serve_forever()#一直运行

    客户端:

    import  socket
    
    client=socket.socket()
    client.connect(('127.0.0.1',8080))
    
    while True:
        date = input('>>>>')
        client.send(date.encode())
        data = client.recv(1024)
        print('>>>', data.decode())
    
    client.close()

    子线程和子进程以及主进程的关系 

    做完一个真正的关于多进程和多线程的项目后才真正的理解了,子线程和主进程,和子进程的存活关系

    结论:

    1.在主进程中开子线程,主进程会等子线程执行完,再结束。

    2.在子进程中开子线程,子进程不会等子线程执行完,它就会结束,子线程也立即结束,

    例子1.在主进程中开子线程

    from threading import Thread
    import os, time
    
    
    def task():
        time.sleep(2)
        while True:
            print("这是线程%s" % 1)
            time.sleep(0.5)
    
    
    def task1():
        time.sleep(3)
        print("这是线程%s" % 2)
    
    
    if __name__ == '__main__':
    
        t1 = Thread(target=task)
        t2 = Thread(target=task1)
        # t1.daemon = True  # 守护进程必须在p.start()之前设置,守护进程不能开启子进程
        t1.start()
        t2.start()
        print('主线程')

    结果:

    主线程
    这是线程1
    这是线程1
    这是线程2
    这是线程1
    这是线程1
    这是线程1
    这是线程1
    这是线程1
    这是线程1
    这是线程1
    这是线程1
    这是线程1
    这是线程1

    例子2.在子进程中开子线程

    from multiprocessing import Process
    from threading import Thread
    import os, time
    
    
    class Task(Thread):
        def run(self):
            time.sleep(2)
            while True:
                print("这是线程%s" % 1)
                time.sleep(0.5)
    
    
    class Task1(Thread):
        def run(self):
            time.sleep(3)
            print("这是线程%s" % 2)
    
    
    class ProcessTest(Process):
        def run(self):
            print("子进程")
            t1 = Task()
            t1.start()
            t2 = Task1()
            t2.start()
    
    
    if __name__ == '__main__':
        p = ProcessTest()
        p.start()
        print('主线程')

    结果:

    主线程
    子进程
  • 相关阅读:
    AngularJS总结
    网页的颜色表示方法
    计算机中的字符编码
    计算机中的进制
    常用的HTML 标签二
    常用的HTML标签
    常用的字符实体标记
    一个请求的访问流程
    http请求访问过程
    codeforces 269C Flawed Flow(网络流)
  • 原文地址:https://www.cnblogs.com/sticker0726/p/7943412.html
Copyright © 2011-2022 走看看