zoukankan      html  css  js  c++  java
  • yii使用RocketMq服务

    1、SDK安装:composer.json

    {
      "require": {
         "aliyunmq/mq-http-sdk": ">=1.0.1"
      }
    }
    

    2、config配置文件增加配置

    'rocketmq' => [
            "endPoint" => 'http://test.cn-shanghai.aliyuncs.com',
            "accessId" => 'test123456',
            "accessKey" => 'test12456789',
            "consumer" =>
                [
                    'test' => 'GID_TEST_TEST_1',
                ],
            "instanceId" => 'MQ_INST_123456_Bdffdsf',
            "topic" => [
                'test' => 'test',
            ]
        ]
    

    3、服务类

    <?php
    
    namespace common	est;
    
    use MQModelMessage;
    use MQModelTopicMessage;
    use MQMQClient;
    use MQMQConsumer;
    use MQMQProducer;
    use Yii;
    
    class RocketMqService
    {
        /**
         * @var MQClient
         */
        private $client;
    
        /**
         * @var MQConsumer
         */
        private $consumer = null;
        /**
         * @var MQProducer
         */
        private $producer = null;
    
        private $topic;
        private $groupId;
        private $instanceId;
    
        public function __construct($service)
        {
            $this->client = new MQClient(
                Yii::$app->params['rocketmq']['endPoint'],
                Yii::$app->params['rocketmq']['accessId'],
                Yii::$app->params['rocketmq']['accessKey']
            );
            $this->instanceId = Yii::$app->params['rocketmq']['instanceId'];
            $this->groupId = isset(Yii::$app->params['rocketmq']['consumer'][$service]);
            $this->topic = Yii::$app->params['rocketmq']['topic'][$service];
        }
    
        /**
         * 发送mq消息
         * @param array $content 消息体数组
         * @param string $tag 消息tag
         * @return mixed
         * @author K.k
         */
        public function sendMessage(array $content, string $tag)
        {
            if (!$this->producer){
                $this->producer = $this->client->getProducer($this->instanceId, $this->topic);
            }
            $message = new TopicMessage(json_encode($content));
            $message->setMessageTag($tag);
            return $this->producer->publishMessage($message);
        }
    
        /**
         * 订阅消息
         * @param string $tag
         * @throws
         */
        public function subscribeMq($tag = null)
        {
            $this->consumer = $this->client->getConsumer($this->instanceId, $this->topic, $this->groupId, $tag);
        }
    
    
        /**
         * 消息确认
         * @param Message $message
         */
        public function ack(Message $message)
        {
    
            $receiptHandles[] = $message->getReceiptHandle();
            $this->consumer->ackMessage($receiptHandles);
    
        }
    
        /**
         * 订阅消息
         * @param $tag
         * @param int $numOfMessages
         * @param int $waitSeconds
         * @return bool|Message
         */
        public function consumerMessage($tag, $numOfMessages = 5, $waitSeconds = 0)
        {
            $this->consumer = $this->client->getConsumer($this->instanceId, $this->topic, $this->groupId, $tag);
    
            try {
    
                $messages = $this->consumer->consumeMessage($numOfMessages, $waitSeconds);
            } catch (Exception $e) {
    
                return false;
            }
    
            return $messages;
    
        }
    }
    

    4、发送消息

    $body = [
                'userId' => 123456,
                'userName' => 'test',
            ];
            $mq = new RocketMqService('test');
            $result = $mq->sendMessage($body, 'test_tag');
    

    5、消费消息

    try{
                
                $rockMqService = new RocketMqService();
    
                while (true) {
    
                    $messages = $rockMqService->consumerMessage('test');
    
                    if (!$messages) {
    
                        break;
                    }
    
                    $receiptHandles = array();
                    foreach ($messages as $message) {
                        $receiptHandles[] = $message->getReceiptHandle();
    
                        $body = $message->getMessageBody();
                        $data = json_decode($body, true);
                        
                        //code......
                        
                    }
    
                    $rockMqService->ackConsumerMessage($receiptHandles);
                    
                    sleep(3);
                    
                }
    
    
    
            }catch (Exception $e) {
    
            }
    

      

  • 相关阅读:
    no-useless-call (Rules) – Eslint 中文开发手册
    Java 8 Stream
    CSS3 ,checked 选择器
    MySQL 数据类型
    _Alignas (C keywords) – C 中文开发手册
    C 库函数 – modf()
    JavaScript E 属性
    SyntaxError.prototype (Errors) – JavaScript 中文开发手册
    swagger和openAPI: 上传文件
    Java中HashMap的putAll()方法: HashMap.putAll()
  • 原文地址:https://www.cnblogs.com/diguaer/p/14549523.html
Copyright © 2011-2022 走看看