函数:生产者和消费者
import random
from queue import Queue
from threading import Thread, current_thread
import time
# 实例化一个队列
myq = Queue()
# 定义生产者
def producer():
while True:
tmp = random.randint(1,100)
myq.put(tmp)
print("%s生产了%s,生产后,现在产品总量:%s" % (current_thread().name, tmp, myq.qsize()))
time.sleep(0.5)
# 定义消费者
def consumer():
while True:
print("%s消费了%s,剩余产品%s" % (current_thread().name, myq.get(), myq.qsize()))
time.sleep(1.1)
# 启动生产者和消费者
# 启动生产者
tp = Thread(target=producer)
tp.start()
# 启动消费者
for i in range(2):
tc = Thread(target=consumer)
tc.start()
函数2:
# 编写一个基于tcp的echo服务器(回响服务器,即将客户端发送的信息返回给客户端),
# 要求使用线程和生产者消费者模型(提示:一个线程accept--生产者;两个线程用于接收和发送--消费者)。
import socket
from threading import Thread, current_thread
from queue import Queue
# 生产者
def accept_t(queue):
print("当前线程",current_thread().name)
# client_info = server.accept()
# queue.put(client_info)
# 消费者recv
def recv_t(queue, queue_data):
client_info = queue.get()
client_sock = client_info[0]
data = client_sock.recv(1024)
queue_data.put(data)
pass
try:
print(data.decode())
except:
print(data.decode('gbk'))
# 消费者send
def send_t(queue_data):
data = queue_data.get()
client_sock = client_info[0]
client_sock.send(data)
client_sock.close()
pass
if __name__ == "__main__":
client_info = None
server = None
# 创建服务器的套接字(监听套接字)
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# 设置地址复用属性
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# 绑定IP和端口
server_address = ("", 7972)
server.bind(server_address)
# 监听
server.listen(128)
queue = Queue()
queue_data = Queue()
t1 = Thread(target=accept_t, args=(queue))
t1.start()
t2 = Thread(target=recv_t, args=(queue, queue_data))
t2.start()
t3 = Thread(target=send_t, args=(queue_data,))
t3.start()
t1.join()
t2.join()
t3.join()
类:生产者和消费者
import socket
from queue import Queue
from threading import Thread
import time
import chardet
client_queue = Queue()
# 生产者
class Producer(Thread):
def __init__(self, tcp_server):
super().__init__()
self.tcp_server = tcp_server
def run(self):
client_info = self.tcp_server.accept()
client_queue.put(client_info)
# 消费者
class Consumer(Thread):
def __init__(self):
super().__init__()
def run(self):
client_info = client_queue.get()
client_sock = client_info[0]
client_addr = client_info[1]
msg = client_sock.recv(1024)
print("原始字节流:",msg)
a = 'abcd'.encode("UTF-8")
print('a:', a)
# a = msg.decode()
code = chardet.detect(a)
print('获取到a的编码是',code['encoding'])
print("%s说:%s" % (client_addr, msg.decode()))
client_sock.send(msg.decode().encode('gbk'))
client_sock.close()
print('consumer is over')
# 主函数
def main():
tcp_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind(("", 7892))
tcp_server.listen(128)
p = Producer(tcp_server)
c1 = Consumer()
# c2 = Consumer()
p.start()
c1.start()
# c2.start()
# time.sleep(2)
p.join()
c1.join()
# c2.join()
tcp_server.close()
if __name__ == '__main__':
main()