0.本周知识点预览
- Contextlib
- Redis发布订阅
- RabbitMQ
- pymysql
- SQLAchemy
1.Contextlib模块
contextlib模块的contextmanager 可以实现用with来管理上下文,类似于 with open('test.txt','r') as f,这样打开文件操作后就可以自动关闭文件。
1.范例一(自定义函数):
###contextlib (实现with上下文管理) import contextlib list1 = [1,2,3] str1 = "lk"
###此处必须是这个装饰器 @contextlib.contextmanager def func(l, s): l.append(s) try: ###执行到yield时,中断跳出函数 yield finally: print(l) with func(list1, str1): print(123) print(456)
执行结果如下:
123 456 [1, 2, 3, 'lk']
代码解析:以上代码的执行顺序为:
1.加载list1,str1,contextlib.contextmanager装饰器
2.执行with func(list1, str1)
3.执行def func(l, s), l.append(s)
4.try,yield,跳出函数回到with func(list1, str1) 执行print
5.执行完print后回到def func中的yield处,继续往下执行
2.范例二(socket):
###利用上下文可以处理文件那样处理类似socket,自动关闭连接 import contextlib import socket @contextlib.contextmanager def base_socket(host, port): sk = socket.socket() sk.bind((host, port)) sk.listen(5) print(123) try: yield sk finally: print(789) sk.close() with base_socket("127.0.0.1", 8888) as sock: print(456)
执行结果如下:
123 456 789
代码解析:这个是context对socket的应用,在项目中就可以这么写。执行顺序和上个例子相同。
2.Redis 发布订阅
自定义redis基础类:
## redis 发布订阅 import redis class RedisHelper: def __init__(self): ###创建redis连接对象 self.__conn = redis.Redis(host="127.0.0.1", port=6379) def public(self, msg, chan): ###publish 方法,把信息发布到频道上,返回消息被传递的订阅者的数量 self.__conn.publish(chan, msg) return True def subscribe(self, chan): ###pubsub 方法的意思是,返回一个发布或者订阅的对象,用这个对象,你就能订阅这个频道,监听给发给这些频道的消息 pub = self.__conn.pubsub() ###subscribe 方法: 订阅频道 pub.subscribe(chan) ###parse_response 方法:解析从发布者/订阅者命令的响应 pub.parse_response() return pub
发布者代码:
import test fabu = test.RedisHelper() while True: inp = input(">>> ") if inp == "exit": break else: fabu.public("%s" % inp, "998")
订阅者代码:
import test dingyue = test.RedisHelper() while True: ###subscribe 方法 -> 订阅频道 data = dingyue.subscribe("998") ###parse_response 方法:解析从发布者/订阅者命令的响应 print(data.parse_response())
执行结果:
1、先执行订阅者代码,这时会卡在这里,等待接收消息。
2、后执行发布者代码,这时当发布消息时,订阅者就会收到消息。
3.RabbitMQ
1.未利用exchange
生产者代码:
import pika ###第一件事,我们要做的就是与rabbitmq-server建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1")) channel = connection.channel() ###创建一个队列,假如队列不存在,但是为了确保队列成功创建,C/P两端最好都创建队列 ###durable=True 消息持久化 channel.queue_declare("hello_lk3", durable=True) ###在Rabbitmq中,一个消息不能直接发送给queue, 需要经过一个exchange,后续会讲到 ,现在我们只需将exchange设置为空字符串 channel.basic_publish(exchange='', routing_key="hello_lk1", body="fuck", properties=pika.BasicProperties(delivery_mode=2,)) print("[x] sent 'fuck'") connection.close()
消费者代码:
import pika ###第一件事,我们要做的就是与rabbitmq-server建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1")) ###创建一个频道 channel = connection.channel() ###创建一个队列,假如队列不存在,但是为了确保队列成功创建,C/P两端最好都创建队列 ###durable=True 消息持久化 channel.queue_declare("hello_lk3", durable=True) ###函数名不必须叫callback,callback函数就是将接收到的消息打印在屏幕上 def callback(ch, mechod, properties, body): print("[%s] is received" % body) ###无限循环监听,调用callback,队列名,no_ack的含义为,当时True时,只要订阅到消息,立刻返回ack,这是TCP层面的,并不能确保消息成功消费 ###假如no_ack为False时,订阅到消息后要处理成功后才返回ack,这是业务逻辑层面的,确保消费者成功消费消息 channel.basic_consume(callback, queue="hello_lk1", no_ack=True) print("现在开始消费消息...") channel.start_consuming()
代码执行结果:
生产者:
[x] sent 'fuck' Process finished with exit code 0
消费者:
现在开始消费消息... [b'fuck'] is received [b'fuck'] is received [b'fuck'] is received [b'fuck'] is received
代码解析:
1.先执行消费者代码,在执行生产者代码,可以看到如上图所示结果。
2.执行两次消费者代码,会发现生产者每生产个消息,消费者会轮训的来消费。
3.在步骤2中,假如不想让消费者轮训消费而是先来先得的消费,则需要在消费者代码中加入一行:channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列。
2.使用exchange发布订阅(常用)
1.fanout exchange
这是处理逻辑最简单的exchange类型,实际上它没有任何逻辑,它把进入该exchange的消息全部转发给每一个绑定的队列中,如果这个exchange没有队列与之绑定,消息会被丢弃。然后通过exchange发送消息,routing key可以随便填写,因为是fanout类型的exchange,routing key不起作用。
生产者代码:
import pika ###第一件事,我们要做的就是与rabbitmq-server建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1")) channel = connection.channel() ###创建一个exchange,生产者直接向exchange发消息,而不是队列. ###type: fanout类型:把进入该exchange的消息全部转发给每一个绑定的队列中,如果这个exchange没有队列与之绑定,消息会被丢弃 channel.exchange_declare(exchange="lk", type="fanout") message = 'hello' channel.basic_publish(exchange='lk', routing_key='', body=message) print("send MSG: %s" % message) connection.close()
消费者1代码:
##exchange 版 import pika ###第一件事,我们要做的就是与rabbitmq-server建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1")) ###创建一个频道 channel = connection.channel() ###创建一个exchange,假如不存在,但是为了确保其成功创建,C/P两端最好都创建exchange ###lk 为exchange名 ###type: fanout类型:把进入该exchange的消息全部转发给每一个绑定的队列中,如果这个exchange没有队列与之绑定,消息会被丢弃 channel.exchange_declare(exchange="lk", type='fanout') ##随机创建队列 # result = channel.queue_declare(exclusive=True) # queue_name = result.method.queue ###指定创建队列 channel.queue_declare("hello_lk5") ##绑定队列到exchange channel.queue_bind(exchange="lk", queue="hello_lk5") ###函数名不必须叫callback,callback函数就是将接收到的消息打印在屏幕上 def callback1(ch, method, propreties, body): print("[x] 收到 %s" % body) channel.basic_consume(callback1, queue="hello_lk5", no_ack=True) channel.start_consuming()
消费者2代码:
##exchange 版 import pika ###第一件事,我们要做的就是与rabbitmq-server建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters(host="127.0.0.1")) ###创建一个频道 channel = connection.channel() ###创建一个exchange,假如不存在,但是为了确保其成功创建,C/P两端最好都创建exchange ###lk 为exchange名 ###type: fanout类型:把进入该exchange的消息全部转发给每一个绑定的队列中,如果这个exchange没有队列与之绑定,消息会被丢弃 channel.exchange_declare(exchange="lk", type='fanout') ##随机创建队列 # result = channel.queue_declare(exclusive=True) # queue_name = result.method.queue ###指定创建队列 channel.queue_declare("hello_lk6") ##绑定队列到exchange channel.queue_bind(exchange="lk", queue="hello_lk6") ###函数名必须叫callback,callback函数就是将接收到的消息打印在屏幕上 def callback1(ch, method, propreties, body): print("[x] 收到 %s" % body) channel.basic_consume(callback1, queue="hello_lk6", no_ack=True) channel.start_consuming()
执行结果:
生产者:
send MSG: hello
Process finished with exit code 0
消费者1:
[x] 收到 b'hello'
消费者2:
[x] 收到 b'hello'
代码解析:生产者直接向exchange发消息,这时,消费者创建并绑定队列到exchange上,生产者一旦发布,所有队列都会收到消息。
2.direct exchange
这种类型的交换机Fancout 类型的交换机智能一些,它会根据routing key来决定把消息具体扔到哪个消息队列中。通过exchange发消息的时候会指定一个routing key,只有当routing key和与队列绑定的routing key一样的时候,消息才对发送到对应的消息队列。即,如果与某个队列绑定的routing key叫hello.world,则通过exchange发送的routing key必须也是hello.world,该队列才能接收到消息(可按上述步骤进行验证)。这种情况下,队列之间是互斥关系,一个消息最多只能进入一个队列。
生产者代码:
import pika ###第一件事,我们要做的就是与rabbitmq-server建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters( host='127.0.0.1')) channel = connection.channel() ###创建一个exchange, 类型是direct, channel.exchange_declare(exchange='direct_logs', type='direct') ###定义关键字severity, 每次发消息时会指定关键字. severity = "error" ###message 是要发送的消息 message = "123" ###通过定义好的exchange发送消息,关键字也是定好的,发送指定的消息 channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
消费者1代码:
import pika # import sys ###第一件事,我们要做的就是与rabbitmq-server建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters( host='127.0.0.1')) ###创建一个频道 channel = connection.channel() ###创建一个exchange, 类型是direct, channel.exchange_declare(exchange='direct_logs', type='direct') ###随机创建一个队列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue ###关联关键字,只要生产者发布的消息关联了以下关键字,订阅者便能在绑定的队列中收到消息 severities = ["info", "waring", "error"] ###绑定关键字,队列到exchange. for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') ###打印订阅到的消息 def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) ###循环监听队列 channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
消费者2代码:
import pika # import sys ###第一件事,我们要做的就是与rabbitmq-server建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters( host='127.0.0.1')) ###创建一个频道 channel = connection.channel() ###创建一个exchange, 类型是direct, channel.exchange_declare(exchange='direct_logs', type='direct') ###随机创建一个队列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue ###关联关键字,只要生产者发布的消息关联了以下关键字,订阅者便能在绑定的队列中收到消息 severities = ["error"] ###绑定关键字,队列到exchange. for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') ###打印订阅到的消息 def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) ###循环监听队列 channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
执行结果:
1.首先执行消费者1和消费者2的代码
2.然后执行生产者的代码
3.可以看到消费者1、2都接收到了生产者的消息。
4.假如生产者发送带有非"error"关键字的消息,则只有消费者1才能收到。
消费者1执行结果:
[*] Waiting for logs. To exit press CTRL+C [x] 'error':b'123' [x] 'info':b'123'
消费者2执行结果:
[*] Waiting for logs. To exit press CTRL+C [x] 'error':b'123'
3.Topic exchange
Topic exchange是最灵活的exchange,它会把exchange的routing key与绑定队列的routing key进行模式匹配。Routing key中可以包含 和#两种符号,#号可以用来匹配一个或者多个单词,*用来匹配正好一个单词。
生产者代码:
import pika ###第一件事,我们要做的就是与rabbitmq-server建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters( host='127.0.0.1')) channel = connection.channel() ###创建一个exchange, 类型是topic channel.exchange_declare(exchange='topic_logs', type='topic') ###发布者绑定exchange的关键字,订阅者根据模糊匹配来订阅 routing_key = "lk.haha.python" message = "xxoo" channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
消费者代码:
import pika ###第一件事,我们要做的就是与rabbitmq-server建立连接 connection = pika.BlockingConnection(pika.ConnectionParameters( host='127.0.0.1')) channel = connection.channel() ###创建一个exchange, 类型是topic channel.exchange_declare(exchange='topic_logs', type='topic') ###随机创建一个队列 result = channel.queue_declare(exclusive=True) queue_name = result.method.queue ###订阅者绑定在exchange以及队列的关键字模糊匹配,这里#代表0个或多个单词,* 代表一个单词,假如只写一个#,代表全部匹配. binding_keys = ["lk.#"] ###根据多种匹配来绑定 for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
执行结果:
1.先执行消费者代码,后执行生产者代码。
2.因为生产者发送的消息带有关键字lk.haha.python,符合订阅者的绑定逻辑lk.#,所以这个消费者能收到消息。
3.这个用法很方便,可以通过匹配来进行消息的选择接收。
4.Python的SQLAchemy框架
1.MySQL 基础 ----> 请自行百度
2.SQLAchemy
1.pymysql
import pymysql ###创建一个MySQL 连接对象 conn = pymysql.connect(host='127.0.0.1', port=3306, user='root', passwd='123', db='s13') ###创建一个可以操作MySQL的游标,默认获取结果是元组,当设置cursor=pymysql.cursors.DictCursor,后获取结果为字典 cursor = conn.cursor() # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor) ###执行SQL语句 cursor.execute("select * from t10") # 获取第一行数据 # row_1 = cursor.fetchone() # 获取前n行数据 # row_2 = cursor.fetchmany(3) # 获取所有数据 row_3 = cursor.fetchall() print(row_3) ###mode='relative',获取结果相对位置移动 mode= 'absolute',获取结果绝对位置移动 # cursor.scroll(-2,mode='relative') # row_3 = cursor.fetchall() # print(row_3) ###提交操作,当执行如insert update alter delete drop 等操作后要提交才能生效 conn.commit() ###关闭游标 cursor.close() ###关闭数据库连接 conn.close()
执行结果:就是数据库的操作结果,不过,库和表都是事先从终端创建好的。
2.SQLAchemy基本操作
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index from sqlalchemy.orm import sessionmaker, relationship from sqlalchemy import create_engine ###创建一个数据库连接,连接池为5个 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s13", max_overflow=5) ###创建对象的基类,默认就这么写. Base = declarative_base() # 定义个User子类 class Users(Base): ###要创建的表名 __tablename__ = 'users' ###表的结构 id = Column(Integer, primary_key=True) name = Column(String(32)) extra = Column(String(16)) __table_args__ = ( UniqueConstraint('id', 'name', name='uix_id_name'), Index('ix_id_name', 'name', 'extra'), ) # 一对多 class Favor(Base): __tablename__ = 'favor' nid = Column(Integer, primary_key=True) caption = Column(String(50), default='red', unique=True) class Person(Base): __tablename__ = 'person' nid = Column(Integer, primary_key=True) name = Column(String(32), index=True, nullable=True) favor_id = Column(Integer, ForeignKey("favor.nid")) # 多对多 class ServerToGroup(Base): __tablename__ = 'servertogroup' nid = Column(Integer, primary_key=True, autoincrement=True) server_id = Column(Integer, ForeignKey('server.id')) group_id = Column(Integer, ForeignKey('group.id')) class Group(Base): __tablename__ = 'group' id = Column(Integer, primary_key=True) name = Column(String(64), unique=True, nullable=False) class Server(Base): __tablename__ = 'server' id = Column(Integer, primary_key=True, autoincrement=True) hostname = Column(String(64), unique=True, nullable=False) port = Column(Integer, default=22) ###执行建表操作(create_all),删表操作(drop_all) Base.metadata.create_all(engine) # Base.metadata.drop_all(engine)
执行结果如下:
mysql> show tables; +---------------+ | Tables_in_s13 | +---------------+ | favor | | group | | person | | server | | servertogroup | | t10 | | users | +---------------+ 7 rows in set (0.01 sec)
3.SQLAchemy 增删改查
1.利用数据库连接直接SQL语句执行
from sqlalchemy import create_engine ###创建一个数据库连接,连接池为5个 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s13", max_overflow=5) engine.execute( "INSERT INTO users (id, name, extra) VALUES (1, 'lk', 'haha')" ) result = engine.execute('select * from users') print(result.fetchall())
执行结果:
[(1, 'lk', 'haha')]
2.利用SQLAchemy内部组件操作
在利用SQLAchemy的子类继承模式创建表之后,创建对象,利用对象执行特定语句。
增:
obj = Users(name="alex0", extra='sb') session.add(obj) session.add_all([ Users(name="alex1", extra='sb'), Users(name="alex2", extra='sb'), ]) session.commit()
删:
session.query(Users).filter(Users.id > 2).delete() session.commit()
改:
session.query(Users).filter(Users.id > 2).update({"name" : "099"}) session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False) session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate") session.commit()
查:
ret = session.query(Users).all() ret = session.query(Users.name, Users.extra).all() ret = session.query(Users).filter_by(name='alex').all() ret = session.query(Users).filter_by(name='alex').first()
其他:
# 条件 ret = session.query(Users).filter_by(name='alex').all() ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all() ret = session.query(Users).filter(Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all() ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all() from sqlalchemy import and_, or_ ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all() ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all() ret = session.query(Users).filter( or_( Users.id < 2, and_(Users.name == 'eric', Users.id > 3), Users.extra != "" )).all() # 通配符 ret = session.query(Users).filter(Users.name.like('e%')).all() ret = session.query(Users).filter(~Users.name.like('e%')).all() # 限制 ret = session.query(Users)[1:2] # 排序 ret = session.query(Users).order_by(Users.name.desc()).all() ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all() # 分组 from sqlalchemy.sql import func ret = session.query(Users).group_by(Users.extra).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).all() ret = session.query( func.max(Users.id), func.sum(Users.id), func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all() # 连表 ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all() ret = session.query(Person).join(Favor).all() ret = session.query(Person).join(Favor, isouter=True).all() # 组合 q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union(q2).all() q1 = session.query(Users.name).filter(Users.id > 2) q2 = session.query(Favor.caption).filter(Favor.nid < 2) ret = q1.union_all(q2).all()