zoukankan      html  css  js  c++  java
  • C++ activemq CMS 学习笔记

    很早前就仓促的接触过activemq,但当时太赶时间.后面发现activemq 需要了解的东西实在是太多了.

    关于activemq 一直想起一遍文章.但也一直缺少自己的见解.或许是网上这些文章太多了.也可能是自己知识还不足够.

    0,activemq-cpp 能解决什么问题.

    实际应用就是让开发者能从多线程,多消息通信中解救出来.更多的关注应用逻辑.

    CMS (stands for C++ Messaging Service) is a JMS-like API for C++ for interfacing with Message Brokers such as Apache ActiveMQ. CMS helps to make your C++ client code much neater and easier to follow. To get a better feel for CMS try the API Reference. ActiveMQ-CPP is a client only library, a message broker such as Apache ActiveMQ is still needed for your clients to communicate.

    Our implementation of CMS is called ActiveMQ-CPP, which has an architecture that allows for pluggable transports and wire formats. Currently we support the OpenWire and Stomp protocols, both over TCP and SSL, we also now support a Failover Transport for more reliable client operation. In addition to CMS, ActiveMQ-CPP also provides a robust set of classes that support platform independent constructs such as threading, I/O, sockets, etc. You may find many of these utilities very useful, such as a Java like Thread class or the "synchronized" macro that let's you use a Java-like synchronization on any object that implements the activemq::concurrent::Synchronizable interface. ActiveMQ-CPP is released under the Apache 2.0 License

    大意:

    CMS (C++ 消息 服务)是一个面象apache activemq 的 消息 中间层的C++接口.

    CMS的实现 叫做activemq-cpp ,不过当前只支持 openwire,amqp,TCP,ssl. 现在还支持 主备切换功能(这个是重点,当时我不懂,结果就走了弯路!_!).

    -_- ,意思是 activemqconfactivemq.xml中的stomp,mqtt,ws 是没办法的.

    复制代码
    <transportConnectors>
    <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    </transportConnectors>
    复制代码

    1,acticvemq-cpp 的配置使用.

    参考:Active MQ  C++实现通讯 http://blog.csdn.net/lee353086/article/details/6777261

    activemq-cpp下载地址:

    http://activemq.apache.org/cms/download.html

    相关依赖库

    在 http://activemq.apache.org/cms/building.html  中是有介绍的.不过是en的.

    还是再说下吧.本人en也特差.

    "With versions of ActiveMQ-CPP 2.2 and later, we have a dependency on the Apache Portable Runtime project. You'll need to install APR on your system before you'll be able to build ActiveMQ-CPP."

    "The package contains a complete set of CppUnit tests. In order for you to build an run the tests, you will need to download and install the CppUnit library. See http://cppunit.sourceforge.net/cppunit-wiki"

    所以就包含了:apr,apr-iconv,apr-util,cppunit.

    http://mirrors.hust.edu.cn/apache/apr/

    中可以下载 apr,apr-iconv,apr-util(版本号都找最高的,不要一高一低,不然编译会出问题).

    apr-1.5.1-win32-src.zip,

    apr-iconv-1.2.1-win32-src-r2.zip,

    apr-util-1.5.4-win32-src.zip.

    解压后记得重命令文件夹,去掉版本号,改成如下图,不然工程编译时默认的 [附加包含目录] 是找不到的.

    所有文件夹放在一个根目录下.

    打开  activemq-cpp-libraryvs2008-buildactivemq-cpp.sln 

    依次添加[现在项目]:libapr.vcproj,libapriconv.vcproj,libaprutil.vcproj. 只需要lib项就行了.

    最后项目图:

    libapriconv.vcproj,libaprutil.vcproj 的[项目依赖项]都需要libapr

    activemq-cpp 的[项目依赖项]需要libapriconv,libaprutil,libapr.

    activemq-cpp 的[附加包含目录] 需要包含 这三个的的include目录. 

     经过漫长的编译后,

    这个大lib文件就出来.

    activemq-cpp-example 这个工程 ,就有 hello world 的代码.

    2,activemq-cpp-example 项目代码解析.

     通过这个项目可以让我们更好的认识 activemq-cpp的结构.

    复制代码
    复制代码
    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    // START SNIPPET: demo
    
    #include <activemq/library/ActiveMQCPP.h>
    #include <decaf/lang/Thread.h>
    #include <decaf/lang/Runnable.h>
    #include <decaf/util/concurrent/CountDownLatch.h>
    #include <decaf/lang/Integer.h>
    #include <decaf/lang/Long.h>
    #include <decaf/lang/System.h>
    #include <activemq/core/ActiveMQConnectionFactory.h>
    #include <activemq/util/Config.h>
    #include <cms/Connection.h>
    #include <cms/Session.h>
    #include <cms/TextMessage.h>
    #include <cms/BytesMessage.h>
    #include <cms/MapMessage.h>
    #include <cms/ExceptionListener.h>
    #include <cms/MessageListener.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <iostream>
    #include <memory>
    
    using namespace activemq::core;
    using namespace decaf::util::concurrent;
    using namespace decaf::util;
    using namespace decaf::lang;
    using namespace cms;
    using namespace std;
    
    class HelloWorldProducer : public Runnable {
    private:
    
        Connection* connection;
        Session* session;
        Destination* destination;
        MessageProducer* producer;
        int numMessages;
        bool useTopic;
        bool sessionTransacted;
        std::string brokerURI;
    
    private:
    
        HelloWorldProducer(const HelloWorldProducer&);
        HelloWorldProducer& operator=(const HelloWorldProducer&);
    
    public:
    
        HelloWorldProducer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false) :
            connection(NULL),
            session(NULL),
            destination(NULL),
            producer(NULL),
            numMessages(numMessages),
            useTopic(useTopic),
            sessionTransacted(sessionTransacted),
            brokerURI(brokerURI) {
        }
    
        virtual ~HelloWorldProducer(){
            cleanup();
        }
    
        void close() {
            this->cleanup();
        }
    
        virtual void run() {
    
            try {
    
                // Create a ConnectionFactory
                auto_ptr<ConnectionFactory> connectionFactory(
                    ConnectionFactory::createCMSConnectionFactory(brokerURI));
    
                // Create a Connection
                connection = connectionFactory->createConnection();
                connection->start();
    
                // Create a Session
                if (this->sessionTransacted) {
                    session = connection->createSession(Session::SESSION_TRANSACTED);
                } else {
                    session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
                }
    
                // Create the destination (Topic or Queue)
                if (useTopic) {
                    destination = session->createTopic("TEST.FOO");
                } else {
                    destination = session->createQueue("TEST.FOO");
                }
    
                // Create a MessageProducer from the Session to the Topic or Queue
                producer = session->createProducer(destination);
                producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
    
                // Create the Thread Id String
                string threadIdStr = Long::toString(Thread::currentThread()->getId());
    
                // Create a messages
                string text = (string) "Hello world! from thread " + threadIdStr;
    
                for (int ix = 0; ix < numMessages; ++ix) {
                    std::auto_ptr<TextMessage> message(session->createTextMessage(text));
                    message->setIntProperty("Integer", ix);
                    printf("Sent message #%d from thread %s
    ", ix + 1, threadIdStr.c_str());
                    producer->send(message.get());
                }
    
            } catch (CMSException& e) {
                e.printStackTrace();
            }
        }
    
    private:
    
        void cleanup() {
    
            if (connection != NULL) {
                try {
                    connection->close();
                } catch (cms::CMSException& ex) {
                    ex.printStackTrace();
                }
            }
    
            // Destroy resources.
            try {
                delete destination;
                destination = NULL;
                delete producer;
                producer = NULL;
                delete session;
                session = NULL;
                delete connection;
                connection = NULL;
            } catch (CMSException& e) {
                e.printStackTrace();
            }
        }
    };
    
    class HelloWorldConsumer : public ExceptionListener,
                               public MessageListener,
                               public Runnable {
    
    private:
    
        CountDownLatch latch;
        CountDownLatch doneLatch;
        Connection* connection;
        Session* session;
        Destination* destination;
        MessageConsumer* consumer;
        long waitMillis;
        bool useTopic;
        bool sessionTransacted;
        std::string brokerURI;
    
    private:
    
        HelloWorldConsumer(const HelloWorldConsumer&);
        HelloWorldConsumer& operator=(const HelloWorldConsumer&);
    
    public:
    
        HelloWorldConsumer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, int waitMillis = 30000) :
            latch(1),
            doneLatch(numMessages),
            connection(NULL),
            session(NULL),
            destination(NULL),
            consumer(NULL),
            waitMillis(waitMillis),
            useTopic(useTopic),
            sessionTransacted(sessionTransacted),
            brokerURI(brokerURI) {
        }
    
        virtual ~HelloWorldConsumer() {
            cleanup();
        }
    
        void close() {
            this->cleanup();
        }
    
        void waitUntilReady() {
            latch.await();
        }
    
        virtual void run() {
    
            try {
    
                // Create a ConnectionFactory
                auto_ptr<ConnectionFactory> connectionFactory(
                    ConnectionFactory::createCMSConnectionFactory(brokerURI));
    
                // Create a Connection
                connection = connectionFactory->createConnection();
                connection->start();
                connection->setExceptionListener(this);
    
                // Create a Session
                if (this->sessionTransacted == true) {
                    session = connection->createSession(Session::SESSION_TRANSACTED);
                } else {
                    session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
                }
    
                // Create the destination (Topic or Queue)
                if (useTopic) {
                    destination = session->createTopic("TEST.FOO");
                } else {
                    destination = session->createQueue("TEST.FOO");
                }
    
                // Create a MessageConsumer from the Session to the Topic or Queue
                consumer = session->createConsumer(destination);
    
                consumer->setMessageListener(this);
    
                std::cout.flush();
                std::cerr.flush();
    
                // Indicate we are ready for messages.
                latch.countDown();
    
                // Wait while asynchronous messages come in.
                doneLatch.await(waitMillis);
    
            } catch (CMSException& e) {
                // Indicate we are ready for messages.
                latch.countDown();
                e.printStackTrace();
            }
        }
    
        // Called from the consumer since this class is a registered MessageListener.
        virtual void onMessage(const Message* message) {
    
            static int count = 0;
    
            try {
                count++;
                const TextMessage* textMessage = dynamic_cast<const TextMessage*> (message);
                string text = "";
    
                if (textMessage != NULL) {
                    text = textMessage->getText();
                } else {
                    text = "NOT A TEXTMESSAGE!";
                }
    
                printf("Message #%d Received: %s
    ", count, text.c_str());
    
            } catch (CMSException& e) {
                e.printStackTrace();
            }
    
            // Commit all messages.
            if (this->sessionTransacted) {
                session->commit();
            }
    
            // No matter what, tag the count down latch until done.
            doneLatch.countDown();
        }
    
        // If something bad happens you see it here as this class is also been
        // registered as an ExceptionListener with the connection.
        virtual void onException(const CMSException& ex AMQCPP_UNUSED) {
            printf("CMS Exception occurred.  Shutting down client.
    ");
            ex.printStackTrace();
            exit(1);
        }
    
    private:
    
        void cleanup() {
            if (connection != NULL) {
                try {
                    connection->close();
                } catch (cms::CMSException& ex) {
                    ex.printStackTrace();
                }
            }
    
            // Destroy resources.
            try {
                delete destination;
                destination = NULL;
                delete consumer;
                consumer = NULL;
                delete session;
                session = NULL;
                delete connection;
                connection = NULL;
            } catch (CMSException& e) {
                e.printStackTrace();
            }
        }
    };
    
    int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
    
        activemq::library::ActiveMQCPP::initializeLibrary();
        {
        std::cout << "=====================================================
    ";
        std::cout << "Starting the example:" << std::endl;
        std::cout << "-----------------------------------------------------
    ";
    
    
        // Set the URI to point to the IP Address of your broker.
        // add any optional params to the url to enable things like
        // tightMarshalling or tcp logging etc.  See the CMS web site for
        // a full list of configuration options.
        //
        //  http://activemq.apache.org/cms/
        //
        // Wire Format Options:
        // =========================
        // Use either stomp or openwire, the default ports are different for each
        //
        // Examples:
        //    tcp://127.0.0.1:61616                      default to openwire
        //    tcp://127.0.0.1:61616?wireFormat=openwire  same as above
        //    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp instead
        //
        // SSL:
        // =========================
        // To use SSL you need to specify the location of the trusted Root CA or the
        // certificate for the broker you want to connect to.  Using the Root CA allows
        // you to use failover with multiple servers all using certificates signed by
        // the trusted root.  If using client authentication you also need to specify
        // the location of the client Certificate.
        //
        //     System::setProperty( "decaf.net.ssl.keyStore", "<path>/client.pem" );
        //     System::setProperty( "decaf.net.ssl.keyStorePassword", "password" );
        //     System::setProperty( "decaf.net.ssl.trustStore", "<path>/rootCA.pem" );
        //
        // The you just specify the ssl transport in the URI, for example:
        //
        //     ssl://localhost:61617
        //
        std::string brokerURI =
            "failover:(tcp://localhost:61616"
    //        "?wireFormat=openwire"
    //        "&transport.useInactivityMonitor=false"
    //        "&connection.alwaysSyncSend=true"
    //        "&connection.useAsyncSend=true"
    //        "?transport.commandTracingEnabled=true"
    //        "&transport.tcpTracingEnabled=true"
    //        "&wireFormat.tightEncodingEnabled=true"
            ")";
    
        //============================================================
        // set to true to use topics instead of queues
        // Note in the code above that this causes createTopic or
        // createQueue to be used in both consumer an producer.
        //============================================================
        bool useTopics = true;
        bool sessionTransacted = false;
        int numMessages = 2000;
    
        long long startTime = System::currentTimeMillis();
    
        HelloWorldProducer producer(brokerURI, numMessages, useTopics);
            HelloWorldConsumer consumer(brokerURI, numMessages, useTopics, sessionTransacted);
    
        // Start the consumer thread.
        Thread consumerThread(&consumer);
        consumerThread.start();
    
        // Wait for the consumer to indicate that its ready to go.
        consumer.waitUntilReady();
    
        // Start the producer thread.
        Thread producerThread(&producer);
        producerThread.start();
    
        // Wait for the threads to complete.
        producerThread.join();
        consumerThread.join();
    
        long long endTime = System::currentTimeMillis();
        double totalTime = (double)(endTime - startTime) / 1000.0;
    
        consumer.close();
        producer.close();
    
        std::cout << "Time to completion = " << totalTime << " seconds." << std::endl;
        std::cout << "-----------------------------------------------------
    ";
        std::cout << "Finished with the example." << std::endl;
        std::cout << "=====================================================
    ";
    
        }
        activemq::library::ActiveMQCPP::shutdownLibrary();
    }
    
    // END SNIPPET: demo
    复制代码
    复制代码

     从main()开始吧.

     

     

    这样便简快速的实现了应用逻辑.

    3,activemq的几种通信模式.

    可以参考:

    http://shmilyaw-hotmail-com.iteye.com/blog/1897635

    目前本人需要的是activemq-cpp的 request-response 模式.

    4,activemq-cpp的 request-response 模式的应用.

     服务器与客户端通信 数据的交互 和 确认.

    以下是本人修改后的简单代码,bug可能存在,请指出.

    复制两份,一份定义 USE_COMSUMER 一份定义 USE_PRODUCER 就可以生成.

    复制代码
    /*
     * Licensed to the Apache Software Foundation (ASF) under one or more
     * contributor license agreements.  See the NOTICE file distributed with
     * this work for additional information regarding copyright ownership.
     * The ASF licenses this file to You under the Apache License, Version 2.0
     * (the "License"); you may not use this file except in compliance with
     * the License.  You may obtain a copy of the License at
     *
     *     http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing, software
     * distributed under the License is distributed on an "AS IS" BASIS,
     * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
     * See the License for the specific language governing permissions and
     * limitations under the License.
     */
    
    // START SNIPPET: demo
    
    #include <activemq/library/ActiveMQCPP.h>
    #include <decaf/lang/Thread.h>
    #include <decaf/lang/Runnable.h>
    #include <decaf/util/concurrent/CountDownLatch.h>
    #include <decaf/lang/Integer.h>
    #include <decaf/lang/Long.h>
    #include <decaf/lang/System.h>
    #include <activemq/core/ActiveMQConnectionFactory.h>
    #include <activemq/util/Config.h>
    #include <cms/Connection.h>
    #include <cms/Session.h>
    #include <cms/TextMessage.h>
    #include <cms/BytesMessage.h>
    #include <cms/MapMessage.h>
    #include <cms/ExceptionListener.h>
    #include <cms/MessageListener.h>
    #include <stdlib.h>
    #include <stdio.h>
    #include <iostream>
    #include <memory>
    
    #include <decaf/util/Random.h>
    
    using namespace activemq::core;
    using namespace decaf::util::concurrent;
    using namespace decaf::util;
    using namespace decaf::lang;
    using namespace cms;
    using namespace std;
    
    #define  QUEUE_NAME    "eventQueue"
    #define NAME_BYTE_LEN        16
    
    class HelloWorldProducer : public ExceptionListener,
                                public MessageListener,
                                public Runnable {
    private:
        CountDownLatch latch;
        CountDownLatch doneLatch;
        Connection* connection;
        Session* session;
        Destination* destination;
        MessageProducer* producer;
        int numMessages;
        bool useTopic;
        bool sessionTransacted;
        std::string brokerURI;
        bool bReciveMessage;
        long waitMillis;
    
    private:
    
        HelloWorldProducer(const HelloWorldProducer&);
        HelloWorldProducer& operator=(const HelloWorldProducer&);
    
    public:
    
        HelloWorldProducer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false,
            long waitMillis=3000) :
            latch(1),
            doneLatch(numMessages),  
            connection(NULL),
            session(NULL),
            destination(NULL),
            producer(NULL),
            numMessages(numMessages),
            useTopic(useTopic),
            sessionTransacted(sessionTransacted),
            brokerURI(brokerURI) ,
            bReciveMessage(false),
            waitMillis(waitMillis)
            { }
    
        virtual ~HelloWorldProducer(){
            cleanup();
        }
    
        void close() {
            this->cleanup();
        }
        
        void waitUntilReady() {
            latch.await();
        }
    
        virtual void run() {
    
            try {
    
                // Create a ConnectionFactory
                auto_ptr<ConnectionFactory> connectionFactory(
                    ConnectionFactory::createCMSConnectionFactory(brokerURI));
    
                // Create a Connection
                connection = connectionFactory->createConnection();
                connection->start();
    
                // Create a Session
                if (this->sessionTransacted) {
                    session = connection->createSession(Session::SESSION_TRANSACTED);
                } else {
                    session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
                }
                
                session = connection->createSession();
                // Create the destination (Topic or Queue)
                if (useTopic) {
                    destination = session->createTopic(QUEUE_NAME);
                } else {
                    destination = session->createQueue(QUEUE_NAME);
                }
    
                // Create a MessageProducer from the Session to the Topic or Queue
                producer = session->createProducer(destination);
                producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
    
                // Create the Thread Id String
                string threadIdStr = Long::toString(Thread::currentThread()->getId());
    
                // Create a messages
                string text = (string) "Hello world! from thread " + threadIdStr;
    
                for (int ix = 0; ix < numMessages; ++ix) {
                    std::auto_ptr<TextMessage> message(session->createTextMessage(text));
                    
                    //关键消息...
                    std::auto_ptr<Destination> tempDest(session->createTemporaryQueue());
    
                    //cms::Destination tempDest=session->createTemporaryTopic() ; 
                    MessageConsumer * responseConsumer = session->createConsumer(tempDest.get());  
                    responseConsumer->setMessageListener(this);//监听...
    
    
                    message->setCMSReplyTo(tempDest.get());
                    Random random;
                    char buffer[NAME_BYTE_LEN]={0};
                    random.nextBytes((unsigned char *)buffer,NAME_BYTE_LEN);
                    string correlationId="";
                    for(int i=0;i<NAME_BYTE_LEN;++i)
                    {
                        char ch[NAME_BYTE_LEN*2]={0};
                        sprintf(ch,"%02X",(unsigned char)buffer[i]);
                        string str(ch);
    
                        correlationId+=str;
                    }
    
                    message->setCMSCorrelationID(correlationId);
    
                    message->setIntProperty("Integer", ix);
                    printf("Producer Sent message #%d from thread %s
    ", ix + 1, threadIdStr.c_str());
                    producer->send(message.get());
    
                    // Indicate we are ready for messages.
                    latch.countDown();
    
                    // Wait while asynchronous messages come in.
                    doneLatch.await(waitMillis);
    
                } 
            }    
            catch (CMSException& e) {
                printf("Producer run() CMSException 
    " );
                    // Indicate we are ready for messages.
                    latch.countDown();
                    e.printStackTrace();
                }
    
        
            }
            
    
        // Called from the Producer since this class is a registered MessageListener.
        virtual void onMessage(const Message* message) {
    
            static int count = 0;
    
            try {
                count++;
                const TextMessage* textMessage = dynamic_cast<const TextMessage*> (message);
                //ActiveMQMessageTransformation
                //std::auto_ptr<TextMessage> responsemessage(session->createTextMessage());
                //responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID());
                //responsemessage->getCMSReplyTo()
    
                string text = "";
    
                if (textMessage != NULL) {
                    text = textMessage->getText();
                } else {
                    text = "NOT A TEXTMESSAGE!";
                }
    
                printf("Producer Message #%d Received: %s
    ", count, text.c_str());
    
    
                //producer.send
    
            } catch (CMSException& e) {
                printf("Producer onMessage() CMSException 
    " );
                e.printStackTrace();
            }
    
            // Commit all messages.
            if (this->sessionTransacted) {
                session->commit();
            }
    
            // No matter what, tag the count down latch until done.
            doneLatch.countDown();
        }
    
        // If something bad happens you see it here as this class is also been
        // registered as an ExceptionListener with the connection.
        virtual void onException(const CMSException& ex AMQCPP_UNUSED) {
            printf("Producer onException() CMS Exception occurred.  Shutting down client. 
    " );
            ex.printStackTrace();
            exit(1);
        }
    
    
    private:
    
        void cleanup() {
    
            if (connection != NULL) {
                try {
                    connection->close();
                } catch (cms::CMSException& ex) {
                    ex.printStackTrace();
                }
            }
    
            // Destroy resources.
            try {
                delete destination;
                destination = NULL;
                delete producer;
                producer = NULL;
                delete session;
                session = NULL;
                delete connection;
                connection = NULL;
            } catch (CMSException& e) {
                e.printStackTrace();
            }
        }
    };
    
    class HelloWorldConsumer : public ExceptionListener,
                               public MessageListener,
                               public Runnable {
    
    private:
    
        CountDownLatch latch;
        CountDownLatch doneLatch;
        Connection* connection;
        Session* session;
        Destination* destination;
        MessageConsumer* consumer;
        MessageProducer *producer;
        long waitMillis;
        bool useTopic;
        bool sessionTransacted;
        std::string brokerURI;
    
    private:
    
        HelloWorldConsumer(const HelloWorldConsumer&);
        HelloWorldConsumer& operator=(const HelloWorldConsumer&);
    
    public:
    
        HelloWorldConsumer(const std::string& brokerURI, int numMessages, bool useTopic = false, bool sessionTransacted = false, int waitMillis = 30000) :
            latch(1),
            doneLatch(numMessages),
            connection(NULL),
            session(NULL),
            destination(NULL),
            consumer(NULL),
            producer(NULL),
            waitMillis(waitMillis),
            useTopic(useTopic),
            sessionTransacted(sessionTransacted),
            brokerURI(brokerURI) {
        }
    
        virtual ~HelloWorldConsumer() {
            cleanup();
        }
    
        void close() {
            this->cleanup();
        }
    
        void waitUntilReady() {
            latch.await();
        }
    
        virtual void run() {
    
            try {
    
                // Create a ConnectionFactory
                auto_ptr<ConnectionFactory> connectionFactory(
                    ConnectionFactory::createCMSConnectionFactory(brokerURI));
    
                // Create a Connection
                connection = connectionFactory->createConnection();
                connection->start();
                connection->setExceptionListener(this);
    
                // Create a Session
                if (this->sessionTransacted == true) {
                    session = connection->createSession(Session::SESSION_TRANSACTED);
                } else {
                    session = connection->createSession(Session::AUTO_ACKNOWLEDGE);
                }
    
                // Create the destination (Topic or Queue)
                if (useTopic) {
                    destination = session->createTopic(QUEUE_NAME);
                } else {
                    destination = session->createQueue(QUEUE_NAME);
                }
    
                producer = session->createProducer();
                producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
    
                // Create a MessageConsumer from the Session to the Topic or Queue
                consumer = session->createConsumer(destination);
    
                consumer->setMessageListener(this);
    
                std::cout.flush();
                std::cerr.flush();
    
                // Indicate we are ready for messages.
                latch.countDown();
    
                // Wait while asynchronous messages come in.
                doneLatch.await();
    
            } catch (CMSException& e) {
                printf("Consumer onException() CMS Exception occurred.  Shutting down client. 
    " );
                // Indicate we are ready for messages.
                latch.countDown();
                e.printStackTrace();
            }
        }
    
        // Called from the consumer since this class is a registered MessageListener.
        virtual void onMessage(const Message* message) {
    
            static int count = 0;
    
            try {
                count++;
                
                
                // Create the Thread Id String
                string threadIdStr = Long::toString(Thread::currentThread()->getId());
    
                static bool bPrintf=true;
                if(bPrintf)
                {
                    bPrintf=false;
                    printf("consumer Message threadid: %s
    ",  threadIdStr.c_str());
                }
    
                string strReply="consumer return  xxx,ThreadID="+threadIdStr;
                const TextMessage* textMessage = dynamic_cast<const TextMessage*> (message);
                
                if(NULL==textMessage)
                {
                    printf("NULL==textMessage", message->getCMSType().c_str());
    
    
                    //const cms::MapMessage* mapMsg = dynamic_cast<const cms::MapMessage*>(message);
                    //if(mapMsg)
                    //{
                    //    
                    //    std::vector<std::string> elements = mapMsg->getMapNames();
                    //    std::vector<std::string>::iterator iter = elements.begin();
                    //    for(; iter != elements.end() ; ++iter) 
                    //    {
                    //        std::string key = *iter;
                    //        cms::Message::ValueType elementType = mapMsg->getValueType(key);
                    //        string strxxx;
                    //        int cc=0;
                    //        switch(elementType) {
                    //    case cms::Message::BOOLEAN_TYPE:
                    //        //msg->setBoolean(key, mapMsg->getBoolean(key));
                    //        break;
                    //    case cms::Message::BYTE_TYPE:
                    //        //msg->setByte(key, mapMsg->getByte(key));
                    //        break;
                    //    case cms::Message::BYTE_ARRAY_TYPE:
                    //        //msg->setBytes(key, mapMsg->getBytes(key));
                    //        break;
                    //    case cms::Message::CHAR_TYPE:
                    //        //msg->setChar(key, mapMsg->getChar(key));
                    //        break;
                    //    case cms::Message::SHORT_TYPE:
                    //        //msg->setShort(key, mapMsg->getShort(key));
                    //        break;
                    //    case cms::Message::INTEGER_TYPE:
                    //        //msg->setInt(key, mapMsg->getInt(key));
                    //        break;
                    //    case cms::Message::LONG_TYPE:
                    //        //msg->setLong(key, mapMsg->getLong(key));
                    //        break;
                    //    case cms::Message::FLOAT_TYPE:
                    //        //msg->setFloat(key, mapMsg->getFloat(key));
                    //        break;
                    //    case cms::Message::DOUBLE_TYPE:
                    //        //msg->setDouble(key, mapMsg->getDouble(key));
                    //        break;
                    //    case cms::Message::STRING_TYPE:
                    //        //msg->setString(key, mapMsg->getString(key));
                    //        strxxx=mapMsg->getString(key);
                    //        cc=1;
                    //        break;
                    //    default:
                    //        break;
                    //        }
                    //    }
    
                    //}
    
                    return;
                }
    
                std::auto_ptr<TextMessage> responsemessage(session->createTextMessage(strReply));
                responsemessage->setCMSCorrelationID(textMessage->getCMSCorrelationID());
                
                
                string text = "";
    
                if (textMessage != NULL) {
                    text = textMessage->getText();
                } else {
                    text = "NOT A TEXTMESSAGE!";
                }
                
                int nProPerty=textMessage->getIntProperty("Integer");
                printf("consumer Message #%d Received: %s,nProPerty[%d]
    ", count, text.c_str(),nProPerty);
                
                
                const cms::Destination* destSend=textMessage->getCMSReplyTo();
                if(destSend)
                {
                    this->producer->send(destSend,responsemessage.get());
    
                    printf("consumer Message #%d send: %s
    ", count, strReply.c_str());
                }
                
    
            } catch (CMSException& e) {
                printf("Consumer onMessage() CMS Exception occurred.  Shutting down client. 
    " );
                e.printStackTrace();
            }
    
            // Commit all messages.
            if (this->sessionTransacted) {
                session->commit();
            }
    
            // No matter what, tag the count down latch until done.
            //doneLatch.countDown();
        }
    
        // If something bad happens you see it here as this class is also been
        // registered as an ExceptionListener with the connection.
        virtual void onException(const CMSException& ex AMQCPP_UNUSED) {
            printf("Consumer onException() CMS Exception occurred.  Shutting down client. 
    " );
            //printf("CMS Exception occurred.  Shutting down client.
    ");
            ex.printStackTrace();
            exit(1);
        }
    
    private:
    
        void cleanup() {
            if (connection != NULL) {
                try {
                    connection->close();
                } catch (cms::CMSException& ex) {
                    ex.printStackTrace();
                }
            }
    
            // Destroy resources.
            try {
                delete destination;
                destination = NULL;
                delete consumer;
                consumer = NULL;
                delete session;
                session = NULL;
                delete connection;
                connection = NULL;
            } catch (CMSException& e) {
                e.printStackTrace();
            }
        }
    };
    
    int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
    
        //if(argc<2)
        //{
        //    printf("argc<2
    ");
        //    return 0;
        //}
    
        activemq::library::ActiveMQCPP::initializeLibrary();
        {
        std::cout << "=====================================================
    ";
        std::cout << "Starting the example:" << std::endl;
        std::cout << "-----------------------------------------------------
    ";
    
    
        // Set the URI to point to the IP Address of your broker.
        // add any optional params to the url to enable things like
        // tightMarshalling or tcp logging etc.  See the CMS web site for
        // a full list of configuration options.
        //
        //  http://activemq.apache.org/cms/
        //
        // Wire Format Options:
        // =========================
        // Use either stomp or openwire, the default ports are different for each
        //
        // Examples:
        //    tcp://127.0.0.1:61616                      default to openwire
        //    tcp://127.0.0.1:61616?wireFormat=openwire  same as above
        //    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp instead
        //
        // SSL:
        // =========================
        // To use SSL you need to specify the location of the trusted Root CA or the
        // certificate for the broker you want to connect to.  Using the Root CA allows
        // you to use failover with multiple servers all using certificates signed by
        // the trusted root.  If using client authentication you also need to specify
        // the location of the client Certificate.
        //
        //     System::setProperty( "decaf.net.ssl.keyStore", "<path>/client.pem" );
        //     System::setProperty( "decaf.net.ssl.keyStorePassword", "password" );
        //     System::setProperty( "decaf.net.ssl.trustStore", "<path>/rootCA.pem" );
        //
        // The you just specify the ssl transport in the URI, for example:
        //
        //     ssl://localhost:61617
        //
        std::string brokerURI =
            "failover:(tcp://192.168.10.143:61616"
    //        "?wireFormat=openwire"
    //        "&transport.useInactivityMonitor=false"
    //        "&connection.alwaysSyncSend=true"
    //        "&connection.useAsyncSend=true"
    //        "?transport.commandTracingEnabled=true"
    //        "&transport.tcpTracingEnabled=true"
    //        "&wireFormat.tightEncodingEnabled=true"
            ")";
    
        //============================================================
        // set to true to use topics instead of queues
        // Note in the code above that this causes createTopic or
        // createQueue to be used in both consumer an producer.
        //============================================================
        bool useTopics = false;
        bool sessionTransacted = true;
        int numMessages = 1;
        bool useConsumer=true;
        bool useProducer=true;
    
        //int nSet=atoi(argv[1]);
        //if(1==nSet)
        //{
            //#define USE_COMSUMER
    
            
        //}
        //else
        //{
            //#define USE_PRODUCER
    
            //
        //}
    
    
    
        long long startTime = System::currentTimeMillis();
        
    #ifdef USE_PRODUCER
        printf("当前 USE_PRODUCER 
    ");
    
        int numProducerMessages = 30;
        int nThreadNumber=10;
        vector<HelloWorldProducer *> vHelloWorldProducer;
        for(int i=0;i<nThreadNumber;++i)
        {
            HelloWorldProducer * producerTemp=new HelloWorldProducer(brokerURI, numProducerMessages, useTopics);
            vHelloWorldProducer.push_back(producerTemp);
        }
    
    #endif
        
    #ifdef USE_COMSUMER
        printf("当前 USE_COMSUMER 
    ");
        HelloWorldConsumer consumer(brokerURI, numMessages, useTopics, sessionTransacted);
        // Start the consumer thread.
        Thread consumerThread(&consumer);
        consumerThread.start();
    
        // Wait for the consumer to indicate that its ready to go.
        consumer.waitUntilReady();
    
    #endif
    
        
    
    
    #ifdef USE_PRODUCER
        // Start the producer thread.
    
        vector<Thread *> vThread;
        for(int i=0;i<nThreadNumber;++i)
        {
            HelloWorldProducer & ProducerTemp=*vHelloWorldProducer[i];
            Thread * threadTemp=new Thread(&ProducerTemp);
            vThread.push_back(threadTemp);
            threadTemp->start();
            ProducerTemp.waitUntilReady();
    
        }
    
        for(int i=0;i<vThread.size();++i)
        {
            Thread * threadTemp=vThread[i];
            //threadTemp->join();
        }
        while(1)
        {
            Thread::sleep(10);
        }
    
        //Thread producerThread1(&producer1);
        //producerThread1.start();
        //producer1.waitUntilReady();
    
        //Thread producerThread2(&producer2);
        //producerThread2.start();
        //producer2.waitUntilReady();
    
        //Thread producerThread3(&producer3);
        //producerThread3.start();
        //producer3.waitUntilReady();
    #endif
    
    
    
        
    #ifdef USE_PRODUCER
        // Wait for the threads to complete.
        //producerThread1.join();
        //producerThread2.join();
        //producerThread3.join();
    #endif
    
    #ifdef USE_COMSUMER
        consumerThread.join();
    #endif
    
        long long endTime = System::currentTimeMillis();
        double totalTime = (double)(endTime - startTime) / 1000.0;
    
    #ifdef USE_PRODUCER
        //producer1.close();
        //producer2.close();
        //producer3.close();
    
        for(int i=0;i<vHelloWorldProducer.size();++i)
        {
            HelloWorldProducer * ProducerTemp=vHelloWorldProducer[i];
            ProducerTemp->close();
    
            if(ProducerTemp)
            {
                delete ProducerTemp;
                ProducerTemp=NULL;
            }
        }
    
    #endif
    #ifdef USE_COMSUMER
        consumer.close();
    #endif
    
        
       
    
        std::cout << "Time to completion = " << totalTime << " seconds." << std::endl;
        std::cout << "-----------------------------------------------------
    ";
        std::cout << "Finished with the example." << std::endl;
        std::cout << "=====================================================
    ";
    
        }
        activemq::library::ActiveMQCPP::shutdownLibrary();
    
    
        return 0;
    }
    
    // END SNIPPET: demo
    复制代码

    程序运行结果:

    关于activemq-cpp 的Message 消息转换.

     activemq-cpp 中的转换ActiveMQMessageTransformation.transformMessage 中是有相应的实现.

    复制代码
    ////////////////////////////////////////////////////////////////////////////////
    bool ActiveMQMessageTransformation::transformMessage(cms::Message* message, ActiveMQConnection* connection, Message** amqMessage) {
    
        if (message == NULL) {
            throw NullPointerException(__FILE__, __LINE__, "Provided source cms::Message pointer was NULL");
        }
    
        if (amqMessage == NULL) {
            throw NullPointerException(__FILE__, __LINE__, "Provided target commands::Message pointer was NULL");
        }
    
        *amqMessage = dynamic_cast<Message*>(message);
    
        if (*amqMessage != NULL) {
            return false;
        } else {
    
            if (dynamic_cast<cms::BytesMessage*>(message) != NULL) {
                cms::BytesMessage* bytesMsg = dynamic_cast<cms::BytesMessage*>(message);
                bytesMsg->reset();
                ActiveMQBytesMessage* msg = new ActiveMQBytesMessage();
                msg->setConnection(connection);
                try {
                    for (;;) {
                        // Reads a byte from the message stream until the stream is empty
                        msg->writeByte(bytesMsg->readByte());
                    }
                } catch (cms::MessageEOFException& e) {
                    // if an end of message stream as expected
                } catch (cms::CMSException& e) {
                }
    
                *amqMessage = msg;
            } else if (dynamic_cast<cms::MapMessage*>(message) != NULL) {
                cms::MapMessage* mapMsg = dynamic_cast<cms::MapMessage*>(message);
                ActiveMQMapMessage* msg = new ActiveMQMapMessage();
                msg->setConnection(connection);
    
                std::vector<std::string> elements = mapMsg->getMapNames();
                std::vector<std::string>::iterator iter = elements.begin();
                for(; iter != elements.end() ; ++iter) {
                    std::string key = *iter;
                    cms::Message::ValueType elementType = mapMsg->getValueType(key);
    
                    switch(elementType) {
                        case cms::Message::BOOLEAN_TYPE:
                            msg->setBoolean(key, mapMsg->getBoolean(key));
                            break;
                        case cms::Message::BYTE_TYPE:
                            msg->setByte(key, mapMsg->getByte(key));
                            break;
                        case cms::Message::BYTE_ARRAY_TYPE:
                            msg->setBytes(key, mapMsg->getBytes(key));
                            break;
                        case cms::Message::CHAR_TYPE:
                            msg->setChar(key, mapMsg->getChar(key));
                            break;
                        case cms::Message::SHORT_TYPE:
                            msg->setShort(key, mapMsg->getShort(key));
                            break;
                        case cms::Message::INTEGER_TYPE:
                            msg->setInt(key, mapMsg->getInt(key));
                            break;
                        case cms::Message::LONG_TYPE:
                            msg->setLong(key, mapMsg->getLong(key));
                            break;
                        case cms::Message::FLOAT_TYPE:
                            msg->setFloat(key, mapMsg->getFloat(key));
                            break;
                        case cms::Message::DOUBLE_TYPE:
                            msg->setDouble(key, mapMsg->getDouble(key));
                            break;
                        case cms::Message::STRING_TYPE:
                            msg->setString(key, mapMsg->getString(key));
                            break;
                        default:
                            break;
                    }
                }
    
                *amqMessage = msg;
            } else if (dynamic_cast<cms::ObjectMessage*>(message) != NULL) {
                cms::ObjectMessage* objMsg = dynamic_cast<cms::ObjectMessage*>(message);
                ActiveMQObjectMessage* msg = new ActiveMQObjectMessage();
                msg->setConnection(connection);
                msg->setObjectBytes(objMsg->getObjectBytes());
                *amqMessage = msg;
            } else if (dynamic_cast<cms::StreamMessage*>(message) != NULL) {
                cms::StreamMessage* streamMessage = dynamic_cast<cms::StreamMessage*>(message);
                streamMessage->reset();
                ActiveMQStreamMessage* msg = new ActiveMQStreamMessage();
                msg->setConnection(connection);
    
                try {
                    while(true) {
                        cms::Message::ValueType elementType = streamMessage->getNextValueType();
                        int result = -1;
                        std::vector<unsigned char> buffer(255);
    
                        switch(elementType) {
                            case cms::Message::BOOLEAN_TYPE:
                                msg->writeBoolean(streamMessage->readBoolean());
                                break;
                            case cms::Message::BYTE_TYPE:
                                msg->writeBoolean(streamMessage->readBoolean());
                                break;
                            case cms::Message::BYTE_ARRAY_TYPE:
                                while ((result = streamMessage->readBytes(buffer)) != -1) {
                                    msg->writeBytes(&buffer[0], 0, result);
                                    buffer.clear();
                                }
                                break;
                            case cms::Message::CHAR_TYPE:
                                msg->writeChar(streamMessage->readChar());
                                break;
                            case cms::Message::SHORT_TYPE:
                                msg->writeShort(streamMessage->readShort());
                                break;
                            case cms::Message::INTEGER_TYPE:
                                msg->writeInt(streamMessage->readInt());
                                break;
                            case cms::Message::LONG_TYPE:
                                msg->writeLong(streamMessage->readLong());
                                break;
                            case cms::Message::FLOAT_TYPE:
                                msg->writeFloat(streamMessage->readFloat());
                                break;
                            case cms::Message::DOUBLE_TYPE:
                                msg->writeDouble(streamMessage->readDouble());
                                break;
                            case cms::Message::STRING_TYPE:
                                msg->writeString(streamMessage->readString());
                                break;
                            default:
                                break;
                        }
                    }
                } catch (cms::MessageEOFException& e) {
                    // if an end of message stream as expected
                } catch (cms::CMSException& e) {
                }
    
                *amqMessage = msg;
            } else if (dynamic_cast<cms::TextMessage*>(message) != NULL) {
                cms::TextMessage* textMsg = dynamic_cast<cms::TextMessage*>(message);
                ActiveMQTextMessage* msg = new ActiveMQTextMessage();
                msg->setConnection(connection);
                msg->setText(textMsg->getText());
                *amqMessage = msg;
            } else {
                *amqMessage = new ActiveMQMessage();
                (*amqMessage)->setConnection(connection);
            }
    
            ActiveMQMessageTransformation::copyProperties(message, dynamic_cast<cms::Message*>(*amqMessage));
        }
    
        return true;
    }
    复制代码

    5,activemq 的activemq broker cluster (activemq 集群).

    可以参考:

    http://bh-keven.iteye.com/blog/1617788

    http://blog.csdn.net/jason5186/article/details/18702523

    6,activemq.xml 中的配置和activemq Connection URIS 配置

    http://activemq.apache.org/nms/activemq-uri-configuration.html

     http://activemq.apache.org/tcp-transport-reference.html

    是有相应介绍,但需要花一些时间去读.

    //7,wireFormat=openwire 的几种方式.的优缺点.

    //openwire,amqp,stomp,mqtt,ws

  • 相关阅读:
    记录按钮点击次数,点击三次之后跳转页面
    HTML拖放
    .Net实现发送邮件功能
    HTTP 400 错误
    方法(参数的传递)
    方法
    c# 属性 (get、set)
    Python和C++交互
    从Windows远程Ubuntu
    Eclipse+Tomcat WEB开发配置
  • 原文地址:https://www.cnblogs.com/xumaojun/p/8521699.html
Copyright © 2011-2022 走看看