zoukankan      html  css  js  c++  java
  • RabbitMQ队列

    • 介绍

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。

    应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。

    排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

    • 特点

    MQ是消费-生产者模型的一个典型的代表,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。

    • 使用场景

    在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。

    • 通过RabbitMQ实现简单的队列通信(默认端口5672)

    send端(publish)

     1 import pika
     2 
     3 connection = pika.BlockingConnection(
     4     pika.ConnectionParameters('localhost')
     5     )#建立socket
     6 
     7 channel = connection.channel()    #声明一个管道
     8 
     9 # 声明queue
    10 channel.queue_declare(queue='hello',durable=True) #queue=队列名字,durable=True队列持久化
    11 
    12 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    13 channel.basic_publish(exchange='',
    14                       routing_key='hello',
    15                       body='Hello World!',
    16                       properties=pika.BasicProperties(
    17                           delivery_mode=2,#使消息持久化
    18 
    19                       )
    20                       )#routing_key=队列名,body=发送的消息
    21 print(" [x] Sent 'Hello World!'")
    22 connection.close()
    publish

    receive端(consumer) 

     1 import pika
     2 import time
     3 
     4 connection = pika.BlockingConnection(
     5     pika.ConnectionParameters('localhost')
     6     )
     7 
     8 channel = connection.channel()
     9 
    10 # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
    11 # We could avoid that if we were sure that the queue already exists. For example if send.py program
    12 # was run before. But we're not yet sure which program to run first. In such cases it's a good
    13 # practice to repeat declaring the queue in both programs.
    14 channel.queue_declare(queue='hello',durable=True)
    15 
    16 
    17 def callback(ch, method, properties, body):#ch管道的内存地址,method消息的信息
    18     print('--?',ch,method,properties,body)
    19     time.sleep(30)
    20     print(" [x] Received %r" % body)
    21 
    22     ch.basic_ack(delivery_tag=method.delivery_tag)
    23 
    24 channel.basic_qos(prefetch_count=1)#消息公平分发(告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了)
    25 #消费消息
    26 channel.basic_consume(callback,#如果收到消息就调用callback处理消息
    27                       queue='hello',
    28                       # no_ack=True#no ackownledgement
    29                       )
    30 
    31 print(' [*] Waiting for messages. To exit press CTRL+C')
    32 channel.start_consuming()
    consumer
    no_ack=True#当接收端在未完成接收工作时断开,不会给publish发送确认消息
    • 几种工作模式

    a.Work Queues(消息分发轮询)

    在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c)

    import pika
    import time
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
    channel = connection.channel()
     
    # 声明queue
    channel.queue_declare(queue='task_queue')
     
    # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    import sys
     
    message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent
                          )
                          )
    print(" [x] Sent %r" % message)
    connection.close()
    publish
    import pika, time
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        'localhost'))
    channel = connection.channel()
     
     
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(20)
        print(" [x] Done")
        print("method.delivery_tag",method.delivery_tag)
        ch.basic_ack(delivery_tag=method.delivery_tag)
     
     
    channel.basic_consume(callback,
                          queue='task_queue',
                          no_ack=True
                          )
     
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    consumer

     b.消息持久化

    1 channel.queue_declare(queue='hello', durable=True)

    durable=True

    c.消息公平分发

     1 channel.basic_qos(prefetch_count=1) (消费者端)

    如果Rabbit只管按顺序把消息发到各个消费者身上,不考虑消费者负载的话,很可能出现,一个机器配置不高的消费者那里堆积了很多消息处理不完,同时配置高的消费者却一直很轻松。为解决此问题,可以在各个消费者端,配置perfetch=1,意思就是告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了。

    import pika
    import time
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
     
    channel.queue_declare(queue='task_queue', durable=True)
    print(' [*] Waiting for messages. To exit press CTRL+C')
     
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print(" [x] Done")
        ch.basic_ack(delivery_tag = method.delivery_tag)
     
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(callback,
                          queue='task_queue')
     
    channel.start_consuming()
    consumer

    d.PublishSubscribe(消息发布订阅) 

    让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了

        fanout: 所有bind到此exchange的queue都可以接收消息
        direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息
        topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

    fanout广播模式:

    import pika
    import sys
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
     
    channel.exchange_declare(exchange='logs',
                             type='fanout')
     
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    publish
    import pika
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
     
    channel.exchange_declare(exchange='logs',
                             type='fanout')
     
    result = channel.queue_declare(exclusive=True) #不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    queue_name = result.method.queue
     
    channel.queue_bind(exchange='logs',
                       queue=queue_name)
     
    print(' [*] Waiting for logs. To exit press CTRL+C')
     
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
     
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
     
    channel.start_consuming()
    consumer

    direct有选择的接收消息:

    RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

    import pika
    import sys
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
     
    channel.exchange_declare(exchange='direct_logs',
                             type='direct')
     
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,
                          body=message)
    print(" [x] Sent %r:%r" % (severity, message))
    connection.close()
    publish
    import pika
    import sys
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
     
    channel.exchange_declare(exchange='direct_logs',
                             type='direct')
     
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
     
    severities = sys.argv[1:]
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
        sys.exit(1)
     
    for severity in severities:
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity)
     
    print(' [*] Waiting for logs. To exit press CTRL+C')
     
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
     
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
     
    channel.start_consuming()
    subscriber

    topic更细致的消息过滤:

    表达式符号说明:#代表一个或多个字符,*代表任何字符
          例:#.a会匹配a.a,aa.a,aaa.a等
              *.a会匹配a.a,b.a,c.a等
         注:使用RoutingKey为#,Exchange Type为topic的时候相当于使用fanout

    import pika
    import sys
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
     
    channel.exchange_declare(exchange='topic_logs',
                             type='topic')
     
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
                          routing_key=routing_key,
                          body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()
    publish
    import pika
    import sys
     
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
     
    channel.exchange_declare(exchange='topic_logs',
                             type='topic')
     
    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
     
    binding_keys = sys.argv[1:]
    if not binding_keys:
        sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
        sys.exit(1)
     
    for binding_key in binding_keys:
        channel.queue_bind(exchange='topic_logs',
                           queue=queue_name,
                           routing_key=binding_key)
     
    print(' [*] Waiting for logs. To exit press CTRL+C')
     
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
     
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
     
    channel.start_consuming()
    subscriber
     
    
    
    
  • 相关阅读:
    JMETER接口测试问题三之Host of origin may not be blank
    JMETER接口测试问题解决二之后续接口请求依赖登录接口的操作
    JMETER接口测试问题一之请求超时报错
    jmeter接口测试之json提取器的使用方法二
    JMETER接口测试之Debug sample
    JMTER接口测试之JSON提取器
    EXCEL批量导入到Sqlserver数据库并进行两表间数据的批量修改
    Linq的整型或实体类null引发的报错问题
    SqlServer 统计1-12月份 每个月的数据(临时表)
    select2的多选下拉框上传
  • 原文地址:https://www.cnblogs.com/q1ang/p/9291899.html
Copyright © 2011-2022 走看看