zoukankan      html  css  js  c++  java
  • rabbitmq模式:

    简单模式:

    生产者:

    import pika

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个队列
    channel.queue_declare(queue='helloworld')

    # 3 向指定队列插入数据

    channel.basic_publish(exchange='',
    routing_key='helloworld',
    body='Hello 666!')
    print(" [x] Sent 'Hello Yuan!'")
    connection.close()
    消费者:
    import pika

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个队列
    channel.queue_declare(queue='helloworld')

    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

    # 绑定
    channel.basic_consume(queue='helloworld',
    auto_ack=False, # 手动应答
    on_message_callback=callback)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 实时监听
    channel.start_consuming()

    持久化参数:
    生产者:
    import pika

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个队列
    channel.queue_declare(queue='helloworld2',durable=True)

    # 3 向指定队列插入数据

    channel.basic_publish(exchange='',
    routing_key='helloworld2',
    body='Hello 666!',
    properties=pika.BasicProperties(
    delivery_mode=2, # make message persistent
    )
    )
    print(" [x] Sent 'Hello Yuan!'")
    connection.close()
    消费者:
    import pika

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个队列
    channel.queue_declare(queue='helloworld2',durable=True)

    def callback(ch, method, properties, body):
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

    # 绑定
    channel.basic_consume(queue='helloworld2',
    auto_ack=False, # 手动应答
    on_message_callback=callback)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 实时监听
    channel.start_consuming()

    分发参数:
    生产者:
    import pika

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个队列
    channel.queue_declare(queue='helloworld4')

    # 3 向指定队列插入数据

    channel.basic_publish(exchange='',
    routing_key='helloworld4',
    body='Hello 6!',
    )
    print(" [x] Sent 'Hello Yuan!'")
    connection.close()
    消费者:
    import pika
    import time

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个队列
    channel.queue_declare(queue='helloworld4')


    # 轮询分发改为公平分发
    channel.basic_qos(prefetch_count=1)

    def callback(ch, method, properties, body):
    time.sleep(3)
    print(" [x] Received %r" % body)
    ch.basic_ack(delivery_tag=method.delivery_tag)

    # 绑定
    channel.basic_consume(queue='helloworld4',
    auto_ack=False, # 手动应答
    on_message_callback=callback)
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 实时监听
    channel.start_consuming()

    交换机模式--发布订阅:
    生产者:
    import pika

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个交换机
    channel.exchange_declare(exchange='logs', # 交换机名称
    exchange_type='fanout') # 交换机类型:fanout


    message = "info: Hello World!"
    channel.basic_publish(exchange='logs',
    routing_key='',
    body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    消费者:
    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # 声明交换机
    channel.exchange_declare(exchange='logs',
    exchange_type='fanout')

    # 声明一个随机名称队列
    result = channel.queue_declare("",exclusive=True)
    queue_name = result.method.queue
    print("queue_name",queue_name)

    # 将随机队列绑定到指定交换机
    channel.queue_bind(exchange='logs',
    queue=queue_name)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r" % body)


    channel.basic_consume(queue=queue_name,
    auto_ack=True,
    on_message_callback=callback)

    channel.start_consuming()
    交换机模式--关键字模式:
    生产者:
    import pika

    # 1 连接rabbitmq
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()

    # 2 声明一个交换机
    channel.exchange_declare(exchange='logs2', # 交换机名称
    exchange_type='direct') # 交换机类型:fanout


    message = "error: Hello World!"
    channel.basic_publish(exchange='logs2',
    routing_key='error',
    body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    消费者:
    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()

    # 声明交换机
    channel.exchange_declare(exchange='logs2',
    exchange_type='direct')

    # 声明一个随机名称队列
    result = channel.queue_declare("",exclusive=True)
    queue_name = result.method.queue
    print("queue_name",queue_name)

    # 将随机队列绑定到指定交换机,配合关键字



    channel.queue_bind(exchange='logs2',
    queue=queue_name,
    routing_key="info")



    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r" % body)


    channel.basic_consume(queue=queue_name,
    auto_ack=True,
    on_message_callback=callback)

    channel.start_consuming()
    交换机模式-通配符:
    生产者:
    import pika

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='logs3',
    exchange_type='topic')

    message = "info: europe.news!"
    channel.basic_publish(exchange='logs3',
    routing_key='usa.news',
    body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    消费者:
    import pika
    import sys

    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()

    channel.exchange_declare(exchange='logs3',
    exchange_type='topic')

    result = channel.queue_declare("",exclusive=True)
    queue_name = result.method.queue



    channel.queue_bind(exchange='logs3',
    queue=queue_name,
    routing_key="usa.#")

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r" % body)


    channel.basic_consume(queue=queue_name,
    auto_ack=True,
    on_message_callback=callback)

    channel.start_consuming()
     
  • 相关阅读:
    Saltstack module gem 详解
    Saltstack module freezer 详解
    Saltstack module firewalld 详解
    Saltstack module file 详解
    Saltstack module event 详解
    Saltstack module etcd 详解
    Saltstack module environ 详解
    Saltstack module drbd 详解
    Saltstack module dnsutil 详解
    获取主页_剥离百度
  • 原文地址:https://www.cnblogs.com/zhang-da/p/12198725.html
Copyright © 2011-2022 走看看