zoukankan      html  css  js  c++  java
  • rabbitmq 延时队列

    前言

    某个产品 或者订单,有个有效期 过了有效期要取消

    方法一 : 写个脚本,用crontab 定时扫描 改变状态 但是最低只能一分钟 ,不适合

    方法二 : 用swoole得毫秒定时器,每秒钟去扫描表 明显占用资源 mysql受不了 

    方法三 :用rabbitmq延时队列 一开始将其丢入mq 死信队列,设置有效期,过时转发到其他队列,再启动一个消费者 消费  更改表状态 

    php 安装mq扩展

    https://www.cnblogs.com/brady-wang/p/7662393.html

    搭建mq服务

    https://www.cnblogs.com/brady-wang/p/7660174.html

    创建生产者和消费者

    生产者  publish.php

    <?php
    
    header('Content-Type:text/html;charset=utf8;');
    $time = 30;
    
    $params = array(
    	'exchangeName' => 'test_cache_exchange'."_".$time,
    	'queueName' => 'test_cache_queue'."_".$time,
    	'routeKey' => 'test_cache_route'."_".$time,
    );
    
    $connectConfig = array(
    	'host' => '127.0.0.1',
    	'port' => 5672,
    	'login' => 'admin',
    	'password' => 'password',
    	'vhost' => '/'
    );
    
    //var_dump(extension_loaded('amqp'));
    //
    //exit();
    try {
    	$conn = new AMQPConnection($connectConfig);
    	$conn->connect();
    	if (!$conn->isConnected()) {
    		//die('Conexiune esuata');
    		//TODO 记录日志
    		echo 'rabbit-mq 连接错误:', json_encode($connectConfig);
    		exit();
    	}
    	$channel = new AMQPChannel($conn);
    	if (!$channel->isConnected()) {
    		// die('Connection through channel failed');
    		//TODO 记录日志
    		echo 'rabbit-mq Connection through channel failed:', json_encode($connectConfig);
    		exit();
    	}
    	$exchange = new AMQPExchange($channel);
    	$exchange->setFlags(AMQP_DURABLE);//持久化
    	$exchange->setName($params['exchangeName']?:'');
    	$exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
    	$exchange->declareExchange();
    
    	//$channel->startTransaction();
    
    	$queue = new AMQPQueue($channel);
    	$queue->setName($params['queueName']?:'');
    	$queue->setFlags(AMQP_DURABLE);
    
    	// 和普通生产者区别 在这 下面是过期时间和转发到的路由
    	$queue->setArguments(array(
    		'x-dead-letter-exchange' => 'delay_exchange',
    		'x-dead-letter-routing-key' => 'delay_route',
    		'x-message-ttl' => $time*1000,
    	));
    	$queue->declareQueue();
    
    	//绑定
    	$queue->bind($params['exchangeName'], $params['routeKey']);
    } catch(Exception $e) {
    
    }
    
    
    //$num = mt_rand(100, 500);
    $num = 1;
    
    //生成消息
    $exchange->publish(date("Y-m-d H:i:s"), $params['routeKey'], AMQP_MANDATORY, array('delivery_mode'=>2));
    

      

    消费者 consumer.php

    <?php
    
    header('Content-Type:text/html;charset=utf8;');
    
    
    $params = array(
    	'exchangeName' => 'delay_exchange',
    	'queueName' => 'delay_queue',
    	'routeKey' => 'delay_route',
    );
    
    $connectConfig = array(
    	'host' => 'localhost',
    	'port' => 5672,
    	'login' => 'admin',
    	'password' => 'password',
    	'vhost' => '/'
    );
    
    //var_dump(extension_loaded('amqp'));
    
    //exit();
    
    try {
    	$conn = new AMQPConnection($connectConfig);
    	$conn->connect();
    	if (!$conn->isConnected()) {
    		//die('Conexiuneesuata');
    //TODO记录日志
    		echo 'rabbit-mq连接错误:', json_encode($connectConfig);
    		exit();
    	}
    	$channel = new AMQPChannel($conn);
    	if (!$channel->isConnected()) {
    		//die('Connectionthroughchannelfailed');
    //TODO记录日志
    		echo 'rabbit-mqConnectionthroughchannelfailed:', json_encode($connectConfig);
    		exit();
    	}
    	$exchange = new AMQPExchange($channel);
    	$exchange->setFlags(AMQP_DURABLE);//声明一个已存在的交换器的,如果不存在将抛出异常,这个一般用在consume端
    	$exchange->setName($params['exchangeName'] ?: '');
    	$exchange->setType(AMQP_EX_TYPE_DIRECT);//direct类型
    	$exchange->declareExchange();
    
    //$channel->startTransaction();
    
    	$queue = new AMQPQueue($channel);
    	$queue->setName($params['queueName'] ?: '');
    	$queue->setFlags(AMQP_DURABLE);
    	$queue->declareQueue();
    
    //绑定
    	$queue->bind($params['exchangeName'], $params['routeKey']);
    } catch (Exception$e) {
    	echo $e->getMessage();
    	exit();
    }
    
    function callback(AMQPEnvelope $message){
    	global $queue;
    	if ($message) {
    		$body = $message->getBody();
    		echo $body . PHP_EOL;
    		$queue->ack($message->getDeliveryTag());
    	} else {
    		echo 'nomessage' . PHP_EOL;
    	}
    }
     
    //$queue->consume('callback');第一种消费方式,但是会阻塞,程序一直会卡在此处
     
    //第二种消费方式,非阻塞
    $start = time();
    while (true) {
    	$message = $queue->get();
    	if (!empty($message)) {
    		echo $message->getBody()."--失效时间 ".date("Y-m-d H:i:s"). PHP_EOL;
    		$queue->ack($message->getDeliveryTag());//应答,代表该消息已经消费
    //		$end = time();
    //		echo '<br>' . ($end - $start);
    
    	} else {
    		//echo'messagenotfound'.PHP_EOL;
    	}
    }
    

      

    执行推送 我改了不同时间推送,会生成不同的交换机 路由 队列,因为我用得是direct类型  要一一匹配

    消费者开启 

    [root@localhost mq]# php consumer.php 
    2020-07-18 11:07:22--失效时间 2020-07-18 11:07:42
    2020-07-18 11:07:22--失效时间 2020-07-18 11:07:42
    2020-07-18 11:07:23--失效时间 2020-07-18 11:07:43
    2020-07-18 11:07:23--失效时间 2020-07-18 11:07:43
    2020-07-18 11:13:04--失效时间 2020-07-18 11:13:24
    
    2020-07-18 11:21:00--失效时间 2020-07-18 11:21:10
    2020-07-18 11:21:32--失效时间 2020-07-18 11:22:02
    2020-07-18 11:21:32--失效时间 2020-07-18 11:22:02
    2020-07-18 11:21:22--失效时间 2020-07-18 11:22:12
    2020-07-18 11:21:23--失效时间 2020-07-18 11:22:13
    2020-07-18 11:21:23--失效时间 2020-07-18 11:22:13
    

      

    发现正常,都是我设置的事件过期后就到处理队列,在这里消费,处理逻辑即可 

     参考 https://www.cnblogs.com/Zhangcsc/p/11739754.html

    https://blog.csdn.net/weixin_34310369/article/details/92262465?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-2

  • 相关阅读:
    [原创]c#快速排序类 Virus
    [原创]关系,依赖, Virus
    [原创]外包 Virus
    [原创]异步调用I/O方法的使用 Virus
    [原创]一个查找并且替换的算法 Virus
    封装原来的DirectoryInfo类,添加事件,可以代替FileSystemWatcher 类 Virus
    [原创]包头人在北京<一> Virus
    [原创]异步调用,多线程,委托 Virus
    [原创]异步,跨线程,非阻塞,DNS,Socket Virus
    [原创]大家动脑吧,一个面试题 Virus
  • 原文地址:https://www.cnblogs.com/brady-wang/p/13335104.html
Copyright © 2011-2022 走看看