1、SDK安装:composer.json
{
"require": {
"aliyunmq/mq-http-sdk": ">=1.0.1"
}
}
2、config配置文件增加配置
'rocketmq' => [
"endPoint" => 'http://test.cn-shanghai.aliyuncs.com',
"accessId" => 'test123456',
"accessKey" => 'test12456789',
"consumer" =>
[
'test' => 'GID_TEST_TEST_1',
],
"instanceId" => 'MQ_INST_123456_Bdffdsf',
"topic" => [
'test' => 'test',
]
]
3、服务类
<?php
namespace common est;
use MQModelMessage;
use MQModelTopicMessage;
use MQMQClient;
use MQMQConsumer;
use MQMQProducer;
use Yii;
class RocketMqService
{
/**
* @var MQClient
*/
private $client;
/**
* @var MQConsumer
*/
private $consumer = null;
/**
* @var MQProducer
*/
private $producer = null;
private $topic;
private $groupId;
private $instanceId;
public function __construct($service)
{
$this->client = new MQClient(
Yii::$app->params['rocketmq']['endPoint'],
Yii::$app->params['rocketmq']['accessId'],
Yii::$app->params['rocketmq']['accessKey']
);
$this->instanceId = Yii::$app->params['rocketmq']['instanceId'];
$this->groupId = isset(Yii::$app->params['rocketmq']['consumer'][$service]);
$this->topic = Yii::$app->params['rocketmq']['topic'][$service];
}
/**
* 发送mq消息
* @param array $content 消息体数组
* @param string $tag 消息tag
* @return mixed
* @author K.k
*/
public function sendMessage(array $content, string $tag)
{
if (!$this->producer){
$this->producer = $this->client->getProducer($this->instanceId, $this->topic);
}
$message = new TopicMessage(json_encode($content));
$message->setMessageTag($tag);
return $this->producer->publishMessage($message);
}
/**
* 订阅消息
* @param string $tag
* @throws
*/
public function subscribeMq($tag = null)
{
$this->consumer = $this->client->getConsumer($this->instanceId, $this->topic, $this->groupId, $tag);
}
/**
* 消息确认
* @param Message $message
*/
public function ack(Message $message)
{
$receiptHandles[] = $message->getReceiptHandle();
$this->consumer->ackMessage($receiptHandles);
}
/**
* 订阅消息
* @param $tag
* @param int $numOfMessages
* @param int $waitSeconds
* @return bool|Message
*/
public function consumerMessage($tag, $numOfMessages = 5, $waitSeconds = 0)
{
$this->consumer = $this->client->getConsumer($this->instanceId, $this->topic, $this->groupId, $tag);
try {
$messages = $this->consumer->consumeMessage($numOfMessages, $waitSeconds);
} catch (Exception $e) {
return false;
}
return $messages;
}
}
4、发送消息
$body = [
'userId' => 123456,
'userName' => 'test',
];
$mq = new RocketMqService('test');
$result = $mq->sendMessage($body, 'test_tag');
5、消费消息
try{
$rockMqService = new RocketMqService();
while (true) {
$messages = $rockMqService->consumerMessage('test');
if (!$messages) {
break;
}
$receiptHandles = array();
foreach ($messages as $message) {
$receiptHandles[] = $message->getReceiptHandle();
$body = $message->getMessageBody();
$data = json_decode($body, true);
//code......
}
$rockMqService->ackConsumerMessage($receiptHandles);
sleep(3);
}
}catch (Exception $e) {
}