zoukankan      html  css  js  c++  java
  • 网络编程------线程

    概念:(一般用于处理高并发)

    有了进程为什么还要线程?

    进程只能在一个时间干一件事,如果相同时干两件事,进程就不行了

    进程在执行过程中如果遇到阻塞,例如输入,整个进程就会挂起,即使进程中有些工作不依赖输入的数据,也将无法执行.

    线程:

      线程:线程是计算机中被cpu调度的最小单位.

       线程:轻量级的进程/轻型进程

      线程本身创建出来就是为了解决并发问题的

      并且他整体效率比进程要高(相比于进程他省去了多数创建的时间和回收有时还有相互切换(轻量级))

       是进程的一部分,不能独立存在

    进程:(消耗资源)

      对操作系统压力大

      数据隔离

      可以在操作系统中独立存在

      计算机中资源分配的最小单位

    多进程多线程都可以实现并行:资源够用同时进行,简单来说,多个cpu可以同时执行一个进程多个线程

    全局解释器锁GiL:  Cpython解释器下

    python刚出来是单核的所以没有考虑到线程并行问题,随着以后cpu的的增多就有问题了,多个cpu(资源够用,会出现并行问题,那么当两个cpu同时执行一个线程时,就会出现数据混乱),所以 出现了全局解释器锁(锁线程的,结果看上去就又变成单核运算了,它是由解释器提供的)虽然慢了但是保障了数据的安全

    所以: 1 GIL是锁进程的

       2 这个锁是解释器提供的

    所以当遇到高计算型:

        多开线程或者换解释器

    线程常见的几个模块:

    thred(低级,不支持守护线程)

    threding(高级的,功能更强大)

    Queue :允许用户创建一个可以用于多个线程之间共享数据的队列数据结构。

    threding模块

    multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性,因而不再详细介绍(https://docs.python.org/3/library/threading.html?highlight=threading#)

    线程的创建:

    方式一:
    def func(a): print(
    'zi线程',a) if __name__ == '__main__': # 当前文件下才执行下面的代码,不同py文件xia不会执行 t=Thread(target=func,args=('李四',)) t.start() print('主线程')

    #zi线程 李四
    #主线程
    方式二 : 面向对象型
    class
    Sayhi(Thread): def __init__(self,name): super().__init__() self.name=name def run(self): time.sleep(2) print(self.name) if __name__ == '__main__': t=Sayhi('haha') t.start() print('主线程') #主线程 #haha

    多线层与多进程:

    多进程: 开多个进程,每个进程都有不同的pid

    多线程:开启多个线程,每个的pid都和主进程都一样

        同一进程内各个线程共享进程的数据

    def work():
        print('hello',os.getpid())
    
    if __name__ == '__main__':
        #part1:在主进程下开启多个线程,每个线程都跟主进程的pid一样
        t1=Thread(target=work)
        t2=Thread(target=work)
        t1.start()
        t2.start()
        print('主线程/00主进程pid',os.getpid())
    
        #part2:开多个进程,每个进程都有不同的pid
        p1=Process(target=work)
        p2=Process(target=work)
        p1.start()
        p2.start()
        print('主线程/主进程pid',os.getpid())
    # n = 100
    # def func():
    #     global n
    #     n -= 1
    #
    # t = Thread(target=func)
    # t.start()
    # t.join()
    # print(n) # 随线一个线程的改动,整个进程中也改了

    多线程实现socket通信

    sever端
    # socket实现多线程
    import socket
    from threading import Thread
    
    
    def func(conn):
        while True:
            msg=conn.recv(1024).decode()
            conn.send(msg.upper().encode())
    sk=socket.socket()
    sk.bind(('192.168.11.114',9000))
    sk.listen()
    
    while True:
        conn,addr=sk.accept()
        Thread(target=func,args=(conn,)).start()
    
    Clinet端
    
    import socket
    
    sk=socket.socket()
    sk.connect(('192.168.11.114',9000))
    while True:
            sk.send(b'hello')
            print(sk.recv(1024).decode())

    threading中的其他功能:

    currentThread()# 查看线程号
    activeCount() :查看当前还有几个活跃的线程
    enumerate() 返回当前运行的线程list 里面是每个线程对象

    例子:

    import time
    from threading import Thread,currentThread,activeCount,enumerate
    class Mythread(Thread):
        def __init__(self,arg):
            super().__init__()
            self.arg = arg
        def run(self):
            time.sleep(1)
            print('in son',self.arg,currentThread())
    for i in range(10):
        t = Mythread(123)
        t.start()
        print(t.ident)
    print(activeCount())
    print(enumerate()) #   返回当前运行的线程list 里面是每个线程对象
    
    # activeCount() :查看当前还有几个活跃的线程
    # currentThread()# 查看线程号

    守护线程:(setDaemon(True)

    守护线程线程不会随着主线程的结束而结束,他会一直等着主进程结束然后继续等着其他子进程结束他才结束

    守护进程 只守护主进程的代码,主进程代码结束他就结束,并且是在主进程结束之前给自己分配个结束点,而结束

    进程 terminate 强制结束一个子进程
    线程 没有强制结束的方法
    线程结束: 线程内部代码结束执行完毕,就自动结束.

    例子:

     def func():
    #     while True:
    #         print('in func')
    #         time.sleep(0.5)
    #
    # def func2():
    #     print('start func2')
    #     time.sleep(10)
    #     print('end func2')
    #
    # Thread(target=func2).start()
    # t = Thread(target=func)
    # t.setDaemon(True)
    # t.start()
    # print('主线程')
    # time.sleep(2)
    # print('主线程结束')

    (2)

    锁与Gil:(全局解释器锁,自然情况下大约每700条指令会轮转一次,所以即使有GIL也是不安全的,当你有个很长的代码是有可能运行到一半就移交给另一CPU了 所以就不准了)

    总结来说:有了Gil换是会出现数据不安全的现象,所以还要用锁

    
    
    from threading import Thread,Lock
    n=100
    def func():
    global n
    tmp=n-1
    time.sleep(0.1) # 强行移交给另一个
    n=tmp
    if __name__ == '__main__':
    l=[]
    # lock=Lock()
    for i in range (100):
    t=Thread(target=func,)
    t.start()
    l.append(t)
    for t in l:t.join()
    print(n) # 99 # 如果没有sleep就是0

    即使就拿现在的代码来说你不加sleep他的运算结果也没错,但是那是因为你的代码不够长所以你要操作其中的
    数据时 一定要枷锁
    必加锁 这样就安全了

    import time
    from threading import Thread,Lock n = 100 def func(lock): global n # n -= 1 with lock: tmp = n-1 # n-=1 # time.sleep(0.1) n = tmp if __name__ == '__main__': l = [] lock = Lock() for i in range(100): t = Thread(target=func,args=(lock,)) t.start() l.append(t) for t in l:t.join() print(n)

    死锁与递归锁

    进程和线程都有死锁和递归锁

    死锁:两个或两个以上的进程或线程在执行过程中,因抢夺资源而造成的一种互相等待的现象,若无

    外力作用其,他们无法推进下去.此时系统产生了死锁

    死锁
    def eat1(name): noodle_lock.acquire() print(
    '%s拿到面条了'%name) fork_lock.acquire() print('%s拿到叉子了'%name) print('%s开始吃面'%name) time.sleep(0.2) fork_lock.release() print('%s放下叉子了' % name) noodle_lock.release() print('%s放下面了' % name) def eat2(name): fork_lock.acquire() print('%s拿到叉子了' % name) noodle_lock.acquire() print('%s拿到面条了' % name) print('%s开始吃面' % name) time.sleep(0.2) noodle_lock.release() print('%s放下面了' % name) fork_lock.release() print('%s放下叉子了' % name) Thread(target=eat1,args=('alex',)).start() Thread(target=eat2,args=('wusir',)).start() Thread(target=eat1,args=('太白',)).start() Thread(target=eat2,args=('宝元',)).start()

    解决方式:

    lock = Lock()
    # def eat1(name):
    #     lock.acquire()
    #     print('%s拿到面条了'%name)
    #     print('%s拿到叉子了'%name)
    #     print('%s开始吃面'%name)
    #     time.sleep(0.2)
    #     lock.release()
    #     print('%s放下叉子了' % name)
    #     print('%s放下面了' % name)
    #
    # def eat2(name):
    #     lock.acquire()
    #     print('%s拿到叉子了' % name)
    #     print('%s拿到面条了' % name)
    #     print('%s开始吃面' % name)
    #     time.sleep(0.2)
    #     lock.release()
    #     print('%s放下面了' % name)
    #     print('%s放下叉子了' % name)
    #
    # Thread(target=eat1,args=('alex',)).start()
    # Thread(target=eat2,args=('wusir',)).start()
    # Thread(target=eat1,args=('太白',)).start()
    # Thread(target=eat2,args=('宝元',)).start()


    互斥锁

    无论在相同的线程还是不同的线程,都只能连续acquire一次
    要想再acquire,必须先release

    递归锁:Rlock()(在同一线程中,可以无限的acquire,

    但是要想在其他线程中也acquire,
    必须现在自己的线程中添加和acquire次数相同的release

    )

    例子:

     rlock = RLock()
    # def func(num):
    #     rlock.acquire()
    #     print('aaaa',num)
    #     rlock.acquire()
    #     print('bbbb',num)
    #     rlock.release()
    #     rlock.release()
    #
    # Thread(target=func,args=(1,)).start()
    # Thread(target=func,args=(2,)).start()

    总结:想要不出问题就锁一次开一次

    信号量和事件:

    信号量:

    例子:

    import time
    from threading import Semaphore,Thread
    def func(name,sem):
        sem.acquire()
        print(name,'kaishi')
        time.sleep(1)
        print(name,'jieshu')
        sem.release()
    
    sem=Semaphore(2) # 两个两个创建
    for i in range(4):
        p=Thread(target=func,args=(i,sem))
        p.start()

    那他和进程池有什么区别:

    进程池:如果是相同代码,进程池是先先创建2个然后后面一直利用这两个,可以避免一次开很多进程.

    信号量:是一次性把4个都创建出来,然后两个两个放. 

    事件

    几个常用方法:

    event.isSet(): 返回event的状态.

    event.wait() :如果event的状态为Flase将阻塞

    event.clear(): 恢复event状态为Flase

    event.set():设置其状态为True,所有阻塞池的状态进入激活状态,等待操作系统调度.

    from threading import Event
    # 事件
    # wait() 阻塞 到事件内部标识为True就停止阻塞
    # 控制标识
        # set
        # clear
        # is_set
    
    # 连接数据库
    import time
    import random
    from threading import Thread,Event
    def connect_sql(e):
        count = 0
        while count < 3:
            e.wait(0.5) # 最多阻塞0.5秒 然后程序向下执行
            if e.is_set():
                print('连接数据库成功')
                break
            else:
                print('数据库未连接成功')
                count += 1
    
    def test(e):
        time.sleep(random.randint(0,3))
        e.set()
    
    e = Event()
    Thread(target=test,args=(e,)).start()
    Thread(target=connect_sql,args=(e,)).start()

    条件:

    Python提供的Condition对象提供了对复杂线程同步问题的支持。
    Condition被称为条件变量,除了提供与Lock类似的acquire和release方法外,
    还提供了wait和notify方法。线程首先acquire一个条件变量,然后判断一些条件。
    如果条件不满足则wait;如果条件满足,进行一些处理改变条件后,通过notify方法通知其他线程,
    其他处于wait状态的线程接到通知后会重新判断条件。不断的重复这一过程,从而解决复杂的同步问题。
    
    
    import threading
    
    def run(n):
        con.acquire()
        con.wait()
        print("run the thread: %s" % n)
        con.release()
    
    if __name__ == '__main__':
    
        con = threading.Condition()
        for i in range(10):
            t = threading.Thread(target=run, args=(i,))
            t.start()
    
        while True:
            inp = input('>>>')
            if inp == 'q':
                break
            con.acquire()
            con.notify(int(inp))
            con.release()
            print('****')
     设置某个条件
    # 如果满足这个条件 就可以释放线程
    # 监控测试我的网速
    # 20000个任务
    # 测试我的网速 /系统资源
    # 发现系统资源有空闲,我就放行一部分任务

    定时器: 指定n秒后执行某个操作  

    from threading import Timer
    from threading import Timer
    
    def func():
        print('执行我啦')
    
    t = Timer(3,func)
    # 现在这个时间点我不想让它执行,而是预估一下大概多久之后它执行比较合适
    t.start()
    print('主线程的逻辑')

    # 3秒以后执行 func

    线程队列: import queue

    
    
    import queue
    # 线程对列  线程之间数据安全
    
    p=queue.Queue(1) #设置队列大小
    # 普通队列
    p.get()# 获取
    p.put() # 放入
    p.get_nowait()
    p.put_nowait()
    # 连续p.put() 会让你的程序阻塞,他会一直等待有人拿他
    p.get_nowait()#如果他有数据我就取,如果没有不阻塞而是报错
    p.put(1)
    print(p.get(timeout=2))# 它里面可以放参数的 如果我等两秒没有拿到我就报错,但是一般不在程序中这么用,(死等)会影响效率
    
    
    
    # 栈 先进后出  使用队列是可以实现栈的需求的 更加完整的约束我数据进出的顺序
    lfq=queue.LifoQueue()
    lfq.put(1)
    lfq.put(2)
    lfq.put(3)
    print(lfq.get())
    print(lfq.get())
    print(lfq.get())
    # 堆:最小的在最上面
    
    # 优先级队列 根据第一个值的大小来排定优先级的
    # 数字越小,优先级越高(阿斯克码的值越小,优先级越高) 第一个是数字,第二个是码
    a=queue.PriorityQueue()
    a.put((10,'a'))
    a.put((1,'c'))
    print(a.get())
    print(a.get())
    
    

    线程池:  from concurrent.futures import ThreadPoolExecutor 线程池类

    from  concurrent.futures import ThreadPoolExecutor
    import time
    import random
    from threading import currentThread  # 可以查看线程号
    '''
    # 基本方法:
    submit(fn, *args, **kwargs) # 异步提交任务
    
    map(func, *iterables, timeout=None, chunksize=1) #取代for循环submit的操作
    
    shutdown(wait=True)  # 相当于进程池的pool.close()+pool.join()操作
    
    result(timeout=None)#取得结果
    
    add_done_callback(fn)#回调函数
    '''
    
    def func(num):
        # time.sleep(2)
        print('in%s func'% num,currentThread())
        # time.sleep(random.random())
    tp=ThreadPoolExecutor(5)
    for i in range(30):
        tp.submit(func,i)




  • 相关阅读:
    列表与字典的嵌套
    arduino开发ESP8266学习笔十-----ESP8266闪存文件系统
    arduino开发ESP8266学习笔记九---------ESP8266网络服务器(通过网页访问)
    互联网基础
    IC内部集成MOSFET的升压转换电路计算
    arduino开发ESP8266学习笔记八------(WIFI通信)STA模式、AP模式、混合模式
    arduino开发ESP8266学习笔记七--------EEPROM的使用
    arduino开发ESP8266学习笔记六——变量和函数
    arduino开发ESP8266学习笔记五——模拟输入
    arduino开发ESP8266学习笔记四-----舵机
  • 原文地址:https://www.cnblogs.com/systemsystem/p/10111818.html
Copyright © 2011-2022 走看看