zoukankan      html  css  js  c++  java
  • RabbitMQ消息队列

    一、消息队列

    消息队列就是一种先进先出的数据机构

    当在分布式系统中的时候,不同的机器需要做数据交互,所以涉及到不同机器之间的数据交互,这样的话就需要借助专业的消息队列,常见的消息队列有 RabbitMQ 、Kafka...他们都是开源且支持语言较多。

    消息队列解决的问题:

    1. 应用解耦

    2. 流量消峰:

      如果订单系统一秒最多能处理一万次订单,这个处理能力在平时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统是处理不了的,只能限制订单超过一万后不允许用户下单。

      但是使用消息队列,就可以取消这个限制,把这一秒内的订单放入队列中分散成一段时间来处理,这样用户就可能在下单几十秒后才能收到下单成功的操作,但是比不能下单要好。

    3. 消息分发:

      当A发送一次消息,B对消息感兴趣,就只需监听消息,C感兴趣,C也去监听消息,而A完全不需要改动。

    4. 异步消息(Celery 就是对消息队列的分装)

    RabbitMQ 和 Kafka

    RabbitMQ :吞吐量小,有消息确认(对消息可靠性有要求,就用它)

    Kafka:吞吐量高,注重高吞吐量,不注重消息的可靠性,数据量特别大

    二、按装RabbitMQ

    1、原生安装:

    # 安装扩展epel源
    wget -O /etc/yum.repos.d/epel.repo http://mirrors.aliyun.com/repo/epel-7.repo
        
    yum -y install erlang			# 因为RabbitMQ是erlang语言开发的,所以要按装
    yum -y install rabbitmq-server	# 安装RabbitMQ
    systemctl start rabbitmq-server	# 启动
    
    # 创建用户
    rabbitmqctl add_user 用户名 密码
    # 分配权限
    rabbitmqctl set_user_tags 用户名 administrator ——>(设置用户为管理员角色)
    rabbitmqctl set_permissions -p "/" 用户名 ".*" ".*" ".*"	# 设置权限
    systemctl reatart rabbitmq-server	# 重启
    

    2、docker拉取

    docker pull rabbitmq:3.8.3-management		# 自动开启了web管理界面
    
    # 启动需要配置用户名和密码
    docker run -di --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:3.8.3-management
    
    5672:是RabbitMQ的默认端口
    15672:web管理界面的端口
    

    三、基本使用

    生产者:

    import pika
    
    # 有用户名密码
    credentials = pika.PlainCredentials('admin', 'admin')
    # 拿到连接对象
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
    
    # 拿到channel对象
    channel = connection.channel()
    
    # 声明一个队列
    channel.queue_declare(queue='test')
    
    # 生产者向队列中放入一条消息
    channel.basic_publish(exchange='',
                          routing_key='test',   # 指定向那个队列放入
                          body='测试数据')        # 放入的内容
                          
    # 关闭连接
    connection.close()
    

    消费者:

    import pika
    
    # 有用户名密码
    credentials = pika.PlainCredentials('admin', 'admin')
    # 拿到连接对象
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
    
    # 拿到channel对象
    channel = connection.channel()
    
    # 声明一个队列,如果消费者先起来,那么就先声明一个队列
    channel.queue_declare(queue='test')
    
    
    def callback(ch, method, properties, body):
        print(f'测试:{body}')
    
    
    # 消费者从指定的队列中拿消息消费,一旦有一条转到 callback 里
    channel.basic_consume(queue='test', on_message_callback=callback, auto_ack=True)
    
    # 阻塞主,一直等待拿消息消费
    channel.start_consuming()
    

    四、确认机制

    消息确认机制其实就是消费者中 auto_ack 的设置

    生产者不变

    消费者:

    import pika
    
    # 有用户名密码
    credentials = pika.PlainCredentials('admin', 'admin')
    # 拿到连接对象
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
    
    # 拿到channel对象
    channel = connection.channel()
    
    # 声明一个队列,如果消费者先起来,那么就先声明一个队列
    channel.queue_declare(queue='test')
    
    
    def callback(ch, method, properties, body):
        print(f'测试:{body}')
        # 如果auto_ack=False这样设置后
        # 也可以这样设置当真正的消息处理完了,在发确认也是可以的
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    # auto_ack=True,队列收到确认,就会自动把消费过的消息删除。
    # auto_ack=False,那么就不会给队列发送确认消息了,队列就不会删除消息。不会自动回复确认消息,
    channel.basic_consume(queue='test', on_message_callback=callback, auto_ack=False)
    
    # 阻塞主,一直等待拿消息消费
    channel.start_consuming()
    

    五、持久化

    队列持久化:就是在声明队列的时候,指定持久化durable=True,队列必须是新的才可以

    channel.queue_declare(queue='test', durable=True)	# test 队列持久化
    

    消息持久化:就是在发布消息的时候添加

    # 生产者向队列中放入一条消息
    channel.basic_publish(exchange='',
                          routing_key='test',   # 指定向那个队列放入
                          body='hello',         # 放入的内容
                          properties=pika.BasicProperties(delivery_mode=2) # 消息持久化
                          )
    

    六、闲置消费

    当正常情况下如果有多个消费者,那么就会按照顺序第一个消息给 第一个消费者,第二个消息给第二个消费者

    但是当第一个消息的消费者处理信息很耗时,一直没有结束,那么就可以让第二个消费者优先获取闲置消息。

    消费者:

    import pika
    
    # 有用户名密码
    credentials = pika.PlainCredentials('admin', 'admin')
    # 拿到连接对象
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
    
    # 拿到channel对象
    channel = connection.channel()
    
    # 声明一个队列,如果消费者先起来,那么就先声明一个队列
    channel.queue_declare(queue='test')
    
    
    def callback(ch, method, properties, body):
        print(f'测试:{body}')
    
    
    # 就只有这一句话,谁闲置谁获取,没必要按照顺序一个一个来
    channel.basic_qos(prefetch_count=1)
    
    channel.basic_consume(queue='test', on_message_callback=callback, auto_ack=True)
    
    # 阻塞主,一直等待拿消息消费
    channel.start_consuming()
    

    七、发布订阅

    发布订阅就是:我可以有多个订阅者来订阅你的消息,这样发布者只需要发布一条, 我的所有只要订阅你的人都可以消费你的消息

    模型:当我的订阅者起来了之后,就会创建一个队列,多个订阅者就会创建多个队列,当发布者生产了消息之后,会传给 exchange ,然后 exchange 会把消息复制分别分发到订阅者创建的队列中,这样就实现了只要监听你,那就能收到你发的消息。

    基本使用

    发布者:

    import pika
    
    # 有用户名密码
    credentials = pika.PlainCredentials('admin', 'admin')
    # 拿到连接对象
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
    
    # 拿到channel对象
    channel = connection.channel()
    
    # 不指定队列,指定了 exchange 复制分发消息
    channel.exchange_declare(exchange='conn', exchange_type='fanout')
    
    # 生产者向队列中放入一条消息
    channel.basic_publish(exchange='conn',      # 指定复制分发消息的 exchange
                          routing_key='',   	# 不设置指定向那个队列放入
                          body='Hello Word',    # 放入的内容
                          )
    # 关闭连接
    connection.close()
    

    订阅者:启动多次,都绑定到了同一个 exchange,所以就会都收到同一个 exchange 分发的消息

    import pika
    
    # 有用户名密码
    credentials = pika.PlainCredentials('admin', 'admin')
    # 拿到连接对象
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
    
    
    # 拿到channel对象
    channel = connection.channel()
    
    
    # 声明一个队列,如果消费者先起来,那么就先声明一个队列
    channel.exchange_declare(exchange='conn', exchange_type='fanout')
    
    
    # queue 不能制定名字,因为它们的名字都是不一样的
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue    # 生成一个随机的 queue 名字
    
    
    
    # 把随机生成的队列绑定到exchange上
    channel.queue_bind(exchange='conn', queue=queue_name)
    
    
    def callback(ch, method, properties, body):
        print(f'测试:{body}')
    
    
    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    # 阻塞主,一直等待拿消息消费
    channel.start_consuming()
    

    关键字

    需要设置 exchange_type 的类型为 direct

    并且在发布消息的时候设置多个关键字:routing_key

    在订阅者中也需要设置 exchange_type 的类型 direct

    并且当订阅者绑定 exchange 的时候也需要设置 routing_key,

    这样的话在发布者发布消息后,exchange 会根据发布者和订阅者设置的 routing_key 进行匹配,当订阅者的 routing_key 匹配上了发布者的 routing_key 的话,那么订阅者就可以接收到发布者发布的消息,反之收不到消息。

    发布者:

    import pika
    
    # 有用户名密码
    credentials = pika.PlainCredentials('admin', 'admin')
    # 拿到连接对象
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
    
    # 拿到channel对象
    channel = connection.channel()
    
    # 不指定队列,指定了 exchange 复制分发消息,exchange_type='direct'
    channel.exchange_declare(exchange='conn1', exchange_type='direct')
    
    # 生产者向队列中放入一条消息
    channel.basic_publish(exchange='conn1',     # 指定复制分发消息的 exchange
                          routing_key='abc',   # 指定关键字
                          body='Hello Word',    # 放入的内容
                          )
    # 关闭连接
    connection.close()
    

    消费者1:

    import pika
    
    # 有用户名密码
    credentials = pika.PlainCredentials('admin', 'admin')
    # 拿到连接对象
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
    
    
    # 拿到channel对象
    channel = connection.channel()
    
    
    # 声明一个队列,如果消费者先起来,那么就先声明一个队列
    channel.exchange_declare(exchange='conn', exchange_type='direct')
    
    
    # queue 不能制定名字,因为它的名字都是不一样的
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue    # 生成一个随机的 queue 名字
    print(queue_name)
    
    
    # 把随机生成的队列绑定到exchange上,
    # 并设置routing_key='abc',也就是说只有发布者的routing_key中包含有'abc',此订阅者才会收到消息
    channel.queue_bind(exchange='conn1', queue=queue_name, routing_key='abc')
    
    
    def callback(ch, method, properties, body):
        print(f'测试:{body}')
    
    
    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    # 阻塞主,一直等待拿消息消费
    channel.start_consuming()
    

    消费者2:

    import pika
    
    # 有用户名密码
    credentials = pika.PlainCredentials('admin', 'admin')
    # 拿到连接对象
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
    
    
    # 拿到channel对象
    channel = connection.channel()
    
    
    # 声明一个队列,如果消费者先起来,那么就先声明一个队列
    channel.exchange_declare(exchange='conn', exchange_type='direct')
    
    
    # queue 不能制定名字,因为它的名字都是不一样的
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue    # 生成一个随机的 queue 名字
    print(queue_name)
    
    
    # 把随机生成的队列绑定到exchange上,
    # 并设置了多个routing_key,也就是说只有发布者的routing_key中包含有入下两个之一,此订阅者都会收到消息
    channel.queue_bind(exchange='conn1', queue=queue_name, routing_key='abc')
    channel.queue_bind(exchange='conn1', queue=queue_name, routing_key='abcd')
    
    
    def callback(ch, method, properties, body):
        print(f'测试:{body}')
    
    
    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    # 阻塞主,一直等待拿消息消费
    channel.start_consuming()
    

    模糊匹配

    在订阅者绑定匹配的时候可以进行模糊匹配发布者的 routing_key ,匹配上了就能接收到发布者发布的消息

    # 表示后面可以跟任意字符
    * 表示后面只能跟一个单词
    

    发布者:

    import pika
    
    # 有用户名密码
    credentials = pika.PlainCredentials('admin', 'admin')
    # 拿到连接对象
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
    
    # 拿到channel对象
    channel = connection.channel()
    
    # 不指定队列,指定了 exchange 复制分发消息,exchange_type='topic'
    channel.exchange_declare(exchange='conn1', exchange_type='topic')
    
    # 生产者向队列中放入一条消息
    channel.basic_publish(exchange='conn2',     # 指定复制分发消息的 exchange
                          routing_key='abcdefg',    # 指定关键字
                          body='Hello Word',    # 放入的内容
                          )
    # 关闭连接
    connection.close()
    

    订阅者:

    import pika
    
    # 有用户名密码
    credentials = pika.PlainCredentials('admin', 'admin')
    # 拿到连接对象
    connection = pika.BlockingConnection(pika.ConnectionParameters('192.168.88.131', credentials=credentials))
    
    
    # 拿到channel对象
    channel = connection.channel()
    
    
    # 声明一个队列,如果消费者先起来,那么就先声明一个队列
    channel.exchange_declare(exchange='conn', exchange_type='direct')
    
    
    # queue 不能制定名字,因为它的名字都是不一样的
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue    # 生成一个随机的 queue 名字
    print(queue_name)
    
    
    # 把随机生成的队列绑定到exchange上,
    # 并设置routing_key='abc#',也就是说只有发布者的routing_key中包含有'abc'开头,此订阅者才会收到消息
    channel.queue_bind(exchange='conn2', queue=queue_name, routing_key='abc#')
    
    
    def callback(ch, method, properties, body):
        print(f'测试:{body}')
    
    
    channel.basic_consume(queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    # 阻塞主,一直等待拿消息消费
    channel.start_consuming()
    

    八、python中的RPC框架

    RPC :远程过程调用

    例如:两个服务调用,服务1通过网络调用服务2的方法。

    SimpleXMLRPCServer

    自带的:数据包大,速度慢

    服务端:

    from xmlrpc.server import SimpleXMLRPCServer
    
    
    class RPCServer(object):
    
        def getObj(self):
            return 'get obj'
    
        def sendObj(self, data):
            return 'send obj'
    
    
    # SimpleXMLRPCServer
    server = SimpleXMLRPCServer(('localhost', 4242), allow_none=True)
    server.register_introspection_functions()
    server.register_instance(RPCServer())
    server.serve_forever()
    

    客户端:

    from xmlrpc.client import ServerProxy
    
    client = ServerProxy('http://localhost:4242')
    ret = client.getObj()
    print(ret)
    

    ZeroRPC

    第三方的:底层使用 ZeroMQ 和 MessagePack ,速度快,响应时间短,并发高。

    服务端:

    import zerorpc
    
    class RPCServer(object):
    
        def getObj(self):
            return 'get obj'
    
        def sendObj(self, data):
            return 'send obj'
    
    
    server = zerorpc.Server(RPCServer())
    server.bind('tcp://0.0.0.0:4243')   # 允许连接的
    server.run()
    

    客户端:

    import zerorpc
    
    client = zerorpc.Client()
    client.connect('tcp://127.0.0.1:4243')  # 连接
    ret = client.getObj()
    print(ret)
    
    学习之旅
  • 相关阅读:
    Linux下新建服务
    查看MYSQL日志(包含最近锁表日志)
    Linux后台运行进程
    MYSQL分析慢查询
    Linux下打开超大文件方法
    通过文件列表打包文件
    linux学习笔记<基本知识普及>
    虚拟机的安装
    Android NDK编程,引入第三方.so库
    linux下软件安装与卸载
  • 原文地址:https://www.cnblogs.com/XiaoYang-sir/p/15077518.html
Copyright © 2011-2022 走看看