zoukankan      html  css  js  c++  java
  • python自动化_day9_进程线程协程

    复习一下tcp的socket

     1 import  socket
     2 ###server
     3 sk = socket.socket()
     4 sk.bind(('127.0.0.1',8000))
     5 sk.listen()  #最大连接数
     6 
     7 conn,addr = sk.accept() #没执行一次 获取一个连接
     8 while True:
     9     conn.send(b'hellle')
    10     msg = conn.recv(1024)
    11     print(msg)
    12 conn.close()
    13 sk.close()
    14 ####client
    15 import socket
    16 sk = socket.socket()
    17 sk.connect(('127.0.0.1',8000))
    18 while True:
    19     msg = sk.recv(1024)
    20     print(msg)
    21     sk.send(b'helllo')
    22 sk.close()
    23 import socketserver
    24 #在socket基础上实现的并发
    25 class Myserver(socketserver.BaseRequestHandler):
    26     def handle(self):
    27         conn = self.request
    28         # 并发socket这conn不需要添加其他 最好是根据cilent端做回应或者做判断。
    29 myserver = socketserver.ThreadingTCPServer(('127.0.0.1',8000),Myserver)
    30 myserver.serve_forever()
    31 #socketserver所启动的服务端是不能有input操作
    32 #server端一般都是根据cilent端的要求去执行固定的代码

    并发编程:一个程序可以同时时刻做多件事情,可以解决程序中的IO操作影响程序效率的问题

    #计算机 - - 穿孔纸带
    #输入输出 -----大部分时间都不会占用cpu 且会降低程序的效率
    #时间消耗:发送请求,从网络上获取数据 占用大部分时间 怎么才可以充分利用这一段时间。

    #操作系统发展史:
    # 联机批处理系统 :作业的输入/输出由CPU来处理
    # 脱机批处理系统 :为克服与缓解:高速主机与慢速外设的矛盾,提高CPU的利用率,又引入了脱机批处理系统,即输入/输出脱离
    # 主机控制
    # 多道程序系统 :指允许多个程序同时进入内存并运行,同时把多个程序放入内存,并允许它们交替在CPU中运行,它们共享系统
    #中的各种硬、软件资源。当一道程序因I/O请求而暂停运行时,CPU便立即转去运行另一道程序
    #多道的前提就是能够记住a程序执行到哪个地方,操作系统负责调度作业
    # 多道批处理系统 :多道 成批
    # 分时系统 :把时间做成切片,每个程序运行一段时间
    #实时系统 :系统能够及时响应随机发生的外部事件,并在严格的时间范围内完成对该事件的处理
    # 实时系统可分成两类:
    #    (1)实时控制系统。当用于飞机飞行、导弹发射等的自动控制时,要求计算机能尽快处理测量系统测得的数据,及时
    # 地对飞机或导弹进行控制,或将有关信息通过显示终端提供给决策人员。当用于轧钢、石化等工业生产过程控制时,也要求计
    # 算机能及时处理由各类传感器送来的数据,然后控制相应的执行机构。
    #    (2)实时信息处理系统。当用于预定飞机票、查询有关航班、航线、票价等事宜时,或当用于银行系统、情报检索系统
    # 时,都要求计算机能对终端设备发来的服务请求及时予以正确的回答。此类对响应及时性的要求稍弱于第一类。
    #   实时操作系统的主要特点:
    #   (1)及时响应。每一个信息接收、分析处理和发送的过程必须在严格的时间限制内完成。
    #   (2)高可靠性。需采取冗余措施,双机系统前后台工作,也包括必要的保密措施等。
    # 分时——现在流行的PC,服务器都是采用这种运行模式,即把CPU的运行分成若干时间片分别处理不同、的运算请求 linux系统
    # 实时——一般用于单片机上、PLC等,比如电梯的上下控制中,对于按键等动作要求进行实时处理
    #通用操作系统:
    #  操作系统的三种基本类型:多道批处理系统、分时系统、实时系统。
    #   通用操作系统:具有多种类型操作特征的操作系统。可以同时兼有多道批处理、分时、实时处理的功能,或其中两种以上的功能。
    #   例如:实时处理+批处理=实时批处理系统。首先保证优先处理实时任务,插空进行批处理作业。常把实时任务称为前台作业,
    # 批作业称为后台作业。
    #   再如:分时处理+批处理=分时批处理系统。即:时间要求不强的作业放入“后台”(批处理)处理,需频繁交互的作业在“前
    # 台”(分时)处理,处理机优先运行“前台”作业。
    #   从上世纪60年代中期,国际上开始研制一些大型的通用操作系统。这些系统试图达到功能齐全、可适应各种应用范围和操作方
    # 式变化多端的环境的目标。但是,这些系统过于复杂和庞大,不仅付出了巨大的代价,且在解决其可靠性、可维护性和可理解性方面都
    # 遇到很大的困难。
    #   相比之下,UNIX操作系统却是一个例外。这是一个通用的多用户分时交互型的操作系统。它首先建立的是一个精干的核心,而
    # 其功能却足以与许多大型的操作系统相媲美,在核心层以外,可以支持庞大的软件系统。它很快得到应用和推广,并不断完善,对现代
    # 操作系统有着重大的影响。
    #   至此,操作系统的基本概念、功能、基本结构和组成都已形成并渐趋完善。

    #现代操作系统
    #基于多道批处理系统和分时系统
    #多个程序,作业在遇到IO操作的时候,操作系统会帮助你进行切换
    #让CPU的利用率得到最大的提高
    ############################进程################################
    #进程 :运行中的程序   操作系统中资源分配的最小单位
    #什么是程序
    #操作系统 只负责管理调度进程
    #每一个运行中的程序都需要有自己的内存,资源
    #都分配给进程 记录执行的状态 管理自己的内存资源

    #pythoon每一个运行中的程序都是一个进程
    #一个进程 就能做一件事
    #如果有多个进程 -----就可以完成多件事
    #启动多个进程就可以实现并发
    #如果用python来开启一个进程
     1 import time
     2 import os
     3 from multiprocessing import Process
     4 def func(num):
     5     print(num,os.getpid())
     6     time.sleep(100)
     7 if __name__ == '__main__':
     8     print(os.getpid())
     9     p = Process(target=func,args=(10,)) #创造一个进程
    10     p.start()  #开启一个进程
    11     print(os.getpid())
    #同步:先做什么后做什么
    #异步:一边做什么另一边做其他的
    #异步可以有效的提高程序的效率
    #进程与进程之间都是异步的
    #开启一个进程是有时间开销的
    #子进程 :
    #主进程 :运行的程序
    #父进程 :
    #关于print的顺序

    #什么是进程 : 运行中的程序 最小的资源分配单位
    #python中可以用代码启动一个进程 ---- 子进程
    #程序执行就会产生一个主进程
    #父子进程之间的数据不可以共享
    #父子进程之间的代码执行时异步的,各自执行各自的。
    #进程什么时候结束
    #主进程什么时候结束,主进程会等待子进程结束之后而结束
     1 n = 100
     2 def func():
     3     global n
     4     n = 0
     5     print('-----------')
     6     time.sleep(10)
     7 # func()
     8 if __name__ == '__main__':
     9     Process(target=func).start()
    10     time.sleep(1)
    11     print(n)
     1 #开启多个子进程  多进程
     2 def fun(n):
     3     time.sleep(1)
     4     print('-'*n)
     5 if __name__  == '__main__':
     6     l = []
     7     for i in range(10):
     8         p = Process(target=fun, args=(i,))
     9         p.start()
    10         l.append(p)
    11     for i in l:i.join()
    12     print('十条信息都发送完了')
    13     Process(target=fun,args=(1,)).start()
    14     Process(target=fun,args=(2,)).start()
    15     Process(target=fun,args=(3,)).start()
    16     Process(target=fun,args=(4,)).start()
    17     Process(target=fun,args=(5,)).start()
    18 def fun(n):
    19     time.sleep(1)
    20     print('-'*n)
    21 if __name__  == '__main__':
    22     # for i in range(10):
    23     p = Process(target=fun, args=(1,))
    24     p.start()
    25     print('子进程开始了。')
    26     p.join() #阻塞会等待子进程执行完毕之后再继续
    27     print('十条信息都发送完了')

    守护进程:守护进程也是一个子进程,当主进程的代码执行完毕之后自动结束的进程叫做守护进程

     1 def deamon_func():
     2     while True:
     3         print('我还活着')
     4         time.sleep(0.5)
     5 def wahaha():
     6     for i in range(10):
     7         time.sleep(1)
     8         print('---'*i)
     9 if __name__ == '__main__':
    10     p1 = Process(target=wahaha)
    11     p1.start()
    12     p = Process(target=deamon_func)
    13     p.daemon = True
    14     p.start()
    15     for i in range(3):
    16         print(i*'*')
    17         time.sleep(1)
    18     p1.join()
    #开启一个子进程 start
    #子进程和主进程是异步的
    #如果在主进程中药等待子进程结束之后在执行某段代码:join
    #如果有多个子进程 不能再start一个进程之后就立刻join,把所有的start放到一个列表里,等待所有的进程都start之后再逐一join
    #守护进程:当主进程的“代码”执行完毕之后自动结束的进程叫做守护进程
     1 ################################lock###################################
     2 # 多进程抢占输出资源
     3 import os
     4 import time
     5 import random
     6 from multiprocessing import Process,Lock
     7 def work(n,lock):
     8     lock.acquire()
     9     print('%s:%s is running' %(n,os.getpid()))
    10     time.sleep(random.random())
    11     print('%s:%s is done' %(n,os.getpid()))
    12     lock.release()
    13 if __name__ == '__main__':
    14     lock = Lock()
    15     for i in range(3):
    16         p = Process(target=work,args=(i,lock))
    17         p.start()
    18 #牺牲效率但是保证了数据的安全
     1 ################################3购票系统######################################
     2 #文件db的内容为:{"count":1}
     3 #注意一定要用双引号,不然json无法识别
     4 #并发运行,效率高,但竞争写同一文件,数据写入错乱
     5 from multiprocessing import Process,Lock
     6 import time,json,random
     7 def search():
     8     dic=json.load(open('db'))
     9     print('33[43m剩余票数%s33[0m' %dic['count'])
    10 
    11 def get(n):
    12     dic=json.load(open('db'))
    13     time.sleep(0.1) #模拟读数据的网络延迟
    14     if dic['count'] >0:
    15         dic['count']-=1
    16         time.sleep(0.2) #模拟写数据的网络延迟
    17         json.dump(dic,open('db','w'))
    18         print('33[43m购票成功33[0m')
    19 
    20 def task(n,lock):
    21     search()
    22     lock.acquire()
    23     get(n)
    24     lock.release()
    25 
    26 if __name__ == '__main__':
    27     lock = Lock()
    28     for i in range(10): #模拟并发100个客户端抢票
    29         p=Process(target=task,args=(i,lock))
    30         p.start()
     1 ####################################信号量########################################################
     2 #实际工作中很少用到
     3 # 信号量就是 锁 +  计数器
     4 from multiprocessing import Semaphore
     5 sem = Semaphore(5)
     6 sem.acquire()
     7 print(1)
     8 sem.acquire()
     9 print(2)
    10 sem.acquire()
    11 print(3)
    12 sem.acquire()
    13 print(4)
    14 sem.acquire()
    15 print(5)
    16 sem.acquire()
    17 print(6)
    18 from multiprocessing import Process,Semaphore
    19 import time,random
    20 
    21 def go_ktv(sem,user):
    22     sem.acquire()
    23     print('%s 占到一间ktv小屋' %user)
    24     time.sleep(random.randint(0,3)) #模拟每个人在ktv中待的时间不同
    25     sem.release()
    26 
    27 if __name__ == '__main__':
    28     sem=Semaphore(4)
    29     p_l=[]
    30     for i in range(13):
    31         p=Process(target=go_ktv,args=(sem,'user%s' %i,))
    32         p.start()
    33         p_l.append(p)
    34 
    35     for i in p_l:
    36         i.join()
    37     print('============》')
     1 ####################################################事件##############################
     2 #事件内部内置了一个标识符
     3 # wait 方法 如果这个标志是True 那么wait == pass
     4 # wait 方法 如果这个标志是FALSE 那么wait会阻塞 一直阻塞到标志从FALSE变成true
     5 #一个事件在创建之初,内部的标志默认是Fales
     6 # 需要执行set方法  false ---》 true
     7 # 需要执行clear方法  true ---》 false
     8 #############红绿灯模型####################
     9 from multiprocessing import Process, Event
    10 import time, random
    11 def car(e, n):
    12     while True:
    13         if not e.is_set():  # 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
    14             print('33[31m红灯亮33[0m,car%s等着' % n)
    15             e.wait()    # 阻塞,等待is_set()的值变成True,模拟信号灯为绿色
    16             print('33[32m车%s 看见绿灯亮了33[0m' % n)
    17             time.sleep(random.randint(3, 6))
    18             if not e.is_set():   #如果is_set()的值是Flase,也就是红灯,仍然回到while语句开始
    19                 continue
    20             print('车开远了,car', n)
    21             break
    22 def police_car(e, n):
    23     while True:
    24         if not e.is_set():# 进程刚开启,is_set()的值是Flase,模拟信号灯为红色
    25             print('33[31m红灯亮33[0m,car%s等着' % n)
    26             e.wait(0.1) # 阻塞,等待设置等待时间,等待0.1s之后没有等到绿灯就闯红灯走了
    27             if not e.is_set():
    28                 print('33[33m红灯,警车先走33[0m,car %s' % n)
    29             else:
    30                 print('33[33;46m绿灯,警车走33[0m,car %s' % n)
    31         break
    32 def traffic_lights(e, inverval):
    33     while True:
    34         time.sleep(inverval)
    35         if e.is_set():
    36             print('######', e.is_set())
    37             e.clear()  # ---->将is_set()的值设置为False
    38         else:
    39             e.set()    # ---->将is_set()的值设置为True
    40             print('***********',e.is_set())
    41 if __name__ == '__main__':
    42     e = Event()
    43     for i in range(10):
    44         p=Process(target=car,args=(e,i,))  # 创建是个进程控制10辆车
    45         p.start()
    46 
    47     for i in range(5):
    48         p = Process(target=police_car, args=(e, i,))  # 创建5个进程控制5辆警车
    49         p.start()
    50     t = Process(target=traffic_lights, args=(e, 10))  # 创建一个进程控制红绿灯
    51     t.start()
    52 
    53     print('============》')
     1 ####################################队列##########################
     2 from multiprocessing import Queue,Process
     3 def func(n,q):
     4     q.put(n*n)
     5 if __name__ == '__main__':
     6     q = Queue()
     7     p = Process(target=func,args=(10,q))
     8     p.start()
     9     print(q.get())
    10 #管道 + 锁 = 队列
    11 #管道也是一个可以实现进城之间通信的模型
    12 #管道没有锁,数据不安全
     1 ################################数据共享#####################################
     2 from multiprocessing import Manager,Process,Lock
     3 def work(d,lock):
     4     with lock: #不加锁而操作共享的数据,肯定会出现数据错乱
     5         d['count']-=1
     6 
     7 if __name__ == '__main__':
     8     lock=Lock()
     9     with Manager() as m:
    10         dic=m.dict({'count':100})
    11         p_l=[]
    12         for i in range(100):
    13             p=Process(target=work,args=(dic,lock))
    14             p_l.append(p)
    15             p.start()
    16         for p in p_l:
    17             p.join()
    18         print(dic)
    ###################################线程########################################
    #什么是进程  是计算机 资源分配的最小单位
    # 什么是线程
    #线程和进程的关系
        #每一个进程中都至少有一个线程,真正执行代码的
    #python中线程的特点
    #其他语言中线程的特点
    #线程是感受不到的,只有在运行多线程服务的时候才能感受到线程的存在
    #线程和进程的区别:同一个进程的每个线程之间数据共享,线程之间也是异步的。
                        #线程是轻量级的,创建一个线程的时间开销要远远的小于进程
                        #线程是cpu调度的最小单位
    import os
    import time
    from threading import Thread
    import time
    def fun(n):
        time.sleep(1)
        print(os.getpid())
        print('---')
    Thread(target=fun,args=(1,)).start()
    print(123*'*')
    print(os.getpid())
    #进程里至少有一个主线程负责执行代码
    #在主线程中可以再开启一个新的线程
    #那么在同一个进程中就有两个线程同时在工作了
    #线程才是cpu调度的最小单位
    #多个线程之间的数据是共享的
    n = 100
    def fun(i):
        global n
        time.sleep(1)
        n -= 1
        print(os.getpid(),'%sthred'%i)
    t_l = []
    for i in range(100):
        t = Thread(target=fun,args=(i,))
        t.start()
        t_l.append(t)
    for t in t_l:t.join()
    print(n)
    
    #GIL 锁 全局解释器锁
    #使用多线程处理高计算型场景  python并不占有事
    #在同一个进程中 同一时刻 只能有一个线程访问cpu
    #cpu主要是用来计算的
    #如果程序是高IO类型的涉及到比较多的网络请求 数据库请求 文件请求 使用多线程
    #celery
    #在同一个进程中 同一个时刻 只能有一个线程被CPU执行 导致高计算型代码 不适合用python多线程来解决
    #用多进程或者分布式来解决 高计算型代码
     1 ##############################守护线程########################################
     2 from threading import Thread
     3 import time
     4 def foo():
     5     print(123)
     6     time.sleep(1)
     7     print("end123")
     8 
     9 def bar():
    10     print(456)
    11     time.sleep(3)
    12     print("end456")
    13 
    14 t1=Thread(target=foo)
    15 t2=Thread(target=bar)
    16 
    17 t1.daemon=True
    18 t1.start()
    19 t2.start()
    20 print("main-------")
    21 #主线程结束之后守护线程同时结束
    22 #守护线程会等待主线程完全结束之后才结束
     1 #################################锁############################
     2 from threading import Thread,Lock
     3 import os,time
     4 def work():
     5     global n
     6     lock.acquire()
     7     temp = n
     8     time.sleep(0.1)
     9     n = temp-1
    10     lock.release()
    11     # n -= 1
    12 if __name__ == '__main__':
    13     lock=Lock()
    14     n=100
    15     l=[]
    16     for i in range(100):
    17         p=Thread(target=work)
    18         l.append(p)
    19         p.start()
    20     for p in l:
    21         p.join()
    22 #
    23 #     print(n) #结果肯定为0,由原来的并发执行变成串行,牺牲了执行效率保证了数据安全
    24 #当你的程序当中出现了赋值在计算的操作 取值计算再赋值的操作就会出现数据不安全 需要 -----锁
    25 #######死锁_____递归锁#############
    26 from threading import Lock as Lock
    27 import time
    28 mutexA=Lock()
    29 mutexA.acquire()
    30 mutexA.acquire()
    31 print(123)
    32 mutexA.release()
    33 mutexA.release()
    34 #普通的锁 在同一个线程中 只能acquire一次
    35 #所以当acquire两次的时候就容易死出现死锁 现象
    36 #出现了死锁现象可以使用递归锁解决
    37 #但是本质上死锁的出现是因为逻辑错误
    38 #因此我们更应该把注意力集中在解决逻辑错误
    39 #而不要出现错误的时候直接使用递归锁规避
     1 ###################################进程池 线程池#################################
     2 import time
     3 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
     4 def func(num):
     5     time.sleep(1)
     6     print(num)
     7 if __name__ == '__main__':
     8     t = ThreadPoolExecutor(20)
     9     for i in range(100):
    10         t.submit(func,i)
    11     t.shutdown()  #相当于 join
    12 ###################################线程池中map##################
    13 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    14 
    15 import os,time,random
    16 def task(n):
    17     print('%s is runing' %os.getpid())
    18     time.sleep(random.randint(1,3))
    19     return n**2
    20 if __name__ == '__main__':
    21     executor=ThreadPoolExecutor(max_workers=3)
    22     # for i in range(11):
    23     #     future=executor.submit(task,i)
    24     executor.map(task,range(1,12)) #map取代了for+submit
    25 #################################回调函数################################
    26 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    27 from multiprocessing import Pool
    28 import requests
    29 import json
    30 import os
    31 
    32 def get_page(url):
    33     print('<进程%s> get %s' %(os.getpid(),url))
    34     respone=requests.get(url)
    35     if respone.status_code == 200:
    36         return {'url':url,'text':respone.text}
    37 
    38 def parse_page(res):
    39     res=res.result()
    40     print('<进程%s> parse %s' %(os.getpid(),res['url']))
    41     parse_res='url:<%s> size:[%s]
    ' %(res['url'],len(res['text']))
    42     with open('db.txt','a') as f:
    43         f.write(parse_res)
    44 
    45 
    46 if __name__ == '__main__':
    47     urls=[
    48         'https://www.baidu.com',
    49         'https://www.python.org',
    50         'https://www.openstack.org',
    51         'https://help.github.com/',
    52         'http://www.sina.com.cn/'
    53     ]
    54 
    55     # p=Pool(3)
    56     # for url in urls:
    57     #     p.apply_async(get_page,args=(url,),callback=pasrse_page)
    58     # p.close()
    59     # p.join()
    60 
    61     p=ProcessPoolExecutor(3)
    62     for url in urls:
    63         p.submit(get_page,url).add_done_callback(parse_page) #parse_page拿到的是一个future对象obj,需要用obj.result()拿到结果
    64 #######################add_done_callback##########################################
    65 import time
    66 import random
    67 from concurrent.futures import ThreadPoolExecutor
    68 urls=[
    69         'https://www.baidu.com',
    70         'https://www.python.org',
    71         'https://www.openstack.org',
    72         'https://help.github.com/',
    73         'http://www.sina.com.cn/'
    74     ]
    75 def analies(content):
    76     print('分析网页')
    77     print(content.result())
    78 def get_url(url):
    79     time.sleep(random.uniform(1,3))
    80     return url*10
    81 t = ThreadPoolExecutor(3)
    82 for url in urls:
    83     t.submit(get_url,url).add_done_callback(analies)
    84 ###############################gevent###########################################
    85 #什么是协程
    86 #进程是计算机中资源分配的最小单位
    87 #线程是CPU调度的最小单位
    88 #协程是把一个线程拆分成几个
    89 #进程 线程 都是操作系统在调度
    90 #协程是程序调度  减轻了操作系统的负担,增强了用户对程序的可控性
  • 相关阅读:
    Oracle 分析函数(Analytic Functions) 说明
    Build Your Own Oracle RAC 10g Release 2 Cluster on Linux and FireWire
    Build Your Own Oracle RAC 10g Release 2 Cluster on Linux and FireWire
    ORACLE SEQUENCE 介绍
    RAC Ocfs2文件系统常见问题解决方法
    linux 下修改日期和时间
    寒假刷题之7——波纹
    iOS 游戏 Oh my fish! 切割痕迹实现
    ACM常识
    寒假刷题之6——迷宫
  • 原文地址:https://www.cnblogs.com/tewu/p/9243980.html
Copyright © 2011-2022 走看看