zoukankan      html  css  js  c++  java
  • php 利用activeMq+stomp实现消息队列

    php 利用activeMq+stomp实现消息队列

    一、activeMq概述

      ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。

    二、特性列表

    ⒈ 多种语言和协议编写客户端。语言: Java,C,C++,C#,Ruby,Perl,Python,PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
    ⒉ 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
    ⒊ 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
    ⒋ 通过了常见J2EE服务器(如 Geronimo,JBoss 4,GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
    ⒌ 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
    ⒍ 支持通过JDBC和journal提供高速的消息持久化
    ⒎ 从设计上保证了高性能的集群,客户端-服务器,点对点
    ⒏ 支持Ajax
    ⒐ 支持与Axis的整合
    ⒑ 可以很容易的调用内嵌JMS provider,进行测试

    三、运行环境

      在本地或者服务器中安装activeMq,以Windows为例:

    A、 windows下部署

    ActiveMQ部署其实很简单,和所有Java一样,要跑java程序就必须先安装JDK并配置好环境变量,这个很简单。

    然后解压下载的apache-activemq-5.10-20140603.133406-78-bin.zip压缩包到一个目录,得到解压后的目录结构如下图:

    进入bin目录,发现有win32和win64两个文件夹,这2个文件夹分别对应windows32位和windows64位操作系统的启动脚本。

    我的实验环境是windowsXP,就进入win32目录,会看到如下目录结构。

    其中activemq.bat便是启动脚本,双击启动。

    ActiveMQ默认启动到8161端口,启动完了后在浏览器地址栏输入:http://localhost:8161/admin要求输入用户名密码,默认用户名密码为admin、admin,这个用户名密码是在conf/users.properties中配置的。输入用户名密码后便可看到如下图的ActiveMQ控制台界面了。

    四、生产消息

      本文使用stomp协议实现mq队列 ,项目中要加载stomp 包;  

      在需要使用队列的地方,导入该包:

      

    use AppLibrariesStompStomp;

      将消息放入activeMq中:

      

        protected function sendToMQ($destination, $msg_data, $persistent = false)
        {
            try {
                $con = new Stomp(config('app.mq_url'));
                $con->connect();
                $con->begin("Transaction");
                $con->send($destination, json_encode($msg_data), array('persistent'=> $persistent));
                $con->commit("Transaction");
                $con->disconnect();
            } catch (Exception $e) {
                app('log')->warn($e->getMessage());
            }
        }

      1.destination是指队列名称;

      2.msg_data队列中存放的数据;

      3.persistent是否同步; 

      4.app.mq_url是指activeMq安装的服务器地址及端口号,如tcp://localhost:61613;

      经过段代码之后,我们就将数据msg_data放到了队列destination中了;

    五、消费消息

      在activeMq中的消息如何消费呢?一般情况下我们会建立一个cronjob来定时消费队列的消息,消费队列消息主要代码如下:

            try {
                $this->consumer = new Stomp($this->activemq_uri); //$this->acitvemq_uri就是上面的activeMq地址app.mq_url
                if (!$this->consumer->isConnected()) {
                    $this->consumer->connect();
                    $this->consumer->setReadTimeout(3);
                }
                app('log')->info($this->log_remark . "connect to active mq success");
            } catch (StompException $e) {
                app('log')->info("connect to active mq failed : " . $e->getMessage());
                die();
            }
            $queue = self::getQueue();//得到队列名称,上面定义的destination
            $this->consumer->subscribe($queue);  //订阅
            //循环读帧
            while ($this->consumer->hasFrameToRead()) {
                try {
                    $message = $this->consumer->readFrame();
                    $data = json_decode($message->body, true);  // 这里其实就是得到了上面的队列消息msg_data
                    //验证数据并更新数据
                    $handle_result = self::removeDuplicateExpressInfo($data['traces'], $data['shipping_id']);
                    if ($handle_result) {
                        $this->consumer->ack($message);
                    }
                } catch (Exception $e) {
                    app('log')->info("handle with message failed : " . $e->getMessage());
                    if (!$this->consumer->isConnected()) {
                        $this->consumer->connect();
                    } else {
                        break;
                    }
                }
            }
            $this->consumer->unsubscribe($queue);    //释放订阅
            $this->consumer->disconnect();      //端口连接
  • 相关阅读:
    B站14天数据分析笔记6次课笔记
    B站14天数据分析笔记5次课作业
    B站14天数据分析笔记5次课Pandas
    1037 在霍格沃茨找零钱 (20 point(s))
    第一章:第三节探索性数据分析
    Java没有运行选项
    Eclipse截图的时候错误提示消失/复制粘贴错误信息
    错误记录_css属性的值一定不需要引号么?
    错误记录_语法哪里错了?
    微软输入法使用
  • 原文地址:https://www.cnblogs.com/sdgf/p/6233255.html
Copyright © 2011-2022 走看看