zoukankan      html  css  js  c++  java
  • day33 网络编程之线程,并发以及selectors模块io多路复用

     io多路复用

     selectors模块

    概要:

     1 并发编程需要掌握的知识点:
     2     开启进程/线程
     3     生产者消费者模型!!!
     4     GIL全局解释器锁(进程与线程的区别和应用场景)
     5     进程池线程池
     6 
     7     IO模型(理论)
     8 
     9 
    10 1 多线程
    11     线程的概念?
    12         一个进程内默认就会有一个控制线程,该控制线程可以执行代码从而创建新的线程
    13         该控制线程的执行周期就代表改进程的执行周期
    14     线程VS进程
    15         1、线程的创建开销小于进程,创建速度快
    16         2、同一进程下的多个线程共享该进程的地址空间
    17     GIL全局解释器锁
    18     线程池
    View Code

    开启线程的两种方式:

    from threading import Thread
    from multiprocessing import Process
    import time,os
    
    def task():
        print('%s is running' % os.getpid())
        time.sleep(5)
        print('%s is done' % os.getpid())
    
    class Mythread(Thread):
        def __init__(self, name):
            super().__init__()
            self.name=name
    
        def run(self):
            print('%s is running' % os.getpid())
            time.sleep(5)
            print('%s is done' % os.getpid())
    
    
    
    if __name__ == '__main__':
        # t=Thread(target=task,)
        # t=Process(target=task,)
        t=Mythread('xxxxx')
        t.start()
    
        print('')
        '''
        1、一个进程内不开子进程也不开“子线程”:主线程结束,该进程就结束
        
        2、当一个进程内开启子进程时:
            主线程结束,主进程要等,等所有子进程运行完毕,给儿子收尸
        
        3、当一个进程内开启多个线程时:
            主线程结束并不意味着进程结束,
            进程的结束指的是该进程内所有的线程都运行完毕,才应该回收进程
        '''
    View Code

    这里需要注意一下,在线程里不存在僵尸线程和孤儿线程的概念.

    进程和线程的区别:

    #瞅一眼:PPID,PID
    from threading import Thread
    from multiprocessing import Process
    import time,os
    
    
    def task():
        print('partent:%s self:%s' %(os.getppid(),os.getpid()))
        time.sleep(5)
    
    
    
    
    if __name__ == '__main__':
        t=Thread(target=task,)
        # t=Process(target=task,)
        t.start()
    
        print('',os.getppid(),os.getpid())
    
    
    
    
    #进程直接内存空间隔离
    from threading import Thread
    from multiprocessing import Process
    import time,os
    
    n=100
    def task():
        global n
        n=0
    
    
    
    if __name__ == '__main__':
        t=Process(target=task,)
        t.start()
        t.join()
    
        print('',n)
    
    
    #线程之间内存空间共享
    from threading import Thread
    import time,os
    
    n=100
    def task():
        global n
        n=0
    
    if __name__ == '__main__':
        t=Thread(target=task,)
        t.start()
        t.join()
    
        print('',n)
    View Code

    线程的其他属性以及方法:

    from threading import Thread,current_thread,enumerate,active_count
    import time,os
    
    def task():
        print('%s is running' %current_thread().getName())
        time.sleep(5)
        print('%s is done' %current_thread().getName())
    
    
    if __name__ == '__main__':
        # t=Thread(target=task,name='xxxx')  # # 这里有其他的参数就接着往后传即可,没有就只写一个函数名
        t=Thread(target=task)
        t.start()
        # print(t.name)
    
        #查看当前活着的线程
        print(enumerate()[0].getName())  # # MainThread  print(enumerate()[1].getName())------Thread-1
        # 这里有两个进程所以索引值就是只有0,1,超出部分会报错,out of range
        print(active_count())
        print('',current_thread().getName())  # # 主 MainThread
    View Code

    线程池:

    from concurrent.futures import ProcessPoolExecutor,ThreadPoolExecutor
    from threading import current_thread
    import time,random
    def task(n):
        print('%s is running' %current_thread().getName())
        time.sleep(random.randint(1,3))
        return n**2
    
    if __name__ == '__main__':
        # t=ProcessPoolExecutor() #默认是cpu的核数
        # import os
        # print(os.cpu_count())
    
        t=ThreadPoolExecutor(3) #默认是cpu的核数*5
        objs=[]
        for i in range(10):
            obj=t.submit(task,i)
            objs.append(obj)
    
        t.shutdown(wait=True)
        for obj in objs:
            print(obj.result())
        print('',current_thread().getName())
    View Code

    这里有线程池的作业,我一并粘过来

    # 默写的内容是爬虫线程池的回调机制的代码
    # socket 套接字用线程池的方式去写,然后再用多线程的方式去写
    # 文本处理工具的三个任务先把一个套接字写出来然后看看怎么改成线程的概念
    '''
    以下则是开启多线程的方式:
    
    from socket import *
    from multiprocessing import Process
    from threading import Thread
    ser = socket(AF_INET, SOCK_DGRAM)
    ser.bind(('127.0.0.1', 3020))
    ser.listen(2)
    def connunicate(conn):
        while True:
            data = conn.recv()
            conn.send(data.upper())
    
    if __name__ == '__main__':
        while True:
            conn, addr = ser.accept()
    
        t = Thread(target=connunicate, args=conn)
        t.start()
    '''
    
    
    '''
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    from threading import current_thread
    import os
    def task(n):
        print('%s is running' % current_thread().getName())
        
    if __name__ == '__main__':
        t = ThreadProcessExecutor(12)
        # t = ProcessPoolExecutor(4)
        objs = []
        for i in range(20):
            obj = t.submit(task, i)
            objs.append(obj)
        
        t.shutdown(wait=True)    
        for obj in objs:
            print(obj.result())
        print('主', os.getpid(), current_thread().getName())
    '''
    
    
    
    
    # 线程池的版本要理解后背下来!*****
    # 开启线程池的方式:  这里是中级版本,已经搞定,功能都实现了,想要的结果都得到了!
    from socket import *
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    from threading import current_thread
    import os
    
    server = socket(AF_INET, SOCK_STREAM)
    server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
    server.bind(('127.0.0.1', 5038))
    server.listen(3)
    
    
    def connunicate(conn,num):
        while True:
            data = conn.recv(1024)
            conn.send(data.upper())
            print(os.getpid())
        conn.close()
    
    
    if __name__ == '__main__':
        t = ThreadPoolExecutor(12)
        objs = []
        while True:
            conn, addr = server.accept()
            tt = t.submit(connunicate, conn, os.getpid())
            objs.append(tt)
        # t.shutdown(wait=True)
        for ob in objs:
            print(ob.result())
    
        print('', os.getpid())
    
    
    
    
    
    
    # # 客户端版本如下:
    # from socket import *
    # import os
    # client=socket(AF_INET,SOCK_STREAM)
    # client.connect(('127.0.0.1', 5038))
    #
    # while True:
    #     msg=input('>>: ').strip()
    #     if not msg:continue
    #
    #     client.send(msg.encode('utf-8'))
    #     print(os.getpid())
    #     data=client.recv(1024)
    #     print(data.decode('utf-8'))
    View Code

    以上的有需要补充的,那个文本操作没有写出来,把老师博客里面的代码粘过来,

    from threading import Thread
    msg_l=[]
    format_l=[]
    def talk():
        while True:
            msg=input('>>: ').strip()
            if not msg:continue
            msg_l.append(msg)
    
    def format_msg():
        while True:
            if msg_l:
                res=msg_l.pop()
                format_l.append(res.upper())
    
    def save():
        while True:
            if format_l:
                with open('db.txt','a',encoding='utf-8') as f:
                    res=format_l.pop()
                    f.write('%s
    ' %res)
    
    if __name__ == '__main__':
        t1=Thread(target=talk)
        t2=Thread(target=format_msg)
        t3=Thread(target=save)
        t1.start()
        t2.start()
        t3.start()
    View Code

    其实也好理解,就是自己没有多想一想,过早的放弃了.

    异步概念补充(含回调函数):

    # #pip install requests
    # import requests
    # from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    # from threading import current_thread
    # import time
    # import os
    #
    # def get(url):
    #     print('%s GET %s' %(os.getpid(),url))
    #     response=requests.get(url)
    #     time.sleep(3)
    #     if response.status_code == 200:
    #         return {'url':url,'text':response.text}
    #
    # def parse(obj):
    #     res=obj.result()
    #     print('[%s] <%s> (%s)' % (os.getpid(), res['url'],len(res['text'])))
    #
    # if __name__ == '__main__':
    #     urls = [
    #         'https://www.python.org',
    #         'https://www.baidu.com',
    #         'https://www.jd.com',
    #         'https://www.tmall.com',
    #     ]
    #     # t=ThreadPoolExecutor(2)
    #     t=ProcessPoolExecutor(2)
    #     for url in urls:
    #         t.submit(get,url).add_done_callback(parse)
    #     t.shutdown(wait=True)
    #
    #     print('主',os.getpid())
    
        # '''
        # 异步调用:
        #     提交完任务(为该任务绑定一个回调函数),不用再原地等任务执行完毕拿到结果,可以直接提交下一个任务
        #     一个任务一旦执行完毕就会自动触发回调函数的运行
        #
        # 回调函数的参数是单一的:
        #     回调函数的参数就是它所绑定任务的返回值
        #
        # '''
    
    
    #pip install requests
    
    import requests
    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    from threading import current_thread
    import time
    import os
    
    def get(url):
        print('%s GET %s' %(current_thread().getName(),url))
        response=requests.get(url)
        time.sleep(3)
        if response.status_code == 200:
            return {'url':url,'text':response.text}
    
    def parse(obj):
        res=obj.result()
        print('[%s] <%s> (%s)' % (current_thread().getName(), res['url'],len(res['text'])))
    
    if __name__ == '__main__':
        urls = [
            'https://www.python.org',
            'https://www.baidu.com',
            'https://www.jd.com',
            'https://www.tmall.com',
        ]
        t=ThreadPoolExecutor(2)
        for url in urls:
            t.submit(get,url).add_done_callback(parse)
        t.shutdown(wait=True)
    
        print('',os.getpid())
    View Code

    这里的异步概念是,默写的内容,已经默出来过的.

  • 相关阅读:
    【新梦想Java开发公开课】
    接口自动化框架httprunner(三)--断言
    接口自动化框架httprunner(二)--变量空间(context)作用域
    接口自动化框架httprunner(一)--安装及简单使用
    python+locust性能测试(四)之分布式运行
    安装Resharper 10.0.X 出现 The CrossAppDomainPointer is NULL 错误解决方案之一
    给DNN装SkinTuner扩展时出现Could not load file or assembly 'System.Data.SQLite
    STA和MTA线程模式的区别
    jQuery插件,迅速的实现拖拽功能
    query插件(三):Colorbox–内容播放插件
  • 原文地址:https://www.cnblogs.com/2012-dream/p/7979495.html
Copyright © 2011-2022 走看看