zoukankan      html  css  js  c++  java
  • RabbitMQ学习笔记【1】

    生产者:

    # 生产者
    import pika
    
    # 链接本地rabbitmq,获取channel对象
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    # RabbitMQ有两种方式发布消息:
        # 一种是生产者直接通过队列发布
        # 一种是通过交换机的方式发布
    # 交换机的方式更灵活,交换机的模式一种有4种:
        # direct = 'direct'  处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
        # fanout = 'fanout'  不处理路由键。你只需要简单的将队列绑定到交换机上。(广播模式,只要绑定了就发,不区分routing_key)
        # headers = 'headers'
        # topic = 'topic'  将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
    # 符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,
       # 但是“audit.*” 只会匹配到“audit.irs”。
    # 性能排序:fanout > direct >> topic。比例大约为11:10:6 # # 申明一个名称为hello的队列 # channel.queue_declare(queue='hello') # 申明一个交换机 channel.exchange_declare(exchange='logs', exchange_type='fanout') # 向队列中插入一个数据"hello world!" # channel.basic_publish(exchange='', # 交换机模式 # routing_key='hello', # 当交换机为空时,routing_key就是要插入的队列名称 # body='Hello World1111!') # 插入的数据 # 向交换机插入数据 channel.basic_publish(exchange='logs', # 交换机模式 routing_key='', # 当交换机模式为fanout模式,为空,当为direct和topic的时候,需要传路由,消费者通过路由匹配获取数据 body='Hello rasion!') # 插入的数据 print(" [x] Sent 'Hello World!'")

    消费者:

    # 消费者
    import pika
    import time
    
    # 链接本地rabbitmq,获取channel对象
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
    
    # 创建交换机(若消费者先启动,需要有交换机)
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    
    # 申明一个名称为hello的队列
    result = channel.queue_declare("", exclusive=True)
    queue_name = result.method.queue
    print(result.method.queue)  # 随机队列名
    
    # 绑定交换机
    channel.queue_bind(exchange='logs',
                #
    routing_key="error", 若交换机模式为direct或者topic的时候,需要传递路由,以方便匹配交换机内容
                       queue=queue_name)
    
    # 回调函数
    def callback(ch, method, properties, body):
        time.sleep(1)
        print(" [x] Received body %r" % body)
        print(" [x] Received ch %r" % ch)
        print(" [x] Received method %r" % method)
        print(" [x] Received properties %r" % properties)
        # ch.basic_ack(delivery_tag=method.delivery_tag)  # 应答
    
    
    # 监听绑定
    channel.basic_consume(queue=queue_name,
                          auto_ack=True,
                          on_message_callback=callback)
    
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    # 启动监听
    channel.start_consuming()
  • 相关阅读:
    fastadmin的数据限制什么意思?具体是怎么配置的?
    jQuery上传剪切图片的原理和代码
    dedecms模板明明存在,还是报错:说模板不存在
    数据库基本信息查询
    数据库 --- 基础知识 1
    代码块分享
    并发编程知识内容汇总
    网络编程 与 并发编程 汇总
    并发编程 --- 线程补充2
    并发编程 --- 线程补充
  • 原文地址:https://www.cnblogs.com/tortoise512/p/15417312.html
Copyright © 2011-2022 走看看