zoukankan      html  css  js  c++  java
  • 如何在WINDOW环境下搭建ActivateMQ和zookeeper集群环境

    考虑到使用虚拟机环境搭建环境需要太多内存,于是研究如何使用window版本建立环境.
    @echo off
    REM Licensed to the Apache Software Foundation (ASF) under one or more
    REM contributor license agreements.  See the NOTICE file distributed with
    REM this work for additional information regarding copyright ownership.
    REM The ASF licenses this file to You under the Apache License, Version 2.0
    REM (the "License"); you may not use this file except in compliance with
    REM the License.  You may obtain a copy of the License at
    REM
    REM     http://www.apache.org/licenses/LICENSE-2.0
    REM
    REM Unless required by applicable law or agreed to in writing, software
    REM distributed under the License is distributed on an "AS IS" BASIS,
    REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    REM See the License for the specific language governing permissions and
    REM limitations under the License.
    
    setlocal
    e:
    cd E:gitzookeeper-3.4.12in
    call "%~dp0zkEnv.cmd"
    
    set ZOOMAIN=org.apache.zookeeper.server.quorum.QuorumPeerMain
    echo on
    call %JAVA% "-Dzookeeper.log.dir=%ZOO_LOG_DIR%" "-Dzookeeper.root.logger=%ZOO_LOG4J_PROP%" -cp "%CLASSPATH%" %ZOOMAIN% "%ZOOCFGDIR%1.cfg" %*
    
    endlocal
    pause
    

      1.cfg 

    # synchronization phase can take
    initLimit=10
    # The number of ticks that can pass between 
    # sending a request and getting an acknowledgement
    syncLimit=5
    # the directory where the snapshot is stored.
    # do not use /tmp for storage, /tmp here is just 
    # example sakes.
    dataDir=../data/1
    # the port at which the clients will connect
    clientPort=2181
    # the maximum number of client connections.
    # increase this if you need to handle more clients
    #maxClientCnxns=60
    #
    # Be sure to read the maintenance section of the 
    # administrator guide before turning on autopurge.
    #
    # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
    #
    # The number of snapshots to retain in dataDir
    #autopurge.snapRetainCount=3
    # Purge task interval in hours
    # Set to "0" to disable auto purge feature
    #autopurge.purgeInterval=1
    server.1=127.0.0.1:2181:3881
    server.2=127.0.0.1:12181:13881
    server.3=127.0.0.1:22181:23881
    

     data中建立 1 2 3 目录,在其中建立 myid 文件,内容分别为 1 2 3 .

    如此类似配置3个cmd文件和cfg文件

    分别运行,完成zookeeper 集群

    正常的zookeeper dos窗口主 从 从

    uestProcessor@487] - Processed session termination for sessionid: 0x200053f0361
    004
    2018-05-10 08:54:40,975 [myid:3] - INFO  [SessionTracker:ZooKeeperServer@354] -
    Expiring session 0x200053f03610005, timeout of 4000ms exceeded
    2018-05-10 08:54:40,975 [myid:3] - INFO  [SessionTracker:ZooKeeperServer@354] -
    Expiring session 0x100053f00f20005, timeout of 4000ms exceeded
    2018-05-10 08:54:40,975 [myid:3] - INFO  [ProcessThread(sid:3 cport:-1)::PrepRe
    uestProcessor@487] - Processed session termination for sessionid: 0x200053f0361
    005
    2018-05-10 08:54:40,975 [myid:3] - INFO  [ProcessThread(sid:3 cport:-1)::PrepRe
    uestProcessor@487] - Processed session termination for sessionid: 0x100053f00f2
    005
    2018-05-10 08:54:42,975 [myid:3] - INFO  [SessionTracker:ZooKeeperServer@354] -
    Expiring session 0x300053f07550006, timeout of 4000ms exceeded
    2018-05-10 08:54:42,976 [myid:3] - INFO  [ProcessThread(sid:3 cport:-1)::PrepRe
    uestProcessor@487] - Processed session termination for sessionid: 0x300053f0755
    006
    
    -----------------------------
    
    2018-05-10 08:54:34,303 [myid:2] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2
    182:ZooKeeperServer@948] - Client attempting to establish new session at /127.0.
    0.1:55853
    2018-05-10 08:54:34,824 [myid:2] - INFO  [CommitProcessor:2:ZooKeeperServer@693]
     - Established session 0x200053f03610005 with negotiated timeout 4000 for client
     /127.0.0.1:55853
    2018-05-10 08:54:34,824 [myid:2] - WARN  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2
    182:NIOServerCnxn@383] - Exception causing close of session 0x200053f03610005:
    您的主机中的软件中止了一个已建立的连接。
    2018-05-10 08:54:34,827 [myid:2] - INFO  [NIOServerCxn.Factory:0.0.0.0/0.0.0.0:2
    182:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0.1:55853 w
    hich had sessionid 0x200053f03610005
    
    --------------------
    
    181:NIOServerCnxnFactory@215] - Accepted socket connection from /127.0
    
    2018-05-10 08:54:35,603 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0
    181:ZooKeeperServer@948] - Client attempting to establish new session
    0.1:55857
    2018-05-10 08:54:36,129 [myid:1] - INFO  [CommitProcessor:1:ZooKeeperS
     - Established session 0x100053f00f20005 with negotiated timeout 4000
     /127.0.0.1:55857
    2018-05-10 08:54:36,130 [myid:1] - WARN  [NIOServerCxn.Factory:0.0.0.0
    181:NIOServerCnxn@383] - Exception causing close of session 0x100053f0
    您的主机中的软件中止了一个已建立的连接。
    2018-05-10 08:54:36,130 [myid:1] - INFO  [NIOServerCxn.Factory:0.0.0.0
    181:NIOServerCnxn@1040] - Closed socket connection for client /127.0.0
    hich had sessionid 0x100053f00f20005
    

      

    第二步,配置activateMQ集群

     copy mq的启动命令 E:gitapache-activemq-5.15.3inwin64activemq1.bat

    @echo off
    
    REM ------------------------------------------------------------------------
    REM Licensed to the Apache Software Foundation (ASF) under one or more
    REM contributor license agreements.  See the NOTICE file distributed with
    REM this work for additional information regarding copyright ownership.
    REM The ASF licenses this file to You under the Apache License, Version 2.0
    REM (the "License"); you may not use this file except in compliance with
    REM the License.  You may obtain a copy of the License at
    REM
    REM http://www.apache.org/licenses/LICENSE-2.0
    REM
    REM Unless required by applicable law or agreed to in writing, software
    REM distributed under the License is distributed on an "AS IS" BASIS,
    REM WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    REM See the License for the specific language governing permissions and
    REM limitations under the License.
    REM ------------------------------------------------------------------------
    
    rem
    rem Find the application home.
    rem
    if "%OS%"=="Windows_NT" goto nt
    
    echo This is not NT, so please edit this script and set _APP_HOME manually
    set _APP_HOME=..
    
    goto conf
    
    :nt
    rem %~dp0 is name of current script under NT
    set _APP_HOME=%~dp0
    
    rem
    rem Find the wrapper1.conf
    rem
    :conf
    set _WRAPPER_CONF=wrapper1.conf
    
    rem
    rem Run the application.
    rem At runtime, the current directory will be that of Wrapper.exe
    rem
    "%_APP_HOME%wrapper.exe" -c %_WRAPPER_CONF% 
    if not errorlevel 1 goto end
    pause
    
    :end
    set _APP_HOME=
    set _WRAPPER_CONF=
    

      

    复制 data和conf目录为 data1 conf1

    copy wrapper.conf 为  wrapper1.conf  并修改

    # wrapper.debug=TRUE
    set.default.ACTIVEMQ_HOME=../..
    set.default.ACTIVEMQ_BASE=../..
    set.default.ACTIVEMQ_CONF=%ACTIVEMQ_BASE%/conf1
    set.default.ACTIVEMQ_DATA=%ACTIVEMQ_BASE%/data1
    wrapper.working.dir=.
    

      修改conf1中的 activate.xml 和 jetty.xml

    <!--
        Licensed to the Apache Software Foundation (ASF) under one or more
        contributor license agreements.  See the NOTICE file distributed with
        this work for additional information regarding copyright ownership.
        The ASF licenses this file to You under the Apache License, Version 2.0
        (the "License"); you may not use this file except in compliance with
        the License.  You may obtain a copy of the License at
    
        http://www.apache.org/licenses/LICENSE-2.0
    
        Unless required by applicable law or agreed to in writing, software
        distributed under the License is distributed on an "AS IS" BASIS,
        WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
        See the License for the specific language governing permissions and
        limitations under the License.
    -->
    <!-- START SNIPPET: example -->
    <beans
      xmlns="http://www.springframework.org/schema/beans"
      xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
      http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
    
        <!-- Allows us to use system properties as variables in this configuration file -->
        <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
            <property name="locations">
                <value>file:${activemq.conf}/credentials.properties</value>
            </property>
        </bean>
    
       <!-- Allows accessing the server log -->
        <bean id="logQuery" class="io.fabric8.insight.log.log4j.Log4jLogQuery"
              lazy-init="false" scope="singleton"
              init-method="start" destroy-method="stop">
        </bean>
    	 
    
        <!--
            The <broker> element is used to configure the ActiveMQ broker.
        -->
        <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
    
            <destinationPolicy>
                <policyMap>
                  <policyEntries>
                    <policyEntry topic=">" >
                        <!-- The constantPendingMessageLimitStrategy is used to prevent
                             slow topic consumers to block producers and affect other consumers
                             by limiting the number of messages that are retained
                             For more information, see:
    
                             http://activemq.apache.org/slow-consumer-handling.html
    
                        -->
                      <pendingMessageLimitStrategy>
                        <constantPendingMessageLimitStrategy limit="1000"/>
                      </pendingMessageLimitStrategy>
                    </policyEntry>
                  </policyEntries>
                </policyMap>
            </destinationPolicy>
    
    
            <!--
                The managementContext is used to configure how ActiveMQ is exposed in
                JMX. By default, ActiveMQ uses the MBean server that is started by
                the JVM. For more information, see:
    
                http://activemq.apache.org/jmx.html
            -->
            <managementContext>
                <managementContext createConnector="false"/>
            </managementContext>
    
            <!--
                Configure message persistence for the broker. The default persistence
                mechanism is the KahaDB store (identified by the kahaDB tag).
                For more information, see:
    
                http://activemq.apache.org/persistence.html
            
       
    			<persistenceAdapter>  
                 <kahaDB directory="${activemq.data}/kahadb"/>  
            
         </persistenceAdapter>  
    -->
    <persistenceAdapter> 
    <replicatedLevelDB 
    directory="${activemq.data}/leveldb" 
    replicas="3" 
    bind="tcp://0.0.0.0:62621" 
    zkAddress="localhost:2181,localhost:2182,localhost:2183" 
    hostname="localhost" 
    zkPath="/activemq/leveldb-stores"/> 
    </persistenceAdapter>
    
              <!--
                The systemUsage controls the maximum amount of space the broker will
                use before disabling caching and/or slowing down producers. For more information, see:
                http://activemq.apache.org/producer-flow-control.html
              -->
              <systemUsage>
                <systemUsage>
                    <memoryUsage>
                        <memoryUsage percentOfJvmHeap="70" />
                    </memoryUsage>
                    <storeUsage>
                        <storeUsage limit="100 gb"/>
                    </storeUsage>
                    <tempUsage>
                        <tempUsage limit="50 gb"/>
                    </tempUsage>
                </systemUsage>
            </systemUsage>
    
            <!--
                The transport connectors expose ActiveMQ over a given protocol to
                clients and other brokers. For more information, see:
    
                http://activemq.apache.org/configuring-transports.html
            -->
            <transportConnectors>
                <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
                <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
                <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
                </transportConnectors>
    
            <!-- destroy the spring context on shutdown to stop jetty -->
            <shutdownHooks>
                <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
            </shutdownHooks>
    			 
    
        </broker>
    
        <!--
            Enable web consoles, REST and Ajax APIs and demos
            The web consoles requires by default login, you can disable this in the jetty.xml file
    
            Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
        -->
        <import resource="jetty.xml"/>
    
    </beans>
    <!-- END SNIPPET: example -->
    

      jetty.xml 

        <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
                 <!-- the default port number for the web console -->
            <property name="host" value="0.0.0.0"/>
            <property name="port" value="8161"/>
        </bean>
    

      

    bind="tcp://0.0.0.0:62621"
    <transportConnectors 
     <property name="port" value="8161"/>

    如此3个目录中 上述端口不要重复

    然后分别启动 3个mq的bat. 

    th
    jvm 3    |  INFO | ActiveMQ WebConsole available at http://0.0.0.0:8163/
    jvm 3    |  INFO | ActiveMQ Jolokia REST API available at http://0.0.0.0:8163/ap
    i/jolokia/
    jvm 3    |  INFO | Initializing Spring FrameworkServlet 'dispatcher'
    jvm 3    |  INFO | No Spring WebApplicationInitializer types detected on classpa
    th
    jvm 3    |  INFO | jolokia-agent: Using policy access restrictor classpath:/jolo
    kia-access.xml
    jvm 3    |  INFO | Connector vm://localhost started
    

      

    jvm 1    |  INFO | Socket connection established to han-PC/0:0:0:0:0:0:0:1:21
     initiating session
    jvm 1    |  INFO | Session establishment complete on server han-PC/0:0:0:0:0:
    :1:2183, sessionid = 0x300053f07550007, negotiated timeout = 4000
    jvm 1    |  INFO | Using the pure java LevelDB implementation.
    jvm 1    |  INFO | Attaching to master: tcp://localhost:62623
    jvm 1    |  INFO | Slave started
    jvm 1    |  INFO | Slave skipping download of: log/0000000000000000.log
    jvm 1    |  INFO | Slave requested: 000000000000080b.index/CURRENT
    jvm 1    |  INFO | Slave requested: 000000000000080b.index/MANIFEST-000002
    jvm 1    |  INFO | Slave requested: 000000000000080b.index/000003.log
    jvm 1    |  INFO | Attaching... Downloaded 0.02/1.37 kb and 1/3 files
    jvm 1    |  INFO | Attaching... Downloaded 0.06/1.37 kb and 2/3 files
    jvm 1    |  INFO | Attaching... Downloaded 1.37/1.37 kb and 3/3 files
    jvm 1    |  INFO | Attached
    

      

    jvm 2    |  INFO | Slave stopped
    jvm 2    |  WARN | listeners are taking too long to process the events
    jvm 2    |  WARN | listeners are taking too long to process the events
    jvm 2    |  INFO | Using the pure java LevelDB implementation.
    jvm 2    |  INFO | Attaching to master: tcp://localhost:62623
    jvm 2    |  INFO | Slave started
    jvm 2    |  INFO | Slave skipping download of: log/0000000000000000.log
    jvm 2    |  INFO | Slave requested: 000000000000080b.index/000003.log
    jvm 2    |  INFO | Slave requested: 000000000000080b.index/CURRENT
    jvm 2    |  INFO | Slave requested: 000000000000080b.index/MANIFEST-000002
    jvm 2    |  INFO | Attaching... Downloaded 1.31/1.37 kb and 1/3 files
    jvm 2    |  INFO | Attaching... Downloaded 1.32/1.37 kb and 2/3 files
    jvm 2    |  INFO | Attaching... Downloaded 1.37/1.37 kb and 3/3 files
    jvm 2    |  INFO | Attached
    

      查看zookeeper 

    [zk: localhost:2181(CONNECTED) 9] get /activemq/leveldb-stores/00000000010
    {"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elec
    ted":null}
    cZxid = 0x20000004c
    ctime = Wed May 09 17:17:54 CST 2018
    mZxid = 0x300000013
    mtime = Thu May 10 08:54:26 CST 2018
    pZxid = 0x20000004c
    cversion = 0
    dataVersion = 3
    aclVersion = 0
    ephemeralOwner = 0x20001a36c630002
    dataLength = 90
    numChildren = 0
    [zk: localhost:2181(CONNECTED) 10] get /activemq/leveldb-stores/00000000009
    {"id":"localhost","container":null,"address":"tcp://localhost:62623","position":
    

      

    tcp://localhost:62623 对应的第3个activatemq的leveldb绑定的端口

    <persistenceAdapter>
    <replicatedLevelDB
    directory="${activemq.data}/leveldb"
    replicas="3"
    bind="tcp://0.0.0.0:62623"
    zkAddress="localhost:2181,localhost:2182,localhost:2183"
    hostname="localhost"
    zkPath="/activemq/leveldb-stores"/>
    </persistenceAdapter>

    代码:

    package han;
    
    /**
     * Created by han on 2018/5/7.
     */
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class ZkSend {
        //连接账号
        private String userName = "admin";
        //连接密码
        private String password = "admin";
        //连接地址
        private String brokerURL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:62616,tcp://127.0.0.1:63616)?randomize=false";
        //connection的工厂
        private ConnectionFactory factory;
        //连接对象
        private Connection connection;
        //一个操作会话
        private Session session;
        //目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
        private Destination destination;
        //生产者,就是产生数据的对象
        private MessageProducer producer;
    
        public static void main(String[] args) throws JMSException {
            ZkSend send = new ZkSend();
            send.start();
        }
    
        public void start() throws JMSException {
            try {
                //根据用户名,密码,url创建一个连接工厂
                factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
                //从工厂中获取一个连接
                connection = factory.createConnection();
                //测试过这个步骤不写也是可以的,但是网上的各个文档都写了
                connection.start();
                //创建一个session
                //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
                //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
                //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
                //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
                //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                //创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建
                destination = session.createQueue("text-msg");
                //从session中,获取一个消息生产者
                producer = session.createProducer(destination);
                //设置生产者的模式,有两种可选
                //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
                //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    
                //创建一条消息,当然,消息的类型有很多,如文字,字节,对象等,可以通过session.create..方法来创建出来
                TextMessage textMsg = session.createTextMessage(String.valueOf( System.currentTimeMillis())+"ZooKeeper!");
              //  BytesMessage bytesMessage=session.createBytesMessage( );
               // bytesMessage.setByteProperty("a",Byte.parseByte("01"));
    
                for(int i = 0 ; i < 3 ; i ++){
                    //发送一条消息
                    producer.send(textMsg );
                   // producer.send(bytesMessage);
                }
                session.commit();
                System.out.println("发送消息成功");
                //即便生产者的对象关闭了,程序还在运行哦
    
                producer.close();
    
            } catch (JMSException e) {
                session.rollback(); // rollback when exception happens
                e.printStackTrace();
            }
        }
    }
    

      

    package han;
    
    /**
     * Created by han on 2018/5/7.
     */
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class ZkReceive {
        //连接账号
        private String userName = "admin";
        //连接密码
        private String password = "admin";
        //连接地址
        private String brokerURL = "failover:(tcp://127.0.0.1:61616,tcp://127.0.0.1:62616,tcp://127.0.0.1:63616)?randomize=false";
    
        //connection的工厂
        private ConnectionFactory factory;
        //连接对象
        private Connection connection;
        //一个操作会话
        private Session session;
        //目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
        private Destination destination;
        //消费者,就是接收数据的对象
        private MessageConsumer consumer;
        public static void main(String[] args) throws JMSException {
            ZkReceive receive = new ZkReceive();
            receive.start();
        }
    
        public void start() throws JMSException {
            try {
                //根据用户名,密码,url创建一个连接工厂
                factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
                //从工厂中获取一个连接
                connection = factory.createConnection();
                //测试过这个步骤不写也是可以的,但是网上的各个文档都写了
                connection.start();
                //创建一个session
                //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
                //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
                //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
                //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
                //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
                session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
                //创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建
                destination = session.createQueue("text-msg");
                //根据session,创建一个接收者对象
                consumer = session.createConsumer(destination);
    
    
                //实现一个消息的监听器
                //实现这个监听器后,以后只要有消息,就会通过这个监听器接收到
                consumer.setMessageListener(new MessageListener() {
    
                    public void onMessage(Message message) {
                        try {
                            //获取到接收的数据
                            String text = ((TextMessage)message).getText();
                            System.out.println(text);
                        } catch (JMSException e) {
                            e.printStackTrace();
                            try {
                                session.rollback();
                            } catch (JMSException e1) {
                                e1.printStackTrace();
                            }
                        }
                    }
                });
                //关闭接收端,也不会终止程序哦
                session.commit();
               // consumer.close();
            } catch (JMSException e) {
                session.rollback(); // rollback when exception happens
                e.printStackTrace();
            } finally {
               // consumer.close();
                //http://localhost:8161/api/message/text-msg?type=queue&clientId=han-PC-57632-1525743389712-1%3A1%3A1%3A1%3A1&json=true
            }
        }
    }
    

      http://127.0.0.1:8163/admin/queues.jsp 查看队列

    Home | Queues | Topics | Subscribers | Connections | Network | Scheduled | Send
    Support
    
    Queue Name  
     
    Queue Name Filter  
     
    Queues:
    Name  	Number Of Pending Messages  	Number Of Consumers  	Messages Enqueued  	Messages Dequeued  	Views  	Operations  
    text-msg	6	1	3	0	Browse Active Consumers
    Active Producers
      	Send To Purge Delete
    

      

  • 相关阅读:
    网络分析(二)定向与非定向
    Flex 找不到html文件,不能自动生成html问题解决
    常用的功能点记录
    git常规操作命令整理
    语境驱动测试7原则
    探索式测试的问与答
    测试建模:Google ACC
    探索式测试:基于测程的测试管理(SessionBased Test Management)
    用Excel展示SQL Server中的数据 (III): IronPython与自动化
    在Ajax中使用Flash实现跨域数据读取
  • 原文地址:https://www.cnblogs.com/cndavy/p/9017988.html
Copyright © 2011-2022 走看看