zoukankan      html  css  js  c++  java
  • RabbitMQ

    一、RabbitMQ简介

      RabbitMQ是流行的开源消息队列系统,用erlang语言开发。它是AMQP(高级消息队列协议)的标准实现。遵循Mozilla Public License开源协议。

      MQ(message queuing)使用消息将应用程序连接起来。这些消息通过像RabbitMQ 这样的消息代理服务器在应用程序之间路由。这就像是在应用程序之间放置一个邮局。我们想要解决的这个问题是处理庞大的实时信息,并把它们快速路由到众多的消费者。我们要在不阻塞消息生产者的情况下做到这一点,同时也无须让生产者知道最终消费者是谁。RabbitMQ使用一种基于标准的方法来确保应用程序之间相互通信,而不管应用是用Python、PHP 还是Scala 编写的。

      RabbitMQ结构图如下:

        

      

      几个概念说明 
    Exchange:交换机,决定了消息路由规则;
    Queue:消息队列;
    Channel:进行消息读写的通道;
    Bind:绑定了Queue和Exchange,意即为符合什么样路由规则的消息,将会放置入哪一个消息队列;

    二、RabbitMQ安装 

    安装配置epel源 
         rpm -ivh http://dl.fedoraproject.org/pub/epel/6/i386/epel-release-6-8.noarch.rpm 
      
    安装erlang 
         yum -y install erlang 
      
    安装RabbitMQ 
         yum -y install rabbitmq-server 
    

      启动RabbitMQ    /etc/init.d/rabbitmq-server start

      安装API  pip3 install pika  

        

    三、实现最简单的队列通信

      send端

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 
     4 import pika
     5 
     6 connection = pika.BlockingConnection(pika.ConnectionParameters(
     7                '192.168.65.245'))               # 连接消息队列服务器
     8 channel = connection.channel()                  # 生成一个管道
     9 
    10 # 声明queue
    11 channel.queue_declare(queue='hello')            # 在管道中生成一个队列,队列的名称叫hello
    12 
    13 # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    14 channel.basic_publish(exchange='',              
    15                       routing_key='hello',
    16                       body='Hello flash!')       # RabbitMQ不能直接往queue里放消息,必须先通过exchange
    17 print(" [x] Sent 'Hello flash!'")
    18 connection.close()

      receive端

     1 #!/usr/bin/env python
     2 # -*- coding:utf-8 -*-
     3 
     4 import pika
     5 
     6 connection = pika.BlockingConnection(pika.ConnectionParameters(
     7                '192.168.65.245'))
     8 channel = connection.channel()
     9 
    10 
    11 # You may ask why we declare the queue again ‒ we have already declared it in our previous code.
    12 # We could avoid that if we were sure that the queue already exists. For example if send.py program
    13 # was run before. But we're not yet sure which program to run first. In such cases it's a good
    14 # practice to repeat declaring the queue in both programs.
    15 channel.queue_declare(queue='hello')      
    16 
    17 
    18 def callback(ch, method, properties, body):
    19     print(" [x] Received %r" % body)
    20 
    21 channel.basic_consume(callback,             # callback, 回调函数
    22                       queue='hello',
    23                       no_ack=True)
    24 
    25 channel.start_consuming()

      

    Work Queues

       

      在这种模式下,RabbitMQ会默认把p发的消息依次分发给各个消费者(c),跟负载均衡差不多 

      生产者代码 

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                   '192.168.65.245'))
    channel = connection.channel()
    
    # 声明queue
    channel.queue_declare(queue='task_queue')
    
    # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    
    message = ' '.join(sys.argv[1:]) or "Hello World!"
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=message,
                          properties=pika.BasicProperties(
                            delivery_mode=2,  # make message persistent
                          ))
    print(" [x] Sent %r" % message)
    connection.close()
    

      消费者代码

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import pika
    import time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
                   '192.168.65.245'))
    channel = connection.channel()
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(body.count(b'.'))
        print(" [x] Done")
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_consume(callback,
                          queue='task_queue',
                          )
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    

      此时,先启动消息生产者,然后再分别启动3个消费者,通过生产者多发送几条消息,你会发现,这几条消息会被依次分配到各个消费者身上。

    消息持久化

      1、acknowledgment 消息不丢失
      no-ack = False,如果生产者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='192.168.65.245'))
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print 'ok'
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    

      2、durable   消息不丢失

        生产者代码

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.65.245'))
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='hello', durable=True)
    
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='Hello World!',
                          properties=pika.BasicProperties(
                              delivery_mode=2, # make message persistent
                          ))
    print(" [x] Sent 'Hello World!'")
    connection.close()
    

        消费者代码

    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.65.245'))
    channel = connection.channel()
    
    # make message persistent
    channel.queue_declare(queue='hello', durable=True)
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        import time
        time.sleep(10)
        print 'ok'
        ch.basic_ack(delivery_tag = method.delivery_tag)
    
    channel.basic_consume(callback,
                          queue='hello',
                          no_ack=False)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    
    

      

     
    运维因自动化而有趣!
  • 相关阅读:
    HttpContext请求上下文对象
    HttpRuntime类
    HttpServerUtility类
    【POJ3614】Sunscreen
    【poj1995】Raising Modulo Numbers
    【poj3263】Tallest Cow(差分数组)
    【HNOI2003】【BZOJ1218】激光炸弹
    STL入门基础【OI缩水版】
    【TJOI2016】【bzoj4552】排序(二分答案+线段树01排序)
    【POJ3784】Running Median(中位数,对顶堆)
  • 原文地址:https://www.cnblogs.com/Rambotien/p/5591077.html
Copyright © 2011-2022 走看看