zoukankan      html  css  js  c++  java
  • rabbitmq模式:

    简单模式:

    生产者:

    import pika

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个队列
    channel.queue_declare(queue='helloworld')

    # 3 向指定队列插入数据

    channel.basic_publish(exchange='',
    routing_key='helloworld',
    body='Hello 666!')
    print(" [x] Sent 'Hello Yuan!'")
    connection.close()
    消费者:
    import pika

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个队列
    channel.queue_declare(queue='helloworld')

    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

    # 绑定
    channel.basic_consume(queue='helloworld',
    auto_ack=False, # 手动应答
    on_message_callback=callback)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 实时监听
    channel.start_consuming()

    持久化参数:
    生产者:
    import pika

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个队列
    channel.queue_declare(queue='helloworld2',durable=True)

    # 3 向指定队列插入数据

    channel.basic_publish(exchange='',
    routing_key='helloworld2',
    body='Hello 666!',
    properties=pika.BasicProperties(
    delivery_mode=2, # make message persistent
    )
    )
    print(" [x] Sent 'Hello Yuan!'")
    connection.close()
    消费者:
    import pika

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个队列
    channel.queue_declare(queue='helloworld2',durable=True)

    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

    # 绑定
    channel.basic_consume(queue='helloworld2',
    auto_ack=False, # 手动应答
    on_message_callback=callback)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 实时监听
    channel.start_consuming()

    分发参数:
    生产者:
    import pika

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个队列
    channel.queue_declare(queue='helloworld4')

    # 3 向指定队列插入数据

    channel.basic_publish(exchange='',
    routing_key='helloworld4',
    body='Hello 6!',
    )
    print(" [x] Sent 'Hello Yuan!'")
    connection.close()
    消费者:
    import pika
    import time

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个队列
    channel.queue_declare(queue='helloworld4')


    # 轮询分发改为公平分发
    channel.basic_qos(prefetch_count=1)

    def callback(ch, method, properties, body):
    time.sleep(3)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

    # 绑定
    channel.basic_consume(queue='helloworld4',
    auto_ack=False, # 手动应答
    on_message_callback=callback)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 实时监听
    channel.start_consuming()

    交换机模式--发布订阅:
    生产者:
    import pika

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个交换机
    channel.exchange_declare(exchange='logs', # 交换机名称
    exchange_type='fanout') # 交换机类型:fanout


    message = "info: Hello World!"
    channel.basic_publish(exchange='logs',
    routing_key='',
    body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    消费者:
    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # 声明交换机
    channel.exchange_declare(exchange='logs',
    exchange_type='fanout')

    # 声明一个随机名称队列
    result = channel.queue_declare("",exclusive=True)
    queue_name = result.method.queue
    print("queue_name",queue_name)

    # 将随机队列绑定到指定交换机
    channel.queue_bind(exchange='logs',
    queue=queue_name)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r" % body)


    channel.basic_consume(queue=queue_name,
    auto_ack=True,
    on_message_callback=callback)

    channel.start_consuming()
    交换机模式--关键字模式:
    生产者:
    import pika

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个交换机
    channel.exchange_declare(exchange='logs2', # 交换机名称
    exchange_type='direct') # 交换机类型:fanout


    message = "error: Hello World!"
    channel.basic_publish(exchange='logs2',
    routing_key='error',
    body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    消费者:
    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # 声明交换机
    channel.exchange_declare(exchange='logs2',
    exchange_type='direct')

    # 声明一个随机名称队列
    result = channel.queue_declare("",exclusive=True)
    queue_name = result.method.queue
    print("queue_name",queue_name)

    # 将随机队列绑定到指定交换机,配合关键字



    channel.queue_bind(exchange='logs2',
    queue=queue_name,
    routing_key="info")



    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r" % body)


    channel.basic_consume(queue=queue_name,
    auto_ack=True,
    on_message_callback=callback)

    channel.start_consuming()
    交换机模式-通配符:
    生产者:
    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='logs3',
    exchange_type='topic')

    message = "info: europe.news!"
    channel.basic_publish(exchange='logs3',
    routing_key='usa.news',
    body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    消费者:
    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='logs3',
    exchange_type='topic')

    result = channel.queue_declare("",exclusive=True)
    queue_name = result.method.queue



    channel.queue_bind(exchange='logs3',
    queue=queue_name,
    routing_key="usa.#")

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r" % body)


    channel.basic_consume(queue=queue_name,
    auto_ack=True,
    on_message_callback=callback)

    channel.start_consuming()
     
  • 相关阅读:
    Android Studio不自动代码提示问题解决
    公司邮箱
    IntentService2
    python帮助信息和常见强制转换
    列表,字典的常用方法
    python的类型(一)
    python运算符
    pycharm调试技巧
    python开发工具
    python安装
  • 原文地址:https://www.cnblogs.com/zhang-da/p/12198725.html
Copyright © 2011-2022 走看看