zoukankan      html  css  js  c++  java
  • rabbitmq management advance lesson

    rabbitmq management advance

    management install

    rabbitmq-plugins enable rabbitmq_management


    visit : http://ali3:15672/
    管理页面的进程与rabbitmq-server 是分开的

    solve management webpage 401 problem

    abbitmqctl add_user test test
    rabbitmqctl set_user_tags test administrator
    rabbitmqctl set_permissions -p / test "." "." ".*"

    after fresh install:
    fresh install

    with data:
    with data

    with more data:
    此处输入图片的描述

    unsolved why lost message?

    klg@klgaliyun03:~/rabbitmq-demo/queues$ node worker.js
    [*] Waiting for messages in task_queue. To exit press CTRL+C
    [x] Received 9
    [x] Done
    [x] Received 0
    [x] Done
    [x] Received 2
    [x] Done
    [x] Received 4
    [x] Done
    [x] Received 6
    [x] Done
    [x] Received 8
    [x] Done

    you can send without consumers, but later consumers is up, while the message still can be received.

    so where is the max ? see the following

    config advance

    config manual
    /etc/rabbitmq/rabbitmq.config
    example

    可以限制发送速度?

    no way?
    无需配置,系统根据consumer 的处理速度,限制producer 的发送带宽,以限制producer发送的速度。
    flow control

    提高consumer吞吐

    尝试oneway,例如log 收集日记,如果不需要高可靠性,用最basic的模式:只send ,不care 返回
    img

    注意事项

    • consumer端必定要注意消息重复的问题,要做成幂等
    • 场景 步骤典型选择
      错:
      img
      对:
      img

    如何做警报

    api
    root@klgaliyun03:~# curl -i -u guest:guest http://localhost:15672/api/vhosts
    HTTP/1.1 200 OK
    vary: Accept-Encoding, origin
    Server: MochiWeb/1.1 WebMachine/1.10.0 (never breaks eye contact)
    Date: Thu, 23 Jun 2016 12:07:11 GMT
    Content-Type: application/json
    Content-Length: 907
    Cache-Control: no-cache

    [{"message_stats":{"publish":219,"publish_details":{"rate":1.6},"publish_in":0,"publish_in_details":{"rate":0.0},"publish_out":0,"publish_out_details":{"rate":0.0},"ack":179,"ack_details":{"rate":1.0},"deliver_get":180,"deliver_get_details":{"rate":1.0},"confirm":0,"confirm_details":{"rate":0.0},"return_unroutable":0,"return_unroutable_details":{"rate":0.0},"redeliver":0,"redeliver_details":{"rate":0.0},"deliver":180,"deliver_details":{"rate":1.0},"deliver_no_ack":0,"deliver_no_ack_details":{"rate":0.0},"get":0,"get_details":{"rate":0.0},"get_no_ack":0,"get_no_ack_details":{"rate":0.0}},"send_oct":183288,"send_oct_details":{"rate":2896.8},"recv_oct":129734,"recv_oct_details":{"rate":2097.0},"messages":31,"messages_details":{"rate":1.2},"messages_ready":30,"messages_ready_details":{"rate":1.2},"messages_unacknowledged":1,"messages_unacknowledged_details":{"rate":0.0},"name":"/","tracing":false}]

    send_oct_details

    其他典型场景

    削减波峰,保护后端(消费端)
    波峰
    see here
    削减波峰 关键一个控制消费端参数的是 prefetch
    如下方

    ch.prefetch(1); //一个消费完了才接下一个
    ch.prefetch(3); //一次拿3个去尝试消费,没ack成功的话会被重发
    ch.prefetch(0); //一次拿n个,具体多少待确定

    如下方演示图prefetch多个:
    prefetch3
    如下方演示图prefetch一个接一个:
    prefetch1

    轮询改进

    queue 模型时,当两个comsumer时,default 轮询方式是round-robin, 为了避免一个忙,另外一个闲的情况出现, 可以设置prefetch 来避免这个问题

    其他用处

    find bottle neck of your system

    FAQ

    • 如何知道message有没有被consume?
    • 如何知道结果处理结果?
    • 如果客户端已经断开了link?

    使用RPC模式
    此处输入图片的描述

    就是当普通http请求一样(有个msg ID)。server 会返回结果。结果存在reply_to 的queue (附上msgID)。 client 会去拿(根据msgID 知道对应上具体发送那个请求)。

    client 同时是rpc queue的producer & reply_queue 的consumer
    server 同时是reply_queue的producer & rpc queue的consumer

    需要优雅处理几件事情:

    • server 端连不上 -- 重连or抛错
    • client 端连接超时
    • client 端rpc模式超时
    • 防重:server 端都要处理重复消息的问题,client 端要处理重复reply的问题

    流程解析

    5断了的case:
    server&client 端消息重复的效果演示:
    重复问题
    留意:

    handler(req)
    .then(fResult =>{
    console.log("handle complete, send result to client", fResult);
    ch.sendToQueue(req.properties.replyTo,new Buffer( JSON.stringify( fResult) ), {correlationId: req.properties.correlationId});//sync send the result back to client
    console.log('delay finish in 5s');
    bluebird.delay(5000).then(function(){
    ch.ack(req);
    console.log('finish ack');
    });
    });

    第一次是正常运行
    第二次运行,在reply 后,中断右边的server端的ack 回复,然后重启server端,server则接收到第二次的重复消息
    接下来server消费第二次重复消息,然后relpy, ack。
    client 收到两次返回

    $ node rpc_client.js 8
    [x] Requesting fib(8)
    [.] Got 8
    [.] Got 8 <--- 重复的

    保证客户端的接收

    客户等待返回时断开了怎么办
    使用rpc 模式时,利用reply to 去维持一个长的连接,但是并非一个新的queue。
    如果在等待回应的过程中,客户crash了,server是没办法去发送的回去的。客户理应重新再发一遍。
    server 端如何感知这个事情呢?(TBD)

    再来这张图:
    流程解析

    3断了,客户重试。

    Client (A B)双节点, Server (C D)双节点。
    RPC 模式下:
    Client-------- Server
    A—>m1—>C
    B—>m2—>D
    C想reply A的时候,A(down了) disconnect 了怎么办。A没办法收到,即便B还在正常工作。

    所以比较稳定的模式应该是这样 working queue + pub/sub
    回调网关

    • what if rabbitmq-server is killed ?
      message will be lost , util we set :

    ch.assertQueue('task_queue', {durable: true});
    ch.sendToQueue(q, new Buffer(msg), {persistent: true});

    In the meanwile we better use HA solution.
    Note that: this is not 100% guarantee message won't lost.

    • what if consumer is killed, will the message lost? -- queue 在就行, set noAck:false

    In order to make sure a message is never lost, RabbitMQ supports
    message acknowledgments. An ack(nowledgement) is sent back from the
    consumer to tell RabbitMQ that a particular message has been received,
    processed and that RabbitMQ is free to delete it.

    If a consumer dies (its channel is closed, connection is closed, or
    TCP connection is lost) without sending an ack, RabbitMQ will
    understand that a message wasn't processed fully and will re-queue it.
    If there are other consumers online at the same time, it will then
    quickly redeliver it to another consumer. That way you can be sure
    that no message is lost, even if the workers occasionally die.

    • what if consumer took too long to handle the message?

    There aren't any message timeouts; RabbitMQ will redeliver the message when the consumer dies. It's fine even if processing a message takes a very, very long time.

    RPC problem:

    RPC 测试时,服务端有个消息处理卡住,就永远的卡住,
    如下图:
    此处输入图片的描述
    获取卡住的消息
    此处输入图片的描述
    可以

    • delete queue
      how:

    rabbitmqadmin -u {user} -p {password} -V {vhost} delete queue name={name}

    example to delete queue:

    rabbitmqadmin -u {user} -p {password} -V / delete queue name=rpc_queue

    • purge 清空 queue

    http://localhost:15672/#/queues/%2F/rpc_queue

    • move 改队列的所有消息 (推荐) (不能移除单个)

    To move messages, the shovel plugin must be enabled, try:

    $ rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management

    此处输入图片的描述
    相当于有一个“垃圾”消息队列(填写上图destination queue),这些记录着你不能正常consume的,
    然后接下来一步相当重要:

    如何更好的consume:

    • 方法1:跳过改消息with timeout,存储下来,如db,然后取下一个。
      然后review,让程序重新对这些结果进行consume。
    • 方法2: worker 对失败的消息设置最大重试次数,超过阈值则把发送到别的队列里面。通过后台、UI 界面监控/显示 这些消息。修正问题后,按需把此失败消息触发下重试。(推荐)

    上面的异常消息从worker表现来说,除了卡住之外,严重的还很可能导致worker 崩溃,甚至多个worker 集体的崩溃(另外一个worker 接了此消息,接着崩溃) -- 即所谓(catastrophic failover)。
    所以对队列的堆积情况监控是非常必要的。

    针对这种超时的处理,可以参考这个link:
    https://www.rabbitmq.com/dlx.html
    https://www.rabbitmq.com/ttl.html
    https://docs.microsoft.com/en-us/azure/service-bus-messaging/service-bus-performance-improvements

    概念

    • Producer: Application that sends the messages.
    • Consumer: Application that receives the messages.
    • Queue: Buffer that stores messages.
    • Message: Information that is sent from the producer to a consumer through RabbitMQ.
    • Connection: A connection is a TCP connection between your application and the RabbitMQ broker.
    • Channel: A channel is a virtual connection inside a connection. When you are - publishing or consuming messages or subscribing to a queue is it all done over a channel.
    • Exchange: Receives messages from producers and pushes them to queues depending on rules defined by the exchange type. In order to receive messages, a queue needs to be bound to at least one exchange. 理解为消息交易所or消息分发器。他一边接收producer,另外一边根据规则(routing/topic)做消息的分发
    • Binding: A binding is a link between a queue and an exchange.
    • Routing key: The routing key is a key that the exchange looks at to decide how to route the message to queues. The routing key is like an address for the message. 理解为从exchange 到queue的分发规则
    • AMQP: AMQP (Advanced Message Queuing Protocol) is the protocol used by RabbitMQ for messaging.
    • Users: It is possible to connect to RabbitMQ with a given username and password. Every user can be assigned permissions such as rights to read, write and configure privileges within the instance. Users can also be assigned permissions to specific virtual hosts.
    • Vhost, virtual host: A Virtual host provide a way to segregate applications using the same RabbitMQ instance. Different users can have different access privileges to different vhost and queues and exchanges can be created so they only exists in one vhost.理解为用于一个mq实例,模拟不同的机器的配置,用处在于限制不用的用户权限、配置。
    • publish(in/out)
    • confirm
    • deliver
    • redelivered
    • acknowledge

    npm package

    原有的demo实例结构不够好,每次都要create操作,使用以下的npm,或者自行封装

    官方提供的例子没有按照promise and generator 方式编写。稍微封装改写了一下:

    碰到的坑

    本来想将官方的例子进行封装:尝试两次发送消息都共用同一connection, channel, callback queue。结果返回的消息里面uuid 都是同一个uuid。
    所以暂时需要每次发送前,都要初始化一次。
    可能猜测:amqp 是基于tcp 的,如果不同客户端的去往服务端发消息,理论上是不可以用同一个tcp connection。
    但如果是同一个客户端不停的发消息,tcp 不close,就一直可以发信息了。

    测试:

    //run twice
    ch.sendToQueue(q, new Buffer(msg), {persistent: true});
    ch.sendToQueue(q, new Buffer(msg), {persistent: true});
    

    klg@klgaliyun03:~/rabbitmq-demo/queues$ node task-client.js
    [x] Sent 'Hello World!'

    //can receive two message
    klg@klgaliyun03:~/rabbitmq-demo/queues$ node worker-server.js
    [*] Waiting for messages in task_queue. To exit press CTRL+C
    [x] Received Hello World!
    [x] Done
    [x] Received Hello World!
    [x] Done

    参考别人同样遇到这个问题。

    more advance

    高可用

    为保证mq节点挂掉,系统要正常运转,需要做高可用处理,比较合适的是采用mirror模式,简单地通过haproxy 来进行转发。

    平滑过渡

    重启避免拿了消息处理到一半,挂掉的处理:cancel link ref

    • 收到重启消息进行reject
    • 收到重启消息进行cancel
    var consumerInfo ={}
    consumerInfo = ch.consumer(queue, function(req)){
    ...
    ch.cancel(Object.keys(consumerInfo.consumer)[0])
    }
    

    如果不选mq?

    拆解系统过程中,如果不选mq来替代http rest ,还有选择吗?

    调用rest场景:

    • sync 改成 async 回调 (注意重试,重试注意 exponential backoff ,否则造成累计波峰)
    • thrift RPC
    • Raw TCP/UDP
    • Redis pub/sub
    • Retry http

    Performance

    Docker 3GRAM CPU?
    DISK mode:

    • 生产 ~8k/s
    • 消费 ~1k/s

    RAM mode:

    • 生产 ~10k/s
    • 消费 ~2k/s

    总结

    什么时候不使用MQ?
    上游实时关注执行结果

    什么时候使用MQ?
    1)数据驱动的任务依赖
    2)上游不关心多下游执行结果
    3)异步返回执行时间长

    更多参考

  • 相关阅读:
    words you learn through youtube and so on in daily life
    python 随笔
    Zookeeper 指南
    Mac 后台服务
    Elasticsearch 指南
    架构之灰度部署
    架构之CDN缓存
    架构之微服务(zookeeper)
    架构之微服务(etcd)
    架构之微服务设计(Nginx + Upsync)
  • 原文地址:https://www.cnblogs.com/no7dw/p/9504950.html
Copyright © 2011-2022 走看看