zoukankan      html  css  js  c++  java
  • python之上下文管理、redis的发布订阅、rabbitmq

    使用with打开文件的方式,是调用了上下文管理的功能

     1 #打开文件的两种方法:
     2 
     3 f = open('a.txt','r')
     4 
     5 with open('a.txt','r') as f 
     6 
     7 实现使用with关闭socket
     8 import contextlib
     9 import socket
    10 
    11 @contextlib.contextmanage
    12 def Sock(ip,port):
    13     socket = socket.socket()
    14     socket.bind((ip,port))
    15     socket.listen(5)
    16     try:
    17         yield socket
    18     finally:
    19         socket.close()
    20 
    21 #执行Sock函数传入参数,执行到yield socket返回值给s,执行with语句体,执行finally后面的语句
    22 with Sock('127.0.0.1',8000) as s:
    23     print(s)

    redis的发布订阅

    class RedisHelper:
    
        def __init__(self):
            #调用类时自动连接redis
            self.__conn = redis.Redis(host='192.168.1.100')
    
        def public(self, msg, chan):
            self.__conn.publish(chan, msg)
            return True
    
        def subscribe(self, chan):
            pub = self.__conn.pubsub()
            pub.subscribe(chan)
            pub.parse_response()
            return pub
    
    #订阅者
    import s3
    
    obj = s3.RedisHelper()
    data = obj.subscribe('fm111.7')
    print(data.parse_response())
    
    #发布者
    import s3
    
    obj = s3.RedisHelper()
    obj.public('alex db', 'fm111.7')

     RabbitMQ

     1 #消费者
     2 import pika
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
     5 channel = connection.channel()#创建对象
     6 
     7 channel.queue_declare(queue = 'wocao')
     8 def callback(ch,method,properties,body):
     9     print("[x] Received %r"%body)
    10 
    11 channel.basic_consume(callback,queue = 'wocao',no_ack = True)
    12 print('[*] Waiting for messages. To exit press CTRL+C')
    13 channel.start_consuming()
    14 
    15 #生产者
    16 import pika
    17 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
    18 channel = connection.channel()
    19 channel.queue_declare(queue = 'wocao')#指定一个队列,不存在此队列则创建
    20 channel.basic_publish(exchange = '',routing_key = 'wocao',body = 'hello world!')
    21 print("[x] Sent 'hello world!")
    22 connection.close()

     exchange type类型

    #生产者
    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='192.168.11.87'))
    channel = connection.channel()
    #fanout类型,对绑定该exchange的队列实行广播
    channel.exchange_declare(exchange='logs_fanout',
                             type='fanout')
    
    # 随机创建队列
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    # 绑定exchange
    channel.queue_bind(exchange='logs_fanout',
                       queue=queue_name)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    channel.start_consuming()
    #消费者
    import pika
    
    #发送方
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='192.168.11.87'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs_fanout',
                             type='fanout')
    
    message = "what's the fuck"
    #设置exchange的名
    channel.basic_publish(exchange='logs_fanout',
                          routing_key='',
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
     1 #根据关键字发送指定队列
     2 #生产者(发布者)
     3 import pika
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(
     5                                      host = '127.0.0.1'))
     6 channel = connection.channel()
     7 
     8 channel.exchange_declare(exchange='direct_logs_1',
     9                          type='direct')  # 关键字发送到队列
    10 #对error关键字队列发送指令
    11 severity = 'error'
    12 message = '123'
    13 channel.basic_publish(exchange = 'direct_logs_1',
    14                        routing_key = severity,
    15                        body = message)
    16 print('[x] Sent %r:%r'%(severity,message))
    17 connection.close()
    18 #消费者(订阅者)
    19 import pika
    20 #消费者
    21 connection = pika.BlockingConnection(pika.ConnectionParameters(
    22                                      host = '127.0.0.1'))
    23 channel = connection.channel()
    24 channel.exchange_declare(exchange='direct_logs_1',
    25                          type = 'direct')#关键字发送到队列
    26 
    27 result = channel.queue_declare(exclusive=True)
    28 queue_name = result.method.queue
    29 serverities = ['error','info','warning']
    30 for severity in serverities:
    31     channel.queue_bind(exchange='direct_logs_1',
    32                        queue = queue_name,
    33                        routing_key = severity)
    34 def callback(ch,method,properties,body):
    35     print('[x] %r:%r'%(method.routing_key,body))
    36 
    37 channel.basic_consume(callback,
    38                       queue = queue_name,
    39                       no_ack = True)
    40 channel.start_consuming()
     1 #实现消息不丢失接收方
     2 import pika
     3 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.211.55.4'))
     4 channel = connection.channel()
     5 channel.queue_declare(queue = 'hello')
     6 
     7 def callback(ch,method,properties,body):
     8     print('redeived %s'%body)
     9     import time
    10     time.sleep(10)
    11     print('ok')
    12     ch.basic_ack(delivery_tag= method.delivery_tag)
    13 #no_ack = False接收方接受完请求后发送给对方一个接受成功的信号,如果没收到mq会重新将任务放到队列
    14 channel.basic_consume(callback,queue = 'hello',no_ack=False)
    15 print(' Waiting for messages.To exit press CTRL+C')
    16 channel.start_consuming()
     1 #发送方
     2 #实现消息不丢失
     3 import pika
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(host = '10.211.55.4'))
     5 channel = connection.channel()
     6 channel.queue_declare(queue = 'hello',durable = True)
     7 channel.basic_publish(exchange = '',routing_key = 'hello world',
     8                       properties = pika.BasicProperties(
     9                           delivery_mode=2,
    10                       ))#发送方不丢失,发送方保持持久化
    11 print(' Waiting for messages.To exit press CTRL+C')
    12 channel.start_consuming()
     1 #接收方
     2 import pika
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.100'))
     5 channel = connection.channel()
     6 
     7 
     8 channel.queue_declare(queue='hello', durable=True)
     9 def callback(ch, method, properties, body):
    10     print(" [x] Received %r" % body)
    11     import time
    12     time.sleep(10)
    13     print 'ok'
    14     ch.basic_ack(delivery_tag = method.delivery_tag)
    15 channel.basic_consume(callback,
    16                       queue='hello',
    17                       no_ack=False)
    18 channel.start_consuming()

    RabbitMQ队列中默认情况下,接收方从队列中获取消息是顺序的,例如:接收方1只从队列中获取奇数的任务,接收方2只从队列中获取偶数任务

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.11.100'))
    channel = connection.channel()
    channel.queue_declare(queue='hello')
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print 'ok'
        ch.basic_ack(delivery_tag = method.delivery_tag)
    #表示队列不分奇偶分配,谁来取任务就给谁
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()

    RabbitMQ会重新将该任务添加到队列中

  • 相关阅读:
    NOIP 2012 T5 借教室 [洛谷P1083]
    POJ2437 Muddy roads
    POJ2288 Islands and Bridges
    洛谷P2014 TYVJ1051 选课
    POJ1741 Tree
    CODEVS1995 || TYVJ1863 黑魔法师之门
    TYVJ1939 玉蟾宫
    TYVJ1305 最大子序和
    POJ1737 Connected Graph
    TYVJ1864 守卫者的挑战
  • 原文地址:https://www.cnblogs.com/liguangxu/p/5704390.html
Copyright © 2011-2022 走看看