zoukankan      html  css  js  c++  java
  • python之路 RabbitMQ、SQLAlchemy

    一、RabbitMQ

    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

    RabbitMQ安装

    安装配置epel源
       $ rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm
     
    安装erlang
       $ yum -y install erlang
     
    安装RabbitMQ
       $ yum -y install rabbitmq-server
    

     注意:service rabbitmq-server start/stop

    安装API

    pip install pika
    or
    easy_install pika
    or
    源码
     
    https://pypi.python.org/pypi/pika
    

     使用API操作RabbitMQ

    基于Queue实现生产者消费者模型

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import Queue
    import threading
    
    
    message = Queue.Queue(10)
    
    
    def producer(i):
        while True:
            message.put(i)
    
    
    def consumer(i):
        while True:
            msg = message.get()
    
    
    for i in range(12):
        t = threading.Thread(target=producer, args=(i,))
        t.start()
    
    for i in range(10):
        t = threading.Thread(target=consumer, args=(i,))
        t.start()
    

     对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

    #!/usr/bin/env python
    import pika
     
    # ######################### 生产者 #########################
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
     
    channel.queue_declare(queue='hello')
     
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World!')
    print(" [x] Sent 'Hello World!'")
    connection.close()
    
    #!/usr/bin/env python
    import pika
     
    # ########################## 消费者 ##########################
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
     
    channel.queue_declare(queue='hello')
     
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
     
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=True)
     
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    

     1、acknowledgment 消息不丢失

    no-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='10.211.55.4'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print 'ok'
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    消费者

    2、durable   消息不丢失

    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='hello', durable=True)
    
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World!',
                          properties=pika.BasicProperties(
                              delivery_mode=2, # make message persistent
                          ))
    print(" [x] Sent 'Hello World!'")
    connection.close()
    生产者
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='hello', durable=True)
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print 'ok'
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    消费者

    3、消息获取顺序

    默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。

    channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4'))
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='hello')
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print 'ok'
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    消费者

    4、发布订阅

    发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

     exchange type = fanout

    复制代码
    
    #!/usr/bin/env python
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',
                             type='direct')
    
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    
    severities = sys.argv[1:]
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
        sys.exit(1)
    
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    消费者
    #!/usr/bin/env python
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',
                             type='direct')
    
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,
                          body=message)
    print(" [x] Sent %r:%r" % (severity, message))
    connection.close()
    生产者

    二、SQLAlchemy

    SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果。

    Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:

    MySQL-Python
        mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
     
    pymysql
        mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
     
    MySQL-Connector
        mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
     
    cx_Oracle
        oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
     
    更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html
    

     步骤一:

    使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
     
    from sqlalchemy import create_engine
     
     
    engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
     
    engine.execute(
        "INSERT INTO ts_test (a, b) VALUES ('2', 'v1')"
    )
     
    engine.execute(
         "INSERT INTO ts_test (a, b) VALUES (%s, %s)",
        ((555, "v1"),(666, "v1"),)
    )
    engine.execute(
        "INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)",
        id=999, name="v1"
    )
     
    result = engine.execute('select * from ts_test')
    result.fetchall()
    
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from sqlalchemy import create_engine
    
    
    engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
    
    
    # 事务操作
    with engine.begin() as conn:
        conn.execute("insert into table (x, y, z) values (1, 2, 3)")
        conn.execute("my_special_procedure(5)")
        
        
    conn = engine.connect()
    # 事务操作 
    with conn.begin():
           conn.execute("some statement", {'x':5, 'y':10})
    事物操作

    注:查看数据库连接:show status like 'Threads%';

    步骤二:

    使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema Type创建一个特定的结构对象,之后通过SQL Expression Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect 执行SQL,并获取结果。

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
     
    from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey
     
    metadata = MetaData()
     
    user = Table('user', metadata,
        Column('id', Integer, primary_key=True),
        Column('name', String(20)),
    )
     
    color = Table('color', metadata,
        Column('id', Integer, primary_key=True),
        Column('name', String(20)),
    )
    engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
     
    metadata.create_all(engine)
    # metadata.clear()
    # metadata.remove()
    
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKey
    
    metadata = MetaData()
    
    user = Table('user', metadata,
        Column('id', Integer, primary_key=True),
        Column('name', String(20)),
    )
    
    color = Table('color', metadata,
        Column('id', Integer, primary_key=True),
        Column('name', String(20)),
    )
    engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
    
    conn = engine.connect()
    
    # 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name)
    conn.execute(user.insert(),{'id':7,'name':'seven'})
    conn.close()
    
    # sql = user.insert().values(id=123, name='wu')
    # conn.execute(sql)
    # conn.close()
    
    # sql = user.delete().where(user.c.id > 1)
    
    # sql = user.update().values(fullname=user.c.name)
    # sql = user.update().where(user.c.name == 'jack').values(name='ed')
    
    # sql = select([user, ])
    # sql = select([user.c.id, ])
    # sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id)
    # sql = select([user.c.name]).order_by(user.c.name)
    # sql = select([user]).group_by(user.c.name)
    
    # result = conn.execute(sql)
    # print result.fetchall()
    # conn.close()
    增删改查

    更多内容详见:

        http://www.jianshu.com/p/e6bba189fcbd

        http://docs.sqlalchemy.org/en/latest/core/expression_api.html

    注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成。

    步骤三:

    使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
     
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String
    from sqlalchemy.orm import sessionmaker
    from sqlalchemy import create_engine
     
    engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)
     
    Base = declarative_base()
     
     
    class User(Base):
        __tablename__ = 'users'
        id = Column(Integer, primary_key=True)
        name = Column(String(50))
     
    # 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息
    # Base.metadata.create_all(engine)
     
    Session = sessionmaker(bind=engine)
    session = Session()
     
     
    # ########## 增 ##########
    # u = User(id=2, name='sb')
    # session.add(u)
    # session.add_all([
    #     User(id=3, name='sb'),
    #     User(id=4, name='sb')
    # ])
    # session.commit()
     
    # ########## 删除 ##########
    # session.query(User).filter(User.id > 2).delete()
    # session.commit()
     
    # ########## 修改 ##########
    # session.query(User).filter(User.id > 2).update({'cluster_id' : 0})
    # session.commit()
    # ########## 查 ##########
    # ret = session.query(User).filter_by(name='sb').first()
     
    # ret = session.query(User).filter_by(name='sb').all()
    # print ret
     
    # ret = session.query(User).filter(User.name.in_(['sb','bb'])).all()
    # print ret
     
    # ret = session.query(User.name.label('name_label')).all()
    # print ret,type(ret)
     
    # ret = session.query(User).order_by(User.id).all()
    # print ret
     
    # ret = session.query(User).order_by(User.id)[1:3]
    # print ret
    # session.commit()
    

     2、ORM使用

    使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。

    1、创建表

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)
    
    Base = declarative_base()
    
    # 创建单表
    class Users(Base):
        __tablename__ = 'users'
        id = Column(Integer, primary_key=True)
        name = Column(String(32))
        extra = Column(String(16))
    
        __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'),
            Index('ix_id_name', 'name', 'extra'),
        )
    
    
    # 一对多
    class Favor(Base):
        __tablename__ = 'favor'
        nid = Column(Integer, primary_key=True)
        caption = Column(String(50), default='red', unique=True)
    
    
    class Person(Base):
        __tablename__ = 'person'
        nid = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=True)
        favor_id = Column(Integer, ForeignKey("favor.nid"))
    
    
    # 多对多
    class Group(Base):
        __tablename__ = 'group'
        id = Column(Integer, primary_key=True)
        name = Column(String(64), unique=True, nullable=False)
        port = Column(Integer, default=22)
    
    
    class Server(Base):
        __tablename__ = 'server'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        hostname = Column(String(64), unique=True, nullable=False)
    
    
    class ServerToGroup(Base):
        __tablename__ = 'servertogroup'
        nid = Column(Integer, primary_key=True, autoincrement=True)
        server_id = Column(Integer, ForeignKey('server.id'))
        group_id = Column(Integer, ForeignKey('group.id'))
    
    
    def init_db():
        Base.metadata.create_all(engine)
    
    
    def drop_db():
        Base.metadata.drop_all(engine)
    

     ForeignKeyConstraint(['other_id'], ['othertable.other_id']),

    2、操作表

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    from sqlalchemy.ext.declarative import declarative_base
    from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    from sqlalchemy.orm import sessionmaker, relationship
    from sqlalchemy import create_engine
    
    engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=5)
    
    Base = declarative_base()
    
    # 创建单表
    class Users(Base):
        __tablename__ = 'users'
        id = Column(Integer, primary_key=True)
        name = Column(String(32))
        extra = Column(String(16))
    
        __table_args__ = (
        UniqueConstraint('id', 'name', name='uix_id_name'),
            Index('ix_id_name', 'name', 'extra'),
        )
    
        def __repr__(self):
            return "%s-%s" %(self.id, self.name)
    
    # 一对多
    class Favor(Base):
        __tablename__ = 'favor'
        nid = Column(Integer, primary_key=True)
        caption = Column(String(50), default='red', unique=True)
    
        def __repr__(self):
            return "%s-%s" %(self.nid, self.caption)
    
    class Person(Base):
        __tablename__ = 'person'
        nid = Column(Integer, primary_key=True)
        name = Column(String(32), index=True, nullable=True)
        favor_id = Column(Integer, ForeignKey("favor.nid"))
        # 与生成表结构无关,仅用于查询方便
        favor = relationship("Favor", backref='pers')
    
    # 多对多
    class ServerToGroup(Base):
        __tablename__ = 'servertogroup'
        nid = Column(Integer, primary_key=True, autoincrement=True)
        server_id = Column(Integer, ForeignKey('server.id'))
        group_id = Column(Integer, ForeignKey('group.id'))
        group = relationship("Group", backref='s2g')
        server = relationship("Server", backref='s2g')
    
    class Group(Base):
        __tablename__ = 'group'
        id = Column(Integer, primary_key=True)
        name = Column(String(64), unique=True, nullable=False)
        port = Column(Integer, default=22)
        # group = relationship('Group',secondary=ServerToGroup,backref='host_list')
    
    
    class Server(Base):
        __tablename__ = 'server'
    
        id = Column(Integer, primary_key=True, autoincrement=True)
        hostname = Column(String(64), unique=True, nullable=False)
    
    
    
    
    def init_db():
        Base.metadata.create_all(engine)
    
    
    def drop_db():
        Base.metadata.drop_all(engine)
    
    
    Session = sessionmaker(bind=engine)
    session = Session()
    表结构+连接数据库

    增删改查

    #增
    obj = Users(name="alex0", extra='sb')
    session.add(obj)
    session.add_all([
        Users(name="alex1", extra='sb'),
        Users(name="alex2", extra='sb'),
    ])
    session.commit()
    
    
    #删
    session.query(Users).filter(Users.id > 2).delete()
    session.commit()
    
    #改
    session.query(Users).filter(Users.id > 2).update({"name" : "099"})
    session.query(Users).filter(Users.id > 2).update({Users.name: Users.name + "099"}, synchronize_session=False)
    session.query(Users).filter(Users.id > 2).update({"num": Users.num + 1}, synchronize_session="evaluate")
    session.commit()
    
    #查
    ret = session.query(Users).all()
    ret = session.query(Users.name, Users.extra).all()
    ret = session.query(Users).filter_by(name='alex').all()
    ret = session.query(Users).filter_by(name='alex').first()
    
    #其他
    # 条件
    ret = session.query(Users).filter_by(name='alex').all()
    ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
    ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
    ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
    ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
    ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
    from sqlalchemy import and_, or_
    ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
    ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
    ret = session.query(Users).filter(
        or_(
            Users.id < 2,
            and_(Users.name == 'eric', Users.id > 3),
            Users.extra != ""
        )).all()
    
    
    # 通配符
    ret = session.query(Users).filter(Users.name.like('e%')).all()
    ret = session.query(Users).filter(~Users.name.like('e%')).all()
    
    # 限制
    ret = session.query(Users)[1:2]
    
    # 排序
    ret = session.query(Users).order_by(Users.name.desc()).all()
    ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
    
    # 分组
    from sqlalchemy.sql import func
    
    ret = session.query(Users).group_by(Users.extra).all()
    ret = session.query(
        func.max(Users.id),
        func.sum(Users.id),
        func.min(Users.id)).group_by(Users.name).all()
    
    ret = session.query(
        func.max(Users.id),
        func.sum(Users.id),
        func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
    
    # 连表
    
    ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
    
    ret = session.query(Person).join(Favor).all()
    
    ret = session.query(Person).join(Favor, isouter=True).all()
    
    
    # 组合
    q1 = session.query(Users.name).filter(Users.id > 2)
    q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    ret = q1.union(q2).all()
    
    q1 = session.query(Users.name).filter(Users.id > 2)
    q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    ret = q1.union_all(q2).all()
    
  • 相关阅读:
    muduo库源码剖析(一) reactor模式
    POSIX 线程编程(二)线程建立与终止
    visual assist常用快捷键
    Linux下 静态链接库 和 动态链接库
    linux(Ubuntu)下mysql字符集完美解决
    共享内存解读
    hdu2829
    hdu3525
    2013ACM-ICPC亚洲区南京站现场赛G题
    poj1487
  • 原文地址:https://www.cnblogs.com/zhangkui/p/5720582.html
Copyright © 2011-2022 走看看