zoukankan      html  css  js  c++  java
  • rabbitmq

    简介

    RabbitMQ是一个消息代理:它接受并转发消息。您可以将其视为邮局:当您将要发布的邮件放在邮箱中时,您可以确信Postman先生最终会将邮件发送给收件人。在这个比喻中,RabbitMQ是邮箱,邮局和邮递员。RabbitMQ和邮局之间的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据块的消息

    • 生产意味着只有发送。发送消息的程序是一个生产者

    • 队列是在RabbitMQ中的邮箱的名称。虽然消息流经RabbitMQ和您的应用程序,但它们只能存储在队列中队列仅由主机的存储器&磁盘限制约束,它本质上是一个大的消息缓冲器。许多生产者可以发送到一个队列的消息,并且许多消费者可以尝试从一个队列接收数据这就是我们如何代表一个队列:

    • 消费与收货有相似的含义。一个消费者是一个程序,主要是等待接收信息:

    请注意,生产者,消费者和经纪人不必驻留在同一个主机上

    一、利用rabbitmq实现一个简单收发helloworld程序

    我们的整体设计将如下所示:

    生产者向“hello”队列发送消息。消费者从该队列接收消息。

    1. 首先我们要安装pika模块(pip install pika)
      1. 生存者程序:
        import pika
        
        connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #我们现在连接到本地机器上的代理,因此是 本地主机。如果我们想连接到不同机器上的代理,我们可以在此处指定其名称或IP地址。
        channel = connection.channel()
        
        channel.queue_declare(queue='hello')  #确保对方存在,防止消息丢失
        
        """
        在RabbitMQ中,消息永远不能直接发送到队列,它总是需要通过交换(exchange)。
        我们现在需要知道的是如何使用由空字符串标识的默认交换。后面会详细说明
        这种交换是特别的 - 它允许我们指定消息应该去哪个队列。
        需要在routing_key参数中指定队列名称:
        body指定消息内容
        """
        channel.basic_publish(exchange='',
                              routing_key='hello',
                              body = 'Hello。。。。!',)
        
        print("[x] sent hello world")
        
        connection.close()
        

          3、消费者程序

        import pika
        
        connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))#首先我们需要连接到RabbitMQ服务器,与生产者一样
        channel = connection.channel()
        
        channel.queue_declare(queue='hello')  #我们还不确定先运行哪个程序。在这种情况下,重复在两个程序中声明队列是一个很好的做法。官方解释
        
        #从队列中接收消息很复杂,通过回掉函数来处理消息
        def callback(ch,method,properties,body):
            print("[x] received {}".format(body))
        
        channel.basic_consume(callback,
                              queue='hello',
                              no_ack=True) #no_ack为True即不确认,即不管队列发送过来的消息有没有处理完成,都不向服务器(生产者)确认,如果消息还没处理完,消费者程序挂了,则此条消息丢失,如果no_ack默认即False(确认),消费者程序如果没有执行完程序,则生产者不会丢弃这条消息,下次继续发送
        print("[x] waiting for message ,press crtl+c exit")
        channel.start_consuming()  
        

    一个简单的helloworld程序就完成了

     下面做个小测试:

    同时开3个消费者,然后依次发送消息,消费者是怎么接收消息的呢?

    结果是每个消费者依次接收消息即轮询接收消息,并且先打开的消费者先接收到消息,下面看测试结果

    前面我们说了no_ack默认时,如果消息处理完,会向服务器确认,如果这时,消息还没处理完,客户端挂了,则生产者会把消息转发给另外的消费者处理,那生产者是如何判断消费者消息已经处理完并且消费者端客户断的呢?

    1、判断客户端中断很简单,即生产者丢失了远程连接,这里rabbitmq已经帮我们做了,我们不需要再操作

    2、当处理完消息,我们要手动确认,这是要在我们消息处理完添加一行代码:ch.basic_ack(delivery_tag=method.delivery_tag)

    生产者代码:

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #我们现在连接到本地机器上的代理,因此是 本地主机。如果我们想连接到不同机器上的代理,我们可以在此处指定其名称或IP地址。
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')  #确保对方存在,防止消息丢失
    
    """
    在RabbitMQ中,消息永远不能直接发送到队列,它总是需要通过交换(exchange)。
    我们现在需要知道的是如何使用由空字符串标识的默认交换。后面会详细说明
    这种交换是特别的 - 它允许我们指定消息应该去哪个队列。
    需要在routing_key参数中指定队列名称:
    body指定消息内容
    """
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body = 'Hello world[3]!',)
    
    print("[x] sent hello world")
    
    connection.close()
    

    消费者代码:

    import pika,time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))#首先我们需要连接到RabbitMQ服务器,与生产者一样
    channel = connection.channel()
    
    channel.queue_declare(queue='hello')  #我们还不确定先运行哪个程序。在这种情况下,重复在两个程序中声明队列是一个很好的做法。官方解释
    
    #从队列中接收消息很复杂,通过回掉函数来处理消息
    def callback(ch,method,properties,body):
        print("[x] received {}".format(body))
        time.sleep(3)
        print("[x] done")
        ch.basic_ack(delivery_tag=method.delivery_tag)  #在no_ack为默认值时,一定要加上这句,来告诉生产者消息已经处理完成
    channel.basic_consume(callback,
                          queue='hello'
                          ) #no_ack为True即不确认,即不管队列发送过来的消息有没有处理完成,都不向服务器(生产者)确认,如果消息还没处理完,消费者程序挂了,则此条消息丢失,如果no_ack默认即False(确认),消费者程序如果没有执行完程序,则生产者不会丢弃这条消息,下次继续发送
    print("[x] waiting for message ,press crtl+c exit")
    channel.start_consuming()
    

      演示结果:先启动多个消费者,然后启动生产者,此时,如果消费者还没处理完消息,则转到下一个消费者处理。

    上面说了消费者挂了的处理方式,如果生产者挂了我们该如何处理呢?答案是消息的持久化

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost")) #我们现在连接到本地机器上的代理,因此是 本地主机。如果我们想连接到不同机器上的代理,我们可以在此处指定其名称或IP地址。
    channel = connection.channel()
    
    channel.queue_declare(queue='task',durable=True)  #持久化队列
    
    """
    在RabbitMQ中,消息永远不能直接发送到队列,它总是需要通过交换(exchange)。
    我们现在需要知道的是如何使用由空字符串标识的默认交换。后面会详细说明
    这种交换是特别的 - 它允许我们指定消息应该去哪个队列。
    需要在routing_key参数中指定队列名称:
    body指定消息内容
    """
    channel.basic_publish(exchange='',
                          routing_key='task',    #名字同上面的队列一起改
                          body = 'Hello world[3]!',
                          properties=pika.BasicProperties(delivery_mode=2)#持久化消息
                          )
    
    
    print("[x] sent hello world")
    
    connection.close()
    

    x消费者

    import pika,time
    
    connection = pika.BlockingConnection(pika.ConnectionParameters("localhost"))#首先我们需要连接到RabbitMQ服务器,与生产者一样
    channel = connection.channel()
    
    channel.queue_declare(queue='task',durable=True)  #我们还不确定先运行哪个程序。在这种情况下,重复在两个程序中声明队列是一个很好的做法。官方解释
    
    #从队列中接收消息很复杂,通过回掉函数来处理消息
    def callback(ch,method,properties,body):
        print("[x] received {}".format(body))
        time.sleep(3)
        print("[x] done")
        ch.basic_ack(delivery_tag=method.delivery_tag)  #在no_ack为默认值时,一定要加上这句,来告诉生产者消息已经处理完成
    channel.basic_consume(callback,
                          queue='task'
                          ) #no_ack为True即不确认,即不管队列发送过来的消息有没有处理完成,都不向服务器(生产者)确认,如果消息还没处理完,消费者程序挂了,则此条消息丢失,如果no_ack默认即False(确认),消费者程序如果没有执行完程序,则生产者不会丢弃这条消息,下次继续发送
    print("[x] waiting for message ,press crtl+c exit")
    channel.start_consuming()
    

      你可能已经注意到现在rabbitmq是公平的在调度,即使一个consumer很忙,另外一个consumer很空。那我们在消息处理之前先判断队列是否处于忙的状态,即在channel.basic_consume消息前加channel.basic_qos(prefetch_count=1),prefetch_count为当前队列数量,如该例表示当队列中有一条消息就认定队列是忙的,发送给另外一个空的队列继续处理。

    Exchanges

    In previous parts of the tutorial we sent and received messages to and from a queue. Now it's time to introduce the full messaging model in Rabbit.

    Let's quickly go over what we covered in the previous tutorials:

    • producer is a user application that sends messages.
    • queue is a buffer that stores messages.
    • consumer is a user application that receives messages.

    The core idea in the messaging model in RabbitMQ is that the producer never sends any messages directly to a queue. Actually, quite often the producer doesn't even know if a message will be delivered to any queue at all.

    Instead, the producer can only send messages to an exchange. An exchange is a very simple thing. On one side it receives messages from producers and the other side it pushes them to queues. The exchange must know exactly what to do with a message it receives. Should it be appended to a particular queue? Should it be appended to many queues? Or should it get discarded. The rules for that are defined by the exchange type.

    There are a few exchange types available: direct, topic, headers and fanout. We'll focus on the last one -- the fanout. 

    fanout模式通俗点就是广播,即所有绑定到该exchange的队列都能收到producer发出来的消息

    代码producer:

    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',exchange_type='fanout')
    """此步骤是必须的,因为禁止发布到不存在的交换机。
    如果没有任何队列绑定到交换机,消息将丢失,但对我们来说是可行的; 
    如果没有消费者正在听,我们可以安全地丢弃该消息。这个就像我们听
    收音机,即使我们不听,电台也一直放,当我们切换到该电台,就能接收消息
    """
    
    message = ''.join(sys.argv[1:]) or "info:Hello World2!"
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message
                          ) #因为是广播模式,所以这里不需要指定routing_key
    
    print("[x] Sent {}".format(message))
    connection.close()
    

      代码consumer:

    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',exchange_type='fanout')
    
    result = channel.queue_declare(exclusive=True) #随机创建一个队列,exclusive=True表示当消费者断开,删除队列
    queue_name = result.method.queue
    print(queue_name)
    
    channel.queue_bind(exchange='logs',queue=queue_name) #绑定queue至exchange
    
    print("[*]等待日志退出按CTRL + C")
    
    def callback(ch,method,properties,body):
        print("[x]{}".format(body))
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True
                          )
    
    channel.start_consuming()
    

      代码测试:首先先运行多个(>=2)consumer,然后再运行producer程序,运行结果显示多个consumer均能收到消息。如果先运行producer,再运行consumer则消息会被丢弃

    direct: 通过routingKey和exchange决定的那个唯一的queue可以接收消息

    有选择的接收消息(exchange type=direct) 

    RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

    producer:

    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
    """此步骤是必须的,因为禁止发布到不存在的交换机。
    如果没有任何队列绑定到交换机,消息将丢失,但对我们来说是可行的; 
    如果没有消费者正在听,我们可以安全地丢弃该消息。这个就像我们听
    收音机,即使我们不听,电台也一直放,当我们切换到该电台,就能接收消息
    """
    
    
    severity = sys.argv[1] if len(sys.argv) > 2 else 'info'  #默认情况下即向绑定的队列中有关键字info的队列发送
    message = ''.join(sys.argv[2:]) or "info:Hello World!"
    
    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,
                          body=message
                          ) #routing_key为绑定的队列的关键字
    
    print("[x] Sent {}".format(message))
    connection.close()
    

      consumer:

    import pika,sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='direct_logs',exchange_type='direct')
    
    result = channel.queue_declare(exclusive=True) #随机创建一个队列,exclusive=True表示当消费者断开,删除队列
    queue_name = result.method.queue
    
    severities = sys.argv[1:]
    if not severities:
        sys.stderr.write("Usage:{} [info] [warning] [error]
    ".format(sys.argv[0]))
        sys.exit()
    
    for severity in severities: #为队列绑定多个关键字
        channel.queue_bind(exchange='direct_logs',
                           queue=queue_name,
                           routing_key=severity
                           ) #绑定queue至exchange,routing_key关键字绑定。
    
    print("[*]等待日志退出按CTRL + C")
    
    def callback(ch,method,properties,body):
        print("[x]{}".format(body))
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True
                          )
    
    channel.start_consuming()
    

      程序运行模拟:consumer程序运行python exchange_consumer.py info,

    python exchange_consumer.py warning,

    python exchange_consumer.py info warning。

    producer程序发消息:python exchange_producer.py info 111,

    python exchange_producer.py warning 222

    此时可以看到结果exchange_consumer.py info客户端上收到111,info warning客户端上收到222,exchange_consumer.py info warning客户端上收到111和222

     更细致的消息过滤topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息

    Although using the direct exchange improved our system, it still has limitations - it can't do routing based on multiple criteria.

    In our logging system we might want to subscribe to not only logs based on severity, but also based on the source which emitted the log. You might know this concept from the syslog unix tool, which routes logs based on both severity (info/warn/crit...) and facility (auth/cron/kern...).

    That would give us a lot of flexibility - we may want to listen to just critical errors coming from 'cron' but also all logs from 'kern'.

    • * (star) can substitute for exactly one word.
    • # (hash) can substitute for zero or more words.

    It's easiest to explain this in an example:

    代码同direct基本没什么变化
    生产者:
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs',exchange_type='topic')
    """此步骤是必须的,因为禁止发布到不存在的交换机。
    如果没有任何队列绑定到交换机,消息将丢失,但对我们来说是可行的; 
    如果没有消费者正在听,我们可以安全地丢弃该消息。这个就像我们听
    收音机,即使我们不听,电台也一直放,当我们切换到该电台,就能接收消息
    """
    
    
    routing_key = sys.argv[1] if len(sys.argv) > 2 else 'anonymous.info'  #默认情况下即向绑定的队列中有关键字info的队列发送
    message = ''.join(sys.argv[2:]) or "info:Hello World!"
    
    channel.basic_publish(exchange='topic_logs',
                          routing_key=routing_key,
                          body=message
                          ) #routing_key为绑定的队列的关键字
    
    print("[x] Sent {}".format(routing_key,message))
    connection.close()
    

      

    消费者:
    import pika,sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='topic_logs',exchange_type='topic')
    
    result = channel.queue_declare(exclusive=True) #随机创建一个队列,exclusive=True表示当消费者断开,删除队列
    queue_name = result.method.queue
    
    binding_keys = sys.argv[1:]
    if not binding_keys:
        sys.stderr.write("Usage:{} [binding_key]....
    ".format(sys.argv[0]))
        sys.exit()
    
    for binding_key in binding_keys:
        channel.queue_bind(exchange='topic_logs',
                           queue=queue_name,
                           routing_key=binding_key
                           ) #绑定queue至exchange
    
    print("[*]等待日志退出按CTRL + C")
    
    def callback(ch,method,properties,body):
        print("[x]{}:{}".format(method.routing_key,body))
    
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True
                          )
    
    channel.start_consuming()
    

      

      此时我们可以根据表达式来发送到指定的队列。

    测试1:消费者:python exchange_consumer.py #(这里输入#是因为输入#是注释键不是别,所以要用转义符,你也可以用引号括起来代替"#"),此时就相当于fanout。

    消费者2:python exchange_consumer.py "#.py"

    消费者3:python exchange_consumer.py "*.py"

    生产者:python exchange_producer.py py test3

    这时看到结果:消费者2有收到test3消息,消费者3没有收到消息。

    你还可以运行一条:python exchange_producer.py test3.py test3,可以看到消费者2和3都能接收到消息。所以如果我们匹配文件是以.xxx结尾的我们应该用*而不应该用#。

    其实很简单,只要记住,*至少要匹配1个字符,#可以匹配0到多个。类似正则表达式的+*。

    Remote procedure call (RPC)

    需要在远程计算机上运行功能并等待结果,此模式通常称为远程过程调用RPC

    流程如下图:

    server:

    import pika
    connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
    channel = connection.channel()
    
    channel.queue_declare(queue='rpc_queue') #声明一个队列
    
    def fib(n):
        """
        递归方法求斐波拉契
        :param n: 
        :return: 
        """
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n-1) + fib(n-2)
    
    def on_request(ch,method,props,body): #处理客户端请求
        n = int(body)
        print("[.]fib({})".format(n))
        response = fib(n)   
    
        ch.basic_publish(
            exchange='',
            routing_key=props.reply_to,
            properties=pika.BasicProperties(correlation_id=props.correlation_id),
            body = str(response)
        )#发送消息给客户端
    
        ch.basic_ack(delivery_tag=method.delivery_tag)  #对rpc_queue的消息要进行确认
    
    channel.basic_qos(prefetch_count=1) #做负载均衡
    channel.basic_consume(on_request,queue='rpc_queue')  #等待客户端消息
    
    print("[x] Awaiting RPC requests")
    channel.start_consuming()
    

      client:

    import pika
    import uuid
    
    
    class FibonacciRpcClient(object):
        def __init__(self):
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(host="localhost"))
            self.channel = self.connection.channel()
    
            result = self.channel.queue_declare(exclusive=True)  #生成一个随机队列
            self.callback_queue = result.method.queue
    
            self.channel.basic_consume(self.on_response,
                                       no_ack=True,
                                       queue=self.callback_queue)
    
        def on_response(self,ch,method,props,body):#检测是不是我们需要回复的消息
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self,n): #给服务器发送消息,并等待回复结果
            self.response = None
            self.corr_id = str(uuid.uuid4()) #生成一个随机数字,类似身份证号码,具有唯一性
            self.channel.publish(exchange='',
                                 routing_key='rpc_queue',
                                 properties=pika.BasicProperties(
                                     reply_to=self.callback_queue,
                                     correlation_id = self.corr_id
                                 ),#我的理解reply_to要发送回来的队列,
                                 #在该队列中收到响应,响应所属的请求不清楚,这里需要用correlation_id来区分,为每个请求设置一个唯一的值
                                 body=str(n)
    
                                 )
            while self.response is None:
                self.connection.process_data_events()  #相当于非阻塞版的start_consuming
            return int(self.response)
    
    fibonacci_rpc = FibonacciRpcClient()
    print("[x] Requesting fib(30)")
    response = fibonacci_rpc.call(6)
    print("[.] Got {}".format(response))
    

      

  • 相关阅读:
    5) 十分钟学会android--ActionBar知识串烧
    4) 十分钟学会android--建立第一个APP,启动另一个Activity
    3) 十分钟学会android--建立第一个APP,建立简单的用户界面
    2) 十分钟学会android--建立第一个APP,执行Android程序
    1) 十分钟学会android--建立第一个APP,创建android项目
    08.十分钟学会JSP传统标签编程
    07.十分钟学会tomcat数据源
    06.十分钟学会表达式语言EL
    将JSON打印到页面:
    Json对象的属性如何替换
  • 原文地址:https://www.cnblogs.com/zj-luxj/p/7702150.html
Copyright © 2011-2022 走看看