zoukankan      html  css  js  c++  java
  • activemq应用总结(一)

    1.mq发送消息准备

    配置

            <spring:bean id="jmsFactory"
                class="org.apache.activemq.ActiveMQConnectionFactory">
                <spring:property name="brokerURL" value="${activemq.brokerURL}" />
                <spring:property name="userName" value="${activemq.username}" />
                <spring:property name="password" value="${activemq.password}" />
            </spring:bean>
    
            <spring:bean id="client" class="com.*.*.core.mq.Client" init-method="init" destroy-method="destroy">
                <spring:property name="jmsFactory">
                    <spring:ref bean="jmsFactory" />
                </spring:property>
                <spring:property name="reqQueue">
                    <spring:ref bean="reqQueneOne" />
                </spring:property>
                <spring:property name="resQueue">
                    <spring:ref bean="resQueneOne" />
                </spring:property>
                <spring:property name="timeToLive" value="${activemq.timetolive}" />
                <spring:property name="hostName">
                    <spring:ref bean="hostInfo.hostName"/>
                </spring:property>
            </spring:bean>
    
            <spring:bean id="service" class="com.*.*.core.mq.AMQService">
                <spring:property name="client">
                    <spring:ref bean="client" />
                </spring:property>
                <spring:property name="readTime" value="${activemq.readtimeout}" />
            </spring:bean>

    mq通过jmsFactory产生的connection

    connection = jmsFactory.createConnection();

    connection通过获取会话session

    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

    session产生producer.

    producer = session.createProducer(reqQueue);

    2.发送的消息体

    ActiveMQBytesMessage byteMessage = new ActiveMQBytesMessage();

    byteMessage设置JMSCorrelationID,Content(byteSequence = new ByteSequence(mqMessageEntity.getStrRecBuf().getBytes(MQGlobals.MQ_CHARSET));),property(name,value).JMSReplyTo(resQueue).

        public String send(MQMessageEntity mqMessageEntity)
        {
            String sRecBuf = mqMessageEntity.getStrRecBuf();
            String correlationID = String.valueOf(Thread.currentThread().getId())+"_"+createRandomString();
            ClientAMQ clientAMQ = new ClientAMQ();
            DataCache.instance().putData(correlationID, clientAMQ);
            mqMessageEntity.setCorrelationID(correlationID);
            client.sendMQ(mqMessageEntity);
            log.info("发送:correlationID="+correlationID+";["+sRecBuf+"]");
            String returnBuf = clientAMQ.waitRes(readTime);
            DataCache.instance().removeData(correlationID);
            //打印接收报文
            log.info("返回:correlationID="+correlationID+";["+returnBuf+"]");
            if (null == returnBuf) {
                returnBuf = "";
            }
            return returnBuf;
        }

    3.接收消息配置

            <spring:bean id="reqQueueOne"
                class="org.apache.activemq.command.ActiveMQQueue">
                <spring:constructor-arg value="${lianpay.reqQueueOne}"></spring:constructor-arg>
            </spring:bean>
            <spring:bean id="resQueueOne"
                class="org.apache.activemq.command.ActiveMQQueue">
                <spring:constructor-arg value="${lianpay.resQueueOne}"></spring:constructor-arg>
            </spring:bean>
            <!-- YT_RESP消息监听 -->
            <spring:bean id="msgListener"
                class="org.springframework.jms.listener.adapter.MessageListenerAdapter">
                <spring:constructor-arg>
                    <spring:bean
                        class="com.*.*.core.mq.MsgListener">
                    </spring:bean>
                </spring:constructor-arg>
                <spring:property name="defaultListenerMethod" value="handleMessage" />
                <spring:property name="messageConverter" ref="messageConverter" />
            </spring:bean>
            <!-- 交易监听容器 -->
            <spring:bean id="YTMsgContainer"
                class="org.springframework.jms.listener.DefaultMessageListenerContainer">
                <spring:property name="concurrentConsumers" value="${activemq.consumers}" />
                <spring:property name="connectionFactory" ref="jmsFactory" />
                <spring:property name="destination" ref="resQueueOne" />
                <spring:property name="messageListener" ref="msgListener" />
                <spring:property name="messageSelector" value="${selector.resp}"/>
            </spring:bean>

    4.接收消息

        public void handleMessage(ActiveMQBytesMessage activeMQBytesMessage) {
            String correlationID = activeMQBytesMessage.getCorrelationId();
            log.info("监听到消息:correlationID=" + correlationID + "];"+ activeMQBytesMessage.getDestination());
            String returnBuf = null;
            try {
                returnBuf = new String(activeMQBytesMessage.getContent().getData(),MQGlobals.MQ_CHARSET);
            } catch (UnsupportedEncodingException e) {
                log.error("字符转换异常," + e.getMessage());
            }
            // 判断内存中是否存在发送对象
            Object obj = DataCache.instance().getData(correlationID);
            if (null != obj) {
                ClientAMQ clientAmq = (ClientAMQ) obj;
                clientAmq.setReturnBuf(returnBuf);
                clientAmq.startNotify(correlationID);
            } else {
                log.info("correlationID=" + correlationID + ";[" + returnBuf + "]");
            }
        }

    5,接受消息的消息体

    public class ClientAMQ {
        private String returnBuf;//返回结果
        
        /**
         * 开始等待
         * @param waitTine
         * @return
         */
        public synchronized String waitRes(long waitTime)
        {
            if(null!=returnBuf)
                return returnBuf;
            try {
                wait(waitTime);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return returnBuf;
        }
        
        /**
         * 开始唤醒
         * @param correlationID
         */
        public synchronized void startNotify(String correlationID)
        {
            notify();
        }
    
        public String getReturnBuf() {
            return returnBuf;
        }
    
        public void setReturnBuf(String returnBuf) {
            this.returnBuf = returnBuf;
        }
    }
  • 相关阅读:
    IE hasLayout详解
    seajs引入jquery
    jquery实现轮播插件
    CSS视觉格式化模型
    js事件冒泡和事件捕获详解
    你尽力了么===BY cloudsky
    前向否定界定符 python正则表达式不匹配某个字符串 以及无捕获组和命名组(转)
    php safe mode bypass all <转>
    WAF指纹探测及识别技术<freebuf>
    linux集群管理<转>
  • 原文地址:https://www.cnblogs.com/hzcxy/p/2982149.html
Copyright © 2011-2022 走看看