zoukankan      html  css  js  c++  java
  • Python 网络编程与并发编程

    Python 网络编程与并发编程

    socket的基本使用

    TCP(Transmission Control Protocol)可靠的、面向连接的协议(eg:打电话)、传输效率低全双工通信(发送缓存&接收缓存)、面向字节流。使用TCP的应用:Web浏览器;电子邮件、文件传输程序。
    
    UDP(User Datagram Protocol)不可靠的、无连接的服务,传输效率高(发送前时延小),一对一、一对多、多对一、多对多、面向报文,尽最大努力服务,无拥塞控制。使用UDP的应用:域名系统 (DNS);视频流;IP语音(VoIP)。
    
    客户端和服务端在建立连接时: 三次握手
    客户端和服务端在断开连接时: 四次挥手
    SYN 创建连接
    ACK 确认响应
    FIN 断开连接
    
    tcp 协议: 打电话 , 必须先建立连接, 同一时刻只能和一个人聊天, 速度慢,但传输稳定
    udp 协议: 发短信 , 不需要确认连接, 只要知道ip和端口 , 就可以进行通讯,速度快,可以同时聊天,传输不稳定
    
    tcp:发送文件之类的(上传,下载文件,FTP,邮件SMTP)
    优点:传输完整稳定,不限制数据大小
    缺点: 速度慢 (一发一收都需要对方确认)
    
    udp:即时通讯类,比较追求速度(再线视频网站,qq,微信)
    优点:传输速度快,可以多人同时聊天
    缺点:不稳定,有时丢失数据
    

    TCP的基本语法

    server

    import socket
    
    # 创建一个socket对象
    sk = socket.socket()
    # 绑定ip和端口 (注册网络)
    sk.bind(  ("127.0.0.1",9000)  ) # 127.0.0.1 默认代表本机ip
    # 开启监听  (等待别人链接我)
    sk.listen()
    
    
    
    # 程序加阻塞,直到tcp协议的三次握手建立完毕,执行下面代码
    conn,addr = sk.accept()
    print(conn,addr)
    # 最大接受1024个字节 等待数据时,程序加了阻塞,收到数据之后,程序在向下执行
    msg = conn.recv(1024)
    print(msg.decode("utf-8"))
    
    # send 负责发送,只能发送二进制的字节流
    conn.send("早呀早".encode("utf-8"))
    # 关闭此次连接 [走四次挥手]
    conn.close()
    
    # 关闭socket对象 [退还占用的端口]
    sk.close()
    
    """
    < socket.socket fd=4, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000), raddr=('127.0.0.1', 34794)> 
    
    ('127.0.0.1', 34794)
    
    """
    

    client

    import socket
    
    # 差生一个socket对象
    sk = socket.socket()
    # 建立连接
    sk.connect( ("127.0.0.1",9000) )
    
    # 发送消息(发送的数据是二进制字节流)
    sk.send("早".encode("utf-8"))
    # 等待接收消息 (发送完之后,程序添加阻塞,最大字节数1024,等待接收)
    res = sk.recv(1024)
    print(res.decode("utf-8"))
    
    
    # 关闭连接
    sk.close()
    

    tcp循环发送消息

    server

    import socket
    
    # 在网络中注册该主机
    sk = socket.socket()
    sk.bind( ("127.0.0.1",9001) )
    sk.listen()
    
    
    # 准备接收数据
    while True:
    	# 重新建立新的三次握手
    	conn,addr = sk.accept() 
    	# 内循环不停的收发消息,按q退出循环
    	while True:		
    		msg = conn.recv(1024)
    		print(msg.decode("utf-8"))
    		message = input(">>>:")
    		conn.send(message.encode("utf-8"))
    		# 如果你输入一个q ,退出当前的连接
    		if message == "q":
    			break
    	# 执行四次挥手
    	conn.close()
    sk.close()
    

    client

    import socket
    sk = socket.socket()
    sk.connect( ("127.0.0.1",9001) )
    
    while True:
    	# 输入字符串
    	message = input(">>>:")
    	# 发送消息
    	sk.send(message.encode("utf-8"))
    	# 接收服务器的消息
    	res = sk.recv(1024)
    	# 如果接收到的是字节流q,断开连接,有请下一位观众
    	if res == b'q':
    		break
    	print(res.decode("utf-8"))
    sk.close()
    

    UDP的基本语法

    server

    # upd 协议:
    import socket
    # 创建socket对象
    sk = socket.socket(type=socket.SOCK_DGRAM)
    # 绑定地址
    sk.bind( ("127.0.0.1",9000) )
    
    
    # 对于udp来讲 第一个信息,必须先接受
    # 接受数据,返回客户端发出的消息,还有ip和端口组成的元组
    msg,cli_addr = sk.recvfrom(1024)
    print(msg.decode("utf-8"))
    
    message = "你来啦,脑弟"
    # 2个参数 第一个是消息, 第二个是地址(ip,端口)
    sk.sendto(message.encode("utf-8"),cli_addr)
    
    # 关闭udp链接
    sk.close()
    

    client

    # upd 协议:
    import socket
    sk = socket.socket(type=socket.SOCK_DGRAM)
    message = "我来了大哥"
    
    # 发送数据
    sk.sendto(message.encode("utf-8"),("127.0.0.1",9000))
    # 接受数据
    msg,ser_addr = sk.recvfrom(1024)
    print(msg.decode("utf-8"))
    
    # 关闭udp链接
    sk.close()
    

    udp循环发送消息

    server

    # 循环发送消息udp
    import socket
    sk = socket.socket(type=socket.SOCK_DGRAM)
    # 绑定地址
    sk.bind( ("127.0.0.1",9000) )
    
    while True:
    	msg,cli_addr = sk.recvfrom(1024)
    	print(cli_addr)
    	print(msg.decode("utf-8"))
    
    	message = input(">>>")
    	sk.sendto(message.encode("utf-8"),cli_addr)
    
    # 关闭udp链接
    sk.close()
    

    client

    import socket
    sk = socket.socket(type=socket.SOCK_DGRAM)
    
    while True:
    	# 请输入你要写入的数据
    	message = input(">>>")
    	sk.sendto(message.encode(),("127.0.0.1",9000))
    
    	# 接受数据
    	msg,ser_addr = sk.recvfrom(1024)
    	print(msg.decode())
    
    # 关闭udp连接
    sk.close()
    

    黏包

    ### 黏包 
    """
    # tcp协议在发送数据时,会出现黏包现象.	
        (1)数据粘包是因为在客户端/服务器端都会有一个数据缓冲区,
        缓冲区用来临时保存数据,为了保证能够完整的接收到数据,因此缓冲区都会设置的比较大。
        (2)在收发数据频繁时,由于tcp传输消息的无边界,不清楚应该截取多少长度
        导致客户端/服务器端,都有可能把多条数据当成是一条数据进行截取,造成黏包
    """
    
    
    ### 黏包出现的两种情况 
    """
    #黏包现象一:
    	在发送端,由于两个数据短,发送的时间隔较短,所以在发送端形成黏包
    #黏包现象二:
    	在接收端,由于两个数据几乎同时被发送到对方的缓存中,所有在接收端形成了黏包
    #总结:
        发送包时间间隔短 或者 接受不及时,就会黏包
        核心是因为tcp对数据无边界截取,不会按照发送的顺序判断
    """
    
    
    ### 黏包对比:tcp和udp
    """
    #tcp协议:
    优点:接收时数据之间无边界,有可能粘合几条数据成一条数据,造成黏包 
    缺点:不限制数据包的大小,稳定传输不丢包
    
    #udp协议:
    优点:接收时候数据之间有边界,传输速度快,不黏包
    缺点:限制数据包的大小(受带宽路由器等因素影响),传输不稳定,可能丢包
    
    #tcp和udp对于数据包来说都可以进行拆包和解包,理论上来讲,无论多大都能分次发送
    但是tcp一旦发送失败,对方无响应(对方无回执),tcp可以选择再发,直到对应响应完毕为止
    而udp一旦发送失败,是不会询问对方是否有响应的,如果数据量过大,易丢包
    """
    
    
    ### 解决黏包问题
    """
    #解决黏包场景:
    	应用场景在实时通讯时,需要阅读此次发的消息是什么
    #不需要解决黏包场景:
    	下载或者上传文件的时候,最后要把包都结合在一起,黏包无所谓.
    """
    

    黏包现象

    server

    # 服务端
    import socket
    import time
    # 把当前主机注册到网络
    sk = socket.socket()
    sk.bind(("127.0.0.1",9001))
    sk.listen()
    
    conn,addr = sk.accept()
    conn.send("hello,".encode("utf-8"))
    # time.sleep(1)
    conn.send("world".encode("utf-8"))
    conn.recv(1024)
    conn.close()
    
    sk.close()
    

    client

    # 客户端
    import time
    import socket
    sk = socket.socket()
    sk.connect(("127.0.0.1",9001))
    time.sleep(0.1)
    print(sk.recv(10))
    print(sk.recv(10))
    sk.send(b'hahaha')
    sk.close()
    

    黏包的解放方法

    方法一

    server

    # 服务端
    import socket
    import time
    # 把当前主机注册到网络
    sk = socket.socket()
    
    #在bind方法之前加上这句话,可以让一个端口重复使用
    sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    
    sk.bind(("127.0.0.1",9001))
    sk.listen()
    
    conn,addr = sk.accept()
    
    
    conn.send("6".encode("utf-8"))
    conn.send("hello,".encode("utf-8"))
    # time.sleep(1)
    conn.send("world".encode("utf-8"))
    conn.recv(1024)
    conn.close()
    
    sk.close()
    

    client

    # 客户端
    import time
    import socket
    sk = socket.socket()
    sk.connect(("127.0.0.1",9001))
    time.sleep(0.1)
    
    # 是为了接受字符6  #sk.recv(1).decode("utf-8") 字符串
    n = int(sk.recv(1).decode("utf-8"))
    print(n,type(n))
    print(sk.recv(n))
    print(sk.recv(10))
    sk.send(b'hahaha')
    
    sk.close()
    

    方法二

    server

    # 服务端
    import socket
    import time
    # 把当前主机注册到网络
    sk = socket.socket()
    
    #在bind方法之前加上这句话,可以让一个端口重复使用
    sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    
    sk.bind(("127.0.0.1",9001))
    sk.listen()
    
    conn,addr = sk.accept()
    
    
    conn.send("00000120".encode("utf-8"))
    conn.send(("hello,"*20).encode("utf-8"))
    # time.sleep(1)
    conn.send("world".encode("utf-8"))
    conn.recv(1024)
    conn.close()
    
    sk.close()
    

    client

    # 客户端
    import time
    import socket
    sk = socket.socket()
    sk.connect(("127.0.0.1",9001))
    time.sleep(0.1)
    
    n = int(sk.recv(8).decode("utf-8"))
    
    print(n,type(n))
    print(sk.recv(n))
    print(sk.recv(10))
    sk.send(b'hahaha')
    
    sk.close()
    

    方法三

    import struct
    # pack 把任意长度的数字转化成具有固定长度的4个字节值
    # unpack 将4个字节的值恢复成原有数据,返回一个元组
    # i => int  我要转化的这个数据类型是整型
    res = struct.pack("i",120000000)
    print(res,len(res))
    res = struct.pack("i",1300000000)
    print(res,len(res))
    
    
    res = struct.unpack("i",res)
    print(res)
    

    server

    # 服务端
    import socket
    import time
    import struct
    # 把当前主机注册到网络
    sk = socket.socket()
    
    #在bind方法之前加上这句话,可以让一个端口重复使用
    sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
    
    sk.bind(("127.0.0.1",9001))
    sk.listen()
    
    conn,addr = sk.accept()
    
    inp = input(">>>msg:")
    msg = inp.encode("utf-8")
    """
    "abcd"
    "我是好人"
    """
    res = struct.pack("i",len(msg))
    # 发送信息长度 (字节流)
    conn.send(res)
    # 发送用户写的内容
    conn.send(msg)
    # time.sleep(1)
    conn.send("world".encode("utf-8"))
    conn.recv(1024)
    conn.close()
    
    sk.close()
    

    client

    # 客户端
    import time
    import socket
    import struct
    sk = socket.socket()
    sk.connect(("127.0.0.1",9001))
    time.sleep(0.1)
    
    # 是为了接受字符6  #sk.recv(1).decode("utf-8") 字符串
    # n = int(sk.recv(4).decode("utf-8"))
    n = sk.recv(4)
    # 把pack后的数据用unpack进行恢复
    n = struct.unpack("i",n)[0]
    print(n,type(n))
    print(sk.recv(n).decode("utf-8"))
    print(sk.recv(10))
    sk.send(b'hahaha')
    
    sk.close()
    

    python hashlib模块

    #hashlib 这个模块是一堆加密算法的集合体,哈希算法的加密方式不止一种
    https://www.cmd5.com/ md5解密
    # 应用场景:在需要效验功能时使用
        用户密码的 => 加密,解密
        相关效验的 => 加密,解密
    
    #哈希算法也叫摘要算法,相同的数据始终得到相同的输出,不同的数据得到不同的输出。
    #(1)哈希将不可变的任意长度的数据,变成具有固定长度的唯一值	
     (2)字典的键值对映射关系是通过哈希计算的,哈希存储的数据是散列(无序)
    
    import hashlib
    # md5
    
    # 创建一个md5算法的对象
    hs = hashlib.md5()
    # print(hs)
    # update 参数是二进制字节流
    hs.update("123".encode())
    # hexdigest 返回32位十六进制的字符串(固定长度)
    res = hs.hexdigest()
    print(res) #a6e612825ab0ec9b523f24182ea8f0d2
    
    # 加盐 key (一个神秘的key 钥匙) :增加密码的复杂度
    hs = hashlib.md5("opesnhelloworld".encode())
    hs.update("123".encode())
    res = hs.hexdigest()
    print(res)
    
    # 动态加盐 : 使用的加密关键字是用户名的切片
    username = input("请输入用户名:")
    hs = hashlib.md5(username[::2].encode())
    hs.update("123456".encode())
    res = hs.hexdigest()
    print(res)
    
    """
    # md5 加密效率快,通用 ,安全性差  # 32
    # sha 加密效率低,算法精密,安全性高
    """
    print("<=====sha系列====>")
    import hashlib
    hs = hashlib.sha1()
    hs.update("opesn123463922".encode("utf-8"))
    print(hs.hexdigest())
    print(len("9a6747fc6259aa374ab4e1bb03074b6ec672cf99")) # 40
    
    # sha1 加盐
    hs = hashlib.sha1("UIsdf234".encode())
    hs.update("123456".encode())
    print(hs.hexdigest())
    
    
    hs = hashlib.sha512()
    hs.update("1234567".encode())
    print(hs.hexdigest())
    #128 3c9909afec25354d551dae21590bb26e38d53f2173b8d3dc3eee4c047e7ab1c1eb8b85103e3be7ba613b31bb5c9c36214dc9f14a42fd7a2fdb84856bca5c44c2
    
    # sha512 加盐
    hs = hashlib.sha512("FZbaoyou".encode())
    hs.update("1234".encode())
    print(hs.hexdigest())
    
    """
    #hmac 和 hash算法非常类似,hmac输出的长度和
    原始哈希算法的长度一致的,hmac不容易破解
    """
    print("<======hmac=======>")
    import hmac
    import os
    
    key = b"opesn"
    # msg = b"123"
    
    # 随机返回长度为32位的二进制字节流
    msg = os.urandom(32)
    print(msg)
    # key msg 必须是二进制字节流
    hm = hmac.new(key,msg)
    print(hm.hexdigest())
    
    # 校验两个文件的内容是否一样?
    

    python 进程

    ### 进程的概念:(Process)
    
    """
    进程就是正在运行的程序,它是操作系统中,资源分配的最小单位.
    资源分配:(需要cpu来处理,需要内存来临时存储运行数据,都要浪费cpu和内存资源)
    进程号是进程的唯一标识
    
    同一个程序执行两次之后是两个进程
    进程和进程之间的关系: 数据彼此隔离,通过socket通信
    """
    
    ### 进程调度算法
    
    """
    # 先来先服务fcfs(first come first server):先来的先执行
    # 短作业优先算法:分配的cpu多,先把短的算完
    # 时间片轮转算法:每一个任务就执行一个时间片的时间.然后就执行其他的.
    # 多级反馈队列算法
    
    越是时间长的,cpu分配的资源越少,优先级靠后
    越是时间短的,cpu分配的资源越多
    """
    
    import os,time
    # linux # ps -aux | grep 49155 查看具体的某一个进程
    # 查看所有进程 ps -aux
    # 获取子进程或者父进程的id号
    # (1)获取主进程的id getpid process_id
    res = os.getpid()
    print(res)
    
    # (2)getppid 获取父进程id  parent_process_id
    res = os.getppid()
    print(res)
    
    # 启动一个进程
    import os 
    os.fork()
    print(os.getpid())
    
    
    from multiprocessing import Process
    # (1) 进程基本语法
    def func():
    	print("222子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
    
    if __name__ == "__main__":
    	print("111子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
    	# 创建子进程(对象)
    	# 起一个子进程,p是一个进程对象,目的单独执行func这个函数
    	p = Process(target=func)
    	# 调用该子进程
    	p.start()
    
    # (2) 子进程函数,可以传入参数
    # 不带参数
    
    def func():
    	for i in range(1,6):
    		time.sleep(0.5)
    		print("111子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
    
    if __name__ == "__main__":
    	print("111子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
    	n = 5
    	p = Process(target=func)
    	p.start()
    	
    	for i in range(1,n+1):
    		time.sleep(0.3)
    		print("*"*i)
    
    
    # 带参数 谁快谁慢取决于cpu的调度策略
    # 子进程执行func
    def func(n):
    	for i in range(1,n+1):
    		time.sleep(0.5)
    		print("111子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
    
    if __name__ == "__main__":
    	print("111子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
    	n = 3
    	# 如果想要传入参数 用args 它必须是一个元组
    	p = Process(target=func,args=(n,))
    	p.start()
    	
    	# 下面代码是主进程执行的
    	for i in range(1,n+1):
    		time.sleep(0.3)
    		print("*"*i)
    
    
    # (3) 进程之间的数据彼此是隔离的
    """父进程和子进程他们的启动是异步的,操作系统负责启动各种进程的"""
    count = 100
    def func():
    	global count
    	count -= 1
    	print("子进程",count)
    
    if __name__ == "__main__":
    	p = Process(target=func)
    	p.start()
    	
    	time.sleep(1)
    	print("主进程",count)
    
    
    
    # (4) 多个进程并发执行
    '''存在并发就存在顺序的不确定性,因为有执行快慢问题,跟操作系统cpu调度策略有关'''
    def func(args):
    	print("子进程%d" % (args),os.getpid(),os.getppid())
    
    if __name__ == "__main__":
    	for i in range(10):
    		Process(target=func,args=(i,)).start()
    
            
    # (5) 子进程和父进程之间的关系
    """父进程执行完毕之后默认等待子进程结束。这是为了避免出现僵尸程序,方便进程上的管理"""
    # 一般来说,父进程结束,子进程应该结束。
    def func(args):
    	print("子进程%s:" % (args),os.getpid(),os.getppid())
    	time.sleep(1)
    	print("子进程end")
    	
    if __name__ == "__main__":
    	for i in range(10):
    		Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    		# Process(target=func,args=(i,)).start()
    	print("父进程*************")
    

    join

    import time
    import os
    from multiprocessing import Process
    
    # ### join 等待所有子进程执行完毕之后,主进程在向下执行
    # (1) 1个子进程通过join加阻塞,进行同步控制
    def func(index):
    	print("第一封邮件已经发送")
    	
    if __name__ == "__main__":
    	p = Process(target=func,args=(1,))
    	p.start()
    	p.join()
    	print("10封邮件已经发送")
    
    
    # (2) 多个子进程通过join加阻塞,进行同步控制
    def func(index):
    	time.sleep(0.3)
    	print("第%s封邮件已经发送" % (index))
    
    if __name__ == "__main__":
    	lst = []
    	for i in range(10):
    		p = Process(target=func,args=(i,))
    		p.start()
    		lst.append(p)
    	# 列表当中的每一个进程对象都执行p.join 直到所有进程执行完毕,最后执行主进程内容
    	for p in lst:
    		p.join()
    	print("第10封邮件发送完毕")
    
    
    # ###使用第二种方式创建进程
    # (1) 基本使用的语法 
    class MyProcess(Process):
    	# 必须使用run方法名
    	def run(self):
    		print("子进程: 参数:",os.getpid(),os.getppid())
    
    if __name__ == "__main__":
    	p = MyProcess()
    	p.start()
    	print("主进程:",os.getpid())
    
    		
    # (2) 给子进程传递参数
    class MyProcess(Process):
    
    	def __init__(self,arg):
    		# 调用一下父类的构造方法
    		super().__init__()
    		self.arg = arg
    
    	# 必须使用run方法名
    	def run(self):
    		print("子进程: 参数:",os.getpid(),os.getppid(),self.arg)
    
    if __name__ == "__main__":
    	# p = MyProcess("123")
    	# p.start()
    	# print("主进程:",os.getpid())
    	lst = []
    	for i in range(10):
    		p = MyProcess("参数%s" % i) # 子进程并发,是异步程序
    		p.start()
    		lst.append(p)
    		# p.join()
    	
    	for i in lst:
    		i.join() # join是阻塞为了让主进程同步
    	
    	print("主进程",os.getpid())
    

    守护进程

    #可以给子进程贴上守护进程的名字,该进程会随着主进程代码执行完毕而结束(为主进程守护)
    (1)守护进程会在主进程代码执行结束后就终止
    (2)守护进程内无法再开启子进程,否则抛出异常(了解)
    
    import time
    from multiprocessing import Process
    # (1) 守护进程随着主进程代码执行完毕而结束
    """
    正常情况下,主进程默认等待子进程全部结束而结束
    守护进程必须写在start之前进行设置
    """
    def func():
    	print("子进程start")
    	time.sleep(1)
    	print("子进程end")
    	
    if __name__ == "__main__":
    	p = Process(target=func)
    	p.daemon = True
    	p.start()
    	time.sleep(1.5)
    	print("主进程")
    
        
    # (2) 多个子进程的情况下
    '''
    当多个子进程并发执行时,默认主进程等待子进程
    守护进程 守护的是 主进程 , 主要主进程里面的代码执行完毕
    对应是守护进程的那个子进程立即终止,其他非守护进程继续
    守护进程是主进程的代码执行到最后一行了,即为结束。
    '''
    def func1():
    	count = 1
    	while True:
    		print("*" * count)
    		time.sleep(0.5)
    		count += 1
    
    def func2():
    	print("func2 start")
    	time.sleep(5)
    	print("func2 end")
    
    
    if __name__ == "__main__":
    	p1 = Process(target=func1)
    	p1.daemon = True
    	p1.start()
    	Process(target=func2).start()
    	# time.sleep(3)	
    	print("主进程")
        
    
    # (3) 守护进程的实际用途:报活
    '''
    如果最大的监控系统挂掉,就让其他服务器停掉
    就好比厂子到了,员工回家放假
    '''
    # 防止信息泄露,最大的监控系统挂掉,子进程停止发送相关数据.
    def alive():
    	while True:
    		print("1号主机发送日志消息, i am ok")
    		time.sleep(0.6)
    		
    def func():
    	# while True:
    	print("统计各个主机的系统日志,维护数据库管理")
    	time.sleep(1)
    
    if __name__ == "__main__":
    	p = Process(target = alive)
    	p.daemon = True
    	p.start()
    	p = Process(target=func)
    	p.start()
    	# 加上join之后,等待所有子进程执行完毕后,在运行主进程后面的代码,(除了守护进程)
    	p.join()
    	print("...")
    

    lock.acquire()# 上锁
    lock.release()# 解锁
    
    #同一时间允许一个进程上一把锁 就是Lock
    	加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
    #同一时间允许多个进程上多把锁 就是[信号量Semaphore]
    	信号量是锁的变形: 实际实现是 计数器 + 锁,同时允许多个进程上锁	
    
    # 互斥锁Lock : 互斥锁就是进程的互相排斥,谁先抢到资源,谁就上锁改资源内容,为了保证数据的同步性
    # 注意:多个锁一起上,不开锁,会造成死锁.上锁和解锁是一对.
    
    # ### 锁
    # (1) 上锁与解锁
    '''
    上锁时其他进程不能更改同一份资源,解锁之后其他进行按照顺序同步上锁
    #同一时间允许一个进程上一把锁 就是Lock
    加锁可以保证多个进程修改同一块数据时,
    同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,
    但牺牲了速度却保证了数据安全。
    '''
    # 模拟抢票 
    ''''''
    import json
    import time
    from multiprocessing import Process,Lock
    
    def search(person):
    	with open("ticket",mode="r") as fp:
    		dic = json.load(fp)
    		# print(dic,type(dic))
    		print("%s 查询余票:%s" % (person,dic["count"]))
    
    def get_ticket(person):
    	with open("ticket",mode="r") as fp:
    		dic = json.load(fp)
    	time.sleep(0.5)
    	if dic["count"] > 0 :
    		print("%s买到了" % (person))
    		dic["count"] -= 1
    		# 重新更新数据库
    		with open("ticket","w") as fp:
    			json.dump(dic,fp)
    	else:
    		print("%s没买到这个票" % (person))
    
    # 作为统一的并发调用
    def ticket(person,lock):
    	#查询票数
    	search(person)
    	#上锁   到上锁的时候是同步程序
    	lock.acquire()# 上锁
    	get_ticket(person)
    	lock.release()# 解锁
    
    if __name__ == "__main__":
    	lock = Lock()
    	for i in range(10):
    		# 这里是异步并发
    		p = Process(target = ticket ,args = ("person%s" % i,lock))
    		p.start()
    
    
    # (2) 区分同步和异步
    import json
    import time
    from multiprocessing import Process,Lock
    def func(num,lock):
    	time.sleep(1)
    	print("异步程序",num)
    	lock.acquire()
    	time.sleep(0.5)
    	print("同步执行的程序",num)
    	lock.release()
    	
    if __name__ == "__main__":
    	lock = Lock()
    	for i in range(10):	
    		p = Process(target=func,args =(i,lock))
    		p.start()
    		
    """
    互斥锁:互斥锁就是进程的互相排斥,谁先抢到这个资源,
    谁就上锁改变资源的内容,为了保证数据的同步性
    注意:多个锁一起上,不开锁,会造成死锁,上锁和解锁是一对
    lock = Lock()
    lock.acquire()
    print(456)
    lock.release()
    
    # 上了两把锁,没遇到解锁,就卡在这形成死锁.
    lock.acquire()
    lock.acquire()
    lock.release()
    print(123)
    '''	
    

    信号量 Semaphore

    # ### Semaphore 信号量 其实就是锁  
    # 模拟ktv唱歌 假设一共10个人,同一时间允许4个人唱歌
    from multiprocessing import Process,Semaphore
    import random,time
    
    def ktv(person,sem):
    	sem.acquire()
    	print("%s进入了ktv" % (person))
    	time.sleep(random.randrange(6,8))
    	print("%s走出了ktv" % (person))
    	sem.release()
    if __name__ == "__main__":
    	# 最多允许4个进程同时上锁
    	sem = Semaphore(4)
    	for i in range(10):
    		p = Process(target=ktv,args=("person%s" % (i),sem))
    		p.start()
    

    事件

    # ### 事件(Event)
    # 阻塞事件 :
    	e = Event()生成事件对象e   
    	e.wait()动态给程序加阻塞 , 程序当中是否加阻塞完全取决于该对象中的is_set() [默认返回值是False]
        # 如果是True  就不加阻塞
        # 如果是False 那就加阻塞
    
    # 控制这个属性的值
        # set()方法     将这个属性的值改成True
        # clear()方法   将这个属性的值改成False
        # is_set()方法  判断当前的属性是否为True  (默认上来是False)
    
    from multiprocessing import Process,Event
    import time,random
    # (1) 基本用法
    e = Event()
    print(e.is_set())  # 默认False
    
    # 将这个属性的值改成True
    e.set()
    # 将这个属性的值改成False
    e.clear()
    
    print(1)
    e.wait()
    print(2)
    
    
    # 模拟红绿灯操作  假设有20辆小车全部通过
    def traffic_light(e):
    	# traffic_light 只做一件事就是红灯和绿灯的变色效果
    	print("红灯亮")
    	while True:
    		# 默认is_set 获取到的值是False
    		if e.is_set():
    			#红灯区间
    			time.sleep(1)
    			print("红灯亮")
    			e.clear() # False
    		else:
    			#绿灯区间
    			time.sleep(1)
    			print("绿灯亮")
    			e.set()   # True
    
    def car(e,i):
    	#e.is_set() 为False时成立
    	if not e.is_set():
    		print("car %s 在等待" % (i))
    		e.wait()
    	print("car %s 通过了" % i)
    
    if __name__ == "__main__":
    	e = Event()
    	p = Process(target=traffic_light,args = (e,))
    	p.start()
    	
    	for i in range(20):
    		time.sleep(random.randrange(0,2))
    		p = Process(target=car,args = (e,i))
    		p.start()
    
    
    # 优化:  假设有20辆小车全部通过后,红绿灯终止;
    def traffic_light(e):
    	# traffic_light 只做一件事就是红灯和绿灯的变色效果
    	print("红灯亮")
    	while True:
    		# 默认is_set 获取到的值是False
    		if e.is_set():
    			#红灯区间
    			time.sleep(1)
    			print("红灯亮")
    			e.clear() # False
    		else:
    			#绿灯区间
    			time.sleep(1)
    			print("绿灯亮")
    			e.set()   # True
    
    def car(e,i):
    	#e.is_set() 为False时成立
    	if not e.is_set():
    		print("car %s 在等待" % (i))
    		e.wait()
    	print("car %s 通过了" % i)
    
    if __name__ == "__main__":
    	e = Event()
    	lst = []
    	p = Process(target=traffic_light,args = (e,))
    	p.daemon = True
    	p.start()
    	
    	for i in range(20):
    		time.sleep(random.randrange(0,2))
    		p = Process(target=car,args = (e,i))
    		p.start()
    		lst.append(p)
    	for p in lst:
    		p.join()
    		
    	print("程序彻底跑完~")
    

    生产者与消费者

    # IPC Inter-Process Communication
    # 实现进程之间通信的两种机制:
        # 管道 Pipe
        # 队列 Queue
        
    # put() 存放
    # get() 获取
    # get_nowait() 拿不到报异常
    # put_nowait() 非阻塞版本的put
    q.empty()      检测是否为空  (了解)
    q.full() 	   检测是否已经存满 (了解)
    

    队列 Queue

    # ### 队列 Queue
    import queue
    from multiprocessing import Process,Queue
    '''
    队列特征:先进先出
    '''
    
    # (1) 基础语法
    q2 = Queue()
    # 把数据存放在队列当中
    q2.put(1)
    q2.put(2)
    
    # 获取队列当中的数据
    print(q2.get())
    print(q2.get())
    # 队列当中如果已经没有数据了,那会执行阻塞.
    print(q2.get())
    
    print(q.get_nowait())
    print(q.get_nowait())
    
    # 捕捉异常类错误
    # get_nowait linux 不兼容;只有windows好用
    try:
    	print(q.get_nowait())
    except queue.Empty:
    	print("empty")
    except:
    	print(123)
    
    # (2) 可以为queue 指定队列的长度
    q1 = Queue(3) # 最多放3个 ,超过最大长度,就执行阻塞
    q1.put(1)
    q1.put(2)
    q1.put(3)
    # q1.put(4)
    # q1.put_nowait(4) # 用put_nowait添加值超过队列长度直接报错.
    
    res = q1.full()
    print(res)
    q3 = Queue()
    res = q3.empty()
    print(res)
    
    # (3)  进程之间的通讯 
    def func(q):
    	print("子进程:",q.get())
    	q.put("abc")
    if __name__ == "__main__":
    	q4 = Queue()
    	p = Process(target=func,args=(q4,))
    	p.start()
    	q4.put("111222333")
    	p.join()
    	print("主进程:",q4.get())
    

    消费者模型

    '''
    # 爬虫例子:
    一个进程1号负责获取所有网页当中的内容
    一个进程2号负责提取网页当中的连接
    
    那么这个1号进程就可以看成一个生产者
    这个2号进程可以看成一个消费者
    
    有时可能生产者比消费者快,反之亦然
    所以在生产者和消费者之间加一个缓冲区(队列)
    
    生产者和消费者模型从程序上来讲就是数据的存入和获取的过程
    最为理想的生产者和消费者模型 , 两者之间的运行速度应该是同步的
    '''
    from multiprocessing import Process,Queue
    import time
    import random
    # 消费者方法 [负责取值]
    def consumer(q,name):
    	while True:
    		food = q.get()
    		if food is None:
    			break
    		time.sleep(random.uniform(0.5,1))
    		print("%s吃了一个%s" % (name,food))
    
    # 生产者方法 [负责存值]
    def producer(q,name,food):
    	for i in range(5):
    		time.sleep(random.uniform(0.3,0.8))
    		print("%s生产了%s" % (name,food))
    		q.put(food+str(i))
    
    if __name__ == "__main__":
        q = Queue()
        c1 = Process(target=consumer,args=(q,"张三"))
        c2 = Process(target=consumer,args=(q,"李四"))
        # c1.daemon = True
        c1.start()
        c2.start()
        p1 = Process(target=producer,args=(q,"王五","地瓜"))
        p2 = Process(target=producer,args=(q,"tony","黄瓜"))
        p1.start()
        p2.start()
        
        p1.join()
        p2.join()
        q.put(None)
        q.put(None)
    

    JoinableQueue

    # ### JoinableQueue
    '''
    put  存入
    get  获取
    task_done  通过队列其中一个数据被处理
    JoinableQueue 默认会计数,执行一次taskdone减少一次队列数
    JoinableQueue 里面有5个值,taskdone减少一个,减到0 , 队列里面的值为空
    q.join() 队列.join
    添加阻塞,直到放入队列的所有值都被处理掉的时候,撤掉阻塞  
    '''
    from multiprocessing import Process,JoinableQueue
    import random
    import time
    
    def consumer(q,name):
    	while True:
    		food = q.get()
    		time.sleep(random.uniform(0.5,1))
    		print("%s接受了 %s" % (name,food))
    		q.task_done()
    
    def producer(q,name,food):
    	for i in range(5):
    		time.sleep(random.uniform(0.3,0.8))
    		print("%s生产了 %s" % (name,food+str(i)))
    		q.put(food+str(i))
    
    if __name__ == "__main__":
    	jq = JoinableQueue()
    	c1 = Process(target=consumer,args=(jq,"张三"))
    	c1.daemon = True
    	c1.start()
    	
    	p1 = Process(target=producer,args= (jq,"李四","葡萄"))
    	p1.start()
    	
    	p1.join() # 生产者需要把所有数据放到队列当中
    	# p1.join()
    	#添加阻塞,直到放入队列的所有值都被处理掉的时候,撤掉阻塞
    	# 通过task_done 表达处理掉的意思
    	# join 和 task_done 需要配合使用;
    	jq.join()
    

    进程池

    # 进程池:
    # 开启过多的进程并不一定提高你的效率,
    # 如果cpu负载任务过多,平均单个任务执行的效率就会低,反而降低执行速度.
        1个人做4件事和4个人做4件事
        显然后者执行速度更快,
        前者是并发,后者是并行
        利用进程池,可以开启cpu的并行效果
    # apply       开启进程,同步阻塞,每次都要等待当前任务完成之后,在开启下一个进程
    # apply_async 开启进程,异步非阻塞,(主进程 和 子进程异步)
    
    
    # ### Manager dict list 能够实现进程之间的数据共享
    from multiprocessing import Process , Manager ,Lock
    
    def work(d,lock):
    	# 自动上锁和解锁
    	with lock:
    		d["count"] -= 1
    	"""
    	lock.acquire()
    	d["count"] -= 1
    	lock.release()
    	"""
    if __name__ == "__main__":
    	lock = Lock()
    	m = Manager()
    	dic = m.dict({"count":100})
    	lst = []
    	for i in range(100):
    		p  =Process(target=work,args = (dic,lock))
    		lst.append(p)
    		p.start()	
    	# 等待每一个进程执行完毕
    	for p in lst:
    		p.join()	
    	print(dic)
        
        
        
    # ### 进程池
    import os,time
    from multiprocessing import Pool,Process
    # (1) 比较pool 和Process 执行速度
    '''因为进程池可以实现并行的概念,比Process单核并发的速度要快'''
    def func(num):
    	print("发了第%s封邮件" % (num))
    	# time.sleep(1)
    	# print("end")
    	
    if __name__ == "__main__":
    	# print(os.cpu_count()) # 获取cpu核心数量
    	start = time.time()
    	p  = Pool(6)
    	for i in range(100):
    		# apply_async 用来开启子进程
    		p.apply_async(func,args=(i,))
    
    	# 关闭进程池,用户不能在向这个池中创建子进程
    	p.close() 
    	# 这里加阻塞,直到进程池中所有的子进程执行完毕
    	p.join()
    	end = time.time()
    	print(end-start)
    	
    	
    	# 单核Process处理100件任务
    	start = time.time()
    	lst = []
    	for i in range(100):
    		p = Process(target=func,args= ( i ,))
    		p.start()
    		lst.append(p)
    	for p in lst:
    		p.join()
    	end = time.time()
    	print(end-start)
    
    
    # (2) apply 开启进程,同步阻塞,每次都要等待当前任务完成之后,在开启下一个进程
    
    def task(num):
    	time.sleep(1)
    	print("%s: %s" % (num,os.getpid()))
    	return num ** 2
    	
    if __name__ == "__main__":
    	p = Pool(2)
    	for i in range(20):
    		# apply是同步阻塞,每个进程必须执行完,才能在开启进程.
    		res = p.apply(task,args=(i,))
    		print("-->",res)
    
    
    # (3) apply_async 异步非阻塞
    def task(num):
    	time.sleep(3)
    	print("%s: %s" % (num,os.getpid()))
    	return num ** 2
    
    if __name__ == "__main__":
    	p = Pool()
    	res_lst = []
    	for  i in range(20):
    		res = p.apply_async(task,args=(i,))
    		print(res)
    		res_lst.append(res)
    		# 返回的对象需要使用get方法获取到返回值
    	# print(len(res_lst))
    	for res in res_lst:
    		print(res.get())
    	
    	# close 和 join 一写写2个保持同步.
    	# p.close()
    	# p.join()
    
    # (4) 进程池.map (与高阶函数map使用方法一样,只不过这个是并发版本)
    def task(num):
    	time.sleep(1)
    	print("%s:%s"%(num,os.getpid()))
    	return num ** 2
    	
    if __name__ == "__main__":
    	p = Pool()
    	res = p.map(task,range(20)) #自动close和join
    	print(res)
    

    进程池并发socket

    server

    import socket
    from multiprocessing import Pool
    
    def talk(conn):
    	while True:
    		conn.send(b"hello")
    		print(conn.recv(1024))
    	conn.close()
    
    if __name__ =="__main__":
    	sk = socket.socket()
    	sk.bind( ("127.0.0.1",9000) )
    	sk.listen()
    	# Pool默认获取cpu_counter cpu最大核心数
    	p = Pool()
    	while True:
    		conn,addr = sk.accept()
    		p.apply_async(talk,args=(conn,))
    	sk.close()
    

    client

    import socket
    sk = socket.socket()
    sk.connect( ("127.0.0.1",9000) )
    
    while True:
    	print(sk.recv(1024))
    	sk.send(b"byebye")
    

    python 线程

    ### 线程
    
    """
    #进程是资源分配的最小单位
    #线程是计算机中调度的最小单位
    
    #线程的缘起
    资源分配需要分配内存空间,分配cpu:
    分配的内存空间存放着临时要处理的数据等,比如要执行的代码,数据
    而这些内存空间是有限的,不能无限分配
    目前来讲,普通机器,5万个并发程序已是上限.线程概念应用而生.
    
    #线程的特点
    线程是轻量级,干更多的活,同一个进程中的所有线程的资源是共享的.
    每一个进程中都有至少一条线程在工作
    """
    
    ### 线程的缺陷
    """
    #线程可以并发,但是不能并行(即可以1个cpu执行,不能多个cpu一起执行)
    #原因:
       python是解释型语言,执行一句编译一句,而不是一次性全部编译成功,不能提前规划,都是临时调度
       容易造成不同的cpu却反复执行同一个程序.所以加了一把锁叫GIL
       全局解释器锁(Cpython解释器特有) GIL锁:同一时间一个线程只能被一个cpu执行	
    	
    #想要并行的解决办法:
        (1)用多进程
        (2)换一个Pypy解释器
        
    #计算型程序会过度依赖cpu,但网页,爬虫,OA办公,这种交互型带有阻塞的程序里,速度影响无所谓
    """
    
    ### 线程相关函数
    """
    线程.is_alive()    检测线程是否仍然存在
    线程.setName()     设置线程名字
    线程.getName()     获取线程名字
    currentThread().ident 查看线程id号 
    enumerate()        返回目前正在运行的线程列表
    activeCount()      返回目前正在运行的线程数量
    """
    
    import os,time
    from threading import Thread
    from multiprocessing import Process
    # (1) 一个进程可以有多个线程
    def func(i):	
    	print("子线程:", i,os.getpid())
    for i in range(10):
    	t = Thread(target=func,args=(i,))
    	t.start()
    
    
    
    
    #(2) 并发多线程和多进程的速度对比 多线程更快
    def func(i):	
    	print("子线程:", i,os.getpid())
    	
    if __name__ == "__main__":
    	start = time.time()
    	lst = []
    	for i in range(100):
    		t = Thread(target=func,args=(i,))
    		t.start()
    		lst.append(t)
    	for t in lst:
    		t.join()
    	end = time.time() 
    	t1 = end-start
    
    		
    	start = time.time()
    	lst = []
    	for i in range(100):
    		p = Process(target=func,args=(i,))
    		p.start()
    		lst.append(p)
    	for p in lst:
    		p.join()
    	end = time.time()
    	t2 = end - start
    	print(t1,t2)
    
    
        
    # (3) 多个线程共享同一份进程资源
    num = 100
    lst = []
    def func():
    	global num 
    	num -= 1
    	
    for i in range(100):
    	t = Thread(target=func)
    	t.start()
    	lst.append(t)
    
    for t in lst:
    	t.join()
    print(num)
    
    
    
    # (4) 线程相关函数
    '''
    线程.is_alive()    检测线程是否仍然存在
    线程.setName()     设置线程名字
    线程.getName()     获取线程名字
    currentThread().ident 查看线程id号 
    enumerate()        返回目前正在运行的线程列表
    activeCount()      返回目前正在运行的线程数量
    '''
    def func():
    	time.sleep(0.01)
    t = Thread(target=func)
    t.start()
    print(t.is_alive())
    print(t.getName()) #Thread-1
    t.setName("t1")
    print(t.getName())
    time.sleep(3)
    print(t.is_alive()) # False
    
    
    # 1.currentThread().ident 查看线程id号 
    from threading import currentThread
    def func():
    	print("子线程:",currentThread().ident)
    
    print("主线程:",currentThread().ident)
    t = Thread(target=func)
    t.start()
    
    
    
    # enumerate()        返回目前正在运行的线程列表
    from threading import enumerate,currentThread
    def func(i):
    	print("子线程:",currentThread().ident)
    	time.sleep(3)
    	
    for i in range(10):
    	t = Thread(target=func,args=(i,))
    	t.start()
    print(enumerate())
    print(len(enumerate())) # 加上主线程一共11个
    """
    [
    <_MainThread(MainThread, started 74964)>,
     <Thread(Thread-11, started 78972)>
     ]
    """
        
     
     
    # activeCount()      返回目前正在运行的线程数量
    from threading import activeCount,currentThread
    def func(i):
    	print("子线程:",currentThread().ident)
    	time.sleep(2)
    	
    for i in range(10):
    	t = Thread(target=func,args=(i,))
    	t.start()
    time.sleep(3)
    print(activeCount()) # 加上主线程一共11个
    

    守护线程

    # ### 守护线程:等待其他所有子线程结束之后,自动结束
    from threading import Thread
    import time
    def func1():
    	while True:
    		time.sleep(0.5)
    		print(111)
    		
    		
    def func2():
    	print("func2 start->")
    	time.sleep(3)
    	print("func2 end->")
    
    t1 = Thread(target=func1)
    t2 = Thread(target=func2)
    # setDaemon 将普通线程变成守护线程
    t1.setDaemon(True)
    t1.start()
    t2.start()
    t2.join()
    time.sleep(5)
    print("主线程代码结束")
    

    线程数据安全

    from threading import Thread,Lock
    n = 0
    # ###线程之间如果共同更改同一个数据,需要加锁
    def func1(lock):
    	global n
    	for i in range(100000):
    		lock.acquire()
    		n-=1
    		lock.release()
    
    def func2(lock):
    	global n
    	for i in range(100000):
    		lock.acquire()
    		n+=1
    		lock.release()
    
    if __name__ == "__main__":
    	lock = Lock()
    	lst = []
    	for i in range(10):
    		t1 = Thread(target=func1,args=(lock,))
    		t2 = Thread(target=func2,args=(lock,))
    		t1.start()
    		t2.start()
    		lst.append(t1)
    		lst.append(t2)
    	for t in  lst:
    		t.join()
    
    	print(n)
    

    死锁,递归锁,互斥锁

    """
    加一把锁,就对应解一把锁.形成互斥锁.
    从语法上来说,锁可以互相嵌套,但不要使用,
    不要因为逻辑问题让上锁分成两次.导致死锁
    递归锁用于解决死锁,但只是一种应急的处理办法
    """
    
    # ### 死锁,递归锁,互斥锁
    from threading import Thread,Lock
    import time
    
    noodle_lock = Lock()
    kuaizi_lock = Lock()
    # (1)常见情况引发的死锁问题
    def eat1(name):
    	noodle_lock.acquire()
    	print("%s拿到这个面条" % (name))
    	kuaizi_lock.acquire()
    	print("%s拿到了筷子" % (name))
    	print("开始吃")
    	time.sleep(0.5)
    	kuaizi_lock.release()
    	print("%s把筷子放下"% (name))
    	noodle_lock.release()
    	print("%s把面放下"% (name))
    def eat2(name):
    	kuaizi_lock.acquire()
    	print("%s拿到了筷子" % (name))
    	noodle_lock.acquire()
    	print("%s拿到这个面条" % (name))
    	print("开始吃")
    	time.sleep(0.5)
    	noodle_lock.release()
    	print("%s把面放下"% (name))
    	kuaizi_lock.release()
    	print("%s把筷子放下"% (name))
    
    if __name__ == "__main__":
    	name_list = ["张楠","定海呀"]
    	name_list2 = ["晨露中","周金波"]
    	for name in name_list:
    		Thread(target=eat1,args= (name,) ).start()
    		# Thread(target=eat1,args= (name,) ).start()
    	
    	for name in name_list2:
    		Thread(target=eat2,args = (name,)).start()
    		# Thread(target=eat2,args = (name,)).start()
    
    
    
    # (2) 递归锁
    from threading import Thread,RLock
    '''
    	递归所专门用来解决死锁现象的
    	用于临时快速解决服务器崩溃现象,加上递归所应急
    '''
    rlock = RLock()
    def func(name):
    	rlock.acquire()
    	print(name,1)
    	rlock.acquire()
    	print(name,2)
    	rlock.acquire()
    	print(name,3)
    	rlock.release()
    	rlock.release()
    	rlock.release()
    	
    for i in range(10):
    	Thread(target=func,args=("name%s" % (i),) ).start()
    
    import time
    
    print("<==>")
    
    noodle_lock = kuaizi_lock = RLock()
    def eat1(name):
    	noodle_lock.acquire()
    	print("%s拿到这个面条" % (name))
    	kuaizi_lock.acquire()
    	print("%s拿到了筷子" % (name))
    	print("开始吃")
    	time.sleep(0.5)
    	kuaizi_lock.release()
    	print("%s把筷子放下"% (name))
    	noodle_lock.release()
    	print("%s把面放下"% (name))
    def eat2(name):
    	kuaizi_lock.acquire()
    	print("%s拿到了筷子" % (name))
    	noodle_lock.acquire()
    	print("%s拿到这个面条" % (name))
    	print("开始吃")
    	time.sleep(0.5)
    	noodle_lock.release()
    	print("%s把面放下"% (name))
    	kuaizi_lock.release()
    	print("%s把筷子放下"% (name))
    
    if __name__ == "__main__":
    	name_list = ["张楠","定海呀"]
    	name_list2 = ["晨露中","周金波"]
    	for name in name_list:
    		Thread(target=eat1,args= (name,) ).start()	
    	for name in name_list2:
    		Thread(target=eat2,args = (name,)).start()
    
    
    # (3) 互斥锁:
    '''
    	从语法上说,锁是允许互相嵌套的,但是不要使用
    	上一次锁,就对应解开一次,形成互斥锁
    	吃面和拿筷子是同时的
    	不要因为逻辑问题让锁分别上,容易造成死锁.	
    '''
    from threading import Thread,Lock
    mylock = Lock()
    def eat1(name):
    	mylock.acquire()
    	print("%s拿到筷子" % (name))
    	print("%s吃到面条" % (name))
    	time.sleep(0.5)
    	
    	print("%s放下筷子" % (name))
    	print("%s放下面条" % (name))
    	mylock.release()
    def eat2(name):
    	mylock.acquire()
    	print("%s拿到筷子" % (name))
    	print("%s吃到面条" % (name))
    	time.sleep(0.5)	
    	print("%s放下筷子" % (name))
    	print("%s放下面条" % (name))
    	mylock.release()
    if __name__ == "__main__":
    	name_list = ["张三","李四"]
    	name_list2 = ["tony","tom"]
    	for name in name_list:
    		Thread(target=eat1,args= (name,) ).start()	
    	for name in name_list2:
    		Thread(target=eat2,args = (name,)).start()
    

    线程中的信号量

    from threading import Semaphore,Thread
    import time
    
    def func(index,sem):
    	sem.acquire()
    	print(index)
    	time.sleep(1)
    	sem.release()
    
    
    if __name__ == "__main__":
    	sem = Semaphore(5)
    	for i in range(16):
    		Thread(target=func,args=(i,sem)).start()
    		
    '''
    进程而言:
    信号量是配合Process使用,可以限制并发的进程数
    在有了进程池之后,可以取代如上做法
    
    线程而言
    信号量是配合Thread使用的,可以限制并发的线程数
    线程池可以取代如上用法
    '''		
    

    事件

    # ### 模拟连接数据库
    from threading import Event,Thread
    import time,random
    '''
    # wait()   动态给程序加阻塞
    # set()    将内部属性改成True
    # clear()  将内部属性改成False
    # is_set() 判断当前属性(默认是False)
    '''
    '''
    e = Event()
    print(e.is_set())
    # timeout = 5 最多等待5秒
    e.wait(timeout = 5)
    print(e.is_set())
    '''
    
    def check(e):
    	print("开始检测数据库连接")
    	time.sleep(random.randrange(1,6))
    	e.set()
    
    def connect(e):
    	sign = False
    	for i in range(3):
    		e.wait(1)
    		if e.is_set():
    			sign = True
    			print("数据库连接成功")
    			break
    		else:
    			print("尝试连接数据库%s次失败" % (i+1))
    	
    	if sign == False:
    		# 主动抛出异常
    		raise TimeoutError
    
    e = Event()
    Thread(target=connect,args=(e,)).start()
    Thread(target=check,args=(e,)).start()
    

    条件

    # ### 条件
    '''
    wait    添加阻塞
    notify  允许几个被阻塞的线程放行
    
    #语法:wait 使用的前后需要上锁
    """
    acquire()
    wait
    wait下面写上相应逻辑代码
    release()
    """
    
    #语法: notify 使用前后加阻塞
    """
    acquire
    notify(自定义线程放行的数量,默认放行1个)
    release
    """
    '''
    from threading import Condition,Thread
    import time 
    
    def func(con,index):
    	print("%s在等待" % index)
    	con.acquire()
    	# 添加阻塞
    	con.wait()
    	print("%s do everyting" % index)
    	con.release()
    
    con = Condition()
    
    for i in range(10):
    	t = Thread(target=func,args = (con,i))
    	t.start()
    
    # 写法一
    con.acquire()
    con.notify(5) # 默认放行1个(不加参数)
    con.release()
    
    # 写法二
    count = 10
    while count > 0:
    	num = int(input(">>>
    "))
    	con.acquire()
    	# 放行线程 num 动态赋予的值
    	con.notify(num)
    	con.release()
    	count -= num
    

    定时器

    # ### 定时器
    from threading import Timer
    def func():
    	print("目前正在执行任务")
    	
    t = Timer(5,func)
    t.start()
    print("主线程")
    
    # 计划任务 crontab
    

    线程队列

    线程常用队列有 queue LifoQueue PriorityQueue
    queue 先进先出
    LifoQueue 后进先出
    PriorityQueue 按照优先级排序
    
    
    # ### 队列
    import queue
    '''
    put  往队列当中放值,超过队列长度,直接加阻塞
    get  如果获取不到加阻塞
    put_nowait 如果放入的超过了队列长度,直接报异常错误
    get_nowait 如果获取不到直接报异常错误
    '''
    # (1)queue  先进先出
    q = queue.Queue()
    q.put(1)
    q.put(2)
    print(q.get())
    # print(q.get())
    # print(q.get())
    print(q.get_nowait())
    
    
    q = queue.Queue(2)
    q.put(1)
    q.put(2)
    # q.put(3)
    q.put_nowait(3)
    
    
    
    # (2) lifoqueue 用栈的后进先出的顺序形成队列
    from queue import LifoQueue
    lq = LifoQueue()
    lq.put(1)
    lq.put(2)
    lq.put('aaa')
    print(lq.get())
    
    
    
    # (3) PriorityQueue 队列 按照优先级顺序获取
    from queue import PriorityQueue
    # 1.先按照数字进行排列,在按照ascii吗从小到大排序
    pq = PriorityQueue()
    pq.put( (13,'abc') )
    pq.put( (5,"ccc") )
    pq.put( (5,"bcc") )
    pq.put( (15,"bbb") )
    print(pq.get())
    print(pq.get())
    print(pq.get())
    print(pq.get())
    
    
    
    # 2. 单独一个元素的,只能是都放同一种类型
    # 如果是数字
    from queue import PriorityQueue
    pq = PriorityQueue()
    pq.put(15)
    pq.put(13)
    pq.put(3)
    # pq.put("3434") error
    print(pq.get())
    print(pq.get())
    print(pq.get())
    # print(pq.get())
    
    
    # 如果是字符串
    pq = PriorityQueue()
    pq.put("bbd1")
    pq.put("bbcb2")
    # pq.put(33344) error
    print(pq.get())
    print(pq.get())
    

    线程池和进程池 (改良版)

    from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
    import time,random
    def func(i):
    	print("Process",i)
    	time.sleep(1)
    	print("Process is end ")
    
    #(1) ProcessPoolExecutor 进程池的基本使用(改良版)
    if __name__ == "__main__":
    	# ProcessPoolExecutor <==> Pool
    	p = ProcessPoolExecutor(5)
    	# submit <==> apply_async
    	p.submit(func,1)
    	# shutdown <==> close + join 
    	p.shutdown()
    	print("主线程")
    
    
    
    #(2)  线程池
    from threading import current_thread as cthread
    
    def func(i):
    	# cthread().indent 线程号
    	print("thread",i,cthread().ident)
    	time.sleep(random.uniform(0.1,1))
    	print("thread %s end" % (i))
    
    tp = ThreadPoolExecutor(5)
    for i in range(20):
    	tp.submit(func,i)
    
    tp.shutdown()
    print("主线程")
    
    
    
    
    #(3) 返回值
    def func(i):
    	print("thread",i,cthread().ident)
    	time.sleep(random.uniform(0.1,1))
    	print("thread %s end" % (i))
    	return i * "*"
    
    tp = ThreadPoolExecutor(5)
    lst = []
    for i in range(20):
    	res = tp.submit(func,i)
    	# print(res)
    	lst.append(res)
    	
    for res in lst:
    	# result 和 get 使用方法一样的 result带有阻塞效果.
    	print(res.result())
    print("主线程")
    
    
    
    # (4) map 返回生成器
    def func(i):
    	print("thread",i,cthread().ident)
    	time.sleep(random.uniform(0.1,1))
    	print("thread %s end" % (i))
    	return i * "*"
    
    tp = ThreadPoolExecutor(5)
    res = tp.map(func,range(20))
    # for i in res:
    tp.shutdown()
    for i in res:
    	print(i)
    print(res)
    

    回调函数

    # ###  回调函数
    '''
    # 回调函数
        就是一个参数,将这个函数作为参数传到另一个函数里面.
        函数先执行,再执行当参数传递的这个函数,这个参数函数是回调函数
    '''
    # (1) 线程池的回调函数是 子线程完成的.
    from concurrent.futures import ThreadPoolExecutor
    from threading import current_thread as cthread
    import time
    
    def func(i):
    	print("thread",i,cthread().ident)
    	time.sleep(1)
    	print("thread %s end" % (i))
    	return i * "*"
    
    def call_back(args):
    	print("call back : ",cthread().ident)
    	print(args)
    	print(args.result())
    
    
    tp = ThreadPoolExecutor(5)
    for i in range(20):
    	# 当执行完这个线程后 在直接执行call_back这个函数
    	tp.submit(func,i).add_done_callback(call_back)
    tp.shutdown()
    print("主线程",cthread().ident)
    
    
    
    
    # (2)进程池的回调函数有?来完成
    from concurrent.futures import ProcessPoolExecutor
    import os,time
    def func(i):
    	print('process',i,os.getpid())
    	time.sleep(1)
    	print('process %s end' % i)
    	
    if __name__ == "__main__":
    	p = ProcessPoolExecutor(5)
    	p.submit(func,1)
    	p.shutdown()
    	print("主线程",os.getpid())
    
    # 进程池的回调函数 , 是由主进程完成的
    from concurrent.futures import ProcessPoolExecutor
    import os ,time 
    def func(i):
    	print("process",i,os.getpid())
    	time.sleep(1)
    	print("process %s end" % (i))
    	return i * "*"
    
    def call_back(args):
    	print("call back : ",os.getpid())
    	print(args)
    	print(args.result())
    
    if __name__ == "__main__":
    	tp = ProcessPoolExecutor(5)
    	for i in range(20):
    		# 当执行完这个线程后 在直接执行call_back这个函数
    		tp.submit(func,i).add_done_callback(call_back)
    	# tp.shutdown()
    	print("主线程",os.getpid())
    

    协程

    # ### 协程
    
    def my_gen():
    	for i in range(10):
    		yield i
    
    for num in my_gen():
    	print(num)
    	
        
        
    # (1) 协程写生产者消费者模型
    # producer 生产者
    def producer():
    	for i in range(100):
    		yield i
    
    # consumer 消费者	
    def consumer():
    	g = producer()
    	for num in g:
    		print(num)
    
    consumer()
    
    
    
    # (2) 协程的基本实现
    '''
    switch 利用它进行任务的切换
    只能进行手动切换,
    缺陷:不能规避雕io,不能自动实现遇到阻塞就切换.
    '''
    from greenlet import greenlet
    import time
    def eat():
    	print("eat one1")
    	# 切换到play这个协程中
    	g2.switch()
    	time.sleep(1)
    	print("eat one2")
    	
    def play():
    	print("play one1")
    	print("play one2")
    	g1.switch()
    
    g1 = greenlet(eat)
    g2 = greenlet(play)
    g1.switch()
    
    
    
    # (3) gevent 有他的缺陷
    import gevent,time
    '''
    spawn 自动检测阻塞,遇到阻塞就切换
    相当于switch 的升级版 , 
    瑕疵:不能够识别time.sleep() 有些阻塞不认识.
    '''
    def eat():
    	print("eat one1")
    	time.sleep(1)
    	print("eat one2")
    
    def play():
    	print("play one1")
    	time.sleep(1)
    	print("play one2")
    
    g1 = gevent.spawn(eat)
    g2 = gevent.spawn(play)
    
    # 协程的阻塞也是join
    g1.join() # 阻塞直到g1协程执行完毕
    g2.join() # 阻塞直到g2协程执行完毕
    
    
    
    # (4) 进阶版本 用gevent.sleep 取代time.sleep()
    import gevent,time
    '''
    spawn 自动检测阻塞,遇到阻塞就切换
    相当于switch 的升级版 , 
    瑕疵:不能够识别time.sleep() 有些阻塞不认识.
    '''
    def eat():
    	print("eat one1")
    	gevent.sleep(1)
    	print("eat one2")
    
    def play():
    	print("play one1")
    	gevent.sleep(1)
    	print("play one2")
    
    g1 = gevent.spawn(eat)
    g2 = gevent.spawn(play)
    
    # 协程的阻塞也是join
    g1.join() # 阻塞直到g1协程执行完毕
    g2.join() # 阻塞直到g2协程执行完毕
    
    
    
    # (5) 终极解决办法
    # from gevent import monkey 
    # 把它下面引入的模块都识别出来
    # monkey.patch_all()
    
    # 简化:都写在一行的情况下,有分号隔开.
    from gevent import monkey;monkey.patch_all()
    
    import time 
    import gevent
    
    def eat():
    	print("eat one1")
    	time.sleep(1)
    	print("eat one2")
    
    def play():
    	print("play one1")
    	time.sleep(1)
    	print("play one2")
    
    g1 = gevent.spawn(eat)
    g2 = gevent.spawn(play)
    
    # 协程的阻塞也是join
    g1.join() # 阻塞直到g1协程执行完毕
    g2.join() # 阻塞直到g2协程执行完毕
    
    
    
    
    # ### 协程
    from gevent import monkey;monkey.patch_all()
    import gevent
    """
    # spawn(函数,参数...) 启动一个协成
    # join()  阻塞,直到某个协程执行完毕
    # joinall 类似于join 只不过
    g1.join()
    g2.join()
    gevent.joinall([g1,g2]) 一次性把所有需要阻塞的协程对象写到一起
    # value 获取协成的返回值
    """
    # (1) joinall value 的使用方法
    import time
    
    def eat():
    	print("eating 111")
    	time.sleep(1)
    	print("eating 222")
    	return "吃完了"
    
    def play():
    	print("play 111")
    	time.sleep(1)
    	print("play 222")
    	return "play done"
    
    g1 = gevent.spawn(eat)
    g2 = gevent.spawn(play)
    gevent.joinall([g1,g2])
    print(g1.value)
    print(g2.value)
    
    
    # (2) 爬虫的应用
    """
    HTTP 状态码:
    	200 OK
    	400 Bad Request
    	404 Not Found
    
    
    requests 模块
    response = requests.get(网址)
    """
    import requests,time
    response = requests.get("http://www.4399.com")
    # print(response)
    # 获取状态码
    res = response.status_code
    print(res)
    # 获取,设置字符编码
    res_code = response.apparent_encoding
    # print(res)
    # 设置编码集
    response.encoding = res_code
    # 获取网页里面的内容
    res = response.text
    print(res)
    
    url_lst = [
    	"http://www.taobao.com",
    	"http://www.jd.com",
    	"http://www.4399.com",
    	"http://www.7k7k.com",
    	"http://www.baidu.com",
    	"http://www.taobao.com",
    	"http://www.jd.com",
    	"http://www.4399.com",
    	"http://www.7k7k.com",
    	"http://www.baidu.com",
    	"http://www.taobao.com",
    	"http://www.jd.com",
    	"http://www.4399.com",
    	"http://www.7k7k.com",
    	"http://www.baidu.com",
    	"http://www.taobao.com",
    	"http://www.jd.com",
    	"http://www.4399.com",
    	"http://www.7k7k.com",
    	"http://www.baidu.com",
    	"http://www.taobao.com",
    	"http://www.jd.com",
    	"http://www.4399.com",
    	"http://www.7k7k.com",
    	"http://www.baidu.com",
    	"http://www.taobao.com",
    	"http://www.jd.com",
    	"http://www.4399.com",
    	"http://www.7k7k.com",
    	"http://www.baidu.com",
    ]
    
    # (1) 正常爬取数据 , 速度慢
    def get_url(url):
    	response = requests.get(url)
    	if response.status_code == 200:
    		print(response.text)
    		
    # start = time.time()
    # for i in url_lst:
    	# get_url(i)
    # end = time.time()
    # print(end-start) #3.033189535140991
    
    # (2) 用协程爬取数据 速度更快
    lst = []
    start = time.time()
    for  i in url_lst:
    	g = gevent.spawn(get_url,i)
    	lst.append(g)
    	
    gevent.joinall(lst)
    end = time.time()
    print(end-start)   #0.6957511901855469
    

    线程之间的数据隔离

    from threading import local,Thread
    
    # 线程之间的数据隔离
    '''
    多个线程之间,使用threading.local对象
    可以实现多个线程之间的数据隔离
    '''
    loc = local()
    print(loc)	# 对象
    
    def func(name,age):
        global loc
        loc.name = name
        loc.age = age
        print(loc.name,loc.age)
    
    Thread(target=func,args=("hello",20)).start()
    Thread(target=func,args=("world",20)).start()
    
  • 相关阅读:
    贪心算法与动态规划
    Linux重要目录结构
    博客园添加目录索引
    冒泡排序&插入排序&其他排序
    Linux下部署自己写的Web项目
    Java算法入门-数组&链表&队列
    Java集合-数据结构之栈、队列、数组、链表和红黑树
    Java集合-单例模式斗地主&Collections类的shuffle方法了解
    什么是反向代理服务器
    Linux信号处理
  • 原文地址:https://www.cnblogs.com/opesn/p/13521460.html
Copyright © 2011-2022 走看看