zoukankan      html  css  js  c++  java
  • rabbitmq保持连接

    背景:最近线上mq消费者进程ok,但rabbitmq控制台显示无消费进程,导致mq队列消息堆积,以前是直接重启mq,这次决定深究下原因

    操作耗时的守护进程

    因业务原因,每次导入30w条记录,代码中将每500条一批塞入mq队列,在消费的时候,需要查表插库,处理耗时较长,我们使用的是php-amqp库,代码非常简单

    $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, ...);
    $channel = $connection->channel();
    $channel->queue_declare($queue, false, true, false, false);
    $channel->basic_consume($queue, $consumerTag, false, false, false, false, 'consumeLogic');
    function consumeLogic(AMQPMessage $message): void
    {
        // 消费逻辑
    }
    while (count($channel->callbacks)) {
        $channel->wait();
    }
    

    mq心跳

    1.rabbitmq使用心跳机制来保持连接,在正常场景下,客户端期望通过发送心跳包来告知服务端自己存活。如果服务端连续两次发送心跳客户端均无回应,服务端会断开与客户端的连接。心跳间隔可在每次连接时设置。

    2.因php是同步语言,它无法在后台运行耗时任务时持续发送心跳包。这时候服务端就会断开连接,而客户端只有继续使用这个队列的时候才会发现已断开

    rabbitmq如何处理心跳

    通过阅读该库源码,发现是通过方法 AbstractIO::check_heartbeat(), 该方法会在你每次使用连接时调用,如 AMQPChannel::basic_consume(),AMQPChannel::basic_consume(),AMQPChannel::basic_consume()

    如果设置了心跳间隔,check_heartbeat()方法会监测离上次使用连接过去的时间。如果客户端忽略了两次心跳,会自动重连,或者过去了心跳间隔的一半客户端会主动发送心跳。

    手动发送心跳

    当在处理耗时任务时,我们需要确保连接,且在任务处理过程中主动发送心跳,那如何实现呢,我们来看下check_heartbeat()的源码

    public function check_heartbeat()
    {
        // ignore unless heartbeat interval is set
        if ($this->heartbeat !== 0 && $this->last_read && $this->last_write) {
            $t = microtime(true);
            $t_read = round($t - $this->last_read);
            $t_write = round($t - $this->last_write);
    
            // server has gone away
            if (($this->heartbeat * 2) < $t_read) {
                $this->close();
                throw new AMQPHeartbeatMissedException("Missed server heartbeat");
            }
    
            // time for client to send a heartbeat
            if (($this->heartbeat / 2) < $t_write) {
                $this->write_heartbeat();
            }
        }
    }
    

    看了源码之后,发送心跳是有前置条件的

    1. 设置了心跳间隔
    2. 从socket中取值了
    3. 向socket中写过数据

    第一条我们手动设置,第三条只要我们连接了就会有 last_write,现在我们需要满足第二条,那何时会触发read呢,当然是接收消息的时候,可是我们还在处理消息,因同步的问题,需要在处理完才会接收下一条消息。

    那我们可不可以主动read呢,可以,需要加一行代码,就能实现,可在消费代码中调用

    function send_heartbeat($connection)
    {
      $connection->getIO()->read(0);
    }
    

    这时候我们并没有拿消息,只是用了一个hack,来触发发送心跳,看下它是如何生效的

    public function read($len)
    {
        if (is_null($this->sock)) {
            throw new AMQPSocketException(sprintf(
                'Socket was null! Last SocketError was: %s',
                socket_strerror(socket_last_error())
            ));
        }
    
        $this->check_heartbeat();
    
        list($timeout_sec, $timeout_uSec) = MiscHelper::splitSecondsMicroseconds($this->read_timeout);
        $read_start = microtime(true);
        $read = 0;
        $data = '';
        while ($read < $len) {
            $buffer = null;
            $result = socket_recv($this->sock, $buffer, $len - $read, 0);
            if ($result === 0) {
                // From linux recv() manual:
                // When a stream socket peer has performed an orderly shutdown,
                // the return value will be 0 (the traditional "end-of-file" return).
                // http://php.net/manual/en/function.socket-recv.php#47182
                $this->close();
                throw new AMQPConnectionClosedException('Broken pipe or closed connection');
            }
    
            if (empty($buffer)) {
                $read_now = microtime(true);
                $t_read = $read_now - $read_start;
                if ($t_read > $this->read_timeout) {
                    throw new AMQPTimeoutException('Too many read attempts detected in SocketIO');
                }
                $this->select($timeout_sec, $timeout_uSec);
                continue;
            }
    
            $read += mb_strlen($buffer, 'ASCII');
            $data .= $buffer;
        }
    
        if (mb_strlen($data, 'ASCII') != $len) {
            throw new AMQPIOException(sprintf(
                'Error reading data. Received %s instead of expected %s bytes',
                mb_strlen($data, 'ASCII'),
                $len
            ));
        }
    
        $this->last_read = microtime(true);
    
        return $data;
    }
    

    我们调用read后,会主动触发检测心跳包,之后会设置last_read,在第二次手动调用的时候就会发送心跳了。

    这里给大家的建议是处理消息尽量快速,最好不要用hack

    参考链接

    1. Keeping RabbitMQ connections alive in PHP
  • 相关阅读:
    FineReport图表、参数、填报、决策报表制作
    FineReport入门
    python时间计算:当天、前一天、月初、月末、季初、季末、半年初、半年末、年初、年末
    授予mysql的其他用户数据库的使用权限
    Python日志记录
    JStorm:概念与编程模型
    ftp传输图片损坏原因
    web应用中浏览器与服务端的编码和解码
    Http协议中的CharacterEncoding、Content-Encoding和Transfer-Encoding
    设计模式心法之单一职责原
  • 原文地址:https://www.cnblogs.com/mingao/p/10626297.html
Copyright © 2011-2022 走看看