zoukankan      html  css  js  c++  java
  • rabbitMQ 消息队列

    rabbitMQ 消息队列

    前情概要

    python中提供队列操作的模块queue

    import queue
    q = queue.Queue()
    q.put('ddd')
    q.qsize()
    q.get()
    

    队列可以解决两个问题:解耦、异步

    异步

    异步
    	优点:解决排队问题
    	缺点:不能保证任务被及时执行
    	应用场景:去哪儿网购机票
    同步:
    	优点:保证任务被及时执行
    	缺点:排队问题
    

    解耦

    解耦
    	消息队列天然解耦
    	进程通过消息队列联系,不需要有接口
    

    队列的作用
    1.存储消息和数据
    2.保证消息顺序
    3.保证数据交付

    PS:

    1 大并发:
    	目前使用web nginx(同一时刻,可以有1w-2w个并发请求);之前使用的是apache(同一时刻,只能有1000-2000个并发请求)
    	pv = page visit (页面访问量),pv上亿才算大并发,10 server web cluster集群可以满足需求
    	uv = user visit (每日独立用户量)
    	qps = 每秒查询率
    
    2 生产者消费者模型
    	分布式:是一种工作方式,一堆机器,每个机器承担一部分运算
    

    为什么用rabbitMQ 替代python的queue?
    是因为queue不能跨进程

    rabbitMQ

    安装 brew install rabbitmq

    安装rabbitMQ 对python支持的模块 pip3 install pika

    rabbitmq是独立组件,可以给任意程序提供队列服务

    最简单的队列通信

    p(生产者) ---> Exchanges(消息过滤) ---> Queues(队列) ---> c(消费者)

    生产者
    1.通过 端口、ip、认证信息 (得到一个实例)
    2.创建队列
    3.通过实例往队列里发消息

    消费者
    1.通过 端口、ip、认证信息 (得到一个实例)
    2.从指定队列取消息

    linux:
    启动
    rabbitmq-server
    查看端口号
    netstat -nitud grep 端口号

    显示当前的队列列表
    rabbitmqctl list_queues

    producer端

    import pika
    
    # 通过认证
    parameters = pika.ConnectionParameters('localhost')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    # 创建队列
    channel.queue_declare(queue='queue_1')
    
    # 向队列中发消息
    channel.basic_publish(exchange='',
                          routing_key='queue_1',
                          body='hello world!')
    print("[x] sent ''hello world!'")
    connection.close()
    

    consumer端

    import pika
    
    # 通过认证
    parameters = pika.ConnectionParameters('localhost')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    # 创建队列(以防服发送端没有先创建该队列)
    channel.queue_declare(queue='queue_1')
    
    # 回调函数
    def callback(ch, method, properties, body):
        print('[x] received %r' % body)
    
    
    # 指定取消息的队列
    channel.basic_consume(callback,  # 接收到消息调用函数
                          queue='queue_1',
                          no_ack=True)  # 消息处理后不向rabbit-server确认消息已消费完步
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    channel.start_consuming()  # 阻塞模式,等待消息
    

    远程连接rabbitmq server的话,需要配置权限
    创建用户
    rabbitmqctl add_user 用户名 密码
    配置权限,允许从外面访问
    rabbitmqctl set_permissions -p / 用户名 ".*" ".*" ".*"

    客户端配置认证参数

    credentials = pika.PlainCredentials('sunq', '123456')
    parameters = pika.ConnectionParameters(host='192.168.11.199', credentials=credentials)
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()  # 队列连接通道
    

    消费者半途死掉怎么办?

    确保消息被消费完毕
    1.生产者端发消息,保证消息持久化
    2.消费者端,消息处理完毕,发送确认包

    consumer端

    # 回调函数
    def callback(ch, method, properties, body):
        time.sleep(5)
        print('[x] received %r' % body)
        ch.basic_ack(delivery_tag=method.delivery_tag) # 此时,消息队列中的消息需要手动删除
    
    # 指定取消息的队列
    channel.basic_consume(callback,  
                          queue='queue_1',)
                          # no_ack=True)  # 注销,保证消息持久化
    

    rabbitMQ半途死掉怎么办?

    确保rabbitmq消息可以持久化队列
    1.保持队列持久化
    2.保持消息持久化

    producer端

    # 创建队列
    channel.queue_declare(queue='queue_1', durable=True)  # durable=True 1保持队列持久化
    
    # 向队列中发消息
    channel.basic_publish(exchange='',
                          routing_key='queue_1',
                          body='hello world!',
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 2保持消息持久化
                          ))
    

    公平分发

    多消费者处理消息能力不同,避免消费者消息积压,那么消息分发方式如何?
    告诉RabbitMQ在我这个消费者当前消息还没处理完的时候就不要再给我发新消息了
    消息的公平分发:消费端添加:
    channel.basic_qos(prefetch_count=1)

    持久化+公平分发示例

    producer端

    import pika
    
    # 通过认证
    parameters = pika.ConnectionParameters('localhost')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    # 创建队列
    channel.queue_declare(queue='queue_2', durable=True)  # durable=True 保持队列持久化
    
    # 向队列中发消息
    channel.basic_publish(exchange='',
                          routing_key='queue_2',
                          body='hello world!',
                          properties=pika.BasicProperties(
                              delivery_mode=2,  # 保持消息持久化
                          ))
    
    print("[x] sent ''hello world!'")
    connection.close()
    

    consumer端

    import pika
    import time
    
    # 通过认证
    parameters = pika.ConnectionParameters('localhost')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    # 创建队列(以防服发送端没有先创建该队列)
    channel.queue_declare(queue='queue_1')
    
    # 回调函数
    def callback(ch, method, properties, body):
        time.sleep(5)
        print('[x] received %r' % body)
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    # 公平分发
    channel.basic_qos(prefetch_count=1)
    
    # 指定取消息的队列
    channel.basic_consume(callback,  # 接收到消息调用函数
                          queue='queue_1',)
                          # no_ack=True)  # no_ack=True 消息处理不管成功与否,都不返回给发送端
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    
    # 阻塞模式,等待消息
    channel.start_consuming()  
    

    消息订阅发布

    想让你的消息被所有的Queue收到,类似广播的效果,这时候就要用到exchange了

    exchange 类型
    fanout = 广播 :所有bind到此exchange的queue都可以接收消息
    direct = 组播 :通过routingKey和exchange决定的哪个唯一的queue可以接收消息
    topic = 规则播 :所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息
    header = 根据消息头播

    fanout 广播

    类似于微博,只有消费者处于阻塞状态才能接收到生产者的广播

    producer端:

    import pika
    import sys
    
    # 通过认证
    parameters = pika.ConnectionParameters('localhost')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    # 指定队列类型
    channel.exchange_declare(exchange='logs', type='fanout')  # 队列名称logs,队列类型fanout
    
    # 产生的消息
    message = ' '.join(sys.argv[1:]) or "info: Hello World!"
    
    # 向队列中发消息
    channel.basic_publish(exchange='logs',
                          routing_key='',
                          body=message,)
    
    print("[x] sent ''hello world!'")
    connection.close()
    
    

    consumer端

    import pika
    
    # 通过认证
    parameters = pika.ConnectionParameters('localhost')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    # 指定队列类型
    channel.exchange_declare(exchange='logs', type='fanout')
    
    # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    queue_obj = channel.queue_declare(exclusive=True)
    queue_name = queue_obj.method.queue
    channel.queue_bind(exchange='logs', queue=queue_name)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    
    def callback(ch, method, properties, body):
        print('[x] received %r' % body)
    
    
    # 指定取消息的队列
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    # 阻塞模式,等待消息
    channel.start_consuming()
    
    
    direct 组播

    只有消费端都有阻塞时才产生队列

    producer端:

    import pika
    import sys
    
    # 通过认证
    parameters = pika.ConnectionParameters('localhost')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    # 指定队列类型
    channel.exchange_declare(exchange='direct_logs', type='direct')  # 队列名称direct_logs,队列类型direct
    
    # 设置关键字
    log_level = sys.argv[1] if len(sys.argv) > 1 else 'info'
    
    # 产生的消息
    message = ' '.join(sys.argv[2:]) or "Hello World!"
    
    # 向队列中发消息
    channel.basic_publish(exchange='direct_logs',
                          routing_key=log_level,
                          body=message)
    
    print(" [x] Sent %r:%r" % (log_level, message))
    connection.close()
    
    

    consumer端

    import pika
    import sys
    
    # 通过认证
    parameters = pika.ConnectionParameters('localhost')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    # 指定队列类型
    channel.exchange_declare(exchange='direct_logs', type='direct')
    
    # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    queue_obj = channel.queue_declare(exclusive=True)
    queue_name = queue_obj.method.queue
    
    log_level = sys.argv[1:]
    if not log_level:
        sys.stderr.write("Usage: %s [info] [warning] [error]
    " % sys.argv[0])
        sys.exit(1)
    
    for level in log_level:
        channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=level)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    
    # 指定取消息的队列
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    # 阻塞模式,等待消息
    channel.start_consuming()
    

    发送消息python3 send-5-direct.py info hello
    接受消息python3 receive-5-ditrect.py info

    topic 规则播

    producer端:

    import pika
    import sys
    
    # 通过认证
    parameters = pika.ConnectionParameters('localhost')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    # 指定队列类型
    channel.exchange_declare(exchange='topic_logs', type='topic')
    
    # 设置更细致的关键字
    log_level = sys.argv[1] if len(sys.argv) > 1 else 'all.info'
    
    # 产生的消息
    message = ' '.join(sys.argv[2:]) or "Hello World!"
    
    # 向队列中发消息
    channel.basic_publish(exchange='topic_logs',
                          routing_key=log_level,
                          body=message)
    
    print(" [x] Sent %r:%r" % (log_level, message))
    connection.close()
    
    

    consumer端

    import pika
    import sys
    
    # 通过认证
    parameters = pika.ConnectionParameters('localhost')
    connection = pika.BlockingConnection(parameters)
    channel = connection.channel()
    
    # 指定队列类型
    channel.exchange_declare(exchange='topic_logs', type='topic')
    
    # 不指定queue名字,rabbit会随机分配一个名字,exclusive=True会在使用此queue的消费者断开后,自动将queue删除
    queue_obj = channel.queue_declare(exclusive=True)
    queue_name = queue_obj.method.queue
    
    log_level = sys.argv[1:]
    if not log_level:
        sys.stderr.write("Usage: %s [binding_key]...
    " % sys.argv[0])
        sys.exit(1)
    
    for level in log_level:
        channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=level)
    
    print(' [*] Waiting for messages. To exit press CTRL+C')
    
    
    def callback(ch, method, properties, body):
        print(" [x] %r:%r" % (method.routing_key, body))
    
    
    # 指定取消息的队列
    channel.basic_consume(callback,
                          queue=queue_name,
                          no_ack=True)
    
    # 阻塞模式,等待消息
    channel.start_consuming()
    
    

    表达式符号说明:

    代表一个或多个字符,

    *代表任何字符
    例:#.a会匹配a.a,aa.a,aaa.a等
    *.a会匹配a.a,b.a,c.a等

    To 消费端 all the logs run:

    python receive_logs_topic.py "#"
    To receive all logs from the facility "kern":

    python receive_logs_topic.py "kern.*"
    Or if you want to hear only about "critical" logs:

    python receive_logs_topic.py "*.critical"
    You can create multiple bindings:

    python receive_logs_topic.py "kern.*" "*.critical"
    And to emit a log with a routing key "kern.critical" type:

    python emit_log_topic.py "kern.critical" "A critical kernel error"

    服务端python3 send-6-topic.py all.info hell

    RPC(Remote procedure call)远程过程调用

    client
    1.声明一个队列,作为reply_to返回消息结果的队列
    2.发消息到队列,消息里带一个唯一标识符uid和reply_to
    3.监听reply_to的队列,直到有结果
    server
    1.定义fib函数
    2.申明接受指令的列名rpc_queue
    3.开始监听队列,收到消息后调用fib函数
    4.把fib执行结果,发送回客户端指定的reply_to 队列
    

    RPC server

    __author__ = 'Administrator'
    
    import subprocess
    import pika
    import time
    
    parameters = pika.ConnectionParameters('localhost')
    connection = pika.BlockingConnection(parameters)
    
    channel = connection.channel()  # 队列连接通道
    
    channel.queue_declare(queue='rpc_queue2')
    
    
    def fib(n):
        if n == 0:
            return 0
        elif n == 1:
            return 1
        else:
            return fib(n - 1) + fib(n - 2)
    
    
    def run_cmd(cmd):
        cmd_obj = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        result = cmd_obj.stdout.read() + cmd_obj.stderr.read()
    
        return result
    
    
    def on_request(ch, method, props, body):
        cmd = body.decode("utf-8")
    
        print(" [.] run (%s)" % cmd)
        response = run_cmd(cmd)
    
        ch.basic_publish(exchange='',
                         routing_key=props.reply_to,  # 队列
                         properties=pika.BasicProperties(correlation_id=props.correlation_id),
                         body=response)
    
        ch.basic_ack(delivery_tag=method.delivery_tag)
    
    
    channel.basic_consume(on_request, queue='rpc_queue2')
    
    print(" [x] Awaiting RPC requests")
    channel.start_consuming()
    
    

    RPC client

    __author__ = 'Administrator'
    
    import queue
    
    import pika
    import uuid
    
    
    class CMDRpcClient(object):
        def __init__(self):
            # credentials = pika.PlainCredentials('alex', 'alex3714')
            # parameters = pika.ConnectionParameters(host='192.168.11.106',credentials=credentials)
    
            parameters = pika.ConnectionParameters('localhost')
            self.connection = pika.BlockingConnection(parameters)
            self.channel = self.connection.channel()
    
            result = self.channel.queue_declare(exclusive=True)
            self.callback_queue = result.method.queue  # 命令的执行结果的queue
    
            # 声明要监听callback_queue
            self.channel.basic_consume(self.on_response,
                                       no_ack=True,
                                       queue=self.callback_queue)
    
        def on_response(self, ch, method, props, body):
            """
            收到服务器端命令结果后执行这个函数
            :param ch:
            :param method:
            :param props:
            :param body:
            :return:
            """
            if self.corr_id == props.correlation_id:
                self.response = body.decode("gbk")  # 把执行结果赋值给Response
    
        def call(self, n):
            self.response = None
            self.corr_id = str(uuid.uuid4())  # 唯一标识符号
            self.channel.basic_publish(exchange='',
                                       routing_key='rpc_queue2',
                                       properties=pika.BasicProperties(
                                           reply_to=self.callback_queue,
                                           correlation_id=self.corr_id,
                                       ),
                                       body=str(n))
    
            while self.response is None:
                self.connection.process_data_events()  # 检测监听的队列里有没有新消息,如果有,收,如果没有,返回None
                # 检测有没有要发送的新指令
            return self.response
    
    
    cmd_rpc = CMDRpcClient()
    
    print(" [x] Requesting fib(30)")
    response = cmd_rpc.call('ifconfig')
    
    print(response)
    
    
  • 相关阅读:
    [转] 32位 PL/SQL Develope r如何连接64位的Oracle 图解
    UML对象图和包图
    推断的距离1970年1一个月1天从日期数
    Cocos2d-x3.0 RenderTexture(一) 保存
    翻转名单
    Session or Cookie?是否有必要使用Tomcat等一下Web集装箱Session
    【Socket规划】套接字Windows台C语言
    配置主机路由表(route)(两)
    springMVC3得知(五岁以下儿童)--MultiActionController
    JQUERY 选择
  • 原文地址:https://www.cnblogs.com/sunqim16/p/7309218.html
Copyright © 2011-2022 走看看