zoukankan      html  css  js  c++  java
  • Rabbitmq和RPC

    消息队列

    RabbitMQ是一个开源的消息代理和队列服务器,用来通过普通协议在不同的应用之间共享数据(跨平台跨语言)。RabbitMQ是使用Erlang语言编写,并且基于AMQP协议实现。

    1 两个服务调用:restful(http协议),rpc(远程过程调用)
    
    2 rpc:远程过程调用
    	-gRPC:谷歌出的,跨语言
        
    3 不管用rpc或者restful来通信,涉及到同步,异步
    
    4 消息队列解决的问题
    	-应用解耦
        -流量消峰
        -消息分发(发布订阅:观察者模式)
        -异步消息(celery就是对消息队列的封装)
        
    5 rabbitmq(Erlang),kafka(scala和java)
    	-rabbitmq:吞吐量小,消息确认,订单,对消息可靠性有要求,就用它
        -kafka:吞吐量高,注重高吞吐量,不注重消息的可靠性,数据量特别大
        
    6 电商、金融等对事务性要求很高的,可以考虑RabbitMQ
    
    7 日志--》Kafka
    

    Rabbitmq

    安装

    1 原生安装
    	-安装扩展epel源
    	-yum -y install erlang
        -yum -y install rabbitmq-server
        -systemctl start rabbitmq-server
        -systemctl sport rabbitmq-server	#停止
        -rpm -qa rabbitmq-server	#查看版本号
    
    2 docker拉取
    	-docker pull rabbitmq:management(自动开启了web管理界面)
        -docker run -di --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
    	-docker run -di --name rabbitmq88 -p 15672:15672 -p 5672:5672 rabbitmq:management	#不用认证
                    
    3 5672:是rabbitmq的默认端口
      15672:web管理界面的端口
        
        
    4 创建用户
    	rabbitmqctl add_user lqz 123
    
    5 分配权限
        rabbitmqctl set_user_tags lqz administrator
        rabbitmqctl set_permissions -p "/" lqz ".*" ".*" ".*"
    

    基本使用

    # 生产者
    
    # pika
    # pip3 install pika
    
    import pika
    # 拿到连接对象
    # connection = pika.BlockingConnection(pika.ConnectionParameters('39.97.209.187'))
    # 有用户名密码的情况
    credentials = pika.PlainCredentials("admin","admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('39.97.209.187',credentials=credentials))
    # 拿到channel对象
    channel = connection.channel()
    
    # 声明一个队列
    channel.queue_declare(queue='hello')  # 指定队列名字
    
    # 生产者向队列中放一条消息
    channel.basic_publish(exchange='',
                          routing_key='hello',
                          body='lqz js nb')
    print(" Sent 'Hello World!'")
    # 关闭连接
    connection.close()
    
    # 消费者
    
    
    import pika, sys, os
    
    def main():
        # connection = pika.BlockingConnection(pika.ConnectionParameters(host='39.97.209.187'))
        credentials = pika.PlainCredentials("admin", "admin")
        connection = pika.BlockingConnection(pika.ConnectionParameters('39.97.209.187', credentials=credentials))
        channel = connection.channel()
    
        channel.queue_declare(queue='hello')
    
        def callback(ch, method, properties, body):
            print(" [x] Received %r" % body)
    
        channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
    
        channel.start_consuming()
    
    if __name__ == '__main__':
    
        main()
    

    消息确认机制

    老刘IP 101.133.225.166

    # 生产者
    # pika
    # pip3 install pika
    
    import pika
    # 拿到连接对象
    
    # connection = pika.BlockingConnection(pika.ConnectionParameters('39.97.209.187'))
    
    # 有用户名密码的情况
    credentials = pika.PlainCredentials("admin","admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('39.97.209.187',credentials=credentials))
    # 拿到channel对象
    channel = connection.channel()
    
    # 声明一个队列
    channel.queue_declare(queue='lqz')  # 指定队列名字
    
    # 生产者向队列中放一条消息
    channel.basic_publish(exchange='',
                          routing_key='lqz',
                          body='lqz jssss nb')
    print(" lqz jssss nb'")
    
    # 关闭连接
    connection.close()
    
    # 消费者
    import pika, sys, os
    
    def main():
        # connection = pika.BlockingConnection(pika.ConnectionParameters(host='101.133.225.166'))
        credentials = pika.PlainCredentials("admin", "admin")
        connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
        channel = connection.channel()
    
        channel.queue_declare(queue='lqz')
    
        def callback(ch, method, properties, body):
            print(" [x] Received %r" % body)
            # 真正的消息处理完了,再发确认
            ch.basic_ack(delivery_tag=method.delivery_tag)
            
        ## 不会自动回复确认消息,
        ## auto_ack=True,队列收到确认,就会自动把消费过的消息删除
        channel.basic_consume(queue='lqz', on_message_callback=callback, auto_ack=False)
    
        channel.start_consuming()
    
    if __name__ == '__main__':
    
        main()
    
    

    持久化

    #在声明队列时,指定持久化
    channel.queue_declare(queue='lqz_new',durable=True)  
    
    # 声明消息持久化
    在发布消息的时候,
                        properties=pika.BasicProperties(
                             delivery_mode=2,  # make message persistent
                          )
        
        
    ## 生产者
    # pika
    # pip3 install pika
    
    import pika
    # 拿到连接对象
    # connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166'))
    # 有用户名密码的情况
    credentials = pika.PlainCredentials("admin","admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
    # 拿到channel对象
    channel = connection.channel()
    
    # 声明一个队列
    channel.queue_declare(queue='lqz_new',durable=True)  # 指定队列名字
    
    # 生产者向队列中放一条消息
    channel.basic_publish(exchange='',
                          routing_key='lqz_new',
                          body='lqz jssss nb',
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # make message persistent
                          )
                          )
    print(" lqz jssss nb'")
    # 关闭连接
    connection.close()
    
    
    ### 消费者
    import pika, sys, os
    
    def main():
        # connection = pika.BlockingConnection(pika.ConnectionParameters(host='101.133.225.166'))
        credentials = pika.PlainCredentials("admin", "admin")
        connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
        channel = connection.channel()
    
        channel.queue_declare(queue='lqz_new',durable=True)
    
        def callback(ch, method, properties, body):
            print(" [x] Received %r" % body)
            # 真正的消息处理完了,再发确认
            ch.basic_ack(delivery_tag=method.delivery_tag)
        ## 不会自动回复确认消息,
        ## auto_ack=True,队列收到确认,就会自动把消费过的消息删除
        channel.basic_consume(queue='lqz_new', on_message_callback=callback, auto_ack=False)
    
        channel.start_consuming()
    
    if __name__ == '__main__':
    
        main()
    
    

    闲置消费

    #就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
    channel.basic_qos(prefetch_count=1) 
    
    
    
    ## 生产者
    # pika
    # pip3 install pika
    
    import pika
    # 拿到连接对象
    # connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166'))
    # 有用户名密码的情况
    credentials = pika.PlainCredentials("admin","admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
    # 拿到channel对象
    channel = connection.channel()
    
    # 声明一个队列
    channel.queue_declare(queue='lqz')  # 指定队列名字
    
    # 生产者向队列中放一条消息
    channel.basic_publish(exchange='',
                          routing_key='lqz',
                          body='lqz jssss nb')
    print(" lqz jssss nb'")
    # 关闭连接
    connection.close()
    
    ### 消费者1
    import pika, sys, os
    
    def main():
        # connection = pika.BlockingConnection(pika.ConnectionParameters(host='101.133.225.166'))
        credentials = pika.PlainCredentials("admin", "admin")
        connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
        channel = connection.channel()
    
        channel.queue_declare(queue='lqz')
    
        def callback(ch, method, properties, body):
            import time
            time.sleep(50)
            print(" [x] Received %r" % body)
            # 真正的消息处理完了,再发确认
            ch.basic_ack(delivery_tag=method.delivery_tag)
        ## 不会自动回复确认消息,
        ## auto_ack=True,队列收到确认,就会自动把消费过的消息删除
        channel.basic_qos(prefetch_count=1)  #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
        channel.basic_consume(queue='lqz', on_message_callback=callback, auto_ack=False)
    
        channel.start_consuming()
    
    if __name__ == '__main__':
    
        main()
    
    
    ###消费者2
    import pika, sys, os
    
    def main():
        # connection = pika.BlockingConnection(pika.ConnectionParameters(host='101.133.225.166'))
        credentials = pika.PlainCredentials("admin", "admin")
        connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
        channel = connection.channel()
    
        channel.queue_declare(queue='lqz')
    
        def callback(ch, method, properties, body):
            print(" [x] Received %r" % body)
            # 真正的消息处理完了,再发确认
            ch.basic_ack(delivery_tag=method.delivery_tag)
        ## 不会自动回复确认消息,
        ## auto_ack=True,队列收到确认,就会自动把消费过的消息删除
        channel.basic_qos(prefetch_count=1)  #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
        channel.basic_consume(queue='lqz', on_message_callback=callback, auto_ack=False)
    
        channel.start_consuming()
    
    if __name__ == '__main__':
    
        main()
    
    

    发布订阅

    ## 发布者
    import pika
    
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
    channel = connection.channel()
    
    # 声明队列没有指定名字,指定了exchange
    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    
    message = "info: Hello World!"
    channel.basic_publish(exchange='logs', routing_key='', body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    
    
    ## 订阅者(启动多次,会创建出多个队列,都绑定到了同一个exchange上)
    import pika
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='logs', exchange_type='fanout')
    
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue
    print(queue_name)
    
    channel.queue_bind(exchange='logs', queue=queue_name)
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    channel.start_consuming()
    

    关键字

    ### 发布者
    import pika
    
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
    channel = connection.channel()
    
    # 声明队列没有指定名字,指定了exchange
    channel.exchange_declare(exchange='lqz123', exchange_type='direct')
    
    message = "info: asdfasdfasdfsadfasdf World!"
    channel.basic_publish(exchange='lqz123', routing_key='bnb', body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    
    
    ### 订阅者1
    import pika
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='lqz123', exchange_type='direct')
    
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue
    print(queue_name)
    
    channel.queue_bind(exchange='lqz123', queue=queue_name,routing_key='nb')
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    channel.start_consuming()
    
    
    ####订阅者2
    import pika
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='lqz123', exchange_type='direct')
    
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue
    print(queue_name)
    
    channel.queue_bind(exchange='lqz123', queue=queue_name,routing_key='nb')
    channel.queue_bind(exchange='lqz123', queue=queue_name,routing_key='bnb')
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    channel.start_consuming()
    
    
    

    模糊匹配

    # 表示后面可以跟任意字符
    *表示后面只能跟一个单词
    
    ###发布者
    import pika
    
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
    channel = connection.channel()
    
    # 声明队列没有指定名字,指定了exchange
    channel.exchange_declare(exchange='m3', exchange_type='topic')
    
    message = "info: asdfasdfasdfsadfasdf World!"
    channel.basic_publish(exchange='m3', routing_key='lqz.dd', body=message)
    print(" [x] Sent %r" % message)
    connection.close()
    
    
    ### 订阅者1 
    import pika
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='m3', exchange_type='topic')
    
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue
    print(queue_name)
    
    channel.queue_bind(exchange='m3', queue=queue_name,routing_key='lqz.*')
    
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    channel.start_consuming()
    
    
    ###订阅者2 
    import pika
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
    channel = connection.channel()
    
    channel.exchange_declare(exchange='m3', exchange_type='topic')
    
    result = channel.queue_declare(queue='', exclusive=True)
    queue_name = result.method.queue
    print(queue_name)
    
    channel.queue_bind(exchange='m3', queue=queue_name,routing_key='lqz.#')
    print(' [*] Waiting for logs. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print(" [x] %r" % body)
    
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True)
    
    channel.start_consuming()
    

    通过rabbitmq实现rpc

    ### 服务端
    import pika
    
    credentials = pika.PlainCredentials("admin", "admin")
    connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
    channel = connection.channel()
    
    
    channel.queue_declare(queue='rpc_queue')
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n - 1) + fib(n - 2)
    
    def on_request(ch, method, props, body):
        n = int(body)
    
        print(" [.] fib(%s)" % n)
        response = fib(n)
    
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,
                         properties=pika.BasicProperties(correlation_id = 
                                                             props.correlation_id),
                         body=str(response))
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
    
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
    
    ## 客户端
    
    import pika
    import uuid
    
    class FibonacciRpcClient(object):
    
        def __init__(self):
    
            self.credentials = pika.PlainCredentials("admin", "admin")
            self.connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=self.credentials))
            self.channel = self.connection.channel()
    
            result = self.channel.queue_declare(queue='', exclusive=True)
            self.callback_queue = result.method.queue
    
            self.channel.basic_consume(
                queue=self.callback_queue,
                on_message_callback=self.on_response,
                auto_ack=True)
    
        def on_response(self, ch, method, props, body):
            if self.corr_id == props.correlation_id:
                self.response = body
    
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4())
            self.channel.basic_publish(
                exchange='',
                routing_key='rpc_queue',
                properties=pika.BasicProperties(
                    reply_to=self.callback_queue,
                    correlation_id=self.corr_id,
                ),
                body=str(n))
            while self.response is None:
                self.connection.process_data_events()
            return int(self.response)
    
    
    fibonacci_rpc = FibonacciRpcClient()
    
    print(" [x] Requesting fib(30)")
    response = fibonacci_rpc.call(10)  # 外界看上去,就像调用本地的call()函数一样
    print(" [.] Got %r" % response)
    

    python中的rpc框架

    SimpleXMLRPCServer

    ### 服务端
    from xmlrpc.server import SimpleXMLRPCServer
    class RPCServer(object):
    
        def __init__(self):
            super(RPCServer, self).__init__()
            print(self)
            self.send_data = {'server:'+str(i): i for i in range(100)}
            self.recv_data = None
    
        def getObj(self):
            print('get data')
            return self.send_data
    
        def sendObj(self, data):
            print('send data')
            self.recv_data = data
            print(self.recv_data)
    # SimpleXMLRPCServer
    server = SimpleXMLRPCServer(('localhost',4242), allow_none=True)
    server.register_introspection_functions()
    server.register_instance(RPCServer())
    server.serve_forever()
    
    
    
    ### 客户端
    import time
    from xmlrpc.client import ServerProxy
    
    # SimpleXMLRPCServer
    def xmlrpc_client():
        print('xmlrpc client')
        c = ServerProxy('http://localhost:4242')
        data = {'client:'+str(i): i for i in range(100)}
        start = time.clock()
        for i in range(50):
            a=c.getObj()
            print(a)
        for i in range(50):
            c.sendObj(data)
        print('xmlrpc total time %s' % (time.clock() - start))
    
    if __name__ == '__main__':
        xmlrpc_client()
    

    ZeroRPC实现rpc

    ### 服务端
    import zerorpc
    
    class RPCServer(object):
    
        def __init__(self):
            super(RPCServer, self).__init__()
            print(self)
            self.send_data = {'server:'+str(i): i for i in range(100)}
            self.recv_data = None
    
        def getObj(self):
            print('get data')
            return self.send_data
    
        def sendObj(self, data):
            print('send data')
            self.recv_data = data
            print(self.recv_data)
    # zerorpc
    s = zerorpc.Server(RPCServer())
    s.bind('tcp://0.0.0.0:4243')
    s.run()
    
    
    ### 客户端
    import zerorpc
    import time
    # zerorpc
    def zerorpc_client():
        print('zerorpc client')
        c = zerorpc.Client()
        c.connect('tcp://127.0.0.1:4243')
        data = {'client:'+str(i): i for i in range(100)}
        start = time.clock()
        for i in range(500):
            a=c.getObj()
            print(a)
        for i in range(500):
            c.sendObj(data)
    
        print('total time %s' % (time.clock() - start))
    
    
    if __name__ == '__main__':
        zerorpc_client()
    
  • 相关阅读:
    day59_BOS项目_11
    day58_BOS项目_10
    shell 笔记
    docker + swarm 集群
    HDFS深入浅析
    FTP服务器常规操作
    linux shell 流程控制
    认识黑客常用的入侵方法
    Linux中常用的查看系统信息的命令
    解决Yum安装依赖问题
  • 原文地址:https://www.cnblogs.com/linqiaobao/p/13836004.html
Copyright © 2011-2022 走看看