zoukankan      html  css  js  c++  java
  • RabbitMQ 消息队列

    RabbitMQ是一个在AMQP基础上完整的,可复用的企业消息系统。他遵循Mozilla Public License开源协议。

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过读写出入队列的消息(针对应用程序的数据)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。RabbitMQ可以,多个程序同时使用RabbitMQ ,但是必须队列名称不一样。采用erlang语言,属于爱立信公司开发的。

    术语(Jargon)

    • P(Producing):制造和发送信息的一方。
    • Queue:消息队列。
    • C(Consuming):接收消息的一方。

    1. 安装

    Ubuntu 上安装

    1. 添加源、新增公钥(不加会有警告)、更新源,安装:rabbitmq-server
    echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
    
    wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
    
    sudo apt-get update
    
    sudo apt-get install rabbitmq-server
    
    1. 安装完成后还要配置下:
    # 在 rabbitmq 中添加用户
    hj@hj:~$ sudo rabbitmqctl add_user username password
    Creating user "hj"      # 这为设置成功后的提示,同下
    
    # 将用户设置为管理员(只有管理员才能远程登录)
    hj@hj:~$ sudo rabbitmqctl set_user_tags username administrator
    Setting tags for user "hj" to [administrator]
    
    # 为用户设置读写权限
    hj@hj:~$  sudo rabbitmqctl set_permissions -p / username ".*" ".*" ".*"
    Setting permissions for user "username" in vhost "/"
    


    Windows 上安装

    1. 安装 pika
    pip3 install -i http://pypi.douban.com/simple/ pika --trusted-host pypi.douban.com
    
    1. RabbitMQ 是建立在 Erlang OTP 平台上,所有需要下载 Erlang 和 RabbitMQ,官网上下载安装 ErlangRabbitMQ
    2. 将 Erlang 添加到系统环境变量中

    新建一个 ERLANG_HOME,值为 ERlang 的安装路径(有些安装时会自动添加):

    将 ERLANG_HOME 添加到 path 中(这里以 win10 平台为例,其他平台可能会不一样):

    打开 CMD 以管理员身份证运行,输入 erl 检查 ERlang 是否安装成功:

    C:Windowssystem32>erl
    Eshell V10.3  (abort with ^G)       # 版本
    1>      # 标识符
    
    1. rabbitmq 需要开启后台管理插件 rabbitmq management

    2. 队列通信

    2.1 简单示例

    下面我们来使用 RabbitMQ 来实现一个简单的消息收发:

    • 发送端:一台 Windows 机器
    • 接收端:一台 Ubuntu 虚拟机

    消息不能直接发送到队列,而是需要经过 exchange 转发器转发,只有与转发器绑定了的队列,才能收到消息。在这里我们假设不经过 exchange 转发:

    1. 发送端:
    import pika
    
    credentials = pika.PlainCredentials('username', 'password')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.21.xxx', credentials=credentials))
    
    channel = connection.channel()      # 建立 rabbit 协议通道
    
    # 声明queue
    channel.queue_declare(queue='hello')
    
    # n RabbitMQ a message can never be sent directly to the queue, it always needs to go through an exchange.
    # 消息不能直接发送到队列,而是需要经过 exchange 转发器转发,只有与转发器绑定了的队列,才能收到消息
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body=b'Hello World!')
    print(" [x] Sent 'Hello World!'")
    connection.close()
    

    首先需要输入上面第一章中已经注册的 rabbitmq 账户,然后再连接远程端。

    其次再声明了一个队列 queue,名称为 hello,在这里 exchange 为空,发送的内容 body 必须是 bytes 类型。

    1. 接收端:

    接收端也必须指定队列名称:

    import pika
    import time
    
    
    credentials = pika.PlainCredentials('username', 'password')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.21.128', credentials=credentials))
    
    channel = connection.channel()      # 建立 rabbit 协议通道
    
    channel.queue_declare(queue='hello')
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)
        time.sleep(20)
        print(" [x] msg process done %r" % body)
    
    
    channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
    
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    

    运行结果如下:

    2.2 消息持久化

    我们已经知道即使消费者死亡,消息(队列)也不会丢失(在禁用 no_ack=True的前提下,现在是 auto_ack=True)

    但是如果 RabbitMQ 服务器停止,我们的任务一样会丢失,当 RabbitMQ 退出或奔溃时,将会忘记队列和消息,除非我们告诉它不要这样,那么我们就要将队列和消息标记为持久。

    1. 确保 RabbitMQ 永远不会丢失我们的队列,需要设置 durable=True
    # 发送端,即消息制造者
    channel.queue_declare(queue='task_queue', durable=True)
    
    1. 将消息标记为持久性:
    # 发送端,即消息制造者
    properties=pika.BasicProperties(
        delivery_mode=2,  # make message persistent   使消息持久
    )
    

    设置好之后,发送端先发送一条消息,接收端先不要启动。使用以下命令关闭启动 rabbitmq 服务,观察队列和消息会不会真正丢失:

    # 若命令运行失败,可以尝试使用 管理员模式 sudo
    # 启动rabbitmq
    service rabbitmq-server start
    
    # 停止rabbitmq 
    service rabbitmq-server stop 
    
    # 重启rabbitmq
    service rabbitmq-server restart 
    
    # 查看当前活动的队列 
    rabbitmqctl list_queues
    
    

    2.3 公平分发

    所谓公平分发即一个生产者,多个消费者,类似于负载均衡。

    下面我将设置一个发送端,两个接收端:

    1. 发送端:
    import pika
    import time
    import sys
    
    credentials = pika.PlainCredentials('username', 'password')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.21.128', credentials=credentials))
    
    channel = connection.channel()      # 建立 rabbit 协议通道
    
    # 声明queue
    channel.queue_declare(queue='task_queue', durable=True)
    
    
    message = ' '.join(sys.argv[1:]) or "Hello World! %s" % time.time()
    
    channel.basic_publish(exchange='',
                          routing_key='task_queue',
                          body=bytes(message, encoding='utf-8'),
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent   使消息持久
                          )
    
                          )
    print(" [x] Sent %r" % message)
    connection.close()
    
    
    1. 接收端:
    import pika
    import time
    
    credentials = pika.PlainCredentials('username', 'password')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.21.128', credentials=credentials))
    
    channel = connection.channel()      # 建立 rabbit 协议通道
    
    
    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)    # b'Hello World! 1557373639.5839057'
        time.sleep(20)
        print(" [x] Done")
        print("method.delivery_tag", method.delivery_tag)   # 1
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_consume(on_message_callback=callback, queue='task_queue')
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()
    
    

    另外一个接收端代码一致,在此省略,运行结果如下:

    2.4 根据实际情况分发消息

    事实上服务器之间接收、处理消息的能力是不一样的,受网络、配置等因素影响,因此公平分发消息就会导致以下问题出现:

    • 配置高、网络好的服务器处理消息能力强、快
    • 配置一般、网络不好的服务器有可能就会积压很多未处理的消息

    为此我们可以在接收端设置 prefetch_count=1,如果前面还有消息未处理,就告诉发送端不要给我发消息,直至处理完毕前一条消息为止:

    channel.basic_qos(prefetch_count=1)     # 如果前面有消息没处理完,就不要给我再发消息
    
    

    3. 订阅(广播)

    上面的例子基本上都是一对一发送和接收消息,如果想要将消息发送到所有队列(queue)中,那么就需要用到广播了,而实现广播的一个重要参数就是 exchange—— 消息转发器。

    exchange 在定义时是有类型的,只有符合条件的才能接收消息,大致可分为以下几类:

    • fanout(全民广播):凡是绑定 exchange 的队列都可以接收到消息
    • direct(组播):以组为单位接收消息,如:发送到某个组,那么这个组里的所有队列都能接收,routingKey 为关键字/组名
    • topic(根据特征收发消息):所有符合 routingKey 绑定的队列都可以接收消息

    3.1 fanout 方式

    所有绑定 exchange 的 queue 都能接收到消息。

    应用场景:视频直播

    1. 发送端:
    import pika
    import sys
    
    credentials = pika.PlainCredentials('username', 'password')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.21.128', credentials=credentials))
    
    channel = connection.channel()      # 建立 rabbit 协议通道
    
    # 指定 exchange 类型、名字
    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=bytes(message, encoding='utf-8'))
    
    print(" [x] Sent %r" % message)
    connection.close()
    
    
    1. 接收端:
    import pika
    
    credentials = pika.PlainCredentials('username', 'password')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.21.128', credentials=credentials))
    
    channel = connection.channel()      # 建立 rabbit 协议通道
    
    
    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    
    # 不指定queue名字, rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    # 最新源代码需要执行 queue,如果为 '',则 if empty string, the broker will create a unique queue name
                
    result = channel.queue_declare('', exclusive=True)
    queue_name = result.method.queue
    
    # result = <METHOD(['channel_number=1', 'frame_type=1', "method=<Queue.DeclareOk(['consumer_count=0', 'message_count=0', 'queue=amq.gen-hRrQ-pwaT9u-32CcIokCxA'])>"])>
    
    # queue_name = amq.gen-hRrQ-pwaT9u-32CcIokCxA
    
    
    channel.queue_bind(exchange='logs', queue=queue_name)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    
    channel.basic_consume(on_message_callback=callback, queue=queue_name)
    
    channel.start_consuming()
    
    

    打开两个终端,分别运行:

    python3 fanout_send.py t1
    python3 fanout_send.py t2
    
    

    运行结果如下:

    3.2 direct 方式

    RabbitMQ 还可以根据关键字发送接收消息,队列绑定关键字,发送端根据关键字发送到 exchange,exchange 再根据关键字判断发给哪个队列。

    1. 发送端:
    import pika
    import sys
    
    credentials = pika.PlainCredentials('username', 'password')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.21.128', credentials=credentials))
    
    channel = connection.channel()      # 建立 rabbit 协议通道
    
    channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
    
    # python3 direct_send.py info
    severity = sys.argv[1] if len(sys.argv) > 1 else 'info'  # 严重程度,级别, info
    
    message = ' '.join(sys.argv[2:]) or 'Hello World!'      # Hello World!
    
    channel.basic_publish(exchange='direct_logs',
                          routing_key=severity,
                          body=bytes(message, encoding='utf-8'))
    print(" [x] Sent %r:%r" % (severity, message))  # [x] Sent 'info' : 'Hello World!'
    connection.close()
    
    
    1. 接收端:
    import pika
    import sys
    
    credentials = pika.PlainCredentials('username', 'password')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.21.128', credentials=credentials))
    
    channel = connection.channel()      # 建立 rabbit 协议通道
    
    channel.exchange_declare(exchange='direct_logs', exchange_type='direct')
    
    result = channel.queue_declare('', exclusive=True)
    queue_name = result.method.queue
    
    # python3 direct_recv.py info warning error
    # python3 direct_recv.py info
    # python3 direct_recv.py error
    
    severities = sys.argv[1:]       # ['direct_recv.py', 'info', 'warning', 'error']、['direct_recv.py', 'error']、['direct_recv.py', 'info']
    
    if not severities:
        sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
        sys.exit(1)
    
    # 循环绑定关键字
    for severity in severities:
        channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    
    channel.basic_consume(on_message_callback=callback, queue=queue_name)
    
    channel.start_consuming()
    
    

    接收端执行打开三个终端,分别执行:

    python3 direct_recv.py info warning error
    python3 direct_recv.py info
    python3 direct_recv.py error
    
    

    然后循环关键字,绑定队列(queue),发送端执行相应关键字,接收端这边就能根据关键字接收消息。

    运行结果如下:

    3.3 topic 方式

    1. 发送端:
    import pika
    import sys
    
    credentials = pika.PlainCredentials('username', 'password')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.21.128', credentials=credentials))
    
    channel = connection.channel()      # 建立 rabbit 协议通道
    
    
    channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
    
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    message = ' '.join(sys.argv[2:]) or 'Hello World!'
    
    channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=bytes(message, encoding='utf-8'))
    print(" [x] Sent %r:%r" % (routing_key, message))
    connection.close()
    
    
    1. 接收端:
    import pika
    import sys
    
    credentials = pika.PlainCredentials('username', 'password')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.21.128', credentials=credentials))
    
    channel = connection.channel()      # 建立 rabbit 协议通道
    
    
    channel.exchange_declare(exchange='topic_logs', exchange_type='topic')
    
    result = channel.queue_declare('', exclusive=True)
    queue_name = result.method.queue
    
    binding_keys = sys.argv[1:]
    if not binding_keys:
        sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
        sys.exit(1)
    
    for binding_key in binding_keys:
        channel.queue_bind(exchange='topic_logs',
                           queue=queue_name,
                           routing_key=binding_key)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    
    channel.basic_consume(on_message_callback=callback, queue=queue_name)
    
    channel.start_consuming()
    
    

    接收端开启四个终端,发送端开启一个:

    # 接收端
    python3 topic_recv.py *.django.*    # 消息两端可以是任意,中间只要是 django 即可
    python3 topic_recv.py #         # 可以接收任意消息
    python3 topic_recv.py mysql.*   # 以 mysql 开头,结尾可以是任意
    python3 topic_recv.py mysql.error.*     # mysql.error 开头,结尾任意
    
    # 发送端
    python3 topic_send.py mysql.error.info
    python3 topic_send.py ss.django.123
    python3 topic_send.py mysql.error err happend
    python3 topic_send.py python.error test
    
    

    运行结果如下:

    总结

    • # 号能匹配任意消息,相当于广播
    • * 号也可以匹配任意,但是必须和其他一起使用

    4. RPC(Remote Procedure Call)双向传输

    上面收发消息都是单向的,即一个发一个接收,接收的不能够发送。而 RPC 是双向的,既能够发送也能接收。

    应用场景:RPC 服务功能

    1. 发送端:
    import pika
    import uuid
    
    
    class FibonacciRpcClient(object):
        def __init__(self):
    
            credentials = pika.PlainCredentials('username', 'password')
            self.connection = pika.BlockingConnection(pika.ConnectionParameters(
                '192.168.21.128', credentials=credentials))
    
            channel = self.connection.channel()  # 建立 rabbit 协议通道
    
            self.channel = self.connection.channel()
    
            result = self.channel.queue_declare('', exclusive=True)
            self.callback_queue = result.method.queue
    
            self.channel.basic_consume(on_message_callback=self.on_response,
                                       queue=self.callback_queue,
                                       auto_ack=True)  #准备接受命令结果
    
        def on_response(self, ch, method, props, body):
            """"callback方法"""
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4()) #唯一标识符
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue',
                                       properties=pika.BasicProperties(
                                           reply_to=self.callback_queue,
                                           correlation_id=self.corr_id,
                                       ),
                                       body=str(n))
    
            count = 0
            while self.response is None:
                self.connection.process_data_events() #检查队列里有没有新消息,但不会阻塞
                count += 1
                print("check...", count)
    
            return int(self.response)
    
    
    fibonacci_rpc = FibonacciRpcClient()
    
    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(5)
    print(" [.] Got %r" % response)
    
    
    1. 接收端:
    import pika
    import time
    
    credentials = pika.PlainCredentials('username', 'password')
    connection = pika.BlockingConnection(pika.ConnectionParameters(
        '192.168.21.128', credentials=credentials))
    
    channel = connection.channel()      # 建立 rabbit 协议通道
    
    channel.queue_declare(queue='rpc_queue')
    
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n - 1) + fib(n - 2)
    
    
    def on_request(ch, method, props, body):
        n = int(body)
    
        print(" [.] fib(%s)" % n)
        response = fib(n)
    
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id=props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(on_message_callback=on_request, queue='rpc_queue')
    
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
    
    

    运行结果如下:

    5. 参考:

    6. 常用命令

    #创建用户
    rabbitmqctl add_user rabbitadmin 123456
    rabbitmqctl set_user_tags rabbitadmin administrator
    
    # 给用户授权
    rabbitmqctl set_permissions -p / rabbitadmin ".*" ".*" ".*"
    
    # 开启插件管理页面
    rabbitmq-plugins enable rabbitmq_management
    
    rabbitmq-server start 	# 启动服务
    rabbitmq-server stop	 # 关闭服务
    rabbitmq-server restart	 # 重启服务
    rabbitmq-server status 	# 查看服务状态
    
    ps -ef|grep rabbitmq	# 查看端口
    rabbitmqctl list_queues		# 查看队列消息
    ./rabbitmqctl list_users	#	查看用户列表命令
    rabbitmqctl  delete_user  Username		# 删除用户命令
    whereis rabbitmq	#查看rabbitmq安装目录
    
    
  • 相关阅读:
    codeforces 57C 思维
    FOJ 2232 匈牙利算法找二分图最大匹配
    UVA 101 vector
    POJ 3070 矩阵mob
    codeforces 60B bfs
    codeforces 54A
    codeforces 466C 计数 codeforces 483B 二分 容斥
    FOJ 2213 简单几何
    CCF-最优配餐(BFS)
    杂论-FTP
  • 原文地址:https://www.cnblogs.com/midworld/p/10847192.html
Copyright © 2011-2022 走看看