事件驱动
简而言之,事件驱动分为二个部分:第一,注册事件;第二,触发事件。
自定义事件驱动框架
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #!/usr/bin/env python 4 # -*- coding:utf-8 -*- 5 6 # event_drive.py 7 event_list = [] 8 def run(): 9 for event in event_list: 10 obj = event() 11 obj.execute() 12 class BaseHandler(object): 13 """ 14 用户必须继承该类,从而规范所有类的方法(类似于接口的功能) 15 """ 16 def execute(self): 17 raise Exception('you must overwrite execute')
程序员使用上面定义的框架:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #!/usr/bin/env python 4 # -*- coding:utf-8 -*- 5 6 from fram import event_drive 7 8 class MyHandler(event_drive.BaseHandler): 9 10 def execute(self): 11 print ('event-drive execute MyHandler') 12 13 14 event_drive.event_list.append(MyHandler) 15 event_drive.run()
Protocols
Protocols描述了如何以异步的方式处理网络中的事件。HTTP、DNS以及IMAP是应用层协议中的例子。Protocols实现了IProtocol接口,它包含如下的方法:
makeConnection 在transport对象和服务器之间建立一条连接
connectionMade 连接建立起来后调用
dataReceived 接收数据时调用
connectionLost 关闭连接时调用
Transports
Transports代表网络中两个通信结点之间的连接。Transports负责描述连接的细节,比如连接是面向流式的还是面向数据报的,流控以及可靠性。TCP、UDP和Unix套接字可作为transports的例子。它们被设计为“满足最小功能单元,同时具有最大程度的可复用性”,而且从协议实现中分离出来,这让许多协议可以采用相同类型的传输。Transports实现了ITransports接口,它包含如下的方法:
write 以非阻塞的方式按顺序依次将数据写到物理连接上
writeSequence 将一个字符串列表写到物理连接上
loseConnection 将所有挂起的数据写入,然后关闭连接
getPeer 取得连接中对端的地址信息
getHost 取得连接中本端的地址信息
将transports从协议中分离出来也使得对这两个层次的测试变得更加简单。可以通过简单地写入一个字符串来模拟传输,用这种方式来检查。
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from twisted.internet import protocol 4 from twisted.internet import reactor 5 6 class Echo(protocol.Protocol): 7 def dataReceived(self, data):#只要twisted一收到数据 ,就会调用此方法 8 self.transport.write(data) # 把收到的数据 返回给客户端 9 10 def main(): 11 factory = protocol.ServerFactory() #定义基础工厂类 12 factory.protocol = Echo #socketserver中handle 13 14 reactor.listenTCP(9000,factory) 15 reactor.run() 16 17 if __name__ == '__main__': 18 main()
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 from twisted.internet import reactor, protocol 4 5 # a client protocol 6 7 class EchoClient(protocol.Protocol): 8 """Once connected, send a message, then print the result.""" 9 def connectionMade(self): #连接建立成功,就会自动调用此方法 10 print("connection is build, sending data...") 11 self.transport.write("hello alex!") 12 13 def dataReceived(self, data):#一接收到数据就把它写回 14 "As soon as any data is received, write it back." 15 print "Server said:", data 16 #self.transport.loseConnection() 17 exit('exit') 18 19 def connectionLost(self, reason):#关闭连接 20 print "====connection lost===" 21 22 class EchoFactory(protocol.ClientFactory): 23 protocol = EchoClient #handle 24 25 def clientConnectionFailed(self, connector, reason): 26 print "Connection failed - goodbye!" 27 reactor.stop() 28 29 def clientConnectionLost(self, connector, reason): 30 print "Connection lost - goodbye!" 31 reactor.stop() 32 33 34 # this connects the protocol to a server running on port 8000 35 def main(): 36 f = EchoFactory() 37 reactor.connectTCP("localhost", 9000, f) 38 reactor.run() 39 40 # this only runs if the module was *not* imported 41 if __name__ == '__main__': 42 main()
运行服务器端脚本将启动一个TCP服务器,监听端口1234上的连接。服务器采用的是Echo协议,数据经TCP transport对象写出。运行客户端脚本将对服务器发起一个TCP连接,回显服务器端的回应然后终止连接并停止reactor事件循环。这里的Factory用来对连接的双方生成protocol对象实例。两端的通信是异步的,connectTCP负责注册回调函数到reactor事件循环中,当socket上有数据可读时通知回调处理。
一个传送文件的例子:
服务器端,
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 #_*_coding:utf-8_*_ 4 # This is the Twisted Fast Poetry Server, version 1.0 5 6 import optparse, os 7 8 from twisted.internet.protocol import ServerFactory, Protocol 9 10 11 def parse_args(): 12 usage = """usage: %prog [options] poetry-file 13 14 This is the Fast Poetry Server, Twisted edition. 15 Run it like this: 16 17 python twisted_sendfile.py <path-to-poetry-file> 18 19 If you are in the base directory of the twisted-intro package, 20 you could run it like this: 21 22 python twisted-server-1/fastpoetry.py poetry/ecstasy.txt 23 24 to serve up John Donne's Ecstasy, which I know you want to do. 25 """ 26 27 parser = optparse.OptionParser(usage) 28 29 help = "The port to listen on. Default to a random available port." 30 parser.add_option('--port', type='int', help=help) 31 32 help = "The interface to listen on. Default is localhost." 33 parser.add_option('--iface', help=help, default='localhost') 34 35 options, args = parser.parse_args() 36 #print("--arg:",options,args) 37 #print("-->",options.port) 38 39 if len(args) != 1: 40 parser.error('Provide exactly one poetry file.') 41 poetry_file = args[0] 42 43 if not os.path.exists(args[0]): 44 parser.error('No such file: %s' % poetry_file) 45 46 return options, poetry_file 47 48 49 class PoetryProtocol(Protocol): #handle 50 def connectionMade(self): 51 self.transport.write(self.factory.poem) 52 self.transport.loseConnection() 53 54 55 class PoetryFactory(ServerFactory): #基础类 56 protocol = PoetryProtocol 57 def __init__(self, poem): 58 self.poem = poem 59 60 def main(): 61 options, poetry_file = parse_args() 62 poem = open(poetry_file).read() 63 factory = PoetryFactory(poem) 64 from twisted.internet import reactor 65 port = reactor.listenTCP(options.port or 9000, factory, 66 interface=options.iface) 67 print 'Serving %s on %s.' % (poetry_file, port.getHost()) 68 reactor.run() 69 70 71 if __name__ == '__main__': 72 main()
客户端:
1 #!/usr/bin/env python 2 # -*- coding:utf-8 -*- 3 # This is the Twisted Get Poetry Now! client, version 3.0. 4 5 # NOTE: This should not be used as the basis for production code. 6 7 import optparse 8 9 from twisted.internet.protocol import Protocol, ClientFactory 10 11 12 def parse_args(): 13 usage = """usage: %prog [options] [hostname]:port ... 14 15 This is the Get Poetry Now! client, Twisted version 3.0 16 Run it like this: 17 18 python get-poetry-1.py port1 port2 port3 ... 19 """ 20 21 parser = optparse.OptionParser(usage) 22 _, addresses = parser.parse_args() 23 print('==addr:',_,addresses) 24 if not addresses: 25 print parser.format_help() 26 parser.exit() 27 28 def parse_address(addr): 29 if ':' not in addr: 30 host = '127.0.0.1' 31 port = addr 32 else: 33 host, port = addr.split(':', 1) 34 if not port.isdigit(): 35 parser.error('Ports must be integers.') 36 return host, int(port) 37 #return parse_address(addresses) 38 return map(parse_address, addresses) 39 40 class PoetryProtocol(Protocol): 41 42 poem = '' 43 def dataReceived(self, data): 44 self.poem += data 45 #self.factory = PoetryClientFactory 46 print('[%s] recv:[%s]' %(self.transport.getPeer(),len(self.poem))) 47 def connectionLost(self, reason): 48 self.poemReceived(self.poem) 49 50 def poemReceived(self, poem): 51 self.factory.poem_finished(poem) 52 53 54 class PoetryClientFactory(ClientFactory): 55 protocol = PoetryProtocol #handle method 56 def __init__(self, callback): 57 self.callback = callback 58 def poem_finished(self, poem): 59 self.callback(poem) 60 #self.get_poem(poem) 61 62 63 def get_poetry(host, port, callback): 64 """ 65 Download a poem from the given host and port and invoke 66 callback(poem) 67 when the poem is complete. 68 """ 69 from twisted.internet import reactor 70 factory = PoetryClientFactory(callback) 71 reactor.connectTCP(host, port, factory) 72 73 74 def poetry_main(): 75 addresses = parse_args() #((172.0.0.1,9000),(...)) 76 from twisted.internet import reactor 77 poems = [] 78 79 def got_poem(poem): 80 poems.append(poem) 81 if len(poems) == len(addresses): 82 reactor.stop() 83 84 for address in addresses: 85 host, port = address 86 get_poetry(host, port, got_poem) 87 reactor.run() 88 89 print("main loop done...") 90 #for poem in poems: 91 # Eprint poem 92 93 if __name__ == '__main__': 94 poetry_main()
Redis:
redis是一个key-value存储系统。和Memcached类似,它支持存储的value类型相对更多,包括string(字符串)、list(链表)、set(集合)、zset(sorted set --有序集合)和hash(哈希类型)。这些数据类型都支持push/pop、add/remove及取交集并集和差集及更丰富的操作,而且这些操作都是原子性的。在此基础上,redis支持各种不同方式的排序。与memcached一样,为了保证效率,数据都是缓存在内存中。区别的是redis会周期性的把更新的数据写入磁盘或者把修改操作写入追加的记录文件,并且在此基础上实现了master-slave(主从)同步。
Python操作Redis:
API使用
redis-py 的API的使用可以分类为:
- 连接方式
- 连接池
- 操作
- String 操作
- Hash 操作
- List 操作
- Set 操作
- Sort Set 操作
- 管道
- 发布订阅
1.基本操作
之前我们已经知道,redis是以key-value的形式存储的,所以我们在操作的时候。首先我们将redis所在主机的ip和发布端口作为参数实例化了一个对象r,然后执行set('name','Peony_Y'),这样我们就在内存中存储了一个key为name,值为‘Peony_Y’的项。我们可以理解为{'name':'Peony_Y'},当我们要读取的之后,只需要get('name'),就会得到'Peony_Y'的值。
2.连接池
redis-py使用connection pool来管理对一个redis server的所有连接,避免每次建立、释放连接的开销。默认,每个Redis实例都会维护一个自己的连接池。可以直接建立一个连接池,然后作为参数Redis,这样就可以实现多个Redis实例共享一个连接池。
3、管道
redis-py默认在执行每次请求都会创建(连接池申请连接)和断开(归还连接池)一次连接操作,如果想要在一次请求中指定多个命令,则可以使用pipline实现一次请求指定多个命令,并且默认情况下一次pipline 是原子性操作。
4、发布订阅
发布者:服务器
订阅者:Dashboad和数据处理
Demo如下:
定义一个redishelper类,建立与redis连接,定义频道为fm92.4,定义发布public及订阅subscribe方法。
订阅者:导入刚刚我们写好的类,实例化对象,调用订阅方法,就可以使用while True接收信息了。
发布者:导入刚刚我们写好的类,实例化对象,调用发布方法,下例发布了一条消息‘hello’
RabbitMQ队列
RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。
MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。
实现最简单的队列通信
发送(send)端
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 'localhost')) 5 channel = connection.channel() 6 7 #声明queue 8 #hannel.queue_declare(queue='task_q',durable=True) 9 10 #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 11 channel.basic_publish(exchange='', 12 routing_key='task_q', 13 body='Hello World! 35', 14 properties=pika.BasicProperties( 15 delivery_mode = 2, # make message persistent 16 ) 17 ) 18 print(" [x] Sent 'Hello World!'") 19 connection.close()
接收(recive)端
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 'localhost')) 5 channel = connection.channel() 6 7 8 #You may ask why we declare the queue again ‒ we have already declared it in our previous code. 9 # We could avoid that if we were sure that the queue already exists. For example if send.py program 10 #was run before. But we're not yet sure which program to run first. In such cases it's a good 11 # practice to repeat declaring the queue in both programs. 12 #channel.queue_declare(queue='task_q',durable=True) 13 14 def callback(ch, method, properties, body): 15 print("-->ch") 16 print(" [x] Received %r" % body) 17 ch.basic_ack(delivery_tag = method.delivery_tag) 18 19 20 channel.basic_consume(callback, 21 queue='task_q', 22 no_ack=True) 23 24 print(' [*] Waiting for messages. To exit press CTRL+C') 25 channel.start_consuming()
工作队列
在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多
消费的提供者的代码
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 'localhost')) 5 channel = connection.channel() 6 7 #声明queue 8 channel.queue_declare(queue='task_queue') 9 10 #n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange. 11 import sys 12 13 message = ' '.join(sys.argv[1:]) or "Hello World!" 14 channel.basic_publish(exchange='', 15 routing_key='task_queue', 16 body=message, 17 properties=pika.BasicProperties( 18 delivery_mode = 2, # make message persistent 19 )) 20 print(" [x] Sent %r" % message) 21 connection.close()
消费者的代码
1 import pika,time 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 'localhost')) 5 channel = connection.channel() 6 7 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % body) 11 time.sleep(body.count(b'.')) 12 print(" [x] Done") 13 ch.basic_ack(delivery_tag = method.delivery_tag) 14 15 16 channel.basic_consume(callback, 17 queue='task_queue', 18 ) 19 20 print(' [*] Waiting for messages. To exit press CTRL+C') 21 channel.start_consuming()
此时,先启动消息生产者,然后再分别启动3个消费者,通过生产者多发送几条消息,你会发现,这几条消息会被依次分配到各个消费者身上
1、acknowledgment 消息不丢失
no-ack = False,如果生产者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters( 4 host='10.211.55.4')) 5 channel = connection.channel() 6 7 channel.queue_declare(queue='hello') 8 9 def callback(ch, method, properties, body): 10 print(" [x] Received %r" % body) 11 import time 12 time.sleep(10) 13 print 'ok' 14 ch.basic_ack(delivery_tag = method.delivery_tag) 15 16 channel.basic_consume(callback, 17 queue='hello', 18 no_ack=False) 19 20 print(' [*] Waiting for messages. To exit press CTRL+C') 21 channel.start_consuming()
2、durable 消息不丢失
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) 4 channel = connection.channel() 5 6 # make message persistent 7 channel.queue_declare(queue='hello', durable=True) 8 9 channel.basic_publish(exchange='', 10 routing_key='hello', 11 body='Hello World!', 12 properties=pika.BasicProperties( 13 delivery_mode=2, # make message persistent 14 )) 15 print(" [x] Sent 'Hello World!'") 16 connection.close()
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) 4 channel = connection.channel() 5 6 # make message persistent 7 channel.queue_declare(queue='hello', durable=True) 8 9 10 def callback(ch, method, properties, body): 11 print(" [x] Received %r" % body) 12 import time 13 time.sleep(10) 14 print 'ok' 15 ch.basic_ack(delivery_tag = method.delivery_tag) 16 17 channel.basic_consume(callback, 18 queue='hello', 19 no_ack=False) 20 21 print(' [*] Waiting for messages. To exit press CTRL+C') 22 channel.start_consuming()
3、消息获取顺序
默认消息队列里的数据是按照顺序被消费者拿走,例如:消费者1 去队列中获取 奇数 序列的任务,消费者1去队列中获取 偶数 序列的任务。
channel.basic_qos(prefetch_count=1) 表示谁来谁取,不再按照奇偶数排列
1 import pika 2 3 connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.211.55.4')) 4 channel = connection.channel() 5 6 # make message persistent 7 channel.queue_declare(queue='hello') 8 9 10 def callback(ch, method, properties, body): 11 print(" [x] Received %r" % body) 12 import time 13 time.sleep(10) 14 print 'ok' 15 ch.basic_ack(delivery_tag = method.delivery_tag) 16 17 channel.basic_qos(prefetch_count=1) 18 19 channel.basic_consume(callback, 20 queue='hello', 21 no_ack=False) 22 23 print(' [*] Waiting for messages. To exit press CTRL+C') 24 channel.start_consuming()
4、发布订阅
发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。
exchange type = fanout
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange='logs', 10 type='fanout') 11 12 message = ' '.join(sys.argv[1:]) or "info: Hello World!" 13 channel.basic_publish(exchange='logs', 14 routing_key='', 15 body=message) 16 print(" [x] Sent %r" % message) 17 connection.close()
1 #!/usr/bin/env python 2 import pika 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='logs', 9 type='fanout') 10 11 result = channel.queue_declare(exclusive=True) 12 queue_name = result.method.queue 13 14 channel.queue_bind(exchange='logs', 15 queue=queue_name) 16 17 print(' [*] Waiting for logs. To exit press CTRL+C') 18 19 def callback(ch, method, properties, body): 20 print(" [x] %r" % body) 21 22 channel.basic_consume(callback, 23 queue=queue_name, 24 no_ack=True) 25 26 channel.start_consuming()
5、关键字发送
exchange type = direct
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
5、关键字发送
exchange type = direct
之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange='direct_logs', 10 type='direct') 11 12 result = channel.queue_declare(exclusive=True) 13 queue_name = result.method.queue 14 15 severities = sys.argv[1:] 16 if not severities: 17 sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0]) 18 sys.exit(1) 19 20 for severity in severities: 21 channel.queue_bind(exchange='direct_logs', 22 queue=queue_name, 23 routing_key=severity) 24 25 print(' [*] Waiting for logs. To exit press CTRL+C') 26 27 def callback(ch, method, properties, body): 28 print(" [x] %r:%r" % (method.routing_key, body)) 29 30 channel.basic_consume(callback, 31 queue=queue_name, 32 no_ack=True) 33 34 channel.start_consuming() 35 36 消费者
1 import pika 2 import sys 3 4 connection = pika.BlockingConnection(pika.ConnectionParameters( 5 host='localhost')) 6 channel = connection.channel() 7 8 channel.exchange_declare(exchange='direct_logs', 9 type='direct') 10 11 severity = sys.argv[1] if len(sys.argv) > 1 else 'info' 12 message = ' '.join(sys.argv[2:]) or 'Hello World!' 13 channel.basic_publish(exchange='direct_logs', 14 routing_key=severity, 15 body=message) 16 print(" [x] Sent %r:%r" % (severity, message)) 17 connection.close()
6、模糊匹配
exchange type = topic
在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。
- # 表示可以匹配 0 个 或 多个 单词
- * 表示只能匹配 一个 单词
1 发送者路由值 队列中 2 old.boy.python old.* -- 不匹配 3 old.boy.python old.# -- 匹配
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange='topic_logs', 10 type='topic') 11 12 result = channel.queue_declare(exclusive=True) 13 queue_name = result.method.queue 14 15 binding_keys = sys.argv[1:] 16 if not binding_keys: 17 sys.stderr.write("Usage: %s [binding_key]... " % sys.argv[0]) 18 sys.exit(1) 19 20 for binding_key in binding_keys: 21 channel.queue_bind(exchange='topic_logs', 22 queue=queue_name, 23 routing_key=binding_key) 24 25 print(' [*] Waiting for logs. To exit press CTRL+C') 26 27 def callback(ch, method, properties, body): 28 print(" [x] %r:%r" % (method.routing_key, body)) 29 30 channel.basic_consume(callback, 31 queue=queue_name, 32 no_ack=True) 33 34 channel.start_consuming() 35 36 消费者
1 #!/usr/bin/env python 2 import pika 3 import sys 4 5 connection = pika.BlockingConnection(pika.ConnectionParameters( 6 host='localhost')) 7 channel = connection.channel() 8 9 channel.exchange_declare(exchange='topic_logs', 10 type='topic') 11 12 routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' 13 message = ' '.join(sys.argv[2:]) or 'Hello World!' 14 channel.basic_publish(exchange='topic_logs', 15 routing_key=routing_key, 16 body=message) 17 print(" [x] Sent %r:%r" % (routing_key, message)) 18 connection.close() 19 20 生产者