zoukankan      html  css  js  c++  java
  • rabbitmq pika(python)订阅发布多客户端消费场景简单使用

    发布端:

    import pika
    import time
    credentials  = pika.credentials.PlainCredentials('root', 'root',erase_on_connect =False)
    s_conn = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',credentials=credentials))  # 创建连接
    # 原则上,消息,只能有交换机传到队列。就像我们家里面的交换机道理一样。
    # 有多个设备连接到交换机,那么,这个交换机把消息发给那个设备呢,就是根据
    # 交换机的类型来定。类型有:direct	opicheadersfanout
    # fanout:这个就是,所有的设备都能收到消息,就是广播。
    # 此处定义一个名称为'logs'的'fanout'类型的exchange
    chan = s_conn.channel()  # 在连接上创建一个频道
    
    # chan.queue_declare(queue='hello')  # 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
    
    chan.exchange_declare(exchange='logs',
                        exchange_type='fanout'
                          )
    
    while 1:
        time.sleep(1)
    # 将消息发送到名为log的exchange中
    # 因为是fanout类型的exchange,所以无需指定routing_key
        ack = chan.basic_publish(exchange='logs',  # 交换机
                           routing_key='',  # 路由键,写明将消息发往哪个队列
                           body='Ye:当前时间%s ' % time.strftime('%m-%d %H:%M:%S'))  # 生产者要发送的消息
        print("[生产者] send 'hello world")
        time.sleep(10)
        print(ack)
    
    
    s_conn.close()  # 当生产者发送完消息后,可选择关闭
    

    接收端:

    import pika
    credentials = pika.credentials.PlainCredentials('root', 'root',erase_on_connect =False)
    s_conn = pika.BlockingConnection(pika.ConnectionParameters('39.106.205.106',credentials=credentials))  # 创建连接
    
    chan = s_conn.channel()  # 在连接上创建一个频道
    
    chan.exchange_declare(exchange='logs',
                        exchange_type='fanout'
                          )
    # chan.queue_declare(queue='hello')  # 声明一个队列,生产者和消费者都要声明一个相同的队列,用来防止万一某一方挂了,另一方能正常运行
    # 类似的,比如log,我们其实最想看的,当连接上的时刻到消费者退出,这段时间的日志
    # 有些消息,过期了的对我们并没有什么用
    # 并且,一个终端,我们要收到队列的所有消息,比如:这个队列收到两个消息,一个终端收到一个。
    # 我们现在要做的是:两个终端都要收到两个
    # 那么,我们就只需做个临时队列。消费端断开后就自动删除
    result = chan.queue_declare(queue='temp2', exclusive=True)
    # 取得队列名称
    queue_name = result.method.queue
    
    # 将队列和交换机绑定一起
    chan.queue_bind(exchange='logs',
                    queue=queue_name
                    )
    
    def callback(ch, method, properties, body):  # 定义一个回调函数,用来接收生产者发送的消息
        print(ch, method, properties, )
        print(body.decode('utf8'))
    
    
    chan.basic_consume(  # 指定取消息的队列名,
                        queue_name,
                       callback,  # 调用回调函数,从队列里取消息
                        # queue=,
    
                       auto_ack=True
                       )  # 取完一条消息后,给生产者发送确认消息,默认是False的,即  默认不给rabbitmq发送一个收到消息的确认
    # 如果auto_ack=True则消失接收之后就会删除也就是只能取一次
    print('[消费者] waiting for msg .')
    chan.start_consuming()  # 开始循环取消息
    

    注意

    多个接收端需要修改临时队列的名字,以防止冲突,会报错关于锁的错误

  • 相关阅读:
    Mongodb 利用mongoshell进行数据类型转换
    利用Cordova开发移动应用程序
    mongodb Capped Collections 固定集合
    ubuntu desktop使用中遇到的问题
    php session 锁机制和基本安全设置
    mongodb update
    mongodb query
    mongodb Insert 、 remove 、操作原子性(atomicity)
    mongodb index(索引)
    mongodb笔记 getting started
  • 原文地址:https://www.cnblogs.com/vinic-xxm/p/11927071.html
Copyright © 2011-2022 走看看