zoukankan      html  css  js  c++  java
  • RabbitMQ

    RabbitMQ

    好处: 解耦,异步,流量削峰

    阻塞队列就相当于一个缓冲区,平衡了生产者和消费者的处理能力。

    简单命令

    rabbitmqctl stop_app
    rabbitmqctl reset
    rabbitmqctl start_app
    rabbitmqctl list_queues
    

    默认端口号 15672

    模式

    简单模式(最广泛)

    参数

    交换机模式

    -- 发布订阅

    -- 关键字模式

    -- 模糊匹配模式

    简单模式

    • 连接rabbitmq
    • 创建队列
    • 向指定的队列插入数据

    **生产者 **basic_publish

    import pika
     
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
     
    channel.queue_declare(queue='hello')
     
    channel.basic_publish(exchange='',  # 简单模式 交换机为空
                          routing_key='hello',  # 指定队列
                          body='Hello World!')  # 向指定队列插入内容
     
    print(" [x] Sent 'Hello World!'")
    

    **消费者 **basic_consume

    import pika
     
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
     
    channel.queue_declare(queue='hello')
     
     
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
     
     
    channel.basic_consume(queue='hello',
                          auto_ack=True , #自动应答
                          on_message_callback=callback)
     
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
     
    

    参数使用

    应答参数

    模拟消费者出问题

    生产者生产数据给消费者,消费者取到数据则队列无数据,若消费者出bug ,再次启动,则取不到数据了

    auto_ack=False  #改默认应答为手动应答
    ch.basic_ack(delivery_tag=method.delivery_tag)   # 给信号
    

    原理

    生产者生产数据放入队列,消费者改手动应答,消费者取数据。队列还会保留一份数据,当消费者发出信号后ch.basic_ack(delivery_tag=method.delivery_tag) ,队列则删除数据

    手动应答牺牲效率

    注重效率则默认应答

    持久化参数

    模拟队列(rabbitmq)出问题

    #声明queue
    channel.queue_declare(queue='hello2', durable=True)  # durable=True 声明可持久化的队列
                                                        # 但是放数据的时候还得指定是否持久化
                                                        properties=pika.BasicProperties(
                                                                  delivery_mode=2,  #持久化参数2
                                                          )
     
    channel.basic_publish(exchange='',
                          routing_key='hSSello2',
                          body='Hello World!',
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent
                              )
                          )                              # 但是放数据的时候还得指定是否持久化
                                                        #properties=pika.BasicProperties(
                                                          #        delivery_mode=2,  # make                                                         #    message persistent
                                                            #      )
    

    分发参数

    改为手动应应答并且加上 channel.basic_qos(prefetch_count=1) # 消费者 加上这一句话

    默认轮询分发,一人一个

    轮询分发(round-robin)不管谁忙,都不会多给消息,总是你一个我一个。想要做到公平分发(fair dispatch),必须关闭自动应答ack,改成手动应答。使用basicQos(perfetch=1)限制每次只发送不超过1条消息到同一个消费者,消费者必须手动反馈告知队列,才会发送下一个。

     channel.basic_qos(prefetch_count=1)  # 消费者 加上这一句话
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    
    def callback(ch, method, properties, body):
        import time
        time.sleep(4)
        print(" [x] Received %r" % body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)  ########
    channel.basic_consume(queue='hello',
                          auto_ack=False,
                          on_message_callback=callback)
    
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    while True:
        channel.start_consuming()
    
    

    交换机模式(routingkey和exchange同时满足)

    基于交换机通信(容器) 由生产者创建,向交换机插入数据

    消费者创建队列 队列绑定交换机

    发布订阅

    # 生产者
    import pika
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
     
    channel.exchange_declare(exchange='logs',          ## 交换机模式 名字任意
                             exchange_type='fanout') ## fanout 发布订阅模式
     
    message = "info: Hello World!"
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
     
    #####################################################################
    # 消费者
    import pika
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
     
    channel.exchange_declare(exchange='logs',       ## 交换机模式 名字任意
                             exchange_type='fanout') ## fanout 发布订阅模式
     
    # 消费者创建队列
    result = channel.queue_declare("",exclusive=True) # 随机名字  "" , exclusive=True
    queue_name = result.method.queue  # 拿到随机名字
     
    # 绑定到交换机上
    channel.queue_bind(exchange='logs',
                       queue=queue_name)
     
    print(' [*] Waiting for logs. To exit press CTRL+C')
     
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
     
     
    channel.basic_consume(queue=queue_name,
                          auto_ack=True,
                          on_message_callback=callback)
     
    channel.start_consuming()
    

    关键字

    生产者放一个关键字 消费者放一个关键字

    匹配成功后 则给消费者

    # 生产者
    import pika
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
     
    channel.exchange_declare(exchange='logs',          ## 交换机模式 名字任意
                             exchange_type='direct') ## direct 关键字模式
     
    message = "info: Hello World!"
    channel.basic_publish(exchange='logs',
                          routing_key='xxx', # 绑定关键字
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
     
    #####################################################################
    # 消费者
    import pika
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
     
    channel.exchange_declare(exchange='logs',       ## 交换机模式 名字任意
                             exchange_type='direct') ## direct 关键字模式
     
    # 消费者创建队列
    result = channel.queue_declare("",exclusive=True) # 随机名字  "" , exclusive=True
    queue_name = result.method.queue  # 拿到随机名字
     
    # 绑定到交换机上
    channel.queue_bind(exchange='logs',
                       queue=queue_name,routing_key='xxx') # routing_key 绑定关键字
    [ 可以绑定多个关键字                 
    channel.queue_bind(exchange='logs',
                       queue=queue_name,routing_key='xxx') # channel.queue_bind(exchange='logs',
                       queue=queue_name,routing_key='xxx') #
     
                       ]
     
    print(' [*] Waiting for logs. To exit press CTRL+C')
     
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
     
     
    channel.basic_consume(queue=queue_name,
                          auto_ack=True,
                          on_message_callback=callback)
     
    channel.start_consuming()
    

    通配符

    同关键字相比,模糊匹配

    #  一个多个单词
    * 一个单词
     
    uss.#
    #.news
    #.weather
    

     
    # 生产者
    import pika
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
     
    channel.exchange_declare(exchange='logs3',
                             exchange_type='topic')
     
    message = "info: Hello ERU!"
    channel.basic_publish(exchange='logs3',
                          routing_key='europe.weather',
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
     
    # 消费者
     
    import pika
    import sys
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
     
    channel.exchange_declare(exchange='logs3',
                             exchange_type='topic')
     
    result = channel.queue_declare("",exclusive=True)
    queue_name = result.method.queue
     
     
     
    channel.queue_bind(exchange='logs3',
                       queue=queue_name,
                       routing_key="#.news")
     
    print(' [*] Waiting for logs. To exit press CTRL+C')
     
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
     
     
    channel.basic_consume(queue=queue_name,
                          auto_ack=True,
                          on_message_callback=callback)
     
    channel.start_consuming()
    
  • 相关阅读:
    杭州办理招行香港一卡通(两地一卡通)攻略
    Android高手进阶教程(二十)之Android与JavaScript方法相互调用!
    Android应用的自动升级、更新模块的实现
    18个最好的jQuery表格插件
    系统的本地策略不允许你采用交互式登录
    android中判断横屏或者竖屏并改变背景
    记录几个东东
    jsAnim学习
    win7下安装oracle10g出现未知错误,程序异常终止
    oracle创建用户并授权
  • 原文地址:https://www.cnblogs.com/tangshuo/p/12744501.html
Copyright © 2011-2022 走看看