zoukankan      html  css  js  c++  java
  • RabbitMQ

    一、简介

      消息队列就是基础数据结构中的“先进先出”的一种数据机构,

      RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、Java、JMS、C等,支持AJAX。

      用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

    二、RabbitMQ的作用

      1.主要应用于应用解耦

    以电商应用为例,应用中有订单系统、库存系统、物流系统、支付系统。用户创建订单后,如果耦合调用库存系统、物流系统、支付系统,任何一个子系统出了故障,都会造成下单操作异常。
    
    当转变成基于消息队列的方式后,系统间调用的问题会减少很多,比如物流系统因为发生故障,需要几分钟来修复。在这几分钟的时间里,物流系统要处理的内存被缓存在消息队列中,用户
    的下单操作可以正常完成。当物流系统恢复后,继续处理订单信息即可,中单用户感受不到物流系统的故障。提升系统的可用性

            

      2.流量削峰

    举个栗子,如果订单系统最多能处理一万次订单,这个处理能力应付正常时段的下单时绰绰有余,正常时段我们下单一秒后就能返回结果。但是在高峰期,如果有两万次下单操作系统
    是处理不了的,只能限制订单超过一万后不允许用户下单。 使用消息队列做缓冲,我们可以取消这个限制,把一秒内下的订单分散成一段时间来处理,这事有些用户可能在下单十几秒后才能收到下单成功的操作,但是比不能下单的体验要好。

       3.消息分发

        生产者只负责生产消息,消费者只要监听了生产者 ,那么生产者发的消息就能被接收。

          

      4.异步消息

    使用消息总线,可以很方便解决这个问题,A调用B服务后,只需要监听B处理完成的消息,当B处理完成后,会发送一条消息给MQ,MQ会将此消息转发给A服务。
    
    这样A服务既不用循环调用B的查询api,也不用提供callback api。同样B服务也不用做这些操作。A服务还能及时的得到异步处理成功的消息

        

      5.常见消息队列及比较

        

        总结:

    Kafka在于分布式架构,RabbitMQ基于AMQP协议来实现,RocketMQ/思路来源于kafka,改成了主从结构,在事务性可靠性方面做了优化。广泛来说,电商、
    金融等对事务性要求很高的,可以考虑RabbitMQ和RocketMQ,对性能要求高的可考虑Kafka

    三、安装

      服务端安装:

    # 原生:
    # 安装配置epel源
    # 安装erlang
    yum -y install erlang
    # 安装RabbitMQ
    yum -y install rabbitmq-server
    
    
    # 使用Docker
    docker pull rabbitmq:management
    docker run -di --name Myrabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management

      客户端:

    pip3 install pika

    四、使用

      基本使用:

    # 生产者
    
    import pika
    
    # 拿到连接对象
    # 有用户名密码的情况
    # connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200'))
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    
    # 拿到channel对象
    channel = connection.channel()
    
    # 声明一个队列
    channel.queue_declare(queue='hello')  # 指定队列名字
    
    # 生产者向队列中放一条消息
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='hello world')
    print(" Sent 'Hello World!'")
    # 关闭连接
    connection.close()
    # 消费者
    import pika
    
    
    def main():
        # 无密码
        # connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.0.0.200'))
        credentials = pika.PlainCredentials("admin", "admin")
        connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
        channel = connection.channel()
    
        channel.queue_declare(queue='hello')
    
        def callback(ch, method, properties, body):
            print(" [x] Received %r" % body)
    
        channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
    
        channel.start_consuming()
    
    
    if __name__ == '__main__':
        main()

      消息确认(保障安全)

    import pika
    
    # 无密码
    # connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
    
    # 有密码
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()
    # 声明一个队列(创建一个队列)
    channel.queue_declare(queue='qjk')
    
    channel.basic_publish(exchange='',
                          routing_key='qjk',  # 消息队列名称
                          body='hello world')
    connection.close()
    生产者
    import pika
    
    
    def main():
        credentials = pika.PlainCredentials("admin", "admin")
        connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
        channel = connection.channel()
    
        # 声明一个队列(创建一个队列)
        channel.queue_declare(queue='qjk')
    
        def callback(ch, method, properties, body):
            print("消费者接受到了任务: %r" % body)
            # 真正的消息处理完了,再发确认
            ch.basic_ack(delivery_tag=method.delivery_tag)
    
        # 不会自动回复确认消息,
        # auto_ack=True,队列收到确认,就会自动把消费过的消息删除
        channel.basic_consume(queue='qjk', on_message_callback=callback, auto_ack=False)
    
        channel.start_consuming()
    
    
    if __name__ == '__main__':
        main()
    消费者

      持久化

    import pika
    
    # 无密码
    # connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
    
    # 有密码
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()
    # 声明一个队列(创建一个队列)
    channel.queue_declare(queue='qjk', durable=True)  # 指定队列持久化
    
    channel.basic_publish(exchange='',
                          routing_key='qjk',  # 消息队列名称
                          body='hello world',
                          properties=pika.BasicProperties(
                              delivery_mode=2, )  # 指定消息持久化
                          )
    connection.close()
    生产者
    import pika
    
    
    def main():
        credentials = pika.PlainCredentials("admin", "admin")
        connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
        channel = connection.channel()
    
        # 声明一个队列(创建一个队列)
        channel.queue_declare(queue='qjk', durable=True)  # 指定队列持久化
    
        def callback(ch, method, properties, body):
            print("消费者接受到了任务: %r" % body)
            # 真正的消息处理完了,再发确认
            ch.basic_ack(delivery_tag=method.delivery_tag)
    
        # 不会自动回复确认消息,
        # auto_ack=True,队列收到确认,就会自动把消费过的消息删除
        channel.basic_consume(queue='qjk', on_message_callback=callback, auto_ack=False)
    
        channel.start_consuming()
    
    
    if __name__ == '__main__':
        main()
    消费者

      闲置消费

    import pika
    
    # 无密码
    # connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1'))
    
    # 有密码
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()
    # 声明一个队列(创建一个队列)
    channel.queue_declare(queue='qjk', durable=True)  # 指定队列持久化
    
    channel.basic_publish(exchange='',
                          routing_key='qjk',  # 消息队列名称
                          body='hello world',
                          properties=pika.BasicProperties(
                              delivery_mode=2, )  # 指定消息持久化
                          )
    connection.close()
    生产者
    import pika
    
    
    def main():
        credentials = pika.PlainCredentials("admin", "admin")
        connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
        channel = connection.channel()
    
        # 声明一个队列(创建一个队列)
        channel.queue_declare(queue='qjk')
    
        def callback(ch, method, properties, body):
            print("消费者接受到了任务: %r" % body)
            # 真正的消息处理完了,再发确认
            ch.basic_ack(delivery_tag=method.delivery_tag)
    
        # 不会自动回复确认消息,
        # auto_ack=True,队列收到确认,就会自动把消费过的消息删除
        channel.basic_consume(queue='qjk', on_message_callback=callback, auto_ack=False)
        channel.basic_qos(prefetch_count=1)  # 谁闲置谁获取,没必要按照顺序一个一个来
    
        channel.start_consuming()
    
    
    if __name__ == '__main__':
        main()
    消费者

      发布订阅

    import pika
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()
    
    # 声明队列没有指定名字,指定了exchange
    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    
    message = "info: Hello World!"
    channel.basic_publish(exchange='logs', routing_key='', body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    发布者
    # 起多个都能收到消息
    
    import pika
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    
    result = channel.queue_declare(queue='', exclusive=True)
    # 随机的名字
    queue_name = result.method.queue
    print(queue_name)
    
    # 绑定
    channel.queue_bind(exchange='logs', queue=queue_name)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    channel.start_consuming()
    订阅者

      发布订阅(高级之Routing按关键字匹配)

    import pika
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()
    
    # 声明队列没有指定名字,指定了exchange
    channel.exchange_declare(exchange='qjk123', exchange_type='direct')
    
    message = "info: asdfasdfasdfsadfasdf World!"
    channel.basic_publish(exchange='qjk123', routing_key='bnb', body=message)  # 指定关键字
    print(" [x] Sent %r" % message)
    connection.close()
    发布者
    import pika
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='qjk123', exchange_type='direct')
    
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue
    print(queue_name)
    
    channel.queue_bind(exchange='qjk123', queue=queue_name, routing_key='nb')
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    channel.start_consuming()
    订阅者1
    import pika
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='qjk123', exchange_type='direct')
    
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue
    print(queue_name)
    
    channel.queue_bind(exchange='qjk123', queue=queue_name, routing_key='nb')
    channel.queue_bind(exchange='qjk123', queue=queue_name, routing_key='bnb')
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    channel.start_consuming()
    订阅者2

      发布订阅高级之Topic(按关键字模糊匹配)   

        *  只能加一个单词

        #  可以加任意单词字符

    import pika
    
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()
    
    # 声明队列没有指定名字,指定了exchange
    channel.exchange_declare(exchange='m3', exchange_type='topic')
    
    message = "info: asdfasdfasdfsadfasdf World!"
    channel.basic_publish(exchange='m3', routing_key='qjk.dd.dd', body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    发布者
    # 收不到消息
    import pika
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='m3', exchange_type='topic')
    
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue
    print(queue_name)
    
    channel.queue_bind(exchange='m3', queue=queue_name, routing_key='qjk.*')
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    channel.start_consuming()
    订阅者1
    # 可以收到消息
    import pika
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='m3', exchange_type='topic')
    
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue
    print(queue_name)
    
    channel.queue_bind(exchange='m3', queue=queue_name, routing_key='qjk.#')
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    channel.start_consuming()
    订阅者2

    五、三种方式实现RPC

      1.RabbitMQ实现RPC

    import pika
    import uuid
    
    
    class FibonacciRpcClient(object):
    
        def __init__(self):
    
            self.credentials = pika.PlainCredentials("admin", "admin")
            self.connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=self.credentials))
            self.channel = self.connection.channel()
    
            result = self.channel.queue_declare(queue='', exclusive=True)
            self.callback_queue = result.method.queue
    
            self.channel.basic_consume(
                queue=self.callback_queue,
                on_message_callback=self.on_response,
                auto_ack=True)
    
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(
                exchange='',
                routing_key='rpc_queue',
                properties=pika.BasicProperties(
                    reply_to=self.callback_queue,
                    correlation_id=self.corr_id,
                ),
                body=str(n))
            while self.response is None:
                self.connection.process_data_events()
            return int(self.response)
    
    
    fibonacci_rpc = FibonacciRpcClient()
    
    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(20)  # 外界看上去,就像调用本地的call()函数一样
    print(" [.] Got %r" % response)
    客户端
    import pika
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('10.0.0.200', credentials=credentials))
    channel = connection.channel()
    
    channel.queue_declare(queue='rpc_queue')
    
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n - 1) + fib(n - 2)
    
    
    def on_request(ch, method, props, body):
        n = int(body)
    
        print(" [.] fib(%s)" % n)
        response = fib(n)
    
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id= 
                                                             props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
    
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
    服务端

      2.内置包SimpleXMLRPCServer实现

    from xmlrpc.client import ServerProxy
    
    
    # SimpleXMLRPCServer
    def xmlrpc_client():
        print('xmlrpc client')
        c = ServerProxy('http://localhost:4242')
        data = {'client:' + str(i): i for i in range(100)}
        start = time.clock()
        for i in range(100):
            a = c.getObj()
            print(a)
        for i in range(100):
            c.sendObj(data)
        print('xmlrpc total time %s' % (time.clock() - start))
    
    
    if __name__ == '__main__':
        xmlrpc_client()
    客户端
    from xmlrpc.server import SimpleXMLRPCServer
    
    
    class RPCServer(object):
    
        def __init__(self):
            super(RPCServer, self).__init__()
            print(self)
            self.send_data = {'server:' + str(i): i for i in range(100)}
            self.recv_data = None
    
        def getObj(self):
            print('get data')
            return self.send_data
    
        def sendObj(self, data):
            print('send data')
            self.recv_data = data
            print(self.recv_data)
    
    
    # SimpleXMLRPCServer
    server = SimpleXMLRPCServer(('localhost', 4242), allow_none=True)
    server.register_introspection_functions()
    server.register_instance(RPCServer())
    server.serve_forever()
    服务端

      3.ZeroRPC实现

        安装:pip3 install zerorpc

    import zerorpc
    import time
    
    
    # zerorpc
    def zerorpc_client():
        print('zerorpc client')
        c = zerorpc.Client()
        c.connect('tcp://127.0.0.1:4243')
        data = {'client:' + str(i): i for i in range(100)}
        start = time.clock()
        for i in range(100):
            a = c.getObj()
            print(a)
        for i in range(100):
            c.sendObj(data)
    
        print('total time %s' % (time.clock() - start))
    
    
    if __name__ == '__main__':
        zerorpc_client()
    客户端
    import zerorpc
    
    
    class RPCServer(object):
    
        def __init__(self):
            super(RPCServer, self).__init__()
            print(self)
            self.send_data = {'server:' + str(i): i for i in range(100)}
            self.recv_data = None
    
        def getObj(self):
            print('get data')
            return self.send_data
    
        def sendObj(self, data):
            print('send data')
            self.recv_data = data
            print(self.recv_data)
    
    
    # zerorpc
    s = zerorpc.Server(RPCServer())
    s.bind('tcp://0.0.0.0:4243')
    s.run()
    服务端
  • 相关阅读:
    Fedora 19 配置参考
    Lua 函数、闭包、尾调用总结
    基于MFC简单图片裁剪工具
    【OpenGL 学习笔记04】顶点数组
    【OpenGL 学习笔记03】点画多边形
    【OpenGL 学习笔记02】宽点画线
    【OpenGL 学习笔记01】HelloWorld示例
    【SSH + BootStrap】实现的无线点餐系统
    【C++ 基础 11】 函数指针总结
    【C++基础 10】四种cast转换的区别
  • 原文地址:https://www.cnblogs.com/qjk95/p/13634517.html
Copyright © 2011-2022 走看看