zoukankan      html  css  js  c++  java
  • RabbitMQ

    • RabbitMQ就是消息队列(Message Queue)
    • 在Python中使用pika库和RabbitMQ相连,再通过RabbitMQ查看队列和消息
    • RabbitMQ消息发送采用轮询方式,即一个接收完再到下一个接收,最后回到第一个consumer,依次循环

    一、简单生产者和消费者实现

    1、生产者

    import pika
    #RabbitMQ 采用轮询方式
    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))#建立连接
    channel=connection.channel()#声明一个管道
    channel.queue_declare(queue='hello')#声明一个队列并命名
    
    channel.basic_publish(exchange='',
                          routing_key='hello',#队列名
                          body='hello there~',
                          )
                          )#发送的消息
    print('send message...')
    
    connection.close()

      首先使用pika库建立连接,声明一个管道,再声明一个队列,发送消息,最后关闭连接

    2、消费者

    import pika
    connection=pika.BlockingConnectison(pika.ConnectionParameters('localhost',5672))#RabbitMQ默认端口
    channel=connection.channel()
    channel.queue_declare(queue='hello',durable=True)#队列名,和produser的一致
    def callback(ch,method,propotities,body):
        print("receive >>",body)
        ch.basic_ack(delivery_tag=method.delivery_tag)#no_ack=False的话要手动确认
    
    channel.basic_consume(callback,#如果收到消息就调用回调函数
                    queue='hello',
                    # no_ack=True
                         )
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()#是channel开始不是connection开始,不用关闭连接

     二、RabbitMQ交换模式

    RabbitMQ交换模式有fanout、direct和topic

      注:三种交换模式中信息的发送都是即时的,即错过了消息就接收不到了

    1、fanout广播

    • fanout即广播模式,也是发布订阅

    publisher实现

    • fanout模式中publisher和一般publisher差不多,但不需要声明queue即routing_key为空,但要指明exchange_type
    import pika
    #不需要声明queue,广播是实时的即订阅发布
    connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel=connection.channel()
    channel.exchange_declare(exchange='logs',exchange_type='fanout')
    message='message...'
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message)
    
    print('send...')
    connection.close()
    View Code

    consumer实现

    • consumer中也不需要指明queue名,rabbitmq会随机分配,但要进行对分配的queue绑定的动作
    • consumer中的exchange_type要与publisher中的一致,否则接收不到消息
    • consumer可以接收publisher广播的任何消息
    import pika
    #发送端和接收端均不用声明queue
    connection=pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel=connection.channel()
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    
    result=channel.queue_declare(exclusive=True)
    #不指定queue名字,rabbitmq会随机指定queue,exclusive=True会在消息接收完后删除
    queue_name=result.method.queue
    def callback(ch,method,properties,body):
        print('receive>>',body)
    
    channel.queue_bind(exchange='logs',queue=queue_name)#绑定queue
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    channel.start_consuming()
    View Code

    2、direct过滤特定消息

    • direct模式可以进行简单过滤,客户端可以选择接收什么类型的消息,如info、warning、error等

    publisher实现

    • direct模式与fanout不同的是direct中routing_key是必须的,即消息的级别
    • direct中要定义消息级别(通过命令行实现不同消息级别的定义),默认级别为info
    • 生产者消费者exchange名字要一致
    import pika,sys
    #不需要声明queue,广播是实时的即订阅发布
    connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel=connection.channel()
    channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
    severity=sys.argv[1] if len(sys.argv)>1 else 'info'#定义消息级别,默认为info  #sys.argv为获取命令行参数
    message=''.join(sys.argv[2:])or 'HELLO!'#获取消息
    channel.basic_publish(exchange='direct_logs',#exchange名字可以随便起,但消费端exchange名字一定要和生产端一致,否则收不到消息!!!
                          routing_key=severity,#direct模式中routing_key是必须的,生成和消费端都是
                          body=message)
    
    print('send...')
    connection.close()
    View Code

    consumer实现

    • 消费者要在命令行模式下指明接收的消息级别(可以接收多个级别的消息),否则会报错
    • 指明routing_key即消息级别
    #direct可以实现接收特定消息或指定消息
    import pika,sys
    #发送端和接收端均不用声明queue
    connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel=connection.channel()
    channel.exchange_declare(exchange='direct_logs',
                             exchange_type='direct')
    
    result=channel.queue_declare(exclusive=True)
    queue_name=result.method.queue
    #定义级别,不指定级别会报错
    severities=sys.argv[1:]
    if not severities:
        sys.stderr.write('Usage:%s [info] [warning] [error]'%sys.argv[0])
        sys.exit(1)
    for s in severities:
        channel.queue_bind(exchange='direct_logs', queue=queue_name,routing_key=s)  
        # 绑定queue,要指定routing_key!!
    def callback(ch,method,properties,body):
        print('receive>>',body)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          #no_ack=True
                          )
    channel.start_consuming()
    View Code

     

    3、topic细致的过滤

    • topic模式可以实现接收不同应用程序的所有消息,如MySQL.info,*.info等

    publisher实现

    • publisher只需要在direct的基础上更改exchange和exchange_type
    • 将默认消息级别改为'anonymous.info'
    import pika,sys
    #不需要声明queue,广播是实时的即订阅发布
    connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel=connection.channel()
    channel.exchange_declare(exchange='topic_logs',exchange_type='topic')
    severity=sys.argv[1] if len(sys.argv)>1 else 'anonymous.info'#定义消息级别,默认为info  #sys.argv为获取命令行参数
    message=''.join(sys.argv[2:])or 'HELLO!'#获取消息
    channel.basic_publish(exchange='topic_logs',#exchange名字可以随便起,但消费端exchange名字一定要和生产端一致,否则收不到消息!!!
                          routing_key=severity,#direct模式中routing_key是必须的,生成和消费端都是
                          body=message)
    
    print('send...')
    connection.close()
    View Code

    consumer实现

    • 消费端接收的消息类型可以以*.开头,如*.info,这样可以接收不同应用的消息
    • #表示接收所有消息
    #direct可以实现接收特定消息或指定消息
    import pika,sys
    #发送端和接收端均不用声明queue
    connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel=connection.channel()
    channel.exchange_declare(exchange='topic_logs',
                             exchange_type='topic')
    
    result=channel.queue_declare(exclusive=True)
    queue_name=result.method.queue
    #定义级别,不指定级别会报错
    severities=sys.argv[1:]
    if not severities:
        sys.stderr.write('Usage:%s [指定接收消息的类型]'%sys.argv[0])
        #消费端接收的消息类型可以以*.开头,如*.info,这样可以接收不同应用的消息
        sys.exit(1)
    for s in severities:
        channel.queue_bind(exchange='topic_logs', queue=queue_name,routing_key=s)  # 绑定queue,要指定routing_key!!
    def callback(ch,method,properties,body):
        print('receive>>',body)
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          #no_ack=True
                          )
    channel.start_consuming()
    View Code

     

    • rpc
    • rpc可以实现客户-服务器进行双向通信,客户和服务器都将消息发送到rpc中,rpc用不同的队列支持它们通信
    • 客户端
    import pika
    import uuid
    class FibonacciRpcClient(object):
        def __init__(self):
            self.connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
            self.channel=self.connection.channel()
            result=self.channel.queue_declare(exclusive=True)
            self.callback_queue=result.method.queue
            self.channel.basic_consume(self.callback,no_ack=True,queue=self.callback_queue)
    
        def callback(self,ch,method,props,body):
            if self.corr_id==props.correlation_id:
                #判断从服务器端接收的UUID和之前发送的是否相等,借此判断是否是同一个消息队列
                self.response=body#从服务器端收到的消息
    
        def call(self,n):
            self.response=None
            self.corr_id=str(uuid.uuid4())#客户端发送请求时生成一个唯一的UUID
    
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties=pika.BasicProperties(#消息持久化
                                           reply_to=self.callback_queue,
                                       correlation_id=self.corr_id),
                                       body=str(n))
            while self.response is None:
                self.connection.process_data_events()
                #一般情况下channel.start_consuming表示阻塞模式下接收消息
                #这里不阻塞收消息,且隔一段时间检查有没有消息
                #即非阻塞版的start_consuming
                print('no message...')#进行到这一步代表没有消息
            return int(self.response)
    
    
    
    fibonacci_rpc=FibonacciRpcClient()
    response=fibonacci_rpc.call(6)
    print('get',response)
    View Code
    • 服务器端
    import pika
    connection=pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel=connection.channel()
    channel.queue_declare(queue='rpc_queue')
    
    def callback(ch,method,props,body):
        '''接收消息并返回结果'''
        n=int(body)
        print(n)
        response=fib(n)
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,#要返回的队列名
                         properties=pika.BasicProperties(correlation_id=props.correlation_id),#使correlation_id等于客户端的并发送回去
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)#确认收到
    
    def fib(n):
        '''斐波那契实现'''
        if n==0 or n==1:
            return n
        else:
            return fib(n-1)+fib(n-2)
    
    
    
    channel.basic_consume(callback,queue='rpc_queue')
    channel.start_consuming()
    View Code
  • 相关阅读:
    react项目如何调试?
    react使用引入svg的icon;svg图形制作
    react使用echarts
    SQL SEVER数据库重建索引的方法
    ThreadLocal的内存泄露
    ThreadLocal是否会引发内存泄露的分析 good
    深入分析 ThreadLocal 内存泄漏问题
    清理ThreadLocal
    线程池 Threadlocal 使用注意
    ThreadLocal对象使用过程中容易陷入的坑
  • 原文地址:https://www.cnblogs.com/Aprilnn/p/9336377.html
Copyright © 2011-2022 走看看