zoukankan      html  css  js  c++  java
  • python操作rabbitmq

     1、普通生产消费:

    --------------------------------------------------------------------------------
    productor:
    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/')) 
    channel = connection.channel()
    
    channel.queue_declare(queue='rhj')
    
    channel.basic_publish(exchange='',
                          routing_key='rhj',
                          body='Hello World!')
    connection.close()
    --------------------------------------------------------------------------------
    consumer: #!/usr/bin/env python # -*- coding: utf-8 -*- import pika connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/')) channel = connection.channel() def callback(channel, method, properties, message): print("get resultc from queue %s" % message) channel.basic_consume(callback, # 消息处理回调函数 queue='rhj', no_ack=True) # 不需要回消息 channel.start_consuming()
    ----------------------------------------------------------------------------------
    结果:
    执行完productor后:

      执行consumer:

      

      消费完成,获取到消息,并未回,队列消息已不存在:

      

    需要回消息的情况,需要改造消费者:

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))
    channel = connection.channel()
    
    
    def callback(channel, method, properties, message):
        print("get result from queue %s" % message)
        # channel.basic_ack(delivery_tag=method.delivery_tag) # 回消息
    
    
    channel.basic_consume(callback,
                          queue='rhj') # 不定义no_ack,默认为False
    
    
    channel.start_consuming()
    ----------------------------------------------------------------------------------
    结果:
    执行完productor后:
    执行consumer:

      

    消费完成,获取到消息,并未回,队列消息仍然存在:

      取消注释,在消费时候回消息,队列消息被删除:

    def callback(channel, method, properties, message):
        print("get result from queue %s" % message)
        channel.basic_ack(delivery_tag=method.delivery_tag) # 回消息

    2、持久化

    前面的队列和消息都是没有持久化的,当rabbitmq-server重启,队列就会丢失,所以下面讨论持久化:

    可以看到,我们前面申明的队列已经不存在了,因为源码默认是不持久化的:

    下面我们修改代码,为队列申明持久化,然后执行:

    #生产者
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))
    channel = connection.channel()
    channel.queue_declare(queue='rhj', durable=True) # 申明队列持久化
    
    channel.basic_publish(exchange='',
                          routing_key='rhj',
                          body='Hello World!')
    connection.close()
    -----------------------------------------------------------------------------------
    执行后,发现队列持久化了,但是消息不见了,那怎么完成消息也持久化:

      为了完成消息持久化,我们继续修改代码:

    channel.basic_publish(exchange='',
                          routing_key='rhj',
                          body='Hello World!',
                          properties=pika.BasicProperties(delivery_mode=2)) #delivery_mode为2为持久化,为1不持久化
    执行后,发现队列和消息都完成了持久化:

       如果我们不为队列持久化,但是为消息持久化会发生什么?:

    结局是不允许,要申请内容持久化,一定要队列持久化为True,否则会抛异常。

    3、队列的durable,exclusive,auto_delete。

    durable上面已经知道什么意思了,下面看看令两种,

    先在生产者什么队列时加一个参数,

    增加一个队列申明如下:

    channel.queue_declare(queue='rhj_temp',exclusive=True)

     结果什么都没发生,其实看注释,是因为当前连接已关闭,该队列已删除了:

    把这条命令加到consumer里就能看到变化了,在consumer连接存在时,队列存在,关闭时,队列删除:

    可以看出,durable是持久化,exclusive是连接结束时关闭,如果同时存在会怎么样,我们在消费者为队列同时添加两个属性?

    channel.queue_declare(queue='rhj_temp', durable=True, exclusive=True)

    按设置,这个队列是应该持久化的,而且在终止连接会删除,有矛盾,结果是怎样的呢?

    队列不见了,说明同时设置durable和exclusive为True,生效的是exclusive;

    下面再看auto_delete,看源码描述:

    和exclusive有啥区别呢,先看下exclusive开两个队列会怎样:

    报错了,因为exclusive是排他的,不允许其他消费者共享,绑定的是单个连接,然后再看auto_delete,

    我们对代码做些改动,申明一个队列,auto_delete=True:

    channel.queue_declare(queue='rhj_tmp', auto_delete=True)

    执行两次,不会保错,两个消费者都存在,说明auto_delete是允许共享的,

     

    继续看队列,看下图,只有在最后一个消费者被干掉的时,队列才会会删除,否则队列一直存在,:

    现在这三个参数的含义应该清楚了。

     4、exchange

    rabbitmq有四种exchange,分别是Direct exchange,Fanout exchange,Topic exchange和Headers exchange。

    执行命令查看下rabbitmq的默认exchange,所以说我们上面指定exchange=''是指定了最后一个默认的direct的exchange,我们看到,每个类型都有默认的exchange。

    第一个看fanout:

    fanout是广播式交换机,即将同一个message发送到所有同该Exchange 绑定的queue。不论RoutingKey是什么,这条消息都会被投递到所有与此Exchange绑定的queue中,修改生产者代码,申明一个exchange,指定消息发到该exchange:

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))
    
    channel.exchange_declare(exchange='rhj_fanout',
                             exchange_type='fanout')
    
    channel.basic_publish(exchange='rhj_fanout',
    routing_key='', #不需要routing_key,配了也没用, 但是参数必须有,这设计。。。 body
    ='Hello World!') connection.close()

    执行该生产者,生成了我们定义的exchange,类型为fanout:

    修改消费者代码:

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))
    channel = connection.channel()
    
    channel.queue_declare(queue='rhj_tmp', auto_delete=True) # 申明自动删除的队列
    channel.queue_bind(exchange='rhj_fanout', queue='rhj_tmp') # 绑定队列到exchange
    
    def callback(channel, method, properties, message):
        print("get result from queue %s" % message)
        channel.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='rhj_tmp')
    
    channel.start_consuming()

    执行consumer后,自动创建了rhj_tmp这个队列,且绑定到了rhj_fanout上,我们查看下绑定信息,看到除了默认exchange,和我们代码一致:

    然后执行productor,往该exchange发一条信息,consumer表现如下:

    consumer成功消费到了消息,可以看到,我们在不需要RoutingKey时通过广播exchange把消息发送到队列并成功消费到。

    第二个看direct:

    Direct是直接交换机,根据Binding指定的Routing Key,将符合Key的消息发送到Binding的Queue。可以构建点对点消息传输模型,

    这个就需要RoutingKey了,而且RoutingKey必须和队列一致,否则就无法路由到该队列,生产者代码:

    import pika
    
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))
    channel = connection.channel()
    channel.exchange_declare(exchange='rhj_director',
                             exchange_type='direct')
    
    
    channel.basic_publish(exchange='rhj_director',
                          routing_key='rhj.tmp',
                          body='Hello World!')
    connection.close()

    执行生产者代码,生成exchange:

    而且exchange因为没有对应的队列,消息被丢弃,消息不回发到默认exchange对应的队列,

    消费者代码:

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))
    channel = connection.channel()
    
    channel.queue_declare(queue='rhj.*', auto_delete=True)
    channel.queue_bind(exchange='rhj_director', queue='rhj.*') # 绑定exchange
    
    def callback(channel, method, properties, message):
        print("get result from queue %s" % message)
        channel.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='rhj.*')
    
    channel.start_consuming()

    执行消费者代码,什么都消费不到,因为队列rhj.*为空,消息由于routing_key的原因,无法发到队列里,

    修改生成者代码,再执行:

    channel.basic_publish(exchange='rhj_director',
                          routing_key='rhj.*',
                          body='Hello World!')

    第三说下Topic,

    Topic Exchangetopic是最常用的一种exchange,叫主题交换机,根据Binding指定的RoutingKey,Exchange对key进行模式匹配后投递到相应的Queue,模式匹配时符号“#”匹配一个或多个词,符号“*”匹配正好一个词,而且单词与单词之间必须要用“.”符号进行分隔。

    生产者代码:

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='rhj_topic',
                             exchange_type='topic')
    
    channel.basic_publish(exchange='rhj_topic',
                          routing_key='rhj.tmp',
                          body='Hello World!')
    connection.close()

    消费者代码,绑定rhj_topic:

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost', 5672, '/'))
    channel = connection.channel()
    
    channel.queue_declare(queue='*.tmp', auto_delete=True)
    channel.queue_declare(queue='rhj.*', auto_delete=True)
    channel.queue_bind(exchange='rhj_topic', queue='*.tmp')
    channel.queue_bind(exchange='rhj_topic', queue='rhj.*') 
    def callback(channel, method, properties, message): print("get result from queue %s" % message) channel.basic_ack(delivery_tag=method.delivery_tag) channel.basic_consume(callback, queue='rhj.*') channel.start_consuming()

    执行生产者代码,发现*.tmp和rhj.*两个队列都收到了消息:

    执行消费者,消费到了消息,消费了的队列消息删除,未消费的则保留:

    具体其他模式匹配,请自己进程尝试。

    最后还有一种header交换器,这种通常使用的比较少,是使用headers进行匹配路由到指定Queue,这里不多介绍了。

    另外exchange声明的时候也是可以指定durable和auto_delete的,含义上面介绍过了,要注意,持久化的exchange只能与持久化的queue进行bind,否则会报错。

  • 相关阅读:
    bzoj2124-等差子序列
    线程安全问题
    IDEA导入maven中导入net.sf.json报错的解决方法
    Java写到.txt文件,如何实现换行
    POI读取Excel如何判断行为空
    为什么JAVA对象需要实现序列化?
    支付宝老版本的支付文档
    连接池和数据源的区别是什么 [
    文件下载时格式设置
    postConstruct执行过程
  • 原文地址:https://www.cnblogs.com/small-office/p/9415335.html
Copyright © 2011-2022 走看看