zoukankan      html  css  js  c++  java
  • RabbitMQ基础知识

    RabbitMQ

    关键在于消息的发布与消费、消息的路由。

    在绑定(Binding)Exchange与Queue的同时,一般会指定一个binding key,可以视作Queue的name,
    消费者将消息发送给Exchange时,一般会指定一个routing key
    当binding key 与 routing key 相匹配时,消息就会被路由到对应的Queue中。

    Exchange Types
    fanout fanout类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中。
    direct direct类型的Exchange路由规则也很简单,它会把消息路由到那些binding key与routing key完全匹配的Queue中。
    topic 与direct类似,但是是模糊匹配,*”用于匹配一个单词,“#”用于匹配多个单词
    binding key 类似 *.*.rabbit,routing key 为quick.orange.rabbit的消息会被路由到该Queue
    headers headers类型的Exchange不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
    在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否完全匹配Queue与Exchange绑定时指定的键值对;如果完全匹配则消息会路由到该Queue,否则不会路由到该Queue。


    对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

    参考:http://www.diggerplus.org/archives/3110


    开启RabbitMQ后台管理:
    1.在rabbitMQ安装目录下的sbin目录,打开终端执行:rabbitmq-plugins.bat enable rabbitmq_management开启网页管理界面,然后重启rabbitMQ
    2.浏览器中输入http://localhost:15672/
    3.输入用户名和密码(默认为guest)

    生产者

    import pika
    
    #########  生产者 #########
    # 链接rabbit服务器(localhost是本机,如果是其他服务器请修改为ip地址)
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    # 创建频道
    channel = connection.channel()
    # 创建一个队列名叫test
    channel.queue_declare(queue='test')
    
    # channel.basic_publish向队列中发送信息
    # exchange -- 它使我们能够确切地指定消息应该到哪个队列去。
    # routing_key 指定向哪个队列中发送消息
    # body是要插入的内容, 字符串格式
    
    while True:  # 循环向队列中发送信息,quit退出程序
        inp = input(">>>").strip()
        if inp == 'quit':
            break
        channel.basic_publish(exchange='',
                              routing_key='test',
                              body=inp)
        print("生产者向队列发送信息%s" % inp)
    
    # 缓冲区已经flush而且消息已经确认发送到了RabbitMQ中,关闭链接
    connection.close()
    
    # 输出结果
    # >> > python
    # 生产者向队列发送信息python
    # >> > quit

    消费者

    #!/usr/bin/env python 3
    import pika
    
    ######### 消费者 #########
    # 链接rabbit
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    # 创建频道
    channel = connection.channel()
    # 如果生产者没有运行创建队列,那么消费者也许就找不到队列了。为了避免这个问题,所有消费者也创建这个队列,如果队列已经存在,则这条无效
    channel.queue_declare(queue='test')
    
    
    # 接收消息需要使用callback这个函数来接收,他会被pika库来调用,接受到的数据都是字节类型的
    def callback(ch, method, properties, body):
        """
            ch : 代表 channel
            method :队列名
            properties : 连接rabbitmq时设置的属性
            body : 从队列中取到的内容,获取到的数据时字节类型
        """
    
    
        print(" [x] Received %r" % body)
    # channel.basic_consume 表示从队列中取数据,如果拿到数据 那么将执行callback函数,callback是回调函数
    # no_ack=True 表示消费完这个消息以后不主动把完成状态通知rabbitmq
    channel.basic_consume(callback,
                          queue='test',
                          no_ack=True)
    print(' [*] 等待信息. To exit press CTRL+C')
    # 永远循环等待数据处理和callback处理的数据,start_consuming方法会阻塞循环执行
    channel.start_consuming()
    
    # 输出结果,一直等待处理队列中的消息,不知终止,除非人为ctrl+c
    #  [*]等待消息,To exit press CTRL+C
    #  [x] Received b'python'

    消费者acknowledgement消息不丢失的方法

    # no_ack = False , 如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。在消费者端做设定条件。
    # 生产者,代码同上,未改变
    # 消费者代码
    
    
    import pika
    import time
    
    # 链接rabbit
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    # 创建频道
    channel = connection.channel()
    # 如果生产者没有运行创建队列,那么消费者创建队列,如果队列已存在,创建队列操作会被忽略
    channel.queue_declare(queue='test')
    
    
    # 回调函数
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(10)
        print('ok')
        ch.basic_ack(delivery_tag=method.delivery_tag)  # 当上面消息处理完成后,通知rabbitmq,消息处理完成,不要在发送了
    
    
    channel.basic_consume(callback,
                          queue='test',
                          no_ack=False)  # 表示消费完这个消息后,主动通知rabbitmq完成状态,如果不通知,rabbitmq会把这条消息重新放回队列中,避免丢失
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
  • 相关阅读:
    读书笔记之 javascript 设计模式 工厂模式
    5分钟读书笔记之 设计模式 桥接模式
    读书笔记之 javascript 设计模式 组合模式
    响应式布局
    接收浏览器传值的方式
    AutoPoco的使用
    四种传值方式
    IIS本地服务器,设置IP地址问题
    MVC的小知识点
    MVC 生成Html字符串MvcHtmlString CacheHelper用法
  • 原文地址:https://www.cnblogs.com/jec1999/p/9410841.html
Copyright © 2011-2022 走看看