zoukankan      html  css  js  c++  java
  • RabbitMQ

    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消 息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过 队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。

    sudo apt install erlang
    
    sudo apt install rabbitmq-server
    
    python api: pip3 install pika
    linux安装

    1、简单模式(一对一的发送)

    生产者发送消息给交换机

    交换机接收消息,如果交换机没有绑定队列,消息扔进垃圾桶

    队列接收消息,存储在内存,等待消费者连接监听获取消息,消费成功后,返回确认

    一些场景:短信,QQ

    2、工作模式(资源的争抢)

    生产者将消息发送给交换机

    交换机发送给绑定的后端队列

    一个队列被多个消费者同时监听,形成消息的争抢结构:根据消费者所在的系统的空闲、性能争抢队列中的消息

    一些场景:抢红包

    3、发布订阅(交换机类型为fanout)

     注:图中未画消费者

    交换机定义类型为:fanout

    交换机绑定多个队列

    生产者将消息发送给交换机,交换机复制同步消息到后端所有的队列中

    一些场景:邮件群发

    4、路由模式(交换机类型:direct)

    交换机定义类型为:direct

    交换机绑定多个队列,队列绑定交换机时,给交换机提供了一个routingkey(路由key)

    发布订阅时,所有fanout类型的交换机绑定后端队列用的路由key都是“”;在路由模式中需要绑定队列时提供当前队列的具体路由key

    一些场景:错误消息的接收和提示

    5、主题模式(交换机类型:topic)

    ①交换机定义类型为:topic

    ②交换机绑定多个队列,与路由模式非常相似,做到按类划分消息

    ③路由key队列绑定的通配符如下:#表示任意字符串,*表示没有特殊符号(单词)的字符串

    python操作

    sudo rabbitmqctl add_user jcr 123
    # 设置用户tag
    sudo rabbitmqctl set_user_tags wupeiqi abc
    # 设置权限
    sudo rabbitmqctl set_permissions -p "/" jcr ".*" ".*" ".*"
    
    # 然后重启rabbiMQ服务
    sudo /etc/init.d/rabbitmq-server restart
     
    # 然后可以使用刚才的用户远程连接rabbitmq server了。
    
    
    ------------------------------
    credentials = pika.PlainCredentials("jcr","123")
    
    connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1',credentials=credentials))
    设置远程密码
    con.add_timeout(5, lambda:can.stop_consuming())
    设置超时时间
    #!/usr/bin/env python
    # -*- coding:utf-8 -*-
    import Queue
    import threading
    
    
    message = Queue.Queue(10)
    
    
    def producer(i):
        while True:
            message.put(i)
    
    
    def consumer(i):
        while True:
            msg = message.get()
            print(msg)
    
    
    for i in range(12):
        t = threading.Thread(target=producer, args=(i,))
        t.start()
    
    for i in range(10):
        t = threading.Thread(target=consumer, args=(i,))
        t.start()        
    基于Queue实现生产者消费者模式

     

     1 #!/usr/bin/env python
     2 import pika
     3  
     4 # ######################### 生产者 #########################
     5  
     6 connection = pika.BlockingConnection(pika.ConnectionParameters(
     7         host='localhost'))
     8 channel = connection.channel()
     9  
    10 channel.queue_declare(queue='hello')
    11  
    12 channel.basic_publish(exchange='',
    13                       routing_key='hello',
    14                       body='Hello World!')
    15 print(" [x] Sent 'Hello World!'")
    16 connection.close()
    17 
    18 
    19 
    20 
    21 
    22 
    23 
    24 
    25 
    26 
    27 
    28 
    29 #!/usr/bin/env python
    30 import pika
    31  
    32 # ########################## 消费者 ##########################
    33  
    34 connection = pika.BlockingConnection(pika.ConnectionParameters(
    35         host='localhost'))
    36 channel = connection.channel()
    37  
    38 channel.queue_declare(queue='hello')
    39  
    40 def callback(ch, method, properties, body):
    41     print(" [x] Received %r" % body)
    42  
    43 channel.basic_consume(
    44                       'hello',
    45                       auto_ack=True,
    46                       on_message_callback=callback)
    47  
    48 print(' [*] Waiting for messages. To exit press CTRL+C')
    49 channel.start_consuming()
    对于RabbitMQ来说,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。
     1 '''
     2 auto-ack = False,如果消费者遇到情况(its channel is closed, connection is closed, or TCP connection is lost)挂掉了,那么,RabbitMQ会重新将该任务添加到队列中。
     3 '''
     4 
     5 
     6 import pika
     7 
     8 connection = pika.BlockingConnection(pika.ConnectionParameters(
     9         host='10.211.55.4'))
    10 channel = connection.channel()
    11 
    12 channel.queue_declare(queue='hello')
    13 
    14 def callback(ch, method, properties, body):
    15     print(" [x] Received %r" % body)
    16     import time
    17     time.sleep(10)
    18     print 'ok'
    19     ch.basic_ack(delivery_tag = method.delivery_tag)
    20 
    21 channel.basic_consume(
    22                       'hello',
    23                       auto_ack=False,
    24                       on_message_callback=callback)
    25 
    26 print(' [*] Waiting for messages. To exit press CTRL+C')
    27 channel.start_consuming()
    28 
    29 #简单的说就是队列中阻塞就等着,如果该阻塞的进程被关了,那就会直接发给另一个
    acknowledgment 消息不丢失

     1 #!/usr/bin/env python
     2 import pika
     3 
     4 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
     5 channel = connection.channel()
     6 
     7 # make message persistent
     8 channel.queue_declare(queue='hello', durable=True)
     9 
    10 channel.basic_publish(exchange='',
    11                       routing_key='hello',
    12                       body='Hello World!',
    13                       properties=pika.BasicProperties(
    14                           delivery_mode=2, # make message persistent
    15                       ))
    16 print(" [x] Sent 'Hello World!'")
    17 connection.close()
    18 
    19 
    20 
    21 
    22 #!/usr/bin/env python
    23 # -*- coding:utf-8 -*-
    24 import pika
    25 
    26 connection = pika.BlockingConnection(pika.ConnectionParameters(host='127.0.0.1'))
    27 channel = connection.channel()
    28 
    29 # make message persistent
    30 channel.queue_declare(queue='hello', durable=True)
    31 
    32 
    33 def callback(ch, method, properties, body):
    34     print(" [x] Received %r" % body)
    35     import time
    36     time.sleep(10)
    37     print 'ok'
    38     ch.basic_ack(delivery_tag = method.delivery_tag)
    39 
    40 channel.basic_consume(
    41                       'hello',
    42                       auto_ack=False,
    43                       on_message_callback=callback)
    44 
    45 print(' [*] Waiting for messages. To exit press CTRL+C')
    46 channel.start_consuming()
    durable 消息不丢失 # 持久化,如果没有消费者不会被丢掉,直到出现接收

    !!!测试时ex不要重名,会报错

    发布订阅

    发布订阅和简单的消息队列区别在于,发布订阅会将消息发送给所有的订阅者,而消息队列中的数据被消费一次便消失。所以,RabbitMQ实现发布和订阅时,会为每一个订阅者创建一个队列,而发布者发布消息时,会将消息放置在所有相关队列中。

    #!/usr/bin/env python
    import pika
    import sys
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    channel.basic_publish(exchange='logs',
                          routing_key='logs',
                          body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    
    
    
    
    
    
    #!/usr/bin/env python
    import pika
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
            host='localhost'))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs',
                             exchange_type='fanout')
    
    result = channel.queue_declare('logs',exclusive=True)
    queue_name = result.method.queue
    
    channel.queue_bind(exchange='logs',
                       queue=queue_name)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(
                         queue_name,
                          auto_ack=True,on_message_callback=callback)
    
    channel.start_consuming()
    exchange type = fanout

    关键字发送

    之前事例,发送消息时明确指定某个队列并向其中发送消息,RabbitMQ还支持根据关键字发送,即:队列绑定关键字,发送者将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列。

    如图就是exchange和带关键字的queue绑定进行分发routing_key
    exchange type = direct

    模糊匹配

     exchange type = topic

    在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

    • # 表示可以匹配 0 个 或 多个 单词
    • *  表示只能匹配 一个 单词
    • anonymous.info    所有发布者的info
    发送者路由值              队列中
    old.boy.python          old.*  -- 不匹配
    old.boy.python          old.#  -- 匹配
  • 相关阅读:
    C#基础概念二十五问
    Jpage分页 通用.net2.0分页程序
    使用ADO.net转换数据到Excel格式并提供下载
    .Net中FckEditor的配置和使用方法(含示例源码)
    利用Wildcard ISAPI Mapping隐藏扩展名[转]
    写简历最易犯五大愚蠢错误
    程序员35岁前成功的12条黄金法则
    目前较为流行的Ajax框架一览
    身份证的验证(支持15位与18位)
    使用ISAPI_Rewrite对asp.net实现URL重写伪静态
  • 原文地址:https://www.cnblogs.com/JcrLive/p/12469792.html
Copyright © 2011-2022 走看看