zoukankan      html  css  js  c++  java
  • Python 网络编程与并发编程

    Python 网络编程与并发编程

    socket的基本使用

    TCP(Transmission Control Protocol)可靠的、面向连接的协议(eg:打电话)、传输效率低全双工通信(发送缓存&接收缓存)、面向字节流。使用TCP的应用:Web浏览器;电子邮件、文件传输程序。
    
    UDP(User Datagram Protocol)不可靠的、无连接的服务,传输效率高(发送前时延小),一对一、一对多、多对一、多对多、面向报文,尽最大努力服务,无拥塞控制。使用UDP的应用:域名系统 (DNS);视频流;IP语音(VoIP)。
    
    客户端和服务端在建立连接时: 三次握手
    客户端和服务端在断开连接时: 四次挥手
    SYN 创建连接
    ACK 确认响应
    FIN 断开连接
    
    tcp 协议: 打电话 , 必须先建立连接, 同一时刻只能和一个人聊天, 速度慢,但传输稳定
    udp 协议: 发短信 , 不需要确认连接, 只要知道ip和端口 , 就可以进行通讯,速度快,可以同时聊天,传输不稳定
    
    tcp:发送文件之类的(上传,下载文件,FTP,邮件SMTP)
    优点:传输完整稳定,不限制数据大小
    缺点: 速度慢 (一发一收都需要对方确认)
    
    udp:即时通讯类,比较追求速度(再线视频网站,qq,微信)
    优点:传输速度快,可以多人同时聊天
    缺点:不稳定,有时丢失数据
    

    TCP的基本语法

    server

    import socket
    
    # 创建一个socket对象
    sk = socket.socket()
    # 绑定ip和端口 (注册网络)
    sk.bind(  ("127.0.0.1",9000)  ) # 127.0.0.1 默认代表本机ip
    # 开启监听  (等待别人链接我)
    sk.listen()
    
    
    
    # 程序加阻塞,直到tcp协议的三次握手建立完毕,执行下面代码
    conn,addr = sk.accept()
    print(conn,addr)
    # 最大接受1024个字节 等待数据时,程序加了阻塞,收到数据之后,程序在向下执行
    msg = conn.recv(1024)
    print(msg.decode("utf-8"))
    
    # send 负责发送,只能发送二进制的字节流
    conn.send("早呀早".encode("utf-8"))
    # 关闭此次连接 [走四次挥手]
    conn.close()
    
    # 关闭socket对象 [退还占用的端口]
    sk.close()
    
    """
    < socket.socket fd=4, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000), raddr=('127.0.0.1', 34794)> 
    
    ('127.0.0.1', 34794)
    
    """
    

    client

    import socket
    
    # 差生一个socket对象
    sk = socket.socket()
    # 建立连接
    sk.connect( ("127.0.0.1",9000) )
    
    # 发送消息(发送的数据是二进制字节流)
    sk.send("早".encode("utf-8"))
    # 等待接收消息 (发送完之后,程序添加阻塞,最大字节数1024,等待接收)
    res = sk.recv(1024)
    print(res.decode("utf-8"))
    
    
    # 关闭连接
    sk.close()
    

    tcp循环发送消息

    server

    import socket
    
    # 在网络中注册该主机
    sk = socket.socket()
    sk.bind( ("127.0.0.1",9001) )
    sk.listen()
    
    
    # 准备接收数据
    while True:
    	# 重新建立新的三次握手
    	conn,addr = sk.accept() 
    	# 内循环不停的收发消息,按q退出循环
    	while True:		
    		msg = conn.recv(1024)
    		print(msg.decode("utf-8"))
    		message = input(">>>:")
    		conn.send(message.encode("utf-8"))
    		# 如果你输入一个q ,退出当前的连接
    		if message == "q":
    			break
    	# 执行四次挥手
    	conn.close()
    sk.close()
    

    client

    import socket
    sk = socket.socket()
    sk.connect( ("127.0.0.1",9001) )
    
    while True:
    	# 输入字符串
    	message = input(">>>:")
    	# 发送消息
    	sk.send(message.encode("utf-8"))
    	# 接收服务器的消息
    	res = sk.recv(1024)
    	# 如果接收到的是字节流q,断开连接,有请下一位观众
    	if res == b'q':
    		break
    	print(res.decode("utf-8"))
    sk.close()
    

    UDP的基本语法

    server

    # upd 协议:
    import socket
    # 创建socket对象
    sk = socket.socket(type=socket.SOCK_DGRAM)
    # 绑定地址
    sk.bind( ("127.0.0.1",9000) )
    
    
    # 对于udp来讲 第一个信息,必须先接受
    # 接受数据,返回客户端发出的消息,还有ip和端口组成的元组
    msg,cli_addr = sk.recvfrom(1024)
    print(msg.decode("utf-8"))
    
    message = "你来啦,脑弟"
    # 2个参数 第一个是消息, 第二个是地址(ip,端口)
    sk.sendto(message.encode("utf-8"),cli_addr)
    
    # 关闭udp链接
    sk.close()
    

    client

    # upd 协议:
    import socket
    sk = socket.socket(type=socket.SOCK_DGRAM)
    message = "我来了大哥"
    
    # 发送数据
    sk.sendto(message.encode("utf-8"),("127.0.0.1",9000))
    # 接受数据
    msg,ser_addr = sk.recvfrom(1024)
    print(msg.decode("utf-8"))
    
    # 关闭udp链接
    sk.close()
    

    udp循环发送消息

    server

    # 循环发送消息udp
    import socket
    sk = socket.socket(type=socket.SOCK_DGRAM)
    # 绑定地址
    sk.bind( ("127.0.0.1",9000) )
    
    while True:
    	msg,cli_addr = sk.recvfrom(1024)
    	print(cli_addr)
    	print(msg.decode("utf-8"))
    
    	message = input(">>>")
    	sk.sendto(message.encode("utf-8"),cli_addr)
    
    # 关闭udp链接
    sk.close()
    

    client

    import socket
    sk = socket.socket(type=socket.SOCK_DGRAM)
    
    while True:
    	# 请输入你要写入的数据
    	message = input(">>>")
    	sk.sendto(message.encode(),("127.0.0.1",9000))
    
    	# 接受数据
    	msg,ser_addr = sk.recvfrom(1024)
    	print(msg.decode())
    
    # 关闭udp连接
    sk.close()
    

    黏包

    ### 黏包 
    """
    # tcp协议在发送数据时,会出现黏包现象.	
        (1)数据粘包是因为在客户端/服务器端都会有一个数据缓冲区,
        缓冲区用来临时保存数据,为了保证能够完整的接收到数据,因此缓冲区都会设置的比较大。
        (2)在收发数据频繁时,由于tcp传输消息的无边界,不清楚应该截取多少长度
        导致客户端/服务器端,都有可能把多条数据当成是一条数据进行截取,造成黏包
    """
    
    
    ### 黏包出现的两种情况 
    """
    #黏包现象一:
    	在发送端,由于两个数据短,发送的时间隔较短,所以在发送端形成黏包
    #黏包现象二:
    	在接收端,由于两个数据几乎同时被发送到对方的缓存中,所有在接收端形成了黏包
    #总结:
        发送包时间间隔短 或者 接受不及时,就会黏包
        核心是因为tcp对数据无边界截取,不会按照发送的顺序判断
    """
    
    
    ### 黏包对比:tcp和udp
    """
    #tcp协议:
    优点:接收时数据之间无边界,有可能粘合几条数据成一条数据,造成黏包 
    缺点:不限制数据包的大小,稳定传输不丢包
    
    #udp协议:
    优点:接收时候数据之间有边界,传输速度快,不黏包
    缺点:限制数据包的大小(受带宽路由器等因素影响),传输不稳定,可能丢包
    
    #tcp和udp对于数据包来说都可以进行拆包和解包,理论上来讲,无论多大都能分次发送
    但是tcp一旦发送失败,对方无响应(对方无回执),tcp可以选择再发,直到对应响应完毕为止
    而udp一旦发送失败,是不会询问对方是否有响应的,如果数据量过大,易丢包
    """
    
    
    ### 解决黏包问题
    """
    #解决黏包场景:
    	应用场景在实时通讯时,需要阅读此次发的消息是什么
    #不需要解决黏包场景:
    	下载或者上传文件的时候,最后要把包都结合在一起,黏包无所谓.
    """
    

    黏包现象

    server

    # 服务端
    import socket
    import time
    # 把当前主机注册到网络
    sk = socket.socket()
    sk.bind(("127.0.0.1",9001))
    sk.listen()
    
    conn,addr = sk.accept()
    conn.send("hello,".encode("utf-8"))
    # time.sleep(1)
    conn.send("world".encode("utf-8"))
    conn.recv(1024)
    conn.close()
    
    sk.close()
    

    client

    # 客户端
    import time
    import socket
    sk = socket.socket()
    sk.connect(("127.0.0.1",9001))
    time.sleep(0.1)
    print(sk.recv(10))
    print(sk.recv(10))
    sk.send(b'hahaha')
    sk.close()
    

    黏包的解放方法

    方法一

    server

    # 服务端
    import socket
    import time
    # 把当前主机注册到网络
    sk = socket.socket()
    
    #在bind方法之前加上这句话,可以让一个端口重复使用
    sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    
    sk.bind(("127.0.0.1",9001))
    sk.listen()
    
    conn,addr = sk.accept()
    
    
    conn.send("6".encode("utf-8"))
    conn.send("hello,".encode("utf-8"))
    # time.sleep(1)
    conn.send("world".encode("utf-8"))
    conn.recv(1024)
    conn.close()
    
    sk.close()
    

    client

    # 客户端
    import time
    import socket
    sk = socket.socket()
    sk.connect(("127.0.0.1",9001))
    time.sleep(0.1)
    
    # 是为了接受字符6  #sk.recv(1).decode("utf-8") 字符串
    n = int(sk.recv(1).decode("utf-8"))
    print(n,type(n))
    print(sk.recv(n))
    print(sk.recv(10))
    sk.send(b'hahaha')
    
    sk.close()
    

    方法二

    server

    # 服务端
    import socket
    import time
    # 把当前主机注册到网络
    sk = socket.socket()
    
    #在bind方法之前加上这句话,可以让一个端口重复使用
    sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    
    sk.bind(("127.0.0.1",9001))
    sk.listen()
    
    conn,addr = sk.accept()
    
    
    conn.send("00000120".encode("utf-8"))
    conn.send(("hello,"*20).encode("utf-8"))
    # time.sleep(1)
    conn.send("world".encode("utf-8"))
    conn.recv(1024)
    conn.close()
    
    sk.close()
    

    client

    # 客户端
    import time
    import socket
    sk = socket.socket()
    sk.connect(("127.0.0.1",9001))
    time.sleep(0.1)
    
    n = int(sk.recv(8).decode("utf-8"))
    
    print(n,type(n))
    print(sk.recv(n))
    print(sk.recv(10))
    sk.send(b'hahaha')
    
    sk.close()
    

    方法三

    import struct
    # pack 把任意长度的数字转化成具有固定长度的4个字节值
    # unpack 将4个字节的值恢复成原有数据,返回一个元组
    # i => int  我要转化的这个数据类型是整型
    res = struct.pack("i",120000000)
    print(res,len(res))
    res = struct.pack("i",1300000000)
    print(res,len(res))
    
    
    res = struct.unpack("i",res)
    print(res)
    

    server

    # 服务端
    import socket
    import time
    import struct
    # 把当前主机注册到网络
    sk = socket.socket()
    
    #在bind方法之前加上这句话,可以让一个端口重复使用
    sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    
    sk.bind(("127.0.0.1",9001))
    sk.listen()
    
    conn,addr = sk.accept()
    
    inp = input(">>>msg:")
    msg = inp.encode("utf-8")
    """
    "abcd"
    "我是好人"
    """
    res = struct.pack("i",len(msg))
    # 发送信息长度 (字节流)
    conn.send(res)
    # 发送用户写的内容
    conn.send(msg)
    # time.sleep(1)
    conn.send("world".encode("utf-8"))
    conn.recv(1024)
    conn.close()
    
    sk.close()
    

    client

    # 客户端
    import time
    import socket
    import struct
    sk = socket.socket()
    sk.connect(("127.0.0.1",9001))
    time.sleep(0.1)
    
    # 是为了接受字符6  #sk.recv(1).decode("utf-8") 字符串
    # n = int(sk.recv(4).decode("utf-8"))
    n = sk.recv(4)
    # 把pack后的数据用unpack进行恢复
    n = struct.unpack("i",n)[0]
    print(n,type(n))
    print(sk.recv(n).decode("utf-8"))
    print(sk.recv(10))
    sk.send(b'hahaha')
    
    sk.close()
    

    python hashlib模块

    #hashlib 这个模块是一堆加密算法的集合体,哈希算法的加密方式不止一种
    https://www.cmd5.com/ md5解密
    # 应用场景:在需要效验功能时使用
        用户密码的 => 加密,解密
        相关效验的 => 加密,解密
    
    #哈希算法也叫摘要算法,相同的数据始终得到相同的输出,不同的数据得到不同的输出。
    #(1)哈希将不可变的任意长度的数据,变成具有固定长度的唯一值	
     (2)字典的键值对映射关系是通过哈希计算的,哈希存储的数据是散列(无序)
    
    import hashlib
    # md5
    
    # 创建一个md5算法的对象
    hs = hashlib.md5()
    # print(hs)
    # update 参数是二进制字节流
    hs.update("123".encode())
    # hexdigest 返回32位十六进制的字符串(固定长度)
    res = hs.hexdigest()
    print(res) #a6e612825ab0ec9b523f24182ea8f0d2
    
    # 加盐 key (一个神秘的key 钥匙) :增加密码的复杂度
    hs = hashlib.md5("opesnhelloworld".encode())
    hs.update("123".encode())
    res = hs.hexdigest()
    print(res)
    
    # 动态加盐 : 使用的加密关键字是用户名的切片
    username = input("请输入用户名:")
    hs = hashlib.md5(username[::2].encode())
    hs.update("123456".encode())
    res = hs.hexdigest()
    print(res)
    
    """
    # md5 加密效率快,通用 ,安全性差  # 32
    # sha 加密效率低,算法精密,安全性高
    """
    print("<=====sha系列====>")
    import hashlib
    hs = hashlib.sha1()
    hs.update("opesn123463922".encode("utf-8"))
    print(hs.hexdigest())
    print(len("9a6747fc6259aa374ab4e1bb03074b6ec672cf99")) # 40
    
    # sha1 加盐
    hs = hashlib.sha1("UIsdf234".encode())
    hs.update("123456".encode())
    print(hs.hexdigest())
    
    
    hs = hashlib.sha512()
    hs.update("1234567".encode())
    print(hs.hexdigest())
    #128 3c9909afec25354d551dae21590bb26e38d53f2173b8d3dc3eee4c047e7ab1c1eb8b85103e3be7ba613b31bb5c9c36214dc9f14a42fd7a2fdb84856bca5c44c2
    
    # sha512 加盐
    hs = hashlib.sha512("FZbaoyou".encode())
    hs.update("1234".encode())
    print(hs.hexdigest())
    
    """
    #hmac 和 hash算法非常类似,hmac输出的长度和
    原始哈希算法的长度一致的,hmac不容易破解
    """
    print("<======hmac=======>")
    import hmac
    import os
    
    key = b"opesn"
    # msg = b"123"
    
    # 随机返回长度为32位的二进制字节流
    msg = os.urandom(32)
    print(msg)
    # key msg 必须是二进制字节流
    hm = hmac.new(key,msg)
    print(hm.hexdigest())
    
    # 校验两个文件的内容是否一样?
    

    python 进程

    ### 进程的概念:(Process)
    
    """
    进程就是正在运行的程序,它是操作系统中,资源分配的最小单位.
    资源分配:(需要cpu来处理,需要内存来临时存储运行数据,都要浪费cpu和内存资源)
    进程号是进程的唯一标识
    
    同一个程序执行两次之后是两个进程
    进程和进程之间的关系: 数据彼此隔离,通过socket通信
    """
    
    ### 进程调度算法
    
    """
    # 先来先服务fcfs(first come first server):先来的先执行
    # 短作业优先算法:分配的cpu多,先把短的算完
    # 时间片轮转算法:每一个任务就执行一个时间片的时间.然后就执行其他的.
    # 多级反馈队列算法
    
    越是时间长的,cpu分配的资源越少,优先级靠后
    越是时间短的,cpu分配的资源越多
    """
    
    import os,time
    # linux # ps -aux | grep 49155 查看具体的某一个进程
    # 查看所有进程 ps -aux
    # 获取子进程或者父进程的id号
    # (1)获取主进程的id getpid process_id
    res = os.getpid()
    print(res)
    
    # (2)getppid 获取父进程id  parent_process_id
    res = os.getppid()
    print(res)
    
    # 启动一个进程
    import os 
    os.fork()
    print(os.getpid())
    
    
    from multiprocessing import Process
    # (1) 进程基本语法
    def func():
    	print("222子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
    
    if __name__ == "__main__":
    	print("111子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
    	# 创建子进程(对象)
    	# 起一个子进程,p是一个进程对象,目的单独执行func这个函数
    	p = Process(target=func)
    	# 调用该子进程
    	p.start()
    
    # (2) 子进程函数,可以传入参数
    # 不带参数
    
    def func():
    	for i in range(1,6):
    		time.sleep(0.5)
    		print("111子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
    
    if __name__ == "__main__":
    	print("111子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
    	n = 5
    	p = Process(target=func)
    	p.start()
    	
    	for i in range(1,n+1):
    		time.sleep(0.3)
    		print("*"*i)
    
    
    # 带参数 谁快谁慢取决于cpu的调度策略
    # 子进程执行func
    def func(n):
    	for i in range(1,n+1):
    		time.sleep(0.5)
    		print("111子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
    
    if __name__ == "__main__":
    	print("111子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
    	n = 3
    	# 如果想要传入参数 用args 它必须是一个元组
    	p = Process(target=func,args=(n,))
    	p.start()
    	
    	# 下面代码是主进程执行的
    	for i in range(1,n+1):
    		time.sleep(0.3)
    		print("*"*i)
    
    
    # (3) 进程之间的数据彼此是隔离的
    """父进程和子进程他们的启动是异步的,操作系统负责启动各种进程的"""
    count = 100
    def func():
    	global count
    	count -= 1
    	print("子进程",count)
    
    if __name__ == "__main__":
    	p = Process(target=func)
    	p.start()
    	
    	time.sleep(1)
    	print("主进程",count)
    
    
    
    # (4) 多个进程并发执行
    '''存在并发就存在顺序的不确定性,因为有执行快慢问题,跟操作系统cpu调度策略有关'''
    def func(args):
    	print("子进程%d" % (args),os.getpid(),os.getppid())
    
    if __name__ == "__main__":
    	for i in range(10):
    		Process(target=func,args=(i,)).start()
    
            
    # (5) 子进程和父进程之间的关系
    """父进程执行完毕之后默认等待子进程结束。这是为了避免出现僵尸程序,方便进程上的管理"""
    # 一般来说,父进程结束,子进程应该结束。
    def func(args):
    	print("子进程%s:" % (args),os.getpid(),os.getppid())
    	time.sleep(1)
    	print("子进程end")
    	
    if __name__ == "__main__":
    	for i in range(10):
    		Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    	print("父进程*************")
    

    join

    import time
    import os
    from multiprocessing import Process
    
    # ### join 等待所有子进程执行完毕之后,主进程在向下执行
    # (1) 1个子进程通过join加阻塞,进行同步控制
    def func(index):
    	print("第一封邮件已经发送")
    	
    if __name__ == "__main__":
    	p = Process(target=func,args=(1,))
    	p.start()
    	p.join()
    	print("10封邮件已经发送")
    
    
    # (2) 多个子进程通过join加阻塞,进行同步控制
    def func(index):
    	time.sleep(0.3)
    	print("第%s封邮件已经发送" % (index))
    
    if __name__ == "__main__":
    	lst = []
    	for i in range(10):
    		p = Process(target=func,args=(i,))
    		p.start()
    		lst.append(p)
    	# 列表当中的每一个进程对象都执行p.join 直到所有进程执行完毕,最后执行主进程内容
    	for p in lst:
    		p.join()
    	print("第10封邮件发送完毕")
    
    
    # ###使用第二种方式创建进程
    # (1) 基本使用的语法 
    class MyProcess(Process):
    	# 必须使用run方法名
    	def run(self):
    		print("子进程: 参数:",os.getpid(),os.getppid())
    
    if __name__ == "__main__":
    	p = MyProcess()
    	p.start()
    	print("主进程:",os.getpid())
    
    		
    # (2) 给子进程传递参数
    class MyProcess(Process):
    
    	def __init__(self,arg):
    		# 调用一下父类的构造方法
    		super().__init__()
    		self.arg = arg
    
    	# 必须使用run方法名
    	def run(self):
    		print("子进程: 参数:",os.getpid(),os.getppid(),self.arg)
    
    if __name__ == "__main__":
    	# p = MyProcess("123")
    	# p.start()
    	# print("主进程:",os.getpid())
    	lst = []
    	for i in range(10):
    		p = MyProcess("参数%s" % i) # 子进程并发,是异步程序
    		p.start()
    		lst.append(p)
    		# p.join()
    	
    	for i in lst:
    		i.join() # join是阻塞为了让主进程同步
    	
    	print("主进程",os.getpid())
    

    守护进程

    #可以给子进程贴上守护进程的名字,该进程会随着主进程代码执行完毕而结束(为主进程守护)
    (1)守护进程会在主进程代码执行结束后就终止
    (2)守护进程内无法再开启子进程,否则抛出异常(了解)
    
    import time
    from multiprocessing import Process
    # (1) 守护进程随着主进程代码执行完毕而结束
    """
    正常情况下,主进程默认等待子进程全部结束而结束
    守护进程必须写在start之前进行设置
    """
    def func():
    	print("子进程start")
    	time.sleep(1)
    	print("子进程end")
    	
    if __name__ == "__main__":
    	p = Process(target=func)
    	p.daemon = True
    	p.start()
    	time.sleep(1.5)
    	print("主进程")
    
        
    # (2) 多个子进程的情况下
    '''
    当多个子进程并发执行时,默认主进程等待子进程
    守护进程 守护的是 主进程 , 主要主进程里面的代码执行完毕
    对应是守护进程的那个子进程立即终止,其他非守护进程继续
    守护进程是主进程的代码执行到最后一行了,即为结束。
    '''
    def func1():
    	count = 1
    	while True:
    		print("*" * count)
    		time.sleep(0.5)
    		count += 1
    
    def func2():
    	print("func2 start")
    	time.sleep(5)
    	print("func2 end")
    
    
    if __name__ == "__main__":
    	p1 = Process(target=func1)
    	p1.daemon = True
    	p1.start()
    	Process(target=func2).start()
    	# time.sleep(3)	
    	print("主进程")
        
    
    # (3) 守护进程的实际用途:报活
    '''
    如果最大的监控系统挂掉,就让其他服务器停掉
    就好比厂子到了,员工回家放假
    '''
    # 防止信息泄露,最大的监控系统挂掉,子进程停止发送相关数据.
    def alive():
    	while True:
    		print("1号主机发送日志消息, i am ok")
    		time.sleep(0.6)
    		
    def func():
    	# while True:
    	print("统计各个主机的系统日志,维护数据库管理")
    	time.sleep(1)
    
    if __name__ == "__main__":
    	p = Process(target = alive)
    	p.daemon = True
    	p.start()
    	p = Process(target=func)
    	p.start()
    	# 加上join之后,等待所有子进程执行完毕后,在运行主进程后面的代码,(除了守护进程)
    	p.join()
    	print("...")
    

    lock.acquire()# 上锁
    lock.release()# 解锁
    
    #同一时间允许一个进程上一把锁 就是Lock
    	加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
    #同一时间允许多个进程上多把锁 就是[信号量Semaphore]
    	信号量是锁的变形: 实际实现是 计数器 + 锁,同时允许多个进程上锁	
    
    # 互斥锁Lock : 互斥锁就是进程的互相排斥,谁先抢到资源,谁就上锁改资源内容,为了保证数据的同步性
    # 注意:多个锁一起上,不开锁,会造成死锁.上锁和解锁是一对.
    
    # ### 锁
    # (1) 上锁与解锁
    '''
    上锁时其他进程不能更改同一份资源,解锁之后其他进行按照顺序同步上锁
    #同一时间允许一个进程上一把锁 就是Lock
    加锁可以保证多个进程修改同一块数据时,
    同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,
    但牺牲了速度却保证了数据安全。
    '''
    # 模拟抢票 
    ''''''
    import json
    import time
    from multiprocessing import Process,Lock
    
    def search(person):
    	with open("ticket",mode="r") as fp:
    		dic = json.load(fp)
    		# print(dic,type(dic))
    		print("%s 查询余票:%s" % (person,dic["count"]))
    
    def get_ticket(person):
    	with open("ticket",mode="r") as fp:
    		dic = json.load(fp)
    	time.sleep(0.5)
    	if dic["count"] > 0 :
    		print("%s买到了" % (person))
    		dic["count"] -= 1
    		# 重新更新数据库
    		with open("ticket","w") as fp:
    			json.dump(dic,fp)
    	else:
    		print("%s没买到这个票" % (person))
    
    # 作为统一的并发调用
    def ticket(person,lock):
    	#查询票数
    	search(person)
    	#上锁   到上锁的时候是同步程序
    	lock.acquire()# 上锁
    	get_ticket(person)
    	lock.release()# 解锁
    
    if __name__ == "__main__":
    	lock = Lock()
    	for i in range(10):
    		# 这里是异步并发
    		p = Process(target = ticket ,args = ("person%s" % i,lock))
    		p.start()
    
    
    # (2) 区分同步和异步
    import json
    import time
    from multiprocessing import Process,Lock
    def func(num,lock):
    	time.sleep(1)
    	print("异步程序",num)
    	lock.acquire()
    	time.sleep(0.5)
    	print("同步执行的程序",num)
    	lock.release()
    	
    if __name__ == "__main__":
    	lock = Lock()
    	for i in range(10):	
    		p = Process(target=func,args =(i,lock))
    		p.start()
    		
    """
    互斥锁:互斥锁就是进程的互相排斥,谁先抢到这个资源,
    谁就上锁改变资源的内容,为了保证数据的同步性
    注意:多个锁一起上,不开锁,会造成死锁,上锁和解锁是一对
    lock = Lock()
    lock.acquire()
    print(456)
    lock.release()
    
    # 上了两把锁,没遇到解锁,就卡在这形成死锁.
    lock.acquire()
    lock.acquire()
    lock.release()
    print(123)
    '''	
    

    信号量 Semaphore

    # ### Semaphore 信号量 其实就是锁  
    # 模拟ktv唱歌 假设一共10个人,同一时间允许4个人唱歌
    from multiprocessing import Process,Semaphore
    import random,time
    
    def ktv(person,sem):
    	sem.acquire()
    	print("%s进入了ktv" % (person))
    	time.sleep(random.randrange(6,8))
    	print("%s走出了ktv" % (person))
    	sem.release()
    if __name__ == "__main__":
    	# 最多允许4个进程同时上锁
    	sem = Semaphore(4)
    	for i in range(10):
    		p = Process(target=ktv,args=("person%s" % (i),sem))
    		p.start()
    

    事件

    # ### 事件(Event)
    # 阻塞事件 :
    	e = Event()生成事件对象e   
    	e.wait()动态给程序加阻塞 , 程序当中是否加阻塞完全取决于该对象中的is_set() [默认返回值是False]
        # 如果是True  就不加阻塞
        # 如果是False 那就加阻塞
    
    # 控制这个属性的值
        # set()方法     将这个属性的值改成True
        # clear()方法   将这个属性的值改成False
        # is_set()方法  判断当前的属性是否为True  (默认上来是False)
    
    from multiprocessing import Process,Event
    import time,random
    # (1) 基本用法
    e = Event()
    print(e.is_set())  # 默认False
    
    # 将这个属性的值改成True
    e.set()
    # 将这个属性的值改成False
    e.clear()
    
    print(1)
    e.wait()
    print(2)
    
    
    # 模拟红绿灯操作  假设有20辆小车全部通过
    def traffic_light(e):
    	# traffic_light 只做一件事就是红灯和绿灯的变色效果
    	print("红灯亮")
    	while True:
    		# 默认is_set 获取到的值是False
    		if e.is_set():
    			#红灯区间
    			time.sleep(1)
    			print("红灯亮")
    			e.clear() # False
    		else:
    			#绿灯区间
    			time.sleep(1)
    			print("绿灯亮")
    			e.set()   # True
    
    def car(e,i):
    	#e.is_set() 为False时成立
    	if not e.is_set():
    		print("car %s 在等待" % (i))
    		e.wait()
    	print("car %s 通过了" % i)
    
    if __name__ == "__main__":
    	e = Event()
    	p = Process(target=traffic_light,args = (e,))
    	p.start()
    	
    	for i in range(20):
    		time.sleep(random.randrange(0,2))
    		p = Process(target=car,args = (e,i))
    		p.start()
    
    
    # 优化:  假设有20辆小车全部通过后,红绿灯终止;
    def traffic_light(e):
    	# traffic_light 只做一件事就是红灯和绿灯的变色效果
    	print("红灯亮")
    	while True:
    		# 默认is_set 获取到的值是False
    		if e.is_set():
    			#红灯区间
    			time.sleep(1)
    			print("红灯亮")
    			e.clear() # False
    		else:
    			#绿灯区间
    			time.sleep(1)
    			print("绿灯亮")
    			e.set()   # True
    
    def car(e,i):
    	#e.is_set() 为False时成立
    	if not e.is_set():
    		print("car %s 在等待" % (i))
    		e.wait()
    	print("car %s 通过了" % i)
    
    if __name__ == "__main__":
    	e = Event()
    	lst = []
    	p = Process(target=traffic_light,args = (e,))
    	p.daemon = True
    	p.start()
    	
    	for i in range(20):
    		time.sleep(random.randrange(0,2))
    		p = Process(target=car,args = (e,i))
    		p.start()
    		lst.append(p)
    	for p in lst:
    		p.join()
    		
    	print("程序彻底跑完~")
    

    生产者与消费者

    # IPC Inter-Process Communication
    # 实现进程之间通信的两种机制:
        # 管道 Pipe
        # 队列 Queue
        
    # put() 存放
    # get() 获取
    # get_nowait() 拿不到报异常
    # put_nowait() 非阻塞版本的put
    q.empty()      检测是否为空  (了解)
    q.full() 	   检测是否已经存满 (了解)
    

    队列 Queue

    # ### 队列 Queue
    import queue
    from multiprocessing import Process,Queue
    '''
    队列特征:先进先出
    '''
    
    # (1) 基础语法
    q2 = Queue()
    # 把数据存放在队列当中
    q2.put(1)
    q2.put(2)
    
    # 获取队列当中的数据
    print(q2.get())
    print(q2.get())
    # 队列当中如果已经没有数据了,那会执行阻塞.
    print(q2.get())
    
    print(q.get_nowait())
    print(q.get_nowait())
    
    # 捕捉异常类错误
    # get_nowait linux 不兼容;只有windows好用
    try:
    	print(q.get_nowait())
    except queue.Empty:
    	print("empty")
    except:
    	print(123)
    
    # (2) 可以为queue 指定队列的长度
    q1 = Queue(3) # 最多放3个 ,超过最大长度,就执行阻塞
    q1.put(1)
    q1.put(2)
    q1.put(3)
    # q1.put(4)
    # q1.put_nowait(4) # 用put_nowait添加值超过队列长度直接报错.
    
    res = q1.full()
    print(res)
    q3 = Queue()
    res = q3.empty()
    print(res)
    
    # (3)  进程之间的通讯 
    def func(q):
    	print("子进程:",q.get())
    	q.put("abc")
    if __name__ == "__main__":
    	q4 = Queue()
    	p = Process(target=func,args=(q4,))
    	p.start()
    	q4.put("111222333")
    	p.join()
    	print("主进程:",q4.get())
    

    消费者模型

    '''
    # 爬虫例子:
    一个进程1号负责获取所有网页当中的内容
    一个进程2号负责提取网页当中的连接
    
    那么这个1号进程就可以看成一个生产者
    这个2号进程可以看成一个消费者
    
    有时可能生产者比消费者快,反之亦然
    所以在生产者和消费者之间加一个缓冲区(队列)
    
    生产者和消费者模型从程序上来讲就是数据的存入和获取的过程
    最为理想的生产者和消费者模型 , 两者之间的运行速度应该是同步的
    '''
    from multiprocessing import Process,Queue
    import time
    import random
    # 消费者方法 [负责取值]
    def consumer(q,name):
    	while True:
    		food = q.get()
    		if food is None:
    			break
    		time.sleep(random.uniform(0.5,1))
    		print("%s吃了一个%s" % (name,food))
    
    # 生产者方法 [负责存值]
    def producer(q,name,food):
    	for i in range(5):
    		time.sleep(random.uniform(0.3,0.8))
    		print("%s生产了%s" % (name,food))
    		q.put(food+str(i))
    
    if __name__ == "__main__":
        q = Queue()
        c1 = Process(target=consumer,args=(q,"张三"))
        c2 = Process(target=consumer,args=(q,"李四"))
        # c1.daemon = True
        c1.start()
        c2.start()
        p1 = Process(target=producer,args=(q,"王五","地瓜"))
        p2 = Process(target=producer,args=(q,"tony","黄瓜"))
        p1.start()
        p2.start()
        
        p1.join()
        p2.join()
        q.put(None)
        q.put(None)
    

    JoinableQueue

    # ### JoinableQueue
    '''
    put  存入
    get  获取
    task_done  通过队列其中一个数据被处理
    JoinableQueue 默认会计数,执行一次taskdone减少一次队列数
    JoinableQueue 里面有5个值,taskdone减少一个,减到0 , 队列里面的值为空
    q.join() 队列.join
    添加阻塞,直到放入队列的所有值都被处理掉的时候,撤掉阻塞  
    '''
    from multiprocessing import Process,JoinableQueue
    import random
    import time
    
    def consumer(q,name):
    	while True:
    		food = q.get()
    		time.sleep(random.uniform(0.5,1))
    		print("%s接受了 %s" % (name,food))
    		q.task_done()
    
    def producer(q,name,food):
    	for i in range(5):
    		time.sleep(random.uniform(0.3,0.8))
    		print("%s生产了 %s" % (name,food+str(i)))
    		q.put(food+str(i))
    
    if __name__ == "__main__":
    	jq = JoinableQueue()
    	c1 = Process(target=consumer,args=(jq,"张三"))
    	c1.daemon = True
    	c1.start()
    	
    	p1 = Process(target=producer,args= (jq,"李四","葡萄"))
    	p1.start()
    	
    	p1.join() # 生产者需要把所有数据放到队列当中
    	# p1.join()
    	#添加阻塞,直到放入队列的所有值都被处理掉的时候,撤掉阻塞
    	# 通过task_done 表达处理掉的意思
    	# join 和 task_done 需要配合使用;
    	jq.join()
    

    进程池

    # 进程池:
    # 开启过多的进程并不一定提高你的效率,
    # 如果cpu负载任务过多,平均单个任务执行的效率就会低,反而降低执行速度.
        1个人做4件事和4个人做4件事
        显然后者执行速度更快,
        前者是并发,后者是并行
        利用进程池,可以开启cpu的并行效果
    # apply       开启进程,同步阻塞,每次都要等待当前任务完成之后,在开启下一个进程
    # apply_async 开启进程,异步非阻塞,(主进程 和 子进程异步)
    
    
    # ### Manager dict list 能够实现进程之间的数据共享
    from multiprocessing import Process , Manager ,Lock
    
    def work(d,lock):
    	# 自动上锁和解锁
    	with lock:
    		d["count"] -= 1
    	"""
    	lock.acquire()
    	d["count"] -= 1
    	lock.release()
    	"""
    if __name__ == "__main__":
    	lock = Lock()
    	m = Manager()
    	dic = m.dict({"count":100})
    	lst = []
    	for i in range(100):
    		p  =Process(target=work,args = (dic,lock))
    		lst.append(p)
    		p.start()	
    	# 等待每一个进程执行完毕
    	for p in lst:
    		p.join()	
    	print(dic)
        
        
        
    # ### 进程池
    import os,time
    from multiprocessing import Pool,Process
    # (1) 比较pool 和Process 执行速度
    '''因为进程池可以实现并行的概念,比Process单核并发的速度要快'''
    def func(num):
    	print("发了第%s封邮件" % (num))
    	# time.sleep(1)
    	# print("end")
    	
    if __name__ == "__main__":
    	# print(os.cpu_count()) # 获取cpu核心数量
    	start = time.time()
    	p  = Pool(6)
    	for i in range(100):
    		# apply_async 用来开启子进程
    		p.apply_async(func,args=(i,))
    
    	# 关闭进程池,用户不能在向这个池中创建子进程
    	p.close() 
    	# 这里加阻塞,直到进程池中所有的子进程执行完毕
    	p.join()
    	end = time.time()
    	print(end-start)
    	
    	
    	# 单核Process处理100件任务
    	start = time.time()
    	lst = []
    	for i in range(100):
    		p = Process(target=func,args= ( i ,))
    		p.start()
    		lst.append(p)
    	for p in lst:
    		p.join()
    	end = time.time()
    	print(end-start)
    
    
    # (2) apply 开启进程,同步阻塞,每次都要等待当前任务完成之后,在开启下一个进程
    
    def task(num):
    	time.sleep(1)
    	print("%s: %s" % (num,os.getpid()))
    	return num ** 2
    	
    if __name__ == "__main__":
    	p = Pool(2)
    	for i in range(20):
    		# apply是同步阻塞,每个进程必须执行完,才能在开启进程.
    		res = p.apply(task,args=(i,))
    		print("-->",res)
    
    
    # (3) apply_async 异步非阻塞
    def task(num):
    	time.sleep(3)
    	print("%s: %s" % (num,os.getpid()))
    	return num ** 2
    
    if __name__ == "__main__":
    	p = Pool()
    	res_lst = []
    	for  i in range(20):
    		res = p.apply_async(task,args=(i,))
    		print(res)
    		res_lst.append(res)
    		# 返回的对象需要使用get方法获取到返回值
    	# print(len(res_lst))
    	for res in res_lst:
    		print(res.get())
    	
    	# close 和 join 一写写2个保持同步.
    	# p.close()
    	# p.join()
    
    # (4) 进程池.map (与高阶函数map使用方法一样,只不过这个是并发版本)
    def task(num):
    	time.sleep(1)
    	print("%s:%s"%(num,os.getpid()))
    	return num ** 2
    	
    if __name__ == "__main__":
    	p = Pool()
    	res = p.map(task,range(20)) #自动close和join
    	print(res)
    

    进程池并发socket

    server

    import socket
    from multiprocessing import Pool
    
    def talk(conn):
    	while True:
    		conn.send(b"hello")
    		print(conn.recv(1024))
    	conn.close()
    
    if __name__ =="__main__":
    	sk = socket.socket()
    	sk.bind( ("127.0.0.1",9000) )
    	sk.listen()
    	# Pool默认获取cpu_counter cpu最大核心数
    	p = Pool()
    	while True:
    		conn,addr = sk.accept()
    		p.apply_async(talk,args=(conn,))
    	sk.close()
    

    client

    import socket
    sk = socket.socket()
    sk.connect( ("127.0.0.1",9000) )
    
    while True:
    	print(sk.recv(1024))
    	sk.send(b"byebye")
    

    python 线程

    ### 线程
    
    """
    #进程是资源分配的最小单位
    #线程是计算机中调度的最小单位
    
    #线程的缘起
    资源分配需要分配内存空间,分配cpu:
    分配的内存空间存放着临时要处理的数据等,比如要执行的代码,数据
    而这些内存空间是有限的,不能无限分配
    目前来讲,普通机器,5万个并发程序已是上限.线程概念应用而生.
    
    #线程的特点
    线程是轻量级,干更多的活,同一个进程中的所有线程的资源是共享的.
    每一个进程中都有至少一条线程在工作
    """
    
    ### 线程的缺陷
    """
    #线程可以并发,但是不能并行(即可以1个cpu执行,不能多个cpu一起执行)
    #原因:
       python是解释型语言,执行一句编译一句,而不是一次性全部编译成功,不能提前规划,都是临时调度
       容易造成不同的cpu却反复执行同一个程序.所以加了一把锁叫GIL
       全局解释器锁(Cpython解释器特有) GIL锁:同一时间一个线程只能被一个cpu执行	
    	
    #想要并行的解决办法:
        (1)用多进程
        (2)换一个Pypy解释器
        
    #计算型程序会过度依赖cpu,但网页,爬虫,OA办公,这种交互型带有阻塞的程序里,速度影响无所谓
    """
    
    ### 线程相关函数
    """
    线程.is_alive()    检测线程是否仍然存在
    线程.setName()     设置线程名字
    线程.getName()     获取线程名字
    currentThread().ident 查看线程id号 
    enumerate()        返回目前正在运行的线程列表
    activeCount()      返回目前正在运行的线程数量
    """
    
    import os,time
    from threading import Thread
    from multiprocessing import Process
    # (1) 一个进程可以有多个线程
    def func(i):	
    	print("子线程:", i,os.getpid())
    for i in range(10):
    	t = Thread(target=func,args=(i,))
    	t.start()
    
    
    
    
    #(2) 并发多线程和多进程的速度对比 多线程更快
    def func(i):	
    	print("子线程:", i,os.getpid())
    	
    if __name__ == "__main__":
    	start = time.time()
    	lst = []
    	for i in range(100):
    		t = Thread(target=func,args=(i,))
    		t.start()
    		lst.append(t)
    	for t in lst:
    		t.join()
    	end = time.time() 
    	t1 = end-start
    
    		
    	start = time.time()
    	lst = []
    	for i in range(100):
    		p = Process(target=func,args=(i,))
    		p.start()
    		lst.append(p)
    	for p in lst:
    		p.join()
    	end = time.time()
    	t2 = end - start
    	print(t1,t2)
    
    
        
    # (3) 多个线程共享同一份进程资源
    num = 100
    lst = []
    def func():
    	global num 
    	num -= 1
    	
    for i in range(100):
    	t = Thread(target=func)
    	t.start()
    	lst.append(t)
    
    for t in lst:
    	t.join()
    print(num)
    
    
    
    # (4) 线程相关函数
    '''
    线程.is_alive()    检测线程是否仍然存在
    线程.setName()     设置线程名字
    线程.getName()     获取线程名字
    currentThread().ident 查看线程id号 
    enumerate()        返回目前正在运行的线程列表
    activeCount()      返回目前正在运行的线程数量
    '''
    def func():
    	time.sleep(0.01)
    t = Thread(target=func)
    t.start()
    print(t.is_alive())
    print(t.getName()) #Thread-1
    t.setName("t1")
    print(t.getName())
    time.sleep(3)
    print(t.is_alive()) # False
    
    
    # 1.currentThread().ident 查看线程id号 
    from threading import currentThread
    def func():
    	print("子线程:",currentThread().ident)
    
    print("主线程:",currentThread().ident)
    t = Thread(target=func)
    t.start()
    
    
    
    # enumerate()        返回目前正在运行的线程列表
    from threading import enumerate,currentThread
    def func(i):
    	print("子线程:",currentThread().ident)
    	time.sleep(3)
    	
    for i in range(10):
    	t = Thread(target=func,args=(i,))
    	t.start()
    print(enumerate())
    print(len(enumerate())) # 加上主线程一共11个
    """
    [
    <_MainThread(MainThread, started 74964)>,
     <Thread(Thread-11, started 78972)>
     ]
    """
        
     
     
    # activeCount()      返回目前正在运行的线程数量
    from threading import activeCount,currentThread
    def func(i):
    	print("子线程:",currentThread().ident)
    	time.sleep(2)
    	
    for i in range(10):
    	t = Thread(target=func,args=(i,))
    	t.start()
    time.sleep(3)
    print(activeCount()) # 加上主线程一共11个
    

    守护线程

    # ### 守护线程:等待其他所有子线程结束之后,自动结束
    from threading import Thread
    import time
    def func1():
    	while True:
    		time.sleep(0.5)
    		print(111)
    		
    		
    def func2():
    	print("func2 start->")
    	time.sleep(3)
    	print("func2 end->")
    
    t1 = Thread(target=func1)
    t2 = Thread(target=func2)
    # setDaemon 将普通线程变成守护线程
    t1.setDaemon(True)
    t1.start()
    t2.start()
    t2.join()
    time.sleep(5)
    print("主线程代码结束")
    

    线程数据安全

    from threading import Thread,Lock
    n = 0
    # ###线程之间如果共同更改同一个数据,需要加锁
    def func1(lock):
    	global n
    	for i in range(100000):
    		lock.acquire()
    		n-=1
    		lock.release()
    
    def func2(lock):
    	global n
    	for i in range(100000):
    		lock.acquire()
    		n+=1
    		lock.release()
    
    if __name__ == "__main__":
    	lock = Lock()
    	lst = []
    	for i in range(10):
    		t1 = Thread(target=func1,args=(lock,))
    		t2 = Thread(target=func2,args=(lock,))
    		t1.start()
    		t2.start()
    		lst.append(t1)
    		lst.append(t2)
    	for t in  lst:
    		t.join()
    
    	print(n)
    

    死锁,递归锁,互斥锁

    """
    加一把锁,就对应解一把锁.形成互斥锁.
    从语法上来说,锁可以互相嵌套,但不要使用,
    不要因为逻辑问题让上锁分成两次.导致死锁
    递归锁用于解决死锁,但只是一种应急的处理办法
    """
    
    # ### 死锁,递归锁,互斥锁
    from threading import Thread,Lock
    import time
    
    noodle_lock = Lock()
    kuaizi_lock = Lock()
    # (1)常见情况引发的死锁问题
    def eat1(name):
    	noodle_lock.acquire()
    	print("%s拿到这个面条" % (name))
    	kuaizi_lock.acquire()
    	print("%s拿到了筷子" % (name))
    	print("开始吃")
    	time.sleep(0.5)
    	kuaizi_lock.release()
    	print("%s把筷子放下"% (name))
    	noodle_lock.release()
    	print("%s把面放下"% (name))
    def eat2(name):
    	kuaizi_lock.acquire()
    	print("%s拿到了筷子" % (name))
    	noodle_lock.acquire()
    	print("%s拿到这个面条" % (name))
    	print("开始吃")
    	time.sleep(0.5)
    	noodle_lock.release()
    	print("%s把面放下"% (name))
    	kuaizi_lock.release()
    	print("%s把筷子放下"% (name))
    
    if __name__ == "__main__":
    	name_list = ["张楠","定海呀"]
    	name_list2 = ["晨露中","周金波"]
    	for name in name_list:
    		Thread(target=eat1,args= (name,) ).start()
    		# Thread(target=eat1,args= (name,) ).start()
    	
    	for name in name_list2:
    		Thread(target=eat2,args = (name,)).start()
    		# Thread(target=eat2,args = (name,)).start()
    
    
    
    # (2) 递归锁
    from threading import Thread,RLock
    '''
    	递归所专门用来解决死锁现象的
    	用于临时快速解决服务器崩溃现象,加上递归所应急
    '''
    rlock = RLock()
    def func(name):
    	rlock.acquire()
    	print(name,1)
    	rlock.acquire()
    	print(name,2)
    	rlock.acquire()
    	print(name,3)
    	rlock.release()
    	rlock.release()
    	rlock.release()
    	
    for i in range(10):
    	Thread(target=func,args=("name%s" % (i),) ).start()
    
    import time
    
    print("<==>")
    
    noodle_lock = kuaizi_lock = RLock()
    def eat1(name):
    	noodle_lock.acquire()
    	print("%s拿到这个面条" % (name))
    	kuaizi_lock.acquire()
    	print("%s拿到了筷子" % (name))
    	print("开始吃")
    	time.sleep(0.5)
    	kuaizi_lock.release()
    	print("%s把筷子放下"% (name))
    	noodle_lock.release()
    	print("%s把面放下"% (name))
    def eat2(name):
    	kuaizi_lock.acquire()
    	print("%s拿到了筷子" % (name))
    	noodle_lock.acquire()
    	print("%s拿到这个面条" % (name))
    	print("开始吃")
    	time.sleep(0.5)
    	noodle_lock.release()
    	print("%s把面放下"% (name))
    	kuaizi_lock.release()
    	print("%s把筷子放下"% (name))
    
    if __name__ == "__main__":
    	name_list = ["张楠","定海呀"]
    	name_list2 = ["晨露中","周金波"]
    	for name in name_list:
    		Thread(target=eat1,args= (name,) ).start()	
    	for name in name_list2:
    		Thread(target=eat2,args = (name,)).start()
    
    
    # (3) 互斥锁:
    '''
    	从语法上说,锁是允许互相嵌套的,但是不要使用
    	上一次锁,就对应解开一次,形成互斥锁
    	吃面和拿筷子是同时的
    	不要因为逻辑问题让锁分别上,容易造成死锁.	
    '''
    from threading import Thread,Lock
    mylock = Lock()
    def eat1(name):
    	mylock.acquire()
    	print("%s拿到筷子" % (name))
    	print("%s吃到面条" % (name))
    	time.sleep(0.5)
    	
    	print("%s放下筷子" % (name))
    	print("%s放下面条" % (name))
    	mylock.release()
    def eat2(name):
    	mylock.acquire()
    	print("%s拿到筷子" % (name))
    	print("%s吃到面条" % (name))
    	time.sleep(0.5)	
    	print("%s放下筷子" % (name))
    	print("%s放下面条" % (name))
    	mylock.release()
    if __name__ == "__main__":
    	name_list = ["张三","李四"]
    	name_list2 = ["tony","tom"]
    	for name in name_list:
    		Thread(target=eat1,args= (name,) ).start()	
    	for name in name_list2:
    		Thread(target=eat2,args = (name,)).start()
    

    线程中的信号量

    from threading import Semaphore,Thread
    import time
    
    def func(index,sem):
    	sem.acquire()
    	print(index)
    	time.sleep(1)
    	sem.release()
    
    
    if __name__ == "__main__":
    	sem = Semaphore(5)
    	for i in range(16):
    		Thread(target=func,args=(i,sem)).start()
    		
    '''
    进程而言:
    信号量是配合Process使用,可以限制并发的进程数
    在有了进程池之后,可以取代如上做法
    
    线程而言
    信号量是配合Thread使用的,可以限制并发的线程数
    线程池可以取代如上用法
    '''		
    

    事件

    # ### 模拟连接数据库
    from threading import Event,Thread
    import time,random
    '''
    # wait()   动态给程序加阻塞
    # set()    将内部属性改成True
    # clear()  将内部属性改成False
    # is_set() 判断当前属性(默认是False)
    '''
    '''
    e = Event()
    print(e.is_set())
    # timeout = 5 最多等待5秒
    e.wait(timeout = 5)
    print(e.is_set())
    '''
    
    def check(e):
    	print("开始检测数据库连接")
    	time.sleep(random.randrange(1,6))
    	e.set()
    
    def connect(e):
    	sign = False
    	for i in range(3):
    		e.wait(1)
    		if e.is_set():
    			sign = True
    			print("数据库连接成功")
    			break
    		else:
    			print("尝试连接数据库%s次失败" % (i+1))
    	
    	if sign == False:
    		# 主动抛出异常
    		raise TimeoutError
    
    e = Event()
    Thread(target=connect,args=(e,)).start()
    Thread(target=check,args=(e,)).start()
    

    条件

    # ### 条件
    '''
    wait    添加阻塞
    notify  允许几个被阻塞的线程放行
    
    #语法:wait 使用的前后需要上锁
    """
    acquire()
    wait
    wait下面写上相应逻辑代码
    release()
    """
    
    #语法: notify 使用前后加阻塞
    """
    acquire
    notify(自定义线程放行的数量,默认放行1个)
    release
    """
    '''
    from threading import Condition,Thread
    import time 
    
    def func(con,index):
    	print("%s在等待" % index)
    	con.acquire()
    	# 添加阻塞
    	con.wait()
    	print("%s do everyting" % index)
    	con.release()
    
    con = Condition()
    
    for i in range(10):
    	t = Thread(target=func,args = (con,i))
    	t.start()
    
    # 写法一
    con.acquire()
    con.notify(5) # 默认放行1个(不加参数)
    con.release()
    
    # 写法二
    count = 10
    while count > 0:
    	num = int(input(">>>\n"))
    	con.acquire()
    	# 放行线程 num 动态赋予的值
    	con.notify(num)
    	con.release()
    	count -= num
    

    定时器

    # ### 定时器
    from threading import Timer
    def func():
    	print("目前正在执行任务")
    	
    t = Timer(5,func)
    t.start()
    print("主线程")
    
    # 计划任务 crontab
    

    线程队列

    线程常用队列有 queue LifoQueue PriorityQueue
    queue 先进先出
    LifoQueue 后进先出
    PriorityQueue 按照优先级排序
    
    
    # ### 队列
    import queue
    '''
    put  往队列当中放值,超过队列长度,直接加阻塞
    get  如果获取不到加阻塞
    put_nowait 如果放入的超过了队列长度,直接报异常错误
    get_nowait 如果获取不到直接报异常错误
    '''
    # (1)queue  先进先出
    q = queue.Queue()
    q.put(1)
    q.put(2)
    print(q.get())
    # print(q.get())
    # print(q.get())
    print(q.get_nowait())
    
    
    q = queue.Queue(2)
    q.put(1)
    q.put(2)
    # q.put(3)
    q.put_nowait(3)
    
    
    
    # (2) lifoqueue 用栈的后进先出的顺序形成队列
    from queue import LifoQueue
    lq = LifoQueue()
    lq.put(1)
    lq.put(2)
    lq.put('aaa')
    print(lq.get())
    
    
    
    # (3) PriorityQueue 队列 按照优先级顺序获取
    from queue import PriorityQueue
    # 1.先按照数字进行排列,在按照ascii吗从小到大排序
    pq = PriorityQueue()
    pq.put( (13,'abc') )
    pq.put( (5,"ccc") )
    pq.put( (5,"bcc") )
    pq.put( (15,"bbb") )
    print(pq.get())
    print(pq.get())
    print(pq.get())
    print(pq.get())
    
    
    
    # 2. 单独一个元素的,只能是都放同一种类型
    # 如果是数字
    from queue import PriorityQueue
    pq = PriorityQueue()
    pq.put(15)
    pq.put(13)
    pq.put(3)
    # pq.put("3434") error
    print(pq.get())
    print(pq.get())
    print(pq.get())
    # print(pq.get())
    
    
    # 如果是字符串
    pq = PriorityQueue()
    pq.put("bbd1")
    pq.put("bbcb2")
    # pq.put(33344) error
    print(pq.get())
    print(pq.get())
    

    线程池和进程池 (改良版)

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import time,random
    def func(i):
    	print("Process",i)
    	time.sleep(1)
    	print("Process is end ")
    
    #(1) ProcessPoolExecutor 进程池的基本使用(改良版)
    if __name__ == "__main__":
    	# ProcessPoolExecutor <==> Pool
    	p = ProcessPoolExecutor(5)
    	# submit <==> apply_async
    	p.submit(func,1)
    	# shutdown <==> close + join 
    	p.shutdown()
    	print("主线程")
    
    
    
    #(2)  线程池
    from threading import current_thread as cthread
    
    def func(i):
    	# cthread().indent 线程号
    	print("thread",i,cthread().ident)
    	time.sleep(random.uniform(0.1,1))
    	print("thread %s end" % (i))
    
    tp = ThreadPoolExecutor(5)
    for i in range(20):
    	tp.submit(func,i)
    
    tp.shutdown()
    print("主线程")
    
    
    
    
    #(3) 返回值
    def func(i):
    	print("thread",i,cthread().ident)
    	time.sleep(random.uniform(0.1,1))
    	print("thread %s end" % (i))
    	return i * "*"
    
    tp = ThreadPoolExecutor(5)
    lst = []
    for i in range(20):
    	res = tp.submit(func,i)
    	# print(res)
    	lst.append(res)
    	
    for res in lst:
    	# result 和 get 使用方法一样的 result带有阻塞效果.
    	print(res.result())
    print("主线程")
    
    
    
    # (4) map 返回生成器
    def func(i):
    	print("thread",i,cthread().ident)
    	time.sleep(random.uniform(0.1,1))
    	print("thread %s end" % (i))
    	return i * "*"
    
    tp = ThreadPoolExecutor(5)
    res = tp.map(func,range(20))
    # for i in res:
    tp.shutdown()
    for i in res:
    	print(i)
    print(res)
    

    回调函数

    # ###  回调函数
    '''
    # 回调函数
        就是一个参数,将这个函数作为参数传到另一个函数里面.
        函数先执行,再执行当参数传递的这个函数,这个参数函数是回调函数
    '''
    # (1) 线程池的回调函数是 子线程完成的.
    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread as cthread
    import time
    
    def func(i):
    	print("thread",i,cthread().ident)
    	time.sleep(1)
    	print("thread %s end" % (i))
    	return i * "*"
    
    def call_back(args):
    	print("call back : ",cthread().ident)
    	print(args)
    	print(args.result())
    
    
    tp = ThreadPoolExecutor(5)
    for i in range(20):
    	# 当执行完这个线程后 在直接执行call_back这个函数
    	tp.submit(func,i).add_done_callback(call_back)
    tp.shutdown()
    print("主线程",cthread().ident)
    
    
    
    
    # (2)进程池的回调函数有?来完成
    from concurrent.futures import ProcessPoolExecutor
    import os,time
    def func(i):
    	print('process',i,os.getpid())
    	time.sleep(1)
    	print('process %s end' % i)
    	
    if __name__ == "__main__":
    	p = ProcessPoolExecutor(5)
    	p.submit(func,1)
    	p.shutdown()
    	print("主线程",os.getpid())
    
    # 进程池的回调函数 , 是由主进程完成的
    from concurrent.futures import ProcessPoolExecutor
    import os ,time 
    def func(i):
    	print("process",i,os.getpid())
    	time.sleep(1)
    	print("process %s end" % (i))
    	return i * "*"
    
    def call_back(args):
    	print("call back : ",os.getpid())
    	print(args)
    	print(args.result())
    
    if __name__ == "__main__":
    	tp = ProcessPoolExecutor(5)
    	for i in range(20):
    		# 当执行完这个线程后 在直接执行call_back这个函数
    		tp.submit(func,i).add_done_callback(call_back)
    	# tp.shutdown()
    	print("主线程",os.getpid())
    

    协程

    # ### 协程
    
    def my_gen():
    	for i in range(10):
    		yield i
    
    for num in my_gen():
    	print(num)
    	
        
        
    # (1) 协程写生产者消费者模型
    # producer 生产者
    def producer():
    	for i in range(100):
    		yield i
    
    # consumer 消费者	
    def consumer():
    	g = producer()
    	for num in g:
    		print(num)
    
    consumer()
    
    
    
    # (2) 协程的基本实现
    '''
    switch 利用它进行任务的切换
    只能进行手动切换,
    缺陷:不能规避雕io,不能自动实现遇到阻塞就切换.
    '''
    from greenlet import greenlet
    import time
    def eat():
    	print("eat one1")
    	# 切换到play这个协程中
    	g2.switch()
    	time.sleep(1)
    	print("eat one2")
    	
    def play():
    	print("play one1")
    	print("play one2")
    	g1.switch()
    
    g1 = greenlet(eat)
    g2 = greenlet(play)
    g1.switch()
    
    
    
    # (3) gevent 有他的缺陷
    import gevent,time
    '''
    spawn 自动检测阻塞,遇到阻塞就切换
    相当于switch 的升级版 , 
    瑕疵:不能够识别time.sleep() 有些阻塞不认识.
    '''
    def eat():
    	print("eat one1")
    	time.sleep(1)
    	print("eat one2")
    
    def play():
    	print("play one1")
    	time.sleep(1)
    	print("play one2")
    
    g1 = gevent.spawn(eat)
    g2 = gevent.spawn(play)
    
    # 协程的阻塞也是join
    g1.join() # 阻塞直到g1协程执行完毕
    g2.join() # 阻塞直到g2协程执行完毕
    
    
    
    # (4) 进阶版本 用gevent.sleep 取代time.sleep()
    import gevent,time
    '''
    spawn 自动检测阻塞,遇到阻塞就切换
    相当于switch 的升级版 , 
    瑕疵:不能够识别time.sleep() 有些阻塞不认识.
    '''
    def eat():
    	print("eat one1")
    	gevent.sleep(1)
    	print("eat one2")
    
    def play():
    	print("play one1")
    	gevent.sleep(1)
    	print("play one2")
    
    g1 = gevent.spawn(eat)
    g2 = gevent.spawn(play)
    
    # 协程的阻塞也是join
    g1.join() # 阻塞直到g1协程执行完毕
    g2.join() # 阻塞直到g2协程执行完毕
    
    
    
    # (5) 终极解决办法
    # from gevent import monkey 
    # 把它下面引入的模块都识别出来
    # monkey.patch_all()
    
    # 简化:都写在一行的情况下,有分号隔开.
    from gevent import monkey;monkey.patch_all()
    
    import time 
    import gevent
    
    def eat():
    	print("eat one1")
    	time.sleep(1)
    	print("eat one2")
    
    def play():
    	print("play one1")
    	time.sleep(1)
    	print("play one2")
    
    g1 = gevent.spawn(eat)
    g2 = gevent.spawn(play)
    
    # 协程的阻塞也是join
    g1.join() # 阻塞直到g1协程执行完毕
    g2.join() # 阻塞直到g2协程执行完毕
    
    
    
    
    # ### 协程
    from gevent import monkey;monkey.patch_all()
    import gevent
    """
    # spawn(函数,参数...) 启动一个协成
    # join()  阻塞,直到某个协程执行完毕
    # joinall 类似于join 只不过
    g1.join()
    g2.join()
    gevent.joinall([g1,g2]) 一次性把所有需要阻塞的协程对象写到一起
    # value 获取协成的返回值
    """
    # (1) joinall value 的使用方法
    import time
    
    def eat():
    	print("eating 111")
    	time.sleep(1)
    	print("eating 222")
    	return "吃完了"
    
    def play():
    	print("play 111")
    	time.sleep(1)
    	print("play 222")
    	return "play done"
    
    g1 = gevent.spawn(eat)
    g2 = gevent.spawn(play)
    gevent.joinall([g1,g2])
    print(g1.value)
    print(g2.value)
    
    
    # (2) 爬虫的应用
    """
    HTTP 状态码:
    	200 OK
    	400 Bad Request
    	404 Not Found
    
    
    requests 模块
    response = requests.get(网址)
    """
    import requests,time
    response = requests.get("http://www.4399.com")
    # print(response)
    # 获取状态码
    res = response.status_code
    print(res)
    # 获取,设置字符编码
    res_code = response.apparent_encoding
    # print(res)
    # 设置编码集
    response.encoding = res_code
    # 获取网页里面的内容
    res = response.text
    print(res)
    
    url_lst = [
    	"http://www.taobao.com",
    	"http://www.jd.com",
    	"http://www.4399.com",
    	"http://www.7k7k.com",
    	"http://www.baidu.com",
    	"http://www.taobao.com",
    	"http://www.jd.com",
    	"http://www.4399.com",
    	"http://www.7k7k.com",
    	"http://www.baidu.com",
    	"http://www.taobao.com",
    	"http://www.jd.com",
    	"http://www.4399.com",
    	"http://www.7k7k.com",
    	"http://www.baidu.com",
    	"http://www.taobao.com",
    	"http://www.jd.com",
    	"http://www.4399.com",
    	"http://www.7k7k.com",
    	"http://www.baidu.com",
    	"http://www.taobao.com",
    	"http://www.jd.com",
    	"http://www.4399.com",
    	"http://www.7k7k.com",
    	"http://www.baidu.com",
    	"http://www.taobao.com",
    	"http://www.jd.com",
    	"http://www.4399.com",
    	"http://www.7k7k.com",
    	"http://www.baidu.com",
    ]
    
    # (1) 正常爬取数据 , 速度慢
    def get_url(url):
    	response = requests.get(url)
    	if response.status_code == 200:
    		print(response.text)
    		
    # start = time.time()
    # for i in url_lst:
    	# get_url(i)
    # end = time.time()
    # print(end-start) #3.033189535140991
    
    # (2) 用协程爬取数据 速度更快
    lst = []
    start = time.time()
    for  i in url_lst:
    	g = gevent.spawn(get_url,i)
    	lst.append(g)
    	
    gevent.joinall(lst)
    end = time.time()
    print(end-start)   #0.6957511901855469
    

    线程之间的数据隔离

    from threading import local,Thread
    
    # 线程之间的数据隔离
    '''
    多个线程之间,使用threading.local对象
    可以实现多个线程之间的数据隔离
    '''
    loc = local()
    print(loc)	# 对象
    
    def func(name,age):
        global loc
        loc.name = name
        loc.age = age
        print(loc.name,loc.age)
    
    Thread(target=func,args=("hello",20)).start()
    Thread(target=func,args=("world",20)).start()
    
  • 相关阅读:
    leetcode教程系列——Binary Tree
    《Ranked List Loss for Deep Metric Learning》CVPR 2019
    《Domain Agnostic Learning with Disentangled Representations》ICML 2019
    Pytorch从0开始实现YOLO V3指南 part5——设计输入和输出的流程
    Pytorch从0开始实现YOLO V3指南 part4——置信度阈值和非极大值抑制
    Pytorch从0开始实现YOLO V3指南 part3——实现网络前向传播
    Pytorch从0开始实现YOLO V3指南 part2——搭建网络结构层
    Pytorch从0开始实现YOLO V3指南 part1——理解YOLO的工作
    让我佩服的人生 文章
    win8.1配置cordova+ionic等一系列东西
  • 原文地址:https://www.cnblogs.com/helloord/p/13521460.html
Copyright © 2011-2022 走看看