Twsited异步网络框架
Twisted是一个事件驱动的网络框架,其中包含了诸多功能,例如:网络协议、线程、数据库管理、网络操作、电子邮件等。

事件驱动
简而言之,事件驱动分为二个部分:第一,注册事件;第二,触发事件。
自定义事件驱动框架:
#!/usr/bin/env python# -*- coding:utf-8 -*-# event_drive.pyevent_list = []def run(): for event in event_list: obj = event() obj.execute()class BaseHandler(object): """ 用户必须继承该类,从而规范所有类的方法(类似于接口的功能) """ def execute(self): raise Exception('you must overwrite execute')最牛逼的事件驱动框架 |
程序员使用框架”:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
|
#!/usr/bin/env python# -*- coding:utf-8 -*-from source import event_driveclass MyHandler(event_drive.BaseHandler): def execute(self): print 'event-drive execute MyHandler'event_drive.event_list.append(MyHandler)event_drive.run() |
Protocols
Protocols描述了如何以异步的方式处理网络中的事件。HTTP、DNS以及IMAP是应用层协议中的例子。Protocols实现了IProtocol接口,它包含如下的方法:
makeConnection 在transport对象和服务器之间建立一条连接
connectionMade 连接建立起来后调用
dataReceived 接收数据时调用
connectionLost 关闭连接时调用
Transports
Transports代表网络中两个通信结点之间的连接。Transports负责描述连接的细节,比如连接是面向流式的还是面向数据报的,流控以及可靠性。TCP、UDP和Unix套接字可作为transports的例子。它们被设计为“满足最小功能单元,同时具有最大程度的可复用性”,而且从协议实现中分离出来,这让许多协议可以采用相同类型的传输。Transports实现了ITransports接口,它包含如下的方法:
write 以非阻塞的方式按顺序依次将数据写到物理连接上
writeSequence 将一个字符串列表写到物理连接上
loseConnection 将所有挂起的数据写入,然后关闭连接
getPeer 取得连接中对端的地址信息
getHost 取得连接中本端的地址信息
将transports从协议中分离出来也使得对这两个层次的测试变得更加简单。可以通过简单地写入一个字符串来模拟传输,用这种方式来检查。
EchoServer
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
from twisted.internet import protocolfrom twisted.internet import reactorclass Echo(protocol.Protocol): def dataReceived(self, data): self.transport.write(data)def main(): factory = protocol.ServerFactory() factory.protocol = Echo reactor.listenTCP(1234,factory) reactor.run()if __name__ == '__main__': main() |
EchoClient
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
from twisted.internet import reactor, protocol# a client protocolclass EchoClient(protocol.Protocol): """Once connected, send a message, then print the result.""" def connectionMade(self): self.transport.write("hello alex!") def dataReceived(self, data): "As soon as any data is received, write it back." print "Server said:", data self.transport.loseConnection() def connectionLost(self, reason): print "connection lost"class EchoFactory(protocol.ClientFactory): protocol = EchoClient def clientConnectionFailed(self, connector, reason): print "Connection failed - goodbye!" reactor.stop() def clientConnectionLost(self, connector, reason): print "Connection lost - goodbye!" reactor.stop()# this connects the protocol to a server running on port 8000def main(): f = EchoFactory() reactor.connectTCP("localhost", 1234, f) reactor.run()# this only runs if the module was *not* importedif __name__ == '__main__': main() |
运行服务器端脚本将启动一个TCP服务器,监听端口1234上的连接。服务器采用的是Echo协议,数据经TCP transport对象写出。运行客户端脚本将对服务器发起一个TCP连接,回显服务器端的回应然后终止连接并停止reactor事件循环。这里的Factory用来对连接的双方生成protocol对象实例。两端的通信是异步的,connectTCP负责注册回调函数到reactor事件循环中,当socket上有数据可读时通知回调处理
一个传送文件的例子
server side
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
|
#_*_coding:utf-8_*_# This is the Twisted Fast Poetry Server, version 1.0import optparse, osfrom twisted.internet.protocol import ServerFactory, Protocoldef parse_args(): usage = """usage: %prog [options] poetry-fileThis is the Fast Poetry Server, Twisted edition.Run it like this: python fastpoetry.py <path-to-poetry-file>If you are in the base directory of the twisted-intro package,you could run it like this: python twisted-server-1/fastpoetry.py poetry/ecstasy.txtto serve up John Donne's Ecstasy, which I know you want to do.""" parser = optparse.OptionParser(usage) help = "The port to listen on. Default to a random available port." parser.add_option('--port', type='int', help=help) help = "The interface to listen on. Default is localhost." parser.add_option('--iface', help=help, default='localhost') options, args = parser.parse_args() print("--arg:",options,args) if len(args) != 1: parser.error('Provide exactly one poetry file.') poetry_file = args[0] if not os.path.exists(args[0]): parser.error('No such file: %s' % poetry_file) return options, poetry_fileclass PoetryProtocol(Protocol): def connectionMade(self): self.transport.write(self.factory.poem) self.transport.loseConnection()class PoetryFactory(ServerFactory): protocol = PoetryProtocol def __init__(self, poem): self.poem = poemdef main(): options, poetry_file = parse_args() poem = open(poetry_file).read() factory = PoetryFactory(poem) from twisted.internet import reactor port = reactor.listenTCP(options.port or 9000, factory, interface=options.iface) print 'Serving %s on %s.' % (poetry_file, port.getHost()) reactor.run()if __name__ == '__main__': main() |
client side
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
# This is the Twisted Get Poetry Now! client, version 3.0.# NOTE: This should not be used as the basis for production code.import optparsefrom twisted.internet.protocol import Protocol, ClientFactorydef parse_args(): usage = """usage: %prog [options] [hostname]:port ...This is the Get Poetry Now! client, Twisted version 3.0Run it like this: python get-poetry-1.py port1 port2 port3 ...""" parser = optparse.OptionParser(usage) _, addresses = parser.parse_args() if not addresses: print parser.format_help() parser.exit() def parse_address(addr): if ':' not in addr: host = '127.0.0.1' port = addr else: host, port = addr.split(':', 1) if not port.isdigit(): parser.error('Ports must be integers.') return host, int(port) return map(parse_address, addresses)class PoetryProtocol(Protocol): poem = '' def dataReceived(self, data): self.poem += data def connectionLost(self, reason): self.poemReceived(self.poem) def poemReceived(self, poem): self.factory.poem_finished(poem)class PoetryClientFactory(ClientFactory): protocol = PoetryProtocol def __init__(self, callback): self.callback = callback def poem_finished(self, poem): self.callback(poem)def get_poetry(host, port, callback): """ Download a poem from the given host and port and invoke callback(poem) when the poem is complete. """ from twisted.internet import reactor factory = PoetryClientFactory(callback) reactor.connectTCP(host, port, factory)def poetry_main(): addresses = parse_args() from twisted.internet import reactor poems = [] def got_poem(poem): poems.append(poem) if len(poems) == len(addresses): reactor.stop() for address in addresses: host, port = address get_poetry(host, port, got_poem) reactor.run() for poem in poems: print poemif __name__ == '__main__': poetry_main() |
Twisted深入
http://krondo.com/an-introduction-to-asynchronous-programming-and-twisted/
http://blog.csdn.net/hanhuili/article/details/9389433
SqlAlchemy ORM
SQLAlchemy是Python编程语言下的一款ORM框架,该框架建立在数据库API之上,使用关系对象映射进行数据库操作,简言之便是:将对象转换成SQL,然后使用数据API执行SQL并获取执行结果

Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
|
1
2
3
4
5
6
7
8
9
10
11
12
13
|
MySQL-Python mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname> pymysql mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>] MySQL-Connector mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname> cx_Oracle oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...] 更多详见:http://docs.sqlalchemy.org/en/latest/dialects/index.html |
步骤一:
使用 Engine/ConnectionPooling/Dialect 进行数据库操作,Engine使用ConnectionPooling连接数据库,然后再通过Dialect执行SQL语句。
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
|
#!/usr/bin/env python# -*- coding:utf-8 -*- from sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) engine.execute( "INSERT INTO ts_test (a, b) VALUES ('2', 'v1')") engine.execute( "INSERT INTO ts_test (a, b) VALUES (%s, %s)", ((555, "v1"),(666, "v1"),))engine.execute( "INSERT INTO ts_test (a, b) VALUES (%(id)s, %(name)s)", id=999, name="v1") result = engine.execute('select * from ts_test')result.fetchall() |
步骤二:
使用 Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 进行数据库操作。Engine使用Schema Type创建一个特定的结构对象,之后通过SQL Expression Language将该对象转换成SQL语句,然后通过 ConnectionPooling 连接数据库,再然后通过 Dialect 执行SQL,并获取结果。
#!/usr/bin/env python# -*- coding:utf-8 -*-from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKeymetadata = MetaData()user = Table('user', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)),)color = Table('color', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)),)engine = create_engine("mysql+mysqldb://root@localhost:3306/test", max_overflow=5)metadata.create_all(engine) |
增删改查
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
#!/usr/bin/env python# -*- coding:utf-8 -*-from sqlalchemy import create_engine, Table, Column, Integer, String, MetaData, ForeignKeymetadata = MetaData()user = Table('user', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)),)color = Table('color', metadata, Column('id', Integer, primary_key=True), Column('name', String(20)),)engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5)conn = engine.connect()# 创建SQL语句,INSERT INTO "user" (id, name) VALUES (:id, :name)conn.execute(user.insert(),{'id':7,'name':'seven'})conn.close()# sql = user.insert().values(id=123, name='wu')# conn.execute(sql)# conn.close()# sql = user.delete().where(user.c.id > 1)# sql = user.update().values(fullname=user.c.name)# sql = user.update().where(user.c.name == 'jack').values(name='ed')# sql = select([user, ])# sql = select([user.c.id, ])# sql = select([user.c.name, color.c.name]).where(user.c.id==color.c.id)# sql = select([user.c.name]).order_by(user.c.name)# sql = select([user]).group_by(user.c.name)# result = conn.execute(sql)# print result.fetchall()# conn.close() |
一个简单的完整例子
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
|
from sqlalchemy import create_enginefrom sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, Stringfrom sqlalchemy.orm import sessionmakerBase = declarative_base() #生成一个SqlORM 基类engine = create_engine("mysql+mysqldb://root@localhost:3306/test",echo=False)class Host(Base): __tablename__ = 'hosts' id = Column(Integer,primary_key=True,autoincrement=True) hostname = Column(String(64),unique=True,nullable=False) ip_addr = Column(String(128),unique=True,nullable=False) port = Column(Integer,default=22)Base.metadata.create_all(engine) #创建所有表结构if __name__ == '__main__': SessionCls = sessionmaker(bind=engine) #创建与数据库的会话session class ,注意,这里返回给session的是个class,不是实例 session = SessionCls() #h1 = Host(hostname='localhost',ip_addr='127.0.0.1') #h2 = Host(hostname='ubuntu',ip_addr='192.168.2.243',port=20000) #h3 = Host(hostname='ubuntu2',ip_addr='192.168.2.244',port=20000) #session.add(h3) #session.add_all( [h1,h2]) #h2.hostname = 'ubuntu_test' #只要没提交,此时修改也没问题 #session.rollback() #session.commit() #提交 res = session.query(Host).filter(Host.hostname.in_(['ubuntu2','localhost'])).all() print(res) |
更多内容详见:
http://www.jianshu.com/p/e6bba189fcbd
http://docs.sqlalchemy.org/en/latest/core/expression_api.html
注:SQLAlchemy无法修改表结构,如果需要可以使用SQLAlchemy开发者开源的另外一个软件Alembic来完成。
步骤三:
使用 ORM/Schema Type/SQL Expression Language/Engine/ConnectionPooling/Dialect 所有组件对数据进行操作。根据类创建对象,对象转换成SQL,执行SQL。
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
|
#!/usr/bin/env python# -*- coding:utf-8 -*- from sqlalchemy.ext.declarative import declarative_basefrom sqlalchemy import Column, Integer, Stringfrom sqlalchemy.orm import sessionmakerfrom sqlalchemy import create_engine engine = create_engine("mysql+mysqldb://root:123@127.0.0.1:3306/s11", max_overflow=5) Base = declarative_base() class User(Base): __tablename__ = 'users' id = Column(Integer, primary_key=True) name = Column(String(50)) # 寻找Base的所有子类,按照子类的结构在数据库中生成对应的数据表信息# Base.metadata.create_all(engine) Session = sessionmaker(bind=engine)session = Session() # ########## 增 ########### u = User(id=2, name='sb')# session.add(u)# session.add_all([# User(id=3, name='sb'),# User(id=4, name='sb')# ])# session.commit() # ########## 删除 ########### session.query(User).filter(User.id > 2).delete()# session.commit() # ########## 修改 ########### session.query(User).filter(User.id > 2).update({'cluster_id' : 0})# session.commit()# ########## 查 ########### ret = session.query(User).filter_by(name='sb').first() # ret = session.query(User).filter_by(name='sb').all()# print ret # ret = session.query(User).filter(User.name.in_(['sb','bb'])).all()# print ret # ret = session.query(User.name.label('name_label')).all()# print ret,type(ret) # ret = session.query(User).order_by(User.id).all()# print ret # ret = session.query(User).order_by(User.id)[1:3]# print ret# session.commit() |
外键关联
A one to many relationship places a foreign key on the child table referencing the parent.relationship() is then specified on the parent, as referencing a collection of items represented by the child
from sqlalchemy import Table, Column, Integer, ForeignKey from sqlalchemy.orm import relationship from sqlalchemy.ext.declarative import declarative_base Base = declarative_base()
|
1
2
3
4
5
6
7
8
9
|
<br>class Parent(Base): __tablename__ = 'parent' id = Column(Integer, primary_key=True) children = relationship("Child")class Child(Base): __tablename__ = 'child' id = Column(Integer, primary_key=True) parent_id = Column(Integer, ForeignKey('parent.id')) |
To establish a bidirectional relationship in one-to-many, where the “reverse” side is a many to one, specify an additional relationship() and connect the two using therelationship.back_populates parameter:
|
1
2
3
4
5
6
7
8
9
10
|
class Parent(Base): __tablename__ = 'parent' id = Column(Integer, primary_key=True) children = relationship("Child", back_populates="parent")class Child(Base): __tablename__ = 'child' id = Column(Integer, primary_key=True) parent_id = Column(Integer, ForeignKey('parent.id')) parent = relationship("Parent", back_populates="children") |
Child will get a parent attribute with many-to-one semantics.
Alternatively, the backref option may be used on a single relationship() instead of usingback_populates:
|
1
2
3
4
|
class Parent(Base): __tablename__ = 'parent' id = Column(Integer, primary_key=True) children = relationship("Child", backref="parent") |
附,原生sql join查询
几个Join的区别 http://stackoverflow.com/questions/38549/difference-between-inner-and-outer-joins
- INNER JOIN: Returns all rows when there is at least one match in BOTH tables
- LEFT JOIN: Return all rows from the left table, and the matched rows from the right table
- RIGHT JOIN: Return all rows from the right table, and the matched rows from the left table
|
1
|
select host.id,hostname,ip_addr,port,host_group.name from host right join host_group on host.id = host_group.host_id |
in SQLAchemy
|
1
|
session.query(Host).join(Host.host_groups).filter(HostGroup.name=='t1').group_by("Host").all() |
group by 查询
|
1
|
select name,count(host.id) as NumberOfHosts from host right join host_group on host.id= host_group.host_id group by name; |
in SQLAchemy
|
1
2
3
4
5
6
|
from sqlalchemy import funcsession.query(HostGroup, func.count(HostGroup.name )).group_by(HostGroup.name).all()#another examplesession.query(func.count(User.name), User.name).group_by(User.name).all() SELECT count(users.name) AS count_1, users.name AS users_nameFROM users GROUP BY users.name |