zoukankan      html  css  js  c++  java
  • 工作中使用RabbitMQ

    写一个基类

      1 <?php
      2 
      3 namespace BIServiceRabbitMQJob;
      4 
      5 use AMQPConnection;
      6 use AMQPChannel;
      7 use AMQPExchange;
      8 use AMQPQueue;
      9 use Exception;
     10 
     11 abstract class Base{
     12 
     13     Const EXCHANGE_TYPE_DIRECCT = AMQP_EX_TYPE_DIRECT;
     14     Const EXCHANGE_TYPE_FANOUT  = AMQP_EX_TYPE_FANOUT;
     15     Const EXCHANGE_TYPE_TOPIC   = AMQP_EX_TYPE_TOPIC;
     16     
     17     Const JOB_TYPE_MAIL = 'mail';
     18     Const JOB_TYPE_TEST = 'test';
     19     Const JOB_TYPE_STRUCTURE = 'structure';
     20 
     21     /** @var  AMQPChannel */
     22     protected $channel;
     23 
     24     /** @var  AMQPExchange */
     25     protected $exchange;
     26 
     27     public function __construct($config, $vhost = '/')
     28     {
     29         extract($config);
     30         $connArgs = array('host' => $host, 'port' => $port, 'login' => $login, 'password' => $password, 'vhost' => $vhost);
     31         
     32         try {
     33             $conn = new AMQPConnection($connArgs);
     34             if (!$conn->connect()) {
     35                 throw new Exception('Cannot connect to the broker with parameters: ' . json_encode($connArgs));
     36             }
     37             $this->channel = new AMQPChannel($conn);
     38             $this->declareExchange();
     39             $this->bindQueues();
     40         } catch (Exception $e) {
     41             throw new Exception($e->getMessage());
     42         }
     43     }
     44 
     45     protected function declareExchange(){
     46         $this->exchange = new AMQPExchange($this->channel);
     47         $this->exchange->setName($this->getExchangeName());
     48         $this->exchange->setType($this->getExchangeType());
     49         $this->exchange->setFlags(AMQP_DURABLE);
     50         if(!$this->exchange->declareExchange())
     51             throw new Exception('Exchange declare failed.');
     52     }
     53 
     54     protected function bindQueues(){
     55         foreach($this->getQueueNamesAndRoutingKeys() as $queueName => $routingKey){
     56             $queue = new AMQPQueue($this->channel);
     57             $queue->setName($queueName);
     58             $queue->setFlags(AMQP_DURABLE);
     59             $queue->declareQueue();
     60             if(!$queue->bind($this->getExchangeName(), $routingKey))
     61                 throw new Exception('Queue binding failed with parameters: ',
     62                     json_encode(array('name' => $queueName, 'routingKey' => $routingKey)));
     63         }
     64     }
     65 
     66     /**
     67      * @param mixed $content
     68      * @param string $routingKey
     69      * @return bool
     70      * @throws Exception
     71      */
     72     protected function send($content, $routingKey = null){
     73         if(!in_array($routingKey, $this->getQueueNamesAndRoutingKeys()))
     74             throw new Exception('RoutingKey: ' . $routingKey
     75                 . ' is not found in the routing key list from the function getQueueNamesAndRoutingKeys');
     76 
     77         $jobType = $this->getRabbitMQJobType();
     78         if(!$this->validateJobType($jobType))
     79             throw new Exception('Invalid Job Type.');
     80 
     81         $message = array(
     82             'MType' => $jobType,
     83             'Content' => $content,
     84         );
     85         return $this->exchange->publish(json_encode($message), $routingKey);
     86     }
     87     
     88     protected function get($rk) {
     89         if (!in_array($rk, $this->getQueueNamesAndRoutingKeys())) {
     90             throw new Exception("RoutingKey: $rk is not found");
     91         }
     92     }
     93 
     94     /**
     95      * @param string $jobType
     96      * @return bool
     97      */
     98     protected function validateJobType($jobType){
     99         return in_array($jobType, array(
    100             self::JOB_TYPE_MAIL,
    101             self::JOB_TYPE_TEST,
    102             self::JOB_TYPE_STRUCTURE,
    103         ));
    104     }
    105 
    106     function __destruct(){
    107         $this->channel->getConnection()->disconnect();
    108     }
    109 
    110 
    111     /**
    112      * @return string
    113      */
    114     abstract protected function getRabbitMQJobType();
    115 
    116     /**
    117      * @return string
    118      */
    119     abstract protected function getExchangeName();
    120 
    121     /**
    122      * @return string
    123      */
    124     abstract protected function getExchangeType();
    125 
    126 
    127     /**
    128      * @return array queue_name => routing_key
    129      */
    130     abstract protected function getQueueNamesAndRoutingKeys();
    131 }

    写一个service继承基类

      1 <?php
      2 
      3 namespace MissionService;
      4 use BIServiceRabbitMQJobBase;
      5 use MonologHandlerStreamHandler;
      6 use MonologLogger;
      7 
      8 class PublishToMQService extends Base
      9 {
     10     private $message;
     11     private $logger;
     12     private $error;
     13     protected $queue = 'mission_queue';
     14     protected $routingKey = 'api_update_mission';
     15 
     16     /**
     17      * @return bool
     18      */
     19     public function publish()
     20     {
     21         if ( false === $this->_validation() )
     22             return false;
     23 
     24         $this->getLogger()->addDebug(__METHOD__, array('MQMessage' => $this->message));
     25         $this->sendToQueue($this->message, $this->queue);
     26 
     27         return true;
     28     }
     29 
     30 
     31     /**
     32      * @param array $message
     33      * @return $this
     34      */
     35     public function setMessage( $message = array() )
     36     {
     37         $this->message = $message;
     38 
     39         return $this;
     40     }
     41 
     42     /**
     43      * @param $queue
     44      * @return $this
     45      */
     46     public function setQueue( $queue )
     47     {
     48         $this->queue = $queue;
     49 
     50         return $this;
     51     }
     52 
     53     /**
     54      * @param $routingKey
     55      * @return $this
     56      */
     57     public function setRoutingKey( $routingKey )
     58     {
     59         $this->routingKey = $routingKey;
     60 
     61         return $this;
     62     }
     63 
     64     /**
     65      * @return Logger
     66      */
     67     private function getLogger()
     68     {
     69         if (!($this->logger instanceof Logger)) {
     70             $this->logger = new Logger('Detection');
     71             $file = __DIR__ . DIRECTORY_SEPARATOR . '../../../logs/queue.log';
     72             $this->logger->pushHandler(new StreamHandler( $file, Logger::DEBUG ));
     73         }
     74         return $this->logger;
     75     }
     76 
     77     /**
     78      * @return bool
     79      */
     80     private function _validation()
     81     {
     82         if ( empty( $this->message ) ) {
     83             $this->error = 'Message cannot be empty.';
     84             return false;
     85         }
     86 
     87         return true;
     88     }
     89 
     90     /**
     91      * @return string
     92      */
     93     protected function getExchangeName()
     94     {
     95         return 'API';
     96     }
     97 
     98     /**
     99      * @return string
    100      */
    101     protected function getRabbitMQJobType()
    102     {
    103         return Base::JOB_TYPE_TEST;
    104     }
    105 
    106     /**
    107      * @return string
    108      */
    109     protected function getExchangeType()
    110     {
    111         return parent::EXCHANGE_TYPE_DIRECCT;
    112     }
    113 
    114     /**
    115      * @return array queue_name => routing_key
    116      */
    117     protected function getQueueNamesAndRoutingKeys()
    118     {
    119         return array(
    120             $this->queue => $this->routingKey
    121         );
    122     }
    123 
    124     private function sendToQueue($content, $queueName)
    125     {
    126         $key = $this->getQueueNamesAndRoutingKeys();
    127         return parent::send($content, $key[$queueName]);
    128     }
    129 
    130     /**
    131      * @return mixed
    132      */
    133     public function getError()
    134     {
    135         return $this->error;
    136     }
    137 
    138 }

    在代码层调用service

     1 class AlarmController
     2 {    
     3     const QUEUE = 'internal_message';
     4     const ROUTING_KEY = 'api_internal_message';
     5     public function checkTipAlarm()
     6     {
     7         //在线,写队列通知新站内信
     8         /**@var PublishToMQService $publishHandler*/
     9         $publishHandler = $this->get( 'mission.publish.RabbitMQ' );
    10 
    11         $message = array(
    12              'act' => self::ACT_SYSTEM_NEW_INMAIL,
    13              'psid' => (is_numeric($this->request['to']) ? $this->request['to'] : ''),
    14              'uuid' => (!is_numeric($this->request['to']) ? $this->request['to'] : ''),
    15              'data' => array(
    16                  'owner' => $this->uuid
    17              )
    18         );
    19 
    20         $publishHandler->setMessage( $message )->setRoutingKey( self::ROUTING_KEY )->setQueue( self::QUEUE );
    21 
    22         if ( false === $publishHandler->publish() ) {
    23             $this->error =  array(
    24                 'errorMsg' => $publishHandler->getError(),
    25                 'errorCode' => 1
    26             );
    27             return false;
    28         }
    29     }
    30 }
  • 相关阅读:
    Anaconda 和 Jupyter notebook
    DOM基础之创建元素
    python爬虫入门学习3 Requests请求库
    04 字典类型已内置方法
    05 流程控制
    03 可变类型与不可变类型
    02 元组数据类型
    01 列表内置方法
    day2笔记
    00 python基础知识
  • 原文地址:https://www.cnblogs.com/spectrelb/p/6806755.html
Copyright © 2011-2022 走看看