Python 网络编程与并发编程
socket的基本使用
TCP(Transmission Control Protocol)可靠的、面向连接的协议(eg:打电话)、传输效率低全双工通信(发送缓存&接收缓存)、面向字节流。使用TCP的应用:Web浏览器;电子邮件、文件传输程序。
UDP(User Datagram Protocol)不可靠的、无连接的服务,传输效率高(发送前时延小),一对一、一对多、多对一、多对多、面向报文,尽最大努力服务,无拥塞控制。使用UDP的应用:域名系统 (DNS);视频流;IP语音(VoIP)。
客户端和服务端在建立连接时: 三次握手
客户端和服务端在断开连接时: 四次挥手
SYN 创建连接
ACK 确认响应
FIN 断开连接
tcp 协议: 打电话 , 必须先建立连接, 同一时刻只能和一个人聊天, 速度慢,但传输稳定
udp 协议: 发短信 , 不需要确认连接, 只要知道ip和端口 , 就可以进行通讯,速度快,可以同时聊天,传输不稳定
tcp:发送文件之类的(上传,下载文件,FTP,邮件SMTP)
优点:传输完整稳定,不限制数据大小
缺点: 速度慢 (一发一收都需要对方确认)
udp:即时通讯类,比较追求速度(再线视频网站,qq,微信)
优点:传输速度快,可以多人同时聊天
缺点:不稳定,有时丢失数据
TCP的基本语法
server
import socket
# 创建一个socket对象
sk = socket.socket()
# 绑定ip和端口 (注册网络)
sk.bind( ("127.0.0.1",9000) ) # 127.0.0.1 默认代表本机ip
# 开启监听 (等待别人链接我)
sk.listen()
# 程序加阻塞,直到tcp协议的三次握手建立完毕,执行下面代码
conn,addr = sk.accept()
print(conn,addr)
# 最大接受1024个字节 等待数据时,程序加了阻塞,收到数据之后,程序在向下执行
msg = conn.recv(1024)
print(msg.decode("utf-8"))
# send 负责发送,只能发送二进制的字节流
conn.send("早呀早".encode("utf-8"))
# 关闭此次连接 [走四次挥手]
conn.close()
# 关闭socket对象 [退还占用的端口]
sk.close()
"""
< socket.socket fd=4, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('127.0.0.1', 9000), raddr=('127.0.0.1', 34794)>
('127.0.0.1', 34794)
"""
client
import socket
# 差生一个socket对象
sk = socket.socket()
# 建立连接
sk.connect( ("127.0.0.1",9000) )
# 发送消息(发送的数据是二进制字节流)
sk.send("早".encode("utf-8"))
# 等待接收消息 (发送完之后,程序添加阻塞,最大字节数1024,等待接收)
res = sk.recv(1024)
print(res.decode("utf-8"))
# 关闭连接
sk.close()
tcp循环发送消息
server
import socket
# 在网络中注册该主机
sk = socket.socket()
sk.bind( ("127.0.0.1",9001) )
sk.listen()
# 准备接收数据
while True:
# 重新建立新的三次握手
conn,addr = sk.accept()
# 内循环不停的收发消息,按q退出循环
while True:
msg = conn.recv(1024)
print(msg.decode("utf-8"))
message = input(">>>:")
conn.send(message.encode("utf-8"))
# 如果你输入一个q ,退出当前的连接
if message == "q":
break
# 执行四次挥手
conn.close()
sk.close()
client
import socket
sk = socket.socket()
sk.connect( ("127.0.0.1",9001) )
while True:
# 输入字符串
message = input(">>>:")
# 发送消息
sk.send(message.encode("utf-8"))
# 接收服务器的消息
res = sk.recv(1024)
# 如果接收到的是字节流q,断开连接,有请下一位观众
if res == b'q':
break
print(res.decode("utf-8"))
sk.close()
UDP的基本语法
server
# upd 协议:
import socket
# 创建socket对象
sk = socket.socket(type=socket.SOCK_DGRAM)
# 绑定地址
sk.bind( ("127.0.0.1",9000) )
# 对于udp来讲 第一个信息,必须先接受
# 接受数据,返回客户端发出的消息,还有ip和端口组成的元组
msg,cli_addr = sk.recvfrom(1024)
print(msg.decode("utf-8"))
message = "你来啦,脑弟"
# 2个参数 第一个是消息, 第二个是地址(ip,端口)
sk.sendto(message.encode("utf-8"),cli_addr)
# 关闭udp链接
sk.close()
client
# upd 协议:
import socket
sk = socket.socket(type=socket.SOCK_DGRAM)
message = "我来了大哥"
# 发送数据
sk.sendto(message.encode("utf-8"),("127.0.0.1",9000))
# 接受数据
msg,ser_addr = sk.recvfrom(1024)
print(msg.decode("utf-8"))
# 关闭udp链接
sk.close()
udp循环发送消息
server
# 循环发送消息udp
import socket
sk = socket.socket(type=socket.SOCK_DGRAM)
# 绑定地址
sk.bind( ("127.0.0.1",9000) )
while True:
msg,cli_addr = sk.recvfrom(1024)
print(cli_addr)
print(msg.decode("utf-8"))
message = input(">>>")
sk.sendto(message.encode("utf-8"),cli_addr)
# 关闭udp链接
sk.close()
client
import socket
sk = socket.socket(type=socket.SOCK_DGRAM)
while True:
# 请输入你要写入的数据
message = input(">>>")
sk.sendto(message.encode(),("127.0.0.1",9000))
# 接受数据
msg,ser_addr = sk.recvfrom(1024)
print(msg.decode())
# 关闭udp连接
sk.close()
黏包
### 黏包
"""
# tcp协议在发送数据时,会出现黏包现象.
(1)数据粘包是因为在客户端/服务器端都会有一个数据缓冲区,
缓冲区用来临时保存数据,为了保证能够完整的接收到数据,因此缓冲区都会设置的比较大。
(2)在收发数据频繁时,由于tcp传输消息的无边界,不清楚应该截取多少长度
导致客户端/服务器端,都有可能把多条数据当成是一条数据进行截取,造成黏包
"""
### 黏包出现的两种情况
"""
#黏包现象一:
在发送端,由于两个数据短,发送的时间隔较短,所以在发送端形成黏包
#黏包现象二:
在接收端,由于两个数据几乎同时被发送到对方的缓存中,所有在接收端形成了黏包
#总结:
发送包时间间隔短 或者 接受不及时,就会黏包
核心是因为tcp对数据无边界截取,不会按照发送的顺序判断
"""
### 黏包对比:tcp和udp
"""
#tcp协议:
优点:接收时数据之间无边界,有可能粘合几条数据成一条数据,造成黏包
缺点:不限制数据包的大小,稳定传输不丢包
#udp协议:
优点:接收时候数据之间有边界,传输速度快,不黏包
缺点:限制数据包的大小(受带宽路由器等因素影响),传输不稳定,可能丢包
#tcp和udp对于数据包来说都可以进行拆包和解包,理论上来讲,无论多大都能分次发送
但是tcp一旦发送失败,对方无响应(对方无回执),tcp可以选择再发,直到对应响应完毕为止
而udp一旦发送失败,是不会询问对方是否有响应的,如果数据量过大,易丢包
"""
### 解决黏包问题
"""
#解决黏包场景:
应用场景在实时通讯时,需要阅读此次发的消息是什么
#不需要解决黏包场景:
下载或者上传文件的时候,最后要把包都结合在一起,黏包无所谓.
"""
黏包现象
server
# 服务端
import socket
import time
# 把当前主机注册到网络
sk = socket.socket()
sk.bind(("127.0.0.1",9001))
sk.listen()
conn,addr = sk.accept()
conn.send("hello,".encode("utf-8"))
# time.sleep(1)
conn.send("world".encode("utf-8"))
conn.recv(1024)
conn.close()
sk.close()
client
# 客户端
import time
import socket
sk = socket.socket()
sk.connect(("127.0.0.1",9001))
time.sleep(0.1)
print(sk.recv(10))
print(sk.recv(10))
sk.send(b'hahaha')
sk.close()
黏包的解放方法
方法一
server
# 服务端
import socket
import time
# 把当前主机注册到网络
sk = socket.socket()
#在bind方法之前加上这句话,可以让一个端口重复使用
sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
sk.bind(("127.0.0.1",9001))
sk.listen()
conn,addr = sk.accept()
conn.send("6".encode("utf-8"))
conn.send("hello,".encode("utf-8"))
# time.sleep(1)
conn.send("world".encode("utf-8"))
conn.recv(1024)
conn.close()
sk.close()
client
# 客户端
import time
import socket
sk = socket.socket()
sk.connect(("127.0.0.1",9001))
time.sleep(0.1)
# 是为了接受字符6 #sk.recv(1).decode("utf-8") 字符串
n = int(sk.recv(1).decode("utf-8"))
print(n,type(n))
print(sk.recv(n))
print(sk.recv(10))
sk.send(b'hahaha')
sk.close()
方法二
server
# 服务端
import socket
import time
# 把当前主机注册到网络
sk = socket.socket()
#在bind方法之前加上这句话,可以让一个端口重复使用
sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
sk.bind(("127.0.0.1",9001))
sk.listen()
conn,addr = sk.accept()
conn.send("00000120".encode("utf-8"))
conn.send(("hello,"*20).encode("utf-8"))
# time.sleep(1)
conn.send("world".encode("utf-8"))
conn.recv(1024)
conn.close()
sk.close()
client
# 客户端
import time
import socket
sk = socket.socket()
sk.connect(("127.0.0.1",9001))
time.sleep(0.1)
n = int(sk.recv(8).decode("utf-8"))
print(n,type(n))
print(sk.recv(n))
print(sk.recv(10))
sk.send(b'hahaha')
sk.close()
方法三
import struct
# pack 把任意长度的数字转化成具有固定长度的4个字节值
# unpack 将4个字节的值恢复成原有数据,返回一个元组
# i => int 我要转化的这个数据类型是整型
res = struct.pack("i",120000000)
print(res,len(res))
res = struct.pack("i",1300000000)
print(res,len(res))
res = struct.unpack("i",res)
print(res)
server
# 服务端
import socket
import time
import struct
# 把当前主机注册到网络
sk = socket.socket()
#在bind方法之前加上这句话,可以让一个端口重复使用
sk.setsockopt(socket.SOL_SOCKET,socket.SO_REUSEADDR,1)
sk.bind(("127.0.0.1",9001))
sk.listen()
conn,addr = sk.accept()
inp = input(">>>msg:")
msg = inp.encode("utf-8")
"""
"abcd"
"我是好人"
"""
res = struct.pack("i",len(msg))
# 发送信息长度 (字节流)
conn.send(res)
# 发送用户写的内容
conn.send(msg)
# time.sleep(1)
conn.send("world".encode("utf-8"))
conn.recv(1024)
conn.close()
sk.close()
client
# 客户端
import time
import socket
import struct
sk = socket.socket()
sk.connect(("127.0.0.1",9001))
time.sleep(0.1)
# 是为了接受字符6 #sk.recv(1).decode("utf-8") 字符串
# n = int(sk.recv(4).decode("utf-8"))
n = sk.recv(4)
# 把pack后的数据用unpack进行恢复
n = struct.unpack("i",n)[0]
print(n,type(n))
print(sk.recv(n).decode("utf-8"))
print(sk.recv(10))
sk.send(b'hahaha')
sk.close()
python hashlib模块
#hashlib 这个模块是一堆加密算法的集合体,哈希算法的加密方式不止一种
https://www.cmd5.com/ md5解密
# 应用场景:在需要效验功能时使用
用户密码的 => 加密,解密
相关效验的 => 加密,解密
#哈希算法也叫摘要算法,相同的数据始终得到相同的输出,不同的数据得到不同的输出。
#(1)哈希将不可变的任意长度的数据,变成具有固定长度的唯一值
(2)字典的键值对映射关系是通过哈希计算的,哈希存储的数据是散列(无序)
import hashlib
# md5
# 创建一个md5算法的对象
hs = hashlib.md5()
# print(hs)
# update 参数是二进制字节流
hs.update("123".encode())
# hexdigest 返回32位十六进制的字符串(固定长度)
res = hs.hexdigest()
print(res) #a6e612825ab0ec9b523f24182ea8f0d2
# 加盐 key (一个神秘的key 钥匙) :增加密码的复杂度
hs = hashlib.md5("opesnhelloworld".encode())
hs.update("123".encode())
res = hs.hexdigest()
print(res)
# 动态加盐 : 使用的加密关键字是用户名的切片
username = input("请输入用户名:")
hs = hashlib.md5(username[::2].encode())
hs.update("123456".encode())
res = hs.hexdigest()
print(res)
"""
# md5 加密效率快,通用 ,安全性差 # 32
# sha 加密效率低,算法精密,安全性高
"""
print("<=====sha系列====>")
import hashlib
hs = hashlib.sha1()
hs.update("opesn123463922".encode("utf-8"))
print(hs.hexdigest())
print(len("9a6747fc6259aa374ab4e1bb03074b6ec672cf99")) # 40
# sha1 加盐
hs = hashlib.sha1("UIsdf234".encode())
hs.update("123456".encode())
print(hs.hexdigest())
hs = hashlib.sha512()
hs.update("1234567".encode())
print(hs.hexdigest())
#128 3c9909afec25354d551dae21590bb26e38d53f2173b8d3dc3eee4c047e7ab1c1eb8b85103e3be7ba613b31bb5c9c36214dc9f14a42fd7a2fdb84856bca5c44c2
# sha512 加盐
hs = hashlib.sha512("FZbaoyou".encode())
hs.update("1234".encode())
print(hs.hexdigest())
"""
#hmac 和 hash算法非常类似,hmac输出的长度和
原始哈希算法的长度一致的,hmac不容易破解
"""
print("<======hmac=======>")
import hmac
import os
key = b"opesn"
# msg = b"123"
# 随机返回长度为32位的二进制字节流
msg = os.urandom(32)
print(msg)
# key msg 必须是二进制字节流
hm = hmac.new(key,msg)
print(hm.hexdigest())
# 校验两个文件的内容是否一样?
python 进程
### 进程的概念:(Process)
"""
进程就是正在运行的程序,它是操作系统中,资源分配的最小单位.
资源分配:(需要cpu来处理,需要内存来临时存储运行数据,都要浪费cpu和内存资源)
进程号是进程的唯一标识
同一个程序执行两次之后是两个进程
进程和进程之间的关系: 数据彼此隔离,通过socket通信
"""
### 进程调度算法
"""
# 先来先服务fcfs(first come first server):先来的先执行
# 短作业优先算法:分配的cpu多,先把短的算完
# 时间片轮转算法:每一个任务就执行一个时间片的时间.然后就执行其他的.
# 多级反馈队列算法
越是时间长的,cpu分配的资源越少,优先级靠后
越是时间短的,cpu分配的资源越多
"""
import os,time
# linux # ps -aux | grep 49155 查看具体的某一个进程
# 查看所有进程 ps -aux
# 获取子进程或者父进程的id号
# (1)获取主进程的id getpid process_id
res = os.getpid()
print(res)
# (2)getppid 获取父进程id parent_process_id
res = os.getppid()
print(res)
# 启动一个进程
import os
os.fork()
print(os.getpid())
from multiprocessing import Process
# (1) 进程基本语法
def func():
print("222子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
if __name__ == "__main__":
print("111子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
# 创建子进程(对象)
# 起一个子进程,p是一个进程对象,目的单独执行func这个函数
p = Process(target=func)
# 调用该子进程
p.start()
# (2) 子进程函数,可以传入参数
# 不带参数
def func():
for i in range(1,6):
time.sleep(0.5)
print("111子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
if __name__ == "__main__":
print("111子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
n = 5
p = Process(target=func)
p.start()
for i in range(1,n+1):
time.sleep(0.3)
print("*"*i)
# 带参数 谁快谁慢取决于cpu的调度策略
# 子进程执行func
def func(n):
for i in range(1,n+1):
time.sleep(0.5)
print("111子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
if __name__ == "__main__":
print("111子进程id>>>%d,父进程id>>>%d" % (os.getpid(),os.getppid()))
n = 3
# 如果想要传入参数 用args 它必须是一个元组
p = Process(target=func,args=(n,))
p.start()
# 下面代码是主进程执行的
for i in range(1,n+1):
time.sleep(0.3)
print("*"*i)
# (3) 进程之间的数据彼此是隔离的
"""父进程和子进程他们的启动是异步的,操作系统负责启动各种进程的"""
count = 100
def func():
global count
count -= 1
print("子进程",count)
if __name__ == "__main__":
p = Process(target=func)
p.start()
time.sleep(1)
print("主进程",count)
# (4) 多个进程并发执行
'''存在并发就存在顺序的不确定性,因为有执行快慢问题,跟操作系统cpu调度策略有关'''
def func(args):
print("子进程%d" % (args),os.getpid(),os.getppid())
if __name__ == "__main__":
for i in range(10):
Process(target=func,args=(i,)).start()
# (5) 子进程和父进程之间的关系
"""父进程执行完毕之后默认等待子进程结束。这是为了避免出现僵尸程序,方便进程上的管理"""
# 一般来说,父进程结束,子进程应该结束。
def func(args):
print("子进程%s:" % (args),os.getpid(),os.getppid())
time.sleep(1)
print("子进程end")
if __name__ == "__main__":
for i in range(10):
Process(target=func,args=(i,)).start()
# Process(target=func,args=(i,)).start()
# Process(target=func,args=(i,)).start()
# Process(target=func,args=(i,)).start()
# Process(target=func,args=(i,)).start()
# Process(target=func,args=(i,)).start()
# Process(target=func,args=(i,)).start()
# Process(target=func,args=(i,)).start()
# Process(target=func,args=(i,)).start()
# Process(target=func,args=(i,)).start()
print("父进程*************")
join
import time
import os
from multiprocessing import Process
# ### join 等待所有子进程执行完毕之后,主进程在向下执行
# (1) 1个子进程通过join加阻塞,进行同步控制
def func(index):
print("第一封邮件已经发送")
if __name__ == "__main__":
p = Process(target=func,args=(1,))
p.start()
p.join()
print("10封邮件已经发送")
# (2) 多个子进程通过join加阻塞,进行同步控制
def func(index):
time.sleep(0.3)
print("第%s封邮件已经发送" % (index))
if __name__ == "__main__":
lst = []
for i in range(10):
p = Process(target=func,args=(i,))
p.start()
lst.append(p)
# 列表当中的每一个进程对象都执行p.join 直到所有进程执行完毕,最后执行主进程内容
for p in lst:
p.join()
print("第10封邮件发送完毕")
# ###使用第二种方式创建进程
# (1) 基本使用的语法
class MyProcess(Process):
# 必须使用run方法名
def run(self):
print("子进程: 参数:",os.getpid(),os.getppid())
if __name__ == "__main__":
p = MyProcess()
p.start()
print("主进程:",os.getpid())
# (2) 给子进程传递参数
class MyProcess(Process):
def __init__(self,arg):
# 调用一下父类的构造方法
super().__init__()
self.arg = arg
# 必须使用run方法名
def run(self):
print("子进程: 参数:",os.getpid(),os.getppid(),self.arg)
if __name__ == "__main__":
# p = MyProcess("123")
# p.start()
# print("主进程:",os.getpid())
lst = []
for i in range(10):
p = MyProcess("参数%s" % i) # 子进程并发,是异步程序
p.start()
lst.append(p)
# p.join()
for i in lst:
i.join() # join是阻塞为了让主进程同步
print("主进程",os.getpid())
守护进程
#可以给子进程贴上守护进程的名字,该进程会随着主进程代码执行完毕而结束(为主进程守护)
(1)守护进程会在主进程代码执行结束后就终止
(2)守护进程内无法再开启子进程,否则抛出异常(了解)
import time
from multiprocessing import Process
# (1) 守护进程随着主进程代码执行完毕而结束
"""
正常情况下,主进程默认等待子进程全部结束而结束
守护进程必须写在start之前进行设置
"""
def func():
print("子进程start")
time.sleep(1)
print("子进程end")
if __name__ == "__main__":
p = Process(target=func)
p.daemon = True
p.start()
time.sleep(1.5)
print("主进程")
# (2) 多个子进程的情况下
'''
当多个子进程并发执行时,默认主进程等待子进程
守护进程 守护的是 主进程 , 主要主进程里面的代码执行完毕
对应是守护进程的那个子进程立即终止,其他非守护进程继续
守护进程是主进程的代码执行到最后一行了,即为结束。
'''
def func1():
count = 1
while True:
print("*" * count)
time.sleep(0.5)
count += 1
def func2():
print("func2 start")
time.sleep(5)
print("func2 end")
if __name__ == "__main__":
p1 = Process(target=func1)
p1.daemon = True
p1.start()
Process(target=func2).start()
# time.sleep(3)
print("主进程")
# (3) 守护进程的实际用途:报活
'''
如果最大的监控系统挂掉,就让其他服务器停掉
就好比厂子到了,员工回家放假
'''
# 防止信息泄露,最大的监控系统挂掉,子进程停止发送相关数据.
def alive():
while True:
print("1号主机发送日志消息, i am ok")
time.sleep(0.6)
def func():
# while True:
print("统计各个主机的系统日志,维护数据库管理")
time.sleep(1)
if __name__ == "__main__":
p = Process(target = alive)
p.daemon = True
p.start()
p = Process(target=func)
p.start()
# 加上join之后,等待所有子进程执行完毕后,在运行主进程后面的代码,(除了守护进程)
p.join()
print("...")
锁
lock.acquire()# 上锁
lock.release()# 解锁
#同一时间允许一个进程上一把锁 就是Lock
加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。
#同一时间允许多个进程上多把锁 就是[信号量Semaphore]
信号量是锁的变形: 实际实现是 计数器 + 锁,同时允许多个进程上锁
# 互斥锁Lock : 互斥锁就是进程的互相排斥,谁先抢到资源,谁就上锁改资源内容,为了保证数据的同步性
# 注意:多个锁一起上,不开锁,会造成死锁.上锁和解锁是一对.
# ### 锁
# (1) 上锁与解锁
'''
上锁时其他进程不能更改同一份资源,解锁之后其他进行按照顺序同步上锁
#同一时间允许一个进程上一把锁 就是Lock
加锁可以保证多个进程修改同一块数据时,
同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,
但牺牲了速度却保证了数据安全。
'''
# 模拟抢票
''''''
import json
import time
from multiprocessing import Process,Lock
def search(person):
with open("ticket",mode="r") as fp:
dic = json.load(fp)
# print(dic,type(dic))
print("%s 查询余票:%s" % (person,dic["count"]))
def get_ticket(person):
with open("ticket",mode="r") as fp:
dic = json.load(fp)
time.sleep(0.5)
if dic["count"] > 0 :
print("%s买到了" % (person))
dic["count"] -= 1
# 重新更新数据库
with open("ticket","w") as fp:
json.dump(dic,fp)
else:
print("%s没买到这个票" % (person))
# 作为统一的并发调用
def ticket(person,lock):
#查询票数
search(person)
#上锁 到上锁的时候是同步程序
lock.acquire()# 上锁
get_ticket(person)
lock.release()# 解锁
if __name__ == "__main__":
lock = Lock()
for i in range(10):
# 这里是异步并发
p = Process(target = ticket ,args = ("person%s" % i,lock))
p.start()
# (2) 区分同步和异步
import json
import time
from multiprocessing import Process,Lock
def func(num,lock):
time.sleep(1)
print("异步程序",num)
lock.acquire()
time.sleep(0.5)
print("同步执行的程序",num)
lock.release()
if __name__ == "__main__":
lock = Lock()
for i in range(10):
p = Process(target=func,args =(i,lock))
p.start()
"""
互斥锁:互斥锁就是进程的互相排斥,谁先抢到这个资源,
谁就上锁改变资源的内容,为了保证数据的同步性
注意:多个锁一起上,不开锁,会造成死锁,上锁和解锁是一对
lock = Lock()
lock.acquire()
print(456)
lock.release()
# 上了两把锁,没遇到解锁,就卡在这形成死锁.
lock.acquire()
lock.acquire()
lock.release()
print(123)
'''
信号量 Semaphore
# ### Semaphore 信号量 其实就是锁
# 模拟ktv唱歌 假设一共10个人,同一时间允许4个人唱歌
from multiprocessing import Process,Semaphore
import random,time
def ktv(person,sem):
sem.acquire()
print("%s进入了ktv" % (person))
time.sleep(random.randrange(6,8))
print("%s走出了ktv" % (person))
sem.release()
if __name__ == "__main__":
# 最多允许4个进程同时上锁
sem = Semaphore(4)
for i in range(10):
p = Process(target=ktv,args=("person%s" % (i),sem))
p.start()
事件
# ### 事件(Event)
# 阻塞事件 :
e = Event()生成事件对象e
e.wait()动态给程序加阻塞 , 程序当中是否加阻塞完全取决于该对象中的is_set() [默认返回值是False]
# 如果是True 就不加阻塞
# 如果是False 那就加阻塞
# 控制这个属性的值
# set()方法 将这个属性的值改成True
# clear()方法 将这个属性的值改成False
# is_set()方法 判断当前的属性是否为True (默认上来是False)
from multiprocessing import Process,Event
import time,random
# (1) 基本用法
e = Event()
print(e.is_set()) # 默认False
# 将这个属性的值改成True
e.set()
# 将这个属性的值改成False
e.clear()
print(1)
e.wait()
print(2)
# 模拟红绿灯操作 假设有20辆小车全部通过
def traffic_light(e):
# traffic_light 只做一件事就是红灯和绿灯的变色效果
print("红灯亮")
while True:
# 默认is_set 获取到的值是False
if e.is_set():
#红灯区间
time.sleep(1)
print("红灯亮")
e.clear() # False
else:
#绿灯区间
time.sleep(1)
print("绿灯亮")
e.set() # True
def car(e,i):
#e.is_set() 为False时成立
if not e.is_set():
print("car %s 在等待" % (i))
e.wait()
print("car %s 通过了" % i)
if __name__ == "__main__":
e = Event()
p = Process(target=traffic_light,args = (e,))
p.start()
for i in range(20):
time.sleep(random.randrange(0,2))
p = Process(target=car,args = (e,i))
p.start()
# 优化: 假设有20辆小车全部通过后,红绿灯终止;
def traffic_light(e):
# traffic_light 只做一件事就是红灯和绿灯的变色效果
print("红灯亮")
while True:
# 默认is_set 获取到的值是False
if e.is_set():
#红灯区间
time.sleep(1)
print("红灯亮")
e.clear() # False
else:
#绿灯区间
time.sleep(1)
print("绿灯亮")
e.set() # True
def car(e,i):
#e.is_set() 为False时成立
if not e.is_set():
print("car %s 在等待" % (i))
e.wait()
print("car %s 通过了" % i)
if __name__ == "__main__":
e = Event()
lst = []
p = Process(target=traffic_light,args = (e,))
p.daemon = True
p.start()
for i in range(20):
time.sleep(random.randrange(0,2))
p = Process(target=car,args = (e,i))
p.start()
lst.append(p)
for p in lst:
p.join()
print("程序彻底跑完~")
生产者与消费者
# IPC Inter-Process Communication
# 实现进程之间通信的两种机制:
# 管道 Pipe
# 队列 Queue
# put() 存放
# get() 获取
# get_nowait() 拿不到报异常
# put_nowait() 非阻塞版本的put
q.empty() 检测是否为空 (了解)
q.full() 检测是否已经存满 (了解)
队列 Queue
# ### 队列 Queue
import queue
from multiprocessing import Process,Queue
'''
队列特征:先进先出
'''
# (1) 基础语法
q2 = Queue()
# 把数据存放在队列当中
q2.put(1)
q2.put(2)
# 获取队列当中的数据
print(q2.get())
print(q2.get())
# 队列当中如果已经没有数据了,那会执行阻塞.
print(q2.get())
print(q.get_nowait())
print(q.get_nowait())
# 捕捉异常类错误
# get_nowait linux 不兼容;只有windows好用
try:
print(q.get_nowait())
except queue.Empty:
print("empty")
except:
print(123)
# (2) 可以为queue 指定队列的长度
q1 = Queue(3) # 最多放3个 ,超过最大长度,就执行阻塞
q1.put(1)
q1.put(2)
q1.put(3)
# q1.put(4)
# q1.put_nowait(4) # 用put_nowait添加值超过队列长度直接报错.
res = q1.full()
print(res)
q3 = Queue()
res = q3.empty()
print(res)
# (3) 进程之间的通讯
def func(q):
print("子进程:",q.get())
q.put("abc")
if __name__ == "__main__":
q4 = Queue()
p = Process(target=func,args=(q4,))
p.start()
q4.put("111222333")
p.join()
print("主进程:",q4.get())
消费者模型
'''
# 爬虫例子:
一个进程1号负责获取所有网页当中的内容
一个进程2号负责提取网页当中的连接
那么这个1号进程就可以看成一个生产者
这个2号进程可以看成一个消费者
有时可能生产者比消费者快,反之亦然
所以在生产者和消费者之间加一个缓冲区(队列)
生产者和消费者模型从程序上来讲就是数据的存入和获取的过程
最为理想的生产者和消费者模型 , 两者之间的运行速度应该是同步的
'''
from multiprocessing import Process,Queue
import time
import random
# 消费者方法 [负责取值]
def consumer(q,name):
while True:
food = q.get()
if food is None:
break
time.sleep(random.uniform(0.5,1))
print("%s吃了一个%s" % (name,food))
# 生产者方法 [负责存值]
def producer(q,name,food):
for i in range(5):
time.sleep(random.uniform(0.3,0.8))
print("%s生产了%s" % (name,food))
q.put(food+str(i))
if __name__ == "__main__":
q = Queue()
c1 = Process(target=consumer,args=(q,"张三"))
c2 = Process(target=consumer,args=(q,"李四"))
# c1.daemon = True
c1.start()
c2.start()
p1 = Process(target=producer,args=(q,"王五","地瓜"))
p2 = Process(target=producer,args=(q,"tony","黄瓜"))
p1.start()
p2.start()
p1.join()
p2.join()
q.put(None)
q.put(None)
JoinableQueue
# ### JoinableQueue
'''
put 存入
get 获取
task_done 通过队列其中一个数据被处理
JoinableQueue 默认会计数,执行一次taskdone减少一次队列数
JoinableQueue 里面有5个值,taskdone减少一个,减到0 , 队列里面的值为空
q.join() 队列.join
添加阻塞,直到放入队列的所有值都被处理掉的时候,撤掉阻塞
'''
from multiprocessing import Process,JoinableQueue
import random
import time
def consumer(q,name):
while True:
food = q.get()
time.sleep(random.uniform(0.5,1))
print("%s接受了 %s" % (name,food))
q.task_done()
def producer(q,name,food):
for i in range(5):
time.sleep(random.uniform(0.3,0.8))
print("%s生产了 %s" % (name,food+str(i)))
q.put(food+str(i))
if __name__ == "__main__":
jq = JoinableQueue()
c1 = Process(target=consumer,args=(jq,"张三"))
c1.daemon = True
c1.start()
p1 = Process(target=producer,args= (jq,"李四","葡萄"))
p1.start()
p1.join() # 生产者需要把所有数据放到队列当中
# p1.join()
#添加阻塞,直到放入队列的所有值都被处理掉的时候,撤掉阻塞
# 通过task_done 表达处理掉的意思
# join 和 task_done 需要配合使用;
jq.join()
进程池
# 进程池:
# 开启过多的进程并不一定提高你的效率,
# 如果cpu负载任务过多,平均单个任务执行的效率就会低,反而降低执行速度.
1个人做4件事和4个人做4件事
显然后者执行速度更快,
前者是并发,后者是并行
利用进程池,可以开启cpu的并行效果
# apply 开启进程,同步阻塞,每次都要等待当前任务完成之后,在开启下一个进程
# apply_async 开启进程,异步非阻塞,(主进程 和 子进程异步)
# ### Manager dict list 能够实现进程之间的数据共享
from multiprocessing import Process , Manager ,Lock
def work(d,lock):
# 自动上锁和解锁
with lock:
d["count"] -= 1
"""
lock.acquire()
d["count"] -= 1
lock.release()
"""
if __name__ == "__main__":
lock = Lock()
m = Manager()
dic = m.dict({"count":100})
lst = []
for i in range(100):
p =Process(target=work,args = (dic,lock))
lst.append(p)
p.start()
# 等待每一个进程执行完毕
for p in lst:
p.join()
print(dic)
# ### 进程池
import os,time
from multiprocessing import Pool,Process
# (1) 比较pool 和Process 执行速度
'''因为进程池可以实现并行的概念,比Process单核并发的速度要快'''
def func(num):
print("发了第%s封邮件" % (num))
# time.sleep(1)
# print("end")
if __name__ == "__main__":
# print(os.cpu_count()) # 获取cpu核心数量
start = time.time()
p = Pool(6)
for i in range(100):
# apply_async 用来开启子进程
p.apply_async(func,args=(i,))
# 关闭进程池,用户不能在向这个池中创建子进程
p.close()
# 这里加阻塞,直到进程池中所有的子进程执行完毕
p.join()
end = time.time()
print(end-start)
# 单核Process处理100件任务
start = time.time()
lst = []
for i in range(100):
p = Process(target=func,args= ( i ,))
p.start()
lst.append(p)
for p in lst:
p.join()
end = time.time()
print(end-start)
# (2) apply 开启进程,同步阻塞,每次都要等待当前任务完成之后,在开启下一个进程
def task(num):
time.sleep(1)
print("%s: %s" % (num,os.getpid()))
return num ** 2
if __name__ == "__main__":
p = Pool(2)
for i in range(20):
# apply是同步阻塞,每个进程必须执行完,才能在开启进程.
res = p.apply(task,args=(i,))
print("-->",res)
# (3) apply_async 异步非阻塞
def task(num):
time.sleep(3)
print("%s: %s" % (num,os.getpid()))
return num ** 2
if __name__ == "__main__":
p = Pool()
res_lst = []
for i in range(20):
res = p.apply_async(task,args=(i,))
print(res)
res_lst.append(res)
# 返回的对象需要使用get方法获取到返回值
# print(len(res_lst))
for res in res_lst:
print(res.get())
# close 和 join 一写写2个保持同步.
# p.close()
# p.join()
# (4) 进程池.map (与高阶函数map使用方法一样,只不过这个是并发版本)
def task(num):
time.sleep(1)
print("%s:%s"%(num,os.getpid()))
return num ** 2
if __name__ == "__main__":
p = Pool()
res = p.map(task,range(20)) #自动close和join
print(res)
进程池并发socket
server
import socket
from multiprocessing import Pool
def talk(conn):
while True:
conn.send(b"hello")
print(conn.recv(1024))
conn.close()
if __name__ =="__main__":
sk = socket.socket()
sk.bind( ("127.0.0.1",9000) )
sk.listen()
# Pool默认获取cpu_counter cpu最大核心数
p = Pool()
while True:
conn,addr = sk.accept()
p.apply_async(talk,args=(conn,))
sk.close()
client
import socket
sk = socket.socket()
sk.connect( ("127.0.0.1",9000) )
while True:
print(sk.recv(1024))
sk.send(b"byebye")
python 线程
### 线程
"""
#进程是资源分配的最小单位
#线程是计算机中调度的最小单位
#线程的缘起
资源分配需要分配内存空间,分配cpu:
分配的内存空间存放着临时要处理的数据等,比如要执行的代码,数据
而这些内存空间是有限的,不能无限分配
目前来讲,普通机器,5万个并发程序已是上限.线程概念应用而生.
#线程的特点
线程是轻量级,干更多的活,同一个进程中的所有线程的资源是共享的.
每一个进程中都有至少一条线程在工作
"""
### 线程的缺陷
"""
#线程可以并发,但是不能并行(即可以1个cpu执行,不能多个cpu一起执行)
#原因:
python是解释型语言,执行一句编译一句,而不是一次性全部编译成功,不能提前规划,都是临时调度
容易造成不同的cpu却反复执行同一个程序.所以加了一把锁叫GIL
全局解释器锁(Cpython解释器特有) GIL锁:同一时间一个线程只能被一个cpu执行
#想要并行的解决办法:
(1)用多进程
(2)换一个Pypy解释器
#计算型程序会过度依赖cpu,但网页,爬虫,OA办公,这种交互型带有阻塞的程序里,速度影响无所谓
"""
### 线程相关函数
"""
线程.is_alive() 检测线程是否仍然存在
线程.setName() 设置线程名字
线程.getName() 获取线程名字
currentThread().ident 查看线程id号
enumerate() 返回目前正在运行的线程列表
activeCount() 返回目前正在运行的线程数量
"""
import os,time
from threading import Thread
from multiprocessing import Process
# (1) 一个进程可以有多个线程
def func(i):
print("子线程:", i,os.getpid())
for i in range(10):
t = Thread(target=func,args=(i,))
t.start()
#(2) 并发多线程和多进程的速度对比 多线程更快
def func(i):
print("子线程:", i,os.getpid())
if __name__ == "__main__":
start = time.time()
lst = []
for i in range(100):
t = Thread(target=func,args=(i,))
t.start()
lst.append(t)
for t in lst:
t.join()
end = time.time()
t1 = end-start
start = time.time()
lst = []
for i in range(100):
p = Process(target=func,args=(i,))
p.start()
lst.append(p)
for p in lst:
p.join()
end = time.time()
t2 = end - start
print(t1,t2)
# (3) 多个线程共享同一份进程资源
num = 100
lst = []
def func():
global num
num -= 1
for i in range(100):
t = Thread(target=func)
t.start()
lst.append(t)
for t in lst:
t.join()
print(num)
# (4) 线程相关函数
'''
线程.is_alive() 检测线程是否仍然存在
线程.setName() 设置线程名字
线程.getName() 获取线程名字
currentThread().ident 查看线程id号
enumerate() 返回目前正在运行的线程列表
activeCount() 返回目前正在运行的线程数量
'''
def func():
time.sleep(0.01)
t = Thread(target=func)
t.start()
print(t.is_alive())
print(t.getName()) #Thread-1
t.setName("t1")
print(t.getName())
time.sleep(3)
print(t.is_alive()) # False
# 1.currentThread().ident 查看线程id号
from threading import currentThread
def func():
print("子线程:",currentThread().ident)
print("主线程:",currentThread().ident)
t = Thread(target=func)
t.start()
# enumerate() 返回目前正在运行的线程列表
from threading import enumerate,currentThread
def func(i):
print("子线程:",currentThread().ident)
time.sleep(3)
for i in range(10):
t = Thread(target=func,args=(i,))
t.start()
print(enumerate())
print(len(enumerate())) # 加上主线程一共11个
"""
[
<_MainThread(MainThread, started 74964)>,
<Thread(Thread-11, started 78972)>
]
"""
# activeCount() 返回目前正在运行的线程数量
from threading import activeCount,currentThread
def func(i):
print("子线程:",currentThread().ident)
time.sleep(2)
for i in range(10):
t = Thread(target=func,args=(i,))
t.start()
time.sleep(3)
print(activeCount()) # 加上主线程一共11个
守护线程
# ### 守护线程:等待其他所有子线程结束之后,自动结束
from threading import Thread
import time
def func1():
while True:
time.sleep(0.5)
print(111)
def func2():
print("func2 start->")
time.sleep(3)
print("func2 end->")
t1 = Thread(target=func1)
t2 = Thread(target=func2)
# setDaemon 将普通线程变成守护线程
t1.setDaemon(True)
t1.start()
t2.start()
t2.join()
time.sleep(5)
print("主线程代码结束")
线程数据安全
from threading import Thread,Lock
n = 0
# ###线程之间如果共同更改同一个数据,需要加锁
def func1(lock):
global n
for i in range(100000):
lock.acquire()
n-=1
lock.release()
def func2(lock):
global n
for i in range(100000):
lock.acquire()
n+=1
lock.release()
if __name__ == "__main__":
lock = Lock()
lst = []
for i in range(10):
t1 = Thread(target=func1,args=(lock,))
t2 = Thread(target=func2,args=(lock,))
t1.start()
t2.start()
lst.append(t1)
lst.append(t2)
for t in lst:
t.join()
print(n)
死锁,递归锁,互斥锁
"""
加一把锁,就对应解一把锁.形成互斥锁.
从语法上来说,锁可以互相嵌套,但不要使用,
不要因为逻辑问题让上锁分成两次.导致死锁
递归锁用于解决死锁,但只是一种应急的处理办法
"""
# ### 死锁,递归锁,互斥锁
from threading import Thread,Lock
import time
noodle_lock = Lock()
kuaizi_lock = Lock()
# (1)常见情况引发的死锁问题
def eat1(name):
noodle_lock.acquire()
print("%s拿到这个面条" % (name))
kuaizi_lock.acquire()
print("%s拿到了筷子" % (name))
print("开始吃")
time.sleep(0.5)
kuaizi_lock.release()
print("%s把筷子放下"% (name))
noodle_lock.release()
print("%s把面放下"% (name))
def eat2(name):
kuaizi_lock.acquire()
print("%s拿到了筷子" % (name))
noodle_lock.acquire()
print("%s拿到这个面条" % (name))
print("开始吃")
time.sleep(0.5)
noodle_lock.release()
print("%s把面放下"% (name))
kuaizi_lock.release()
print("%s把筷子放下"% (name))
if __name__ == "__main__":
name_list = ["张楠","定海呀"]
name_list2 = ["晨露中","周金波"]
for name in name_list:
Thread(target=eat1,args= (name,) ).start()
# Thread(target=eat1,args= (name,) ).start()
for name in name_list2:
Thread(target=eat2,args = (name,)).start()
# Thread(target=eat2,args = (name,)).start()
# (2) 递归锁
from threading import Thread,RLock
'''
递归所专门用来解决死锁现象的
用于临时快速解决服务器崩溃现象,加上递归所应急
'''
rlock = RLock()
def func(name):
rlock.acquire()
print(name,1)
rlock.acquire()
print(name,2)
rlock.acquire()
print(name,3)
rlock.release()
rlock.release()
rlock.release()
for i in range(10):
Thread(target=func,args=("name%s" % (i),) ).start()
import time
print("<==>")
noodle_lock = kuaizi_lock = RLock()
def eat1(name):
noodle_lock.acquire()
print("%s拿到这个面条" % (name))
kuaizi_lock.acquire()
print("%s拿到了筷子" % (name))
print("开始吃")
time.sleep(0.5)
kuaizi_lock.release()
print("%s把筷子放下"% (name))
noodle_lock.release()
print("%s把面放下"% (name))
def eat2(name):
kuaizi_lock.acquire()
print("%s拿到了筷子" % (name))
noodle_lock.acquire()
print("%s拿到这个面条" % (name))
print("开始吃")
time.sleep(0.5)
noodle_lock.release()
print("%s把面放下"% (name))
kuaizi_lock.release()
print("%s把筷子放下"% (name))
if __name__ == "__main__":
name_list = ["张楠","定海呀"]
name_list2 = ["晨露中","周金波"]
for name in name_list:
Thread(target=eat1,args= (name,) ).start()
for name in name_list2:
Thread(target=eat2,args = (name,)).start()
# (3) 互斥锁:
'''
从语法上说,锁是允许互相嵌套的,但是不要使用
上一次锁,就对应解开一次,形成互斥锁
吃面和拿筷子是同时的
不要因为逻辑问题让锁分别上,容易造成死锁.
'''
from threading import Thread,Lock
mylock = Lock()
def eat1(name):
mylock.acquire()
print("%s拿到筷子" % (name))
print("%s吃到面条" % (name))
time.sleep(0.5)
print("%s放下筷子" % (name))
print("%s放下面条" % (name))
mylock.release()
def eat2(name):
mylock.acquire()
print("%s拿到筷子" % (name))
print("%s吃到面条" % (name))
time.sleep(0.5)
print("%s放下筷子" % (name))
print("%s放下面条" % (name))
mylock.release()
if __name__ == "__main__":
name_list = ["张三","李四"]
name_list2 = ["tony","tom"]
for name in name_list:
Thread(target=eat1,args= (name,) ).start()
for name in name_list2:
Thread(target=eat2,args = (name,)).start()
线程中的信号量
from threading import Semaphore,Thread
import time
def func(index,sem):
sem.acquire()
print(index)
time.sleep(1)
sem.release()
if __name__ == "__main__":
sem = Semaphore(5)
for i in range(16):
Thread(target=func,args=(i,sem)).start()
'''
进程而言:
信号量是配合Process使用,可以限制并发的进程数
在有了进程池之后,可以取代如上做法
线程而言
信号量是配合Thread使用的,可以限制并发的线程数
线程池可以取代如上用法
'''
事件
# ### 模拟连接数据库
from threading import Event,Thread
import time,random
'''
# wait() 动态给程序加阻塞
# set() 将内部属性改成True
# clear() 将内部属性改成False
# is_set() 判断当前属性(默认是False)
'''
'''
e = Event()
print(e.is_set())
# timeout = 5 最多等待5秒
e.wait(timeout = 5)
print(e.is_set())
'''
def check(e):
print("开始检测数据库连接")
time.sleep(random.randrange(1,6))
e.set()
def connect(e):
sign = False
for i in range(3):
e.wait(1)
if e.is_set():
sign = True
print("数据库连接成功")
break
else:
print("尝试连接数据库%s次失败" % (i+1))
if sign == False:
# 主动抛出异常
raise TimeoutError
e = Event()
Thread(target=connect,args=(e,)).start()
Thread(target=check,args=(e,)).start()
条件
# ### 条件
'''
wait 添加阻塞
notify 允许几个被阻塞的线程放行
#语法:wait 使用的前后需要上锁
"""
acquire()
wait
wait下面写上相应逻辑代码
release()
"""
#语法: notify 使用前后加阻塞
"""
acquire
notify(自定义线程放行的数量,默认放行1个)
release
"""
'''
from threading import Condition,Thread
import time
def func(con,index):
print("%s在等待" % index)
con.acquire()
# 添加阻塞
con.wait()
print("%s do everyting" % index)
con.release()
con = Condition()
for i in range(10):
t = Thread(target=func,args = (con,i))
t.start()
# 写法一
con.acquire()
con.notify(5) # 默认放行1个(不加参数)
con.release()
# 写法二
count = 10
while count > 0:
num = int(input(">>>
"))
con.acquire()
# 放行线程 num 动态赋予的值
con.notify(num)
con.release()
count -= num
定时器
# ### 定时器
from threading import Timer
def func():
print("目前正在执行任务")
t = Timer(5,func)
t.start()
print("主线程")
# 计划任务 crontab
线程队列
线程常用队列有 queue LifoQueue PriorityQueue
queue 先进先出
LifoQueue 后进先出
PriorityQueue 按照优先级排序
# ### 队列
import queue
'''
put 往队列当中放值,超过队列长度,直接加阻塞
get 如果获取不到加阻塞
put_nowait 如果放入的超过了队列长度,直接报异常错误
get_nowait 如果获取不到直接报异常错误
'''
# (1)queue 先进先出
q = queue.Queue()
q.put(1)
q.put(2)
print(q.get())
# print(q.get())
# print(q.get())
print(q.get_nowait())
q = queue.Queue(2)
q.put(1)
q.put(2)
# q.put(3)
q.put_nowait(3)
# (2) lifoqueue 用栈的后进先出的顺序形成队列
from queue import LifoQueue
lq = LifoQueue()
lq.put(1)
lq.put(2)
lq.put('aaa')
print(lq.get())
# (3) PriorityQueue 队列 按照优先级顺序获取
from queue import PriorityQueue
# 1.先按照数字进行排列,在按照ascii吗从小到大排序
pq = PriorityQueue()
pq.put( (13,'abc') )
pq.put( (5,"ccc") )
pq.put( (5,"bcc") )
pq.put( (15,"bbb") )
print(pq.get())
print(pq.get())
print(pq.get())
print(pq.get())
# 2. 单独一个元素的,只能是都放同一种类型
# 如果是数字
from queue import PriorityQueue
pq = PriorityQueue()
pq.put(15)
pq.put(13)
pq.put(3)
# pq.put("3434") error
print(pq.get())
print(pq.get())
print(pq.get())
# print(pq.get())
# 如果是字符串
pq = PriorityQueue()
pq.put("bbd1")
pq.put("bbcb2")
# pq.put(33344) error
print(pq.get())
print(pq.get())
线程池和进程池 (改良版)
from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
import time,random
def func(i):
print("Process",i)
time.sleep(1)
print("Process is end ")
#(1) ProcessPoolExecutor 进程池的基本使用(改良版)
if __name__ == "__main__":
# ProcessPoolExecutor <==> Pool
p = ProcessPoolExecutor(5)
# submit <==> apply_async
p.submit(func,1)
# shutdown <==> close + join
p.shutdown()
print("主线程")
#(2) 线程池
from threading import current_thread as cthread
def func(i):
# cthread().indent 线程号
print("thread",i,cthread().ident)
time.sleep(random.uniform(0.1,1))
print("thread %s end" % (i))
tp = ThreadPoolExecutor(5)
for i in range(20):
tp.submit(func,i)
tp.shutdown()
print("主线程")
#(3) 返回值
def func(i):
print("thread",i,cthread().ident)
time.sleep(random.uniform(0.1,1))
print("thread %s end" % (i))
return i * "*"
tp = ThreadPoolExecutor(5)
lst = []
for i in range(20):
res = tp.submit(func,i)
# print(res)
lst.append(res)
for res in lst:
# result 和 get 使用方法一样的 result带有阻塞效果.
print(res.result())
print("主线程")
# (4) map 返回生成器
def func(i):
print("thread",i,cthread().ident)
time.sleep(random.uniform(0.1,1))
print("thread %s end" % (i))
return i * "*"
tp = ThreadPoolExecutor(5)
res = tp.map(func,range(20))
# for i in res:
tp.shutdown()
for i in res:
print(i)
print(res)
回调函数
# ### 回调函数
'''
# 回调函数
就是一个参数,将这个函数作为参数传到另一个函数里面.
函数先执行,再执行当参数传递的这个函数,这个参数函数是回调函数
'''
# (1) 线程池的回调函数是 子线程完成的.
from concurrent.futures import ThreadPoolExecutor
from threading import current_thread as cthread
import time
def func(i):
print("thread",i,cthread().ident)
time.sleep(1)
print("thread %s end" % (i))
return i * "*"
def call_back(args):
print("call back : ",cthread().ident)
print(args)
print(args.result())
tp = ThreadPoolExecutor(5)
for i in range(20):
# 当执行完这个线程后 在直接执行call_back这个函数
tp.submit(func,i).add_done_callback(call_back)
tp.shutdown()
print("主线程",cthread().ident)
# (2)进程池的回调函数有?来完成
from concurrent.futures import ProcessPoolExecutor
import os,time
def func(i):
print('process',i,os.getpid())
time.sleep(1)
print('process %s end' % i)
if __name__ == "__main__":
p = ProcessPoolExecutor(5)
p.submit(func,1)
p.shutdown()
print("主线程",os.getpid())
# 进程池的回调函数 , 是由主进程完成的
from concurrent.futures import ProcessPoolExecutor
import os ,time
def func(i):
print("process",i,os.getpid())
time.sleep(1)
print("process %s end" % (i))
return i * "*"
def call_back(args):
print("call back : ",os.getpid())
print(args)
print(args.result())
if __name__ == "__main__":
tp = ProcessPoolExecutor(5)
for i in range(20):
# 当执行完这个线程后 在直接执行call_back这个函数
tp.submit(func,i).add_done_callback(call_back)
# tp.shutdown()
print("主线程",os.getpid())
协程
# ### 协程
def my_gen():
for i in range(10):
yield i
for num in my_gen():
print(num)
# (1) 协程写生产者消费者模型
# producer 生产者
def producer():
for i in range(100):
yield i
# consumer 消费者
def consumer():
g = producer()
for num in g:
print(num)
consumer()
# (2) 协程的基本实现
'''
switch 利用它进行任务的切换
只能进行手动切换,
缺陷:不能规避雕io,不能自动实现遇到阻塞就切换.
'''
from greenlet import greenlet
import time
def eat():
print("eat one1")
# 切换到play这个协程中
g2.switch()
time.sleep(1)
print("eat one2")
def play():
print("play one1")
print("play one2")
g1.switch()
g1 = greenlet(eat)
g2 = greenlet(play)
g1.switch()
# (3) gevent 有他的缺陷
import gevent,time
'''
spawn 自动检测阻塞,遇到阻塞就切换
相当于switch 的升级版 ,
瑕疵:不能够识别time.sleep() 有些阻塞不认识.
'''
def eat():
print("eat one1")
time.sleep(1)
print("eat one2")
def play():
print("play one1")
time.sleep(1)
print("play one2")
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
# 协程的阻塞也是join
g1.join() # 阻塞直到g1协程执行完毕
g2.join() # 阻塞直到g2协程执行完毕
# (4) 进阶版本 用gevent.sleep 取代time.sleep()
import gevent,time
'''
spawn 自动检测阻塞,遇到阻塞就切换
相当于switch 的升级版 ,
瑕疵:不能够识别time.sleep() 有些阻塞不认识.
'''
def eat():
print("eat one1")
gevent.sleep(1)
print("eat one2")
def play():
print("play one1")
gevent.sleep(1)
print("play one2")
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
# 协程的阻塞也是join
g1.join() # 阻塞直到g1协程执行完毕
g2.join() # 阻塞直到g2协程执行完毕
# (5) 终极解决办法
# from gevent import monkey
# 把它下面引入的模块都识别出来
# monkey.patch_all()
# 简化:都写在一行的情况下,有分号隔开.
from gevent import monkey;monkey.patch_all()
import time
import gevent
def eat():
print("eat one1")
time.sleep(1)
print("eat one2")
def play():
print("play one1")
time.sleep(1)
print("play one2")
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
# 协程的阻塞也是join
g1.join() # 阻塞直到g1协程执行完毕
g2.join() # 阻塞直到g2协程执行完毕
# ### 协程
from gevent import monkey;monkey.patch_all()
import gevent
"""
# spawn(函数,参数...) 启动一个协成
# join() 阻塞,直到某个协程执行完毕
# joinall 类似于join 只不过
g1.join()
g2.join()
gevent.joinall([g1,g2]) 一次性把所有需要阻塞的协程对象写到一起
# value 获取协成的返回值
"""
# (1) joinall value 的使用方法
import time
def eat():
print("eating 111")
time.sleep(1)
print("eating 222")
return "吃完了"
def play():
print("play 111")
time.sleep(1)
print("play 222")
return "play done"
g1 = gevent.spawn(eat)
g2 = gevent.spawn(play)
gevent.joinall([g1,g2])
print(g1.value)
print(g2.value)
# (2) 爬虫的应用
"""
HTTP 状态码:
200 OK
400 Bad Request
404 Not Found
requests 模块
response = requests.get(网址)
"""
import requests,time
response = requests.get("http://www.4399.com")
# print(response)
# 获取状态码
res = response.status_code
print(res)
# 获取,设置字符编码
res_code = response.apparent_encoding
# print(res)
# 设置编码集
response.encoding = res_code
# 获取网页里面的内容
res = response.text
print(res)
url_lst = [
"http://www.taobao.com",
"http://www.jd.com",
"http://www.4399.com",
"http://www.7k7k.com",
"http://www.baidu.com",
"http://www.taobao.com",
"http://www.jd.com",
"http://www.4399.com",
"http://www.7k7k.com",
"http://www.baidu.com",
"http://www.taobao.com",
"http://www.jd.com",
"http://www.4399.com",
"http://www.7k7k.com",
"http://www.baidu.com",
"http://www.taobao.com",
"http://www.jd.com",
"http://www.4399.com",
"http://www.7k7k.com",
"http://www.baidu.com",
"http://www.taobao.com",
"http://www.jd.com",
"http://www.4399.com",
"http://www.7k7k.com",
"http://www.baidu.com",
"http://www.taobao.com",
"http://www.jd.com",
"http://www.4399.com",
"http://www.7k7k.com",
"http://www.baidu.com",
]
# (1) 正常爬取数据 , 速度慢
def get_url(url):
response = requests.get(url)
if response.status_code == 200:
print(response.text)
# start = time.time()
# for i in url_lst:
# get_url(i)
# end = time.time()
# print(end-start) #3.033189535140991
# (2) 用协程爬取数据 速度更快
lst = []
start = time.time()
for i in url_lst:
g = gevent.spawn(get_url,i)
lst.append(g)
gevent.joinall(lst)
end = time.time()
print(end-start) #0.6957511901855469
线程之间的数据隔离
from threading import local,Thread
# 线程之间的数据隔离
'''
多个线程之间,使用threading.local对象
可以实现多个线程之间的数据隔离
'''
loc = local()
print(loc) # 对象
def func(name,age):
global loc
loc.name = name
loc.age = age
print(loc.name,loc.age)
Thread(target=func,args=("hello",20)).start()
Thread(target=func,args=("world",20)).start()