redis发布和订阅
发布者一旦发送消息,那么所有订阅者都会收到。
RedisHelper
#!/usr/bin/env python #-*- coding:utf-8 -*- import redis class redishelper: def __init__(self): self.__conn = redis.Redis(host='192.168.11.87') def public(self, msg, chan): self.__conn.publish(chan, msg) return msg,chan def subscribe(self, chan): pub = self.__conn.pubsub() pub.subscribe(chan) pub.parse_response() return pub
发布者
#!/usr/bin/env python #-*- coding:utf-8 -*- import b1 obj = b1.redishelper() #实例化方法 obj.public('aaaaaa','fm111.7') #执行发布
订阅者
obj = b1.redishelper() #实例化方法 data = obj.subscribe('fm111.7') #调用订阅方法 while True: msg = data.parse_response() #接收发布消息 print(msg)
RabbitMQ
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
安装
#安装配置epel源 $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm #安装erlang $ yum -y install erlang #安装RabbitMQ $ yum -y install rabbitmq-server #启动、关闭服务 service rabbitmq-server start/stop
API
python -m pip install pika
1.基于RabbitMQ实现生产者消费者模型。对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
生产者代码
#!/usr/bin/env python #-*- coding:utf-8 -*- import pika # ######################### 发消息 ######################### #连接rabbitmq服务器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.11.158')) #创建频道 channel = connection.channel() #如果没有这个队列会创建一个 channel.queue_declare(queue='hello') #向队列插入数值 routing_key是队列名 body是要插入的内容 channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') print(" [x] Sent 'Hello World!'") #关闭连接 connection.close()
消费者代码
#!/usr/bin/env python #-*- coding:utf-8 -*- import pika # ##########################取消息 ########################## #连接rabbitmq服务器 connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.11.158')) #创建频道 channel = connection.channel() #如果生产者没有运行创建队列,那么消费者创建队列 channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
当生产者生成一条数据,被消费者接收,消费者中断后如果不超过10秒,连接的时候数据还在。当超过10秒之后,重新链接,数据将消失。消费者等待链接。
2.消息不丢失(数据持久化)
1.当把no_ack=false时,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,RabbitMQ会重新将该任务添加到队列中。
2.durable
生产者代码
#!/usr/bin/env python import pika #链接rabbit服务器 connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) #创建频道 channel = connection.channel() #创建队列,使用durable方法 channel.queue_declare(queue='hello', durable=True) #如果想让队列实现持久化那么加上durable=True channel.basic_publish(exchange='', routing_key='hello', body='Hello World!', properties=pika.BasicProperties( delivery_mode=2, #标记我们的消息为持久化的 - 通过设置 delivery_mode 属性为 2 #这样必须设置,让消息实现持久化 )) #这个exchange参数就是这个exchange的名字. 空字符串标识默认的或者匿名的exchange:如果存在routing_key, 消息路由到routing_key指定的队列中。 print(" [x] 开始队列'") connection.close()
消费者代码
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost')) #创建频道 channel = connection.channel() #创建队列,使用durable方法 channel.queue_declare(queue='hello', durable=True) def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print('ok') ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] 等待队列. To exit press CTRL+C') channel.start_consuming()
注:标记消息为持久化的并不能完全保证消息不会丢失,尽管告诉RabbitMQ保存消息到磁盘,当RabbitMQ接收到消息还没有保存的时候仍然有一个 短暂的时间窗口. RabbitMQ不会对每个消息都执行同步fsync(2) --- 可能只是保存到缓存cache还没有写入到磁盘中,这个持久化保证不是很强,但这比我们简单的任务queue要好很多,如果你想很强的保证你可以使用 publisher confirms
3.消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
消费者代码
#!/usr/bin/env python # -*- coding:utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) channel = connection.channel() # make message persistent channel.queue_declare(queue='hello') def callback(ch, method, properties, body): print(" [x] Received %r" % body) import time time.sleep(10) print 'ok' ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_qos(prefetch_count=1) #表示谁来谁取,不再按照奇偶排列 channel.basic_consume(callback, queue='hello', no_ack=False) print(' [*] Waiting for messages. To exit press CTRL+C') channel.start_consuming()
发布与订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
发布者发送的消息实际是先发送到exchange,然后由exchange发送到相对应的队列。
exchange类型可用: direct , topic , headers 和 fanout 。
exchange type = fanout
发布者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='logs', type='fanout') message = ' '.join(sys.argv[1:]) or "info: Hello World!" channel.basic_publish(exchange='logs', routing_key='', body=message) print(" [x] Sent %r" % message) connection.close()
订阅者
#!/usr/bin/env python
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='logs',
type='fanout')
result = channel.queue_declare(exclusive=True) #队列断开后自动删除临时队列
queue_name = result.method.queue # 队列名采用服务端分配的临时队列
channel.queue_bind(exchange='logs',
queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r" % body)
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
关键字发送( exchange type = direct)
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
生产者
#!/usr/bin/env python #-*- coding:utf-8 -*- ##########发送消息 import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.11.87')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') severity = 'info' message = 'Hello World!' channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message) print(" [x] Sent %r:%r" % (severity, message)) connection.close()
消费者
#!/usr/bin/env python #-*-coding=utf-8-*- import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.11.87')) channel = connection.channel() channel.exchange_declare(exchange='direct_logs', type='direct') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue severities = ['error','warning','info'] if not severities: sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0]) sys.exit(1) for severity in severities: channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
模糊匹配( exchange type = topic)
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
#表示可以匹配0个或多个单词 *表示只能匹配一个单词 发送者路由值 队列中 old.boy.python old.* -- 不匹配 old.boy.python old.# -- 匹配
生产者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()
消费者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='localhost')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]... " % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
SQLAlchemy
SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
schema/type:定义的一种映射格式,把表映射成类。
sql expression language: 封装了增删改查的sql语句
engine: 引擎
connection pooling: 连接池
dialect: 用于和数据库API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
示例,中间状态 演示一个过程
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine #初始化数据库连接 engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5) # 执行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES ('1.1.1.22', 3)" # ) # 新插入行自增ID # cur.lastrowid # 执行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES(%s, %s)",[('1.1.1.22', 3),('1.1.1.221', 3),] # ) # 执行SQL # cur = engine.execute( # "INSERT INTO hosts (host, color_id) VALUES (%(host)s, %(color_id)s)", # host='1.1.1.99', color_id=3 # ) # 执行SQL # cur = engine.execute('select * from hosts') # 获取第一行数据 # cur.fetchone() # 获取第n行数据 # cur.fetchmany(3) # 获取所有数据 # cur.fetchall()
增删改查
#!/usr/bin/env python # -*- coding:utf-8 -*- from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey metadata = MetaData() user = Table('user', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) color = Table('color', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)), ) engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) conn = engine.connect() # 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name) conn.execute(user.insert(),{'id':7,'name':'seven'}) conn.close() # sql = user.insert().values(id=123, name='wu') # conn.execute(sql) # conn.close() # sql = user.delete().where(user.c.id > 1) # sql = user.update().values(fullname=user.c.name) # sql = user.update().where(user.c.name == 'jack').values(name='ed') # sql = select([user, ]) # sql = select([user.c.id, ]) # sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id) # sql = select([user.c.name]).order_by(user.c.name) # sql = select([user]).group_by(user.c.name) # result = conn.execute(sql) # print result.fetchall() # conn.close()
完整示例
from sqlalchemy import create_engine from sqlalchemy.ext.declarative import declarative_base from sqlalchemy import Column, Integer, String from sqlalchemy.orm import sessionmaker Base = declarative_base() #生成一个SqlORM 基类 engine = create_engine("mysql+mysqldb://root@localhost:3306/test",echo=False) class Host(Base): __tablename__ = 'hosts' id = Column(Integer,primary_key=True,autoincrement=True) hostname = Column(String(64),unique=True,nullable=False) ip_addr = Column(String(128),unique=True,nullable=False) port = Column(Integer,default=22) Base.metadata.create_all(engine) #创建所有表结构 if __name__ == '__main__': SessionCls = sessionmaker(bind=engine) #创建与数据库的会话session class ,注意,这里返回给session的是个class,不是实例 session = SessionCls() #h1 = Host(hostname='localhost',ip_addr='127.0.0.1') #h2 = Host(hostname='ubuntu',ip_addr='192.168.2.243',port=20000) #h3 = Host(hostname='ubuntu2',ip_addr='192.168.2.244',port=20000) #session.add(h3) #session.add_all( [h1,h2]) #h2.hostname = 'ubuntu_test' #只要没提交,此时修改也没问题 #session.rollback() #session.commit() #提交 res = session.query(Host).filter(Host.hostname.in_(['ubuntu2','localhost'])).all() print(res)