zoukankan      html  css  js  c++  java
  • Spring整合activeMQ消息队列

    1.配置JMS

      <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->  
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">  
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->  
            <property name="connectionFactory" ref="connectionFactory"/>  
        </bean>    
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->  
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">  
            <property name="brokerURL" value="tcp://localhost:61616"/>  
        </bean>  
           
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">  
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>  
        </bean>  

    发送信息到activeMQ

    @Override
        public void addNotifyCashToMq(final String notifyUrl, final String cashId, final String reqSn, final String callResult,int count) {
            //发送的参数final String callBackUrl = SuperAppConstant.TRANSACTION_CALLBACK_PREFIX_URL + notify_url_notifyCash
                        + notifyUrl + "&cashId=" + cashId + "&reqSn=" + reqSn + "&callResult=" + callResult + "&count="
                        + _count;
            //发送消息到queue_notifuCash_serial消息队列 jmsTemplate.send(queue_notifyCash_serial,
    new MessageCreator() { @Override public Message createMessage(Session session) throws JMSException { if (logger.isDebugEnabled()) { logger.debug("notifyUrl=" + notifyUrl + ",cashId=" + cashId + ",reqSn=" + reqSn + ",callResult=" + callResult + ",_count=" + _count); } HashMap map = new HashMap(); map.put("callBackUrl", callBackUrl); ObjectMessage objectMessage = session.createObjectMessage();//创建消息 objectMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);//延时,delay为延时时长,以毫秒为单位
    return objectMessage; } }); }

    xml配置信息

        <!-- ActiveMQ 连接工厂 -->
        <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="${jms.broker_url}" />
        </bean>
    
        <!-- Spring Caching 连接工厂 -->
        <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="connectionFactory" />
            <property name="sessionCacheSize" value="10" />
        </bean>
    
        <!-- Spring JMS Template -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="cachingConnectionFactory" />
        </bean>

    2.destination消息队列定义

    <description>Queue定义</description>
        <bean id="queue_callback_serial" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg>
                <value>queue_callback_serial</value>
            </constructor-arg>
        </bean>

    3。监听器BatchJob

    3.1 jms.xml

    <description>JMS简单应用配置</description>
    
        <!-- ActiveMQ 连接工厂 -->
        <bean id="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="${jms.broker_url}" />
        </bean>
    
        <!-- Spring Caching 连接工厂 -->
        <bean id="cachingConnectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
            <property name="targetConnectionFactory" ref="connectionFactory" />
            <property name="sessionCacheSize" value="10" />
        </bean>
    
        <!-- Queue定义 -->
        <bean id="orderQueueProducer" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="order.queue.producer" />
        </bean>
        <!-- Spring JMS Template -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="cachingConnectionFactory" />
            <property name="defaultDestination" ref="orderQueueProducer" />
        </bean>
    
        <!-- 使用Spring JmsTemplate的消息生产者 -->
        <bean id="orderProducerJmsService" class="com.gmall88.server.jms.order.impl.OrderProducerJmsServiceImpl">
            <property name="jmsTemplate" ref="jmsTemplate" />
        </bean>
        
        <!-- 定义消息队列 -->
        <bean id="orderQueueListener" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg>
                <value>order.queue.listener</value>
            </constructor-arg>
        </bean>

    3.2 监听器impl

    import java.util.Map;
    
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.ObjectMessage;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.gmall88.server.wxpay.RF;
    
    import net.sf.json.JSONObject;
    
    public class NotifyCashManagerImpl implements MessageListener {
        private Logger logger = LoggerFactory.getLogger(getClass());
        @Override
        public void onMessage(Message message) {
            if(logger.isDebugEnabled()){
                logger.debug("new callback start..");
            }
            if(message !=null){
                if(message instanceof ObjectMessage){
                    ObjectMessage objectMessage = (ObjectMessage) message;//监听消息
                    try {
                        Map param  = (Map)objectMessage.getObject();
                        String callBackUrl = (String)param.get("callBackUrl");//取出消息里的参数
                        if (logger.isInfoEnabled()) {
                            logger.info("callBackUrl=" + callBackUrl);
                        }
                        JSONObject jsonObject = RF.httpsRequestJson(callBackUrl, "POST", "");//通过http回调方法
                        if(jsonObject != null){
                            logger.info("code:"+jsonObject.getString("code"));
                            logger.info("message="+jsonObject.getString("message"));
                        }
                    } catch (Exception e) {
                        logger.error(e.getMessage(),e);
                    }
                }else{
                    logger.error("Unknown message, type=" + message.getClass().getName());
                }
            }else{
                logger.error("message is null");
            }
        }
    
    }

    回调方法:

    @RequestMapping(value = "/notifyCash", method = RequestMethod.POST)
        @ResponseBody
        public Object notifyCash(String notifyUrl, String cashId, String reqSn, String cashResult,int count) {
            ReturnResult returnResult = new ReturnResult();
            String clientId = "superApp_notifyOrder";
            try {
                clientId += cashId;
                returnResult = recordRequestCheck(clientId);
                if(returnResult != null){
                    return returnResult;
                }
                returnResult = new ReturnResult();
                
                try{
                        // 回调业务系统
                        try {
                            superAppServerManager.notifyCash(notifyUrl, cashId, reqSn, cashResult);
                        } catch (Exception e) {
                            // 回调失败,做延时回调
                            logger.error(e.getMessage(), e);
                            superAppServerManager.addNotifyCashToMq(notifyUrl, cashId, reqSn, cashResult, count);
                    }
                }finally{
                    recordRequestEnd(clientId);
                }
                
            } catch (GmallException e) {
                returnResult.setCodeNum(e.getCode());
                returnResult.setMessage(e.getMessage());
            }  catch (Exception e) {
                logger.error(e.getMessage(), e);
                returnResult.setCode(ReturnCodeType.FAILURE)
                        .setMessage(e.getMessage());
            }
            logger.info("called..");
            return returnResult;
        }

    整理了一下整个流程如图所示:

     4.ActiveMq持久化(这里只考虑持久化为MySQL方式)

    把mysql的驱动方法ActiveMQ的lib文件下,如:mysql-connector-java-5.0.4-bin.jar

    配置文件添加:

    <persistenceAdapter>
           <jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
        </persistenceAdapter>

    ActiveMq连接数据库相关配置

     <bean id="derby-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
        <property name="driverClassName" value="com.mysql.jdbc.Driver"/>
        <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
        <property name="username" value="activemq"/>
        <property name="password" value="activemq"/>
        <property name="maxActive" value="200"/>
        <property name="poolPreparedStatements" value="true"/>
      </bean>
  • 相关阅读:
    Flutter环境配置(window10环境)最完善版本
    ElementUI el-date-picker 限制选中日期前后30天,大于当天不可选
    fastreport添加每页的小计
    c# 类似sql中的isnull()语法
    HttPost HttpGet
    LaTeX幻灯片主题和颜色预览
    记录
    交易思想之顺大势逆小势
    Python读写json文件
    Python实现QQ PC端给好友发送消息
  • 原文地址:https://www.cnblogs.com/ouyanxia/p/7147169.html
Copyright © 2011-2022 走看看