zoukankan      html  css  js  c++  java
  • rabbitmq基础使用


    一 . 发送端 producer

    1.建立链接和管道

    connection = pika.BlockingConnection(
                   pika.ConnectionParameters('10.64.16.104',5672,virtual_host="cmmon",credentials=pika.PlainCredentials(username="admin",password="123456"))
                 )
    channel = connection.channel()

    2.声明队列

    channel.queue_declare(queue='hello')

    3.声明exchange  (RabbitMQ 一条消息从不会被直接发送到队列, 它会先经过一个交换所)

          exchange为空,代表默认;

         空字符串所指定的默认交换所允许我们准确指定消息应该前往哪个队列,队列名字routing_key决定;

         body为消息内容

    channel.basic_publish(exchange='',routing_key='hello',body='Hello World!')  
    4.关闭连接
    connection.close()  

    二.接收端 consumer

    1.建立连接

    connection = pika.BlockingConnection(
                   pika.ConnectionParameters('10.64.16.104',5672,virtual_host="cmmon",credentials=pika.PlainCredentials(username="admin",password="123456"))
                 )
    channel = connection.channel()

    2.声明队列 (为什么又声明一个队列,因为不知道哪个先运行,所以要声明两次.如果已经确定改队列声明了,那么可以不声明)

    channel.queue_declare(queue='hello')

    3.消费消息

        从队列中接收消息, 通过给队列提供一个callback 函数来实现. 无论何时接收到消息, 这个callback 函数都会被 Pika 库调用.

         callback函数

    def callback(ch, method, properties, body):  # 四个参数为标准格式
     print(" [x] Received %r" % body)
    time.sleep(15)
    ch.basic_ack(delivery_tag = method.delivery_tag) # 消息处理完成
      这里prefetch_count,类似于权重的操作,按能力分发,如果有一个消息,就不再给你发
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(consumer_callback=callback, queue='hello')

          开始消费

    channel.start_consuming()  

    三.保证消息不丢

        上面只是一个生产者,一个消费者

        如果想一个生产者,多个消费者,那就上面的例子,多启动几个消费者.

        如果消费者里面 没有设置prefetch_count,当启动多个消费者,会发现消息只是依次发给了一个消费者,所以加上channel.basic_qos(prefetch_count=1),这样rabbitmq就实现了公平分发,类似于权重的操作.

        ①万一某个消费者子处理消息的时候,异常终止了怎么办?我们希望的是,这个消费者挂掉了,消息会自动转给另一个消费者

            rabbitmq 有效确认机制    no_ack这个参数,默认为False,表示消息需要确认,这样消费者1宕机了,那就可以把这条消息分发给其他的消费者了

       channel.basic_consume(consumer_callback=callback, queue='hello')
       消费者收到消息,正常处理后,此时才通知队列可以将消息从队列里面删除;如果消费者挂掉,与server的链接通道会关闭或者tcp连接丢失,这时候server知道了这个情况,就会自动重发消息.

        ② 万一server挂掉了,消息和队列丢失了怎么办

            那就在声明队列的时候,加上durable=True,这样,当server挂掉了队列还在

           在声明交换机的时候,加上delivery_mode=2使消息持久化,这样,server挂掉了消息还在

           若是保证消息不丢,这两个参数都要设置。

      channel.queue_declare(queue='hello',durable=True)   要保证生产者和消费者生命的队列一样,都为True或者同时为False
      channel.basic_publish(exchange='',routing_key='hello',body='Hello World!',properties=pika.BasicProperties(delivery_mode=2,))
    四.exchange

    消费者不是和exchange直连的,消费者是连在queue上,queue绑定在exchange上

    exchange类型决定了怎么处理消息

          ① fanout: 所有绑定到此exchange的queue都可以接收消息

          ② direct : tongg routing_key和exchange 决定那个唯一的queue可以接收消息

          ③ topic:   所有符合routing_key 的 routing_key所绑定的queue可以接收消息

    1.fanout   因为生产者是不声明的队列的,只在消费端声明队列,所以广播,是实时的,收不到就没了,消息不会存下来。

         生产者

    import pika
    connection = pika.BlockingConnection(
    pika.ConnectionParameters('10.64.16.104',5672,virtual_host="cmmon",credentials=pika.PlainCredentials(username="admin",password="123456")))
    channel = connection.channel()
    # 注意:这里是广播,不需要声明queue
    channel.exchange_declare(exchange='logs',exchange_type="fanout") # 声明广播管道
    message = "Hello World!"
    channel.basic_publish(exchange='logs',routing_key='',body=message)# 注意此处routing_key为空,必须要有
    connection.close()

         消费者

    import pika
    connection = pika.BlockingConnection(
    pika.ConnectionParameters('10.64.16.104',5672,virtual_host="cmmon",credentials=pika.PlainCredentials(username="admin",password="123456")))
    channel = connection.channel()

    channel.exchange_declare(exchange='logs',exchange_type="fanout") # 声明广播管道
    result = channel.queue_declare(exclusive=True) # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    queue_name = result.method.queue #获取随机的queue名字
    channel.queue_bind(exchange='logs',queue=queue_name)
    def callback(ch, method, properties, body):
    print(body)
    channel.basic_consume(callback,queue=queue_name,no_ack=True)
    channel.start_consuming()

     2.direct 有选择的接受消息 消费者可以过滤,只收我想要的消息

          生产者

    import pika
    import sys

    connection = pika.BlockingConnection(
    pika.ConnectionParameters('10.64.16.104',5672,virtual_host="cmmon",credentials=pika.PlainCredentials(username="admin",password="123456")))
    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
    # 重要程度级别,这里默认定义为 info
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='direct_logs',routing_key=severity,body=message)
    print(" [x] Sent %r:%r" % (severity, message))
    connection.close()

          消费者

    import pika
    import sys

    connection = pika.BlockingConnection(
    pika.ConnectionParameters('10.64.16.104',5672,virtual_host="cmmon",credentials=pika.PlainCredentials(username="admin",password="123456")))
    channel = connection.channel()

    channel.exchange_declare(exchange='direct_logs',exchange_type='direct')

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue
    # 获取运行脚本所有的参数
    severities = sys.argv[1:]
    if not severities:
    sys.stderr.write("Usage: %s [info] [warning] [error] " % sys.argv[0])
    sys.exit(1)
    # 循环列表去绑定
    for severity in severities:
    channel.queue_bind(exchange='direct_logs',queue=queue_name,routing_key=severity)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,queue=queue_name,no_ack=True)
    channel.start_consuming()

          运行接收端,指定接受消息的log级别参数,例如

     python direct_rec.py info warning

     python direct_rec.py warning error

          运行生产者,可自行尝试指定生产者的log级别和消息的body参数

    python direct_send.py info haha

    3.topic

         生产者

    import pika
    import sys

    connection = pika.BlockingConnection(
    pika.ConnectionParameters('10.64.16.104',5672,virtual_host="cmmon",credentials=pika.PlainCredentials(username="admin",password="123456")))
    channel = connection.channel()

    channel.exchange_declare(exchange='topic_logs',exchange_type='topic')

    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    channel.basic_publish(exchange='topic_logs',
    routing_key=routing_key,
    body=message)
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()

          消费者

    import pika
    import sys

    connection = pika.BlockingConnection(
    pika.ConnectionParameters('10.64.16.104',5672,virtual_host="cmmon",credentials=pika.PlainCredentials(username="admin",password="123456")))
    channel = connection.channel()

    channel.exchange_declare(exchange='topic_logs',exchange_type='topic')

    result = channel.queue_declare(exclusive=True)
    queue_name = result.method.queue

    binding_keys = sys.argv[1:]
    if not binding_keys:
    sys.stderr.write("Usage: %s [binding_key]... " % sys.argv[0])
    sys.exit(1)

    for binding_key in binding_keys:
    channel.queue_bind(exchange='topic_logs',queue=queue_name,routing_key=binding_key)

    print(' [*] Waiting for logs. To exit press CTRL+C')

    def callback(ch, method, properties, body):
    print(" [x] %r:%r" % (method.routing_key, body))

    channel.basic_consume(callback,queue=queue_name,no_ack=True)

    channel.start_consuming()

         运行接收端,例如

     python topic_rec.py  *.info

     python topic_rec.py  '#'     #接受所有消息

     python topic_rec.py  kern.*

  • 相关阅读:
    Solaris安装pkg
    JSP路径出现问题
    错误卸载软件导致Windows7系统中的软件无法播放视频
    蛋疼的Solaris设置
    SQL运行突然SESSION中断错误
    JAVA利用Zip4j解压缩
    java解压缩/压缩/加密压缩/加密解压缩 ZIP4J---ZIP文件压缩与解压缩学习
    未得冠军的运动员也有教练——Leo鉴书71
    VS2008 编译SQLite 3.8.4.3 + sqlcipher-3.1.0 + openssl-1.0.1g
    Android数据库安全解决方案,使用SQLCipher进行加解密
  • 原文地址:https://www.cnblogs.com/yuzhen0228/p/10489077.html
Copyright © 2011-2022 走看看