zoukankan      html  css  js  c++  java
  • ActiveMQ的介绍及使用实例.

    今天就来说下 这个项目中使用ActiveMQ的情况, MQ: message queue, 顾名思义就是消息队列的意思. 


    一: 使用场景: 

    消息队列在大型电子商务类网站,如京东、淘宝、去哪儿等网站有这深入的应用,队列的主要作用是消除高并发访问高峰,加快网站的响应速度。在不使用消息队列的情况下,用户的请求数据直接写入数据库,在高并发的情况下,会对数据库造成巨大的压力,同时也使得系统响应延迟加剧。在使用队列后,用户的请求发给队列后立即返回(当然不能直接给用户提示订单提交成功,京东上提示:您“您提交了订单,请等待系统确认”),再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列的服务处理速度远快于数据库,因此用户的响应延迟可得到有效改善。

    那么在babasport这个项目中, 我们可以在上架的时候使用消息队列的模式:
    我们之前在点击一款商品上架的时候, 我们需要分成2步, 第一: 更新商品表中该商品的上架状态. 第二: 将该商品信息保存到Solr服务器中.  那么如果我们使用了消息队列后, 第二步就可以使用发送message来异步完成.

    消息队列可以接收消息和 发送消息

    消息队列类型:

    队列:一对一聊天  私聊  QQ

    主题(订阅模式):一对多聊天  群聊  QQ

    名词解释: 

     二, 代码原型
    ActiveMQ需要部署到Linux系统下, 这里就不再做概述.
    这里也是tar包, 导入到linux下直接解压启动即可, 前面已经有过很多博文讲Linux下一些常用软件的安装步骤.


    上架代码原型:
    项目构件图:

    未使用ActiveMQ前ProductServiceImpl.cs:

     1 //上架
     2     public void isShow(Long[] ids){
     3         Product product = new Product();
     4         product.setIsShow(true);
     5         for (final Long id : ids) {
     6             //上下架状态
     7             product.setId(id);
     8             productDao.updateByPrimaryKeySelective(product);
     9             
    10             //这个地方的代码应该在babasport-solr中写, 现在使用ActiveMQ进行迁移.
    11             //TODO 保存商品信息到Solr服务器
    12             SolrInputDocument doc = new SolrInputDocument();
    13             //ID
    14             doc.setField("id", id);
    15             //名称
    16             Product p = productDao.selectByPrimaryKey(id);
    17             doc.setField("name_ik", p.getName());
    18             //图片URL
    19             doc.setField("url", p.getImgUrls()[0]);
    20             //品牌 ID
    21             doc.setField("brandId", p.getBrandId());
    22             //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 1
    23             SkuQuery skuQuery = new SkuQuery();
    24             skuQuery.createCriteria().andProductIdEqualTo(id);
    25             skuQuery.setOrderByClause("price asc");
    26             skuQuery.setPageNo(1);
    27             skuQuery.setPageSize(1);
    28             List<Sku> skus = skuDao.selectByExample(skuQuery);
    29             doc.setField("price", skus.get(0).getPrice());
    30             //...时间等 剩下的省略
    31             
    32             try {
    33                 solrServer.add(doc);
    34                 solrServer.commit();
    35             } catch (Exception e) {
    36                 // TODO Auto-generated catch block
    37                 e.printStackTrace();
    38             }
    39             
    40             
    41             
    42             
    43             //TODO 静态化
    44         }
    45     }

    上面的代码 除了更改本来就该更改的商品状态信息外, 还去见商品信息保存到了Solr服务器中了. 这里我们使用ActiveMQ进行改造: 
    使用ActiveMQ后的ProductServiceImpl.cs:

     1 //上架
     2     public void isShow(Long[] ids){
     3         Product product = new Product();
     4         product.setIsShow(true);
     5         for (final Long id : ids) {
     6             //上下架状态
     7             product.setId(id);
     8             productDao.updateByPrimaryKeySelective(product);
     9             
    10             //发送商品ID到ActiveMQ即可.
    11             jmsTemplate.send(new MessageCreator() {
    12                 
    13                 @Override
    14                 public Message createMessage(Session session) throws JMSException {
    15                     
    16                     return session.createTextMessage(String.valueOf(id));
    17                 }
    18             });
    19             
    20             //TODO 静态化
    21         }
    22     }

    接着就是配置消息发送方(JMS生产者) mq.xml:

     1 <beans xmlns="http://www.springframework.org/schema/beans"
     2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
     3     xmlns:context="http://www.springframework.org/schema/context"
     4     xmlns:aop="http://www.springframework.org/schema/aop" 
     5     xmlns:tx="http://www.springframework.org/schema/tx"
     6     xmlns:task="http://www.springframework.org/schema/task"
     7     xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
     8     xsi:schemaLocation="http://www.springframework.org/schema/beans 
     9         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
    10         http://www.springframework.org/schema/mvc 
    11         http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 
    12         http://www.springframework.org/schema/context 
    13         http://www.springframework.org/schema/context/spring-context-4.0.xsd 
    14         http://www.springframework.org/schema/aop 
    15         http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
    16         http://www.springframework.org/schema/tx 
    17         http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
    18         http://www.springframework.org/schema/task
    19            http://www.springframework.org/schema/task/spring-task-4.0.xsd
    20         http://code.alibabatech.com/schema/dubbo        
    21         http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    22         
    23         
    24     <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->
    25     <!-- 连接工厂, 此工厂由Apache提供 -->
    26     <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    27         <!-- 连接地址 
    28             在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是61616
    29         -->
    30         <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
    31         <!-- 设置用户名及密码 -->
    32         <property name="userName" value="admin"></property>
    33         <property name="password" value="admin"></property>
    34     </bean>
    35         
    36     <!-- 配置连接池管理工厂 -->
    37     <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
    38         <!-- 注入工厂 -->
    39         <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
    40         <!-- 设置最大连接数 -->
    41         <property name="maxConnections" value="5"></property>
    42     </bean>
    43     
    44     <!-- 把上面的工厂交给Spring管理  -->
    45     <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    46         <!-- 注入上面的工厂 -->
    47         <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
    48     </bean>
    49     
    50     <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->
    51     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    52         <!-- 注入Spring单例工厂 -->
    53         <property name="connectionFactory" ref="singleConnectionFactory"></property>
    54         <!-- 设置默认的目标管道 -->
    55         <property name="defaultDestinationName" value="pId"/>
    56     </bean>
    57 </beans>

    配置说明: 这里是首先构建一个MQ的连接工厂, 只要ActiveMQ启动后 就可以这样构建连接了. 配置登录的用户名和和密码.
    接着就是配置连接池, 把连接工厂交给连接池去管理. 这些都是Apache厂商提供的. 
    接着就是再将连接池交由Spring管理. 
    最后我们再来配置一个jmsTemplate模板来操作ActiveMQ, 这个类似于jdbcTemplate模板. 而且我们这个里面注入了一个默认的管道, 也就是productId, 因为我们现在是 传递消息一一去对应, 关于怎么对应  就是依赖于这个管道.


    接下来我们就看下消息的接收方(JMS消费者)的一些东西:
    消费者的目录结构:(Solr)


    Solr项目中的ActiveMQ配置文件mq.xml:

     1 <beans xmlns="http://www.springframework.org/schema/beans"
     2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
     3     xmlns:context="http://www.springframework.org/schema/context"
     4     xmlns:aop="http://www.springframework.org/schema/aop" 
     5     xmlns:tx="http://www.springframework.org/schema/tx"
     6     xmlns:task="http://www.springframework.org/schema/task"
     7     xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
     8     xsi:schemaLocation="http://www.springframework.org/schema/beans 
     9         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
    10         http://www.springframework.org/schema/mvc 
    11         http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 
    12         http://www.springframework.org/schema/context 
    13         http://www.springframework.org/schema/context/spring-context-4.0.xsd 
    14         http://www.springframework.org/schema/aop 
    15         http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
    16         http://www.springframework.org/schema/tx 
    17         http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
    18         http://www.springframework.org/schema/task
    19            http://www.springframework.org/schema/task/spring-task-4.0.xsd
    20         http://code.alibabatech.com/schema/dubbo        
    21         http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    22         
    23         
    24     <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->
    25     <!-- 连接工厂, 此工厂由Apache提供 -->
    26     <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    27         <!-- 连接地址 
    28             在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是61616
    29         -->
    30         <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
    31         <!-- 设置用户名及密码 -->
    32         <property name="userName" value="admin"></property>
    33         <property name="password" value="admin"></property>
    34     </bean>
    35         
    36     <!-- 配置连接池管理工厂 ,由Apache提供.-->
    37     <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
    38         <!-- 注入工厂 -->
    39         <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
    40         <!-- 设置最大连接数 -->
    41         <property name="maxConnections" value="5"></property>
    42     </bean>
    43     
    44     <!-- 把上面的工厂交给Spring管理  -->
    45     <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    46         <!-- 注入上面的工厂 -->
    47         <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
    48     </bean>
    49     
    50     <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->
    51     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    52         <!-- 注入Spring单例工厂 -->
    53         <property name="connectionFactory" ref="singleConnectionFactory"></property>
    54         <!-- 设置默认的目标管道 -->
    55         <property name="defaultDestinationName" value="pId"/>
    56     </bean>
    57     
    58     <!-- 实例化一个监听到消息后 处理此消息的类 -->
    59     <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>
    60     
    61     <!-- 配置实时监听器 -->
    62     <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    63         <!-- 配置工厂, 需要配置spirng的工厂 -->
    64         <property name="connectionFactory" ref="singleConnectionFactory"/>
    65         <!-- 设置监听的目标 -->
    66         <property name="destinationName" value="pId"/>
    67         <!-- 监听后获取消息的类, 接收监听到的消息 -->
    68         <property name="messageListener" ref="customMessageListener"></property>
    69     </bean>
    70 </beans>

    我们来说下 和上面配置不同的地方, 我们在这里配置了一个监听器, 因为接收到 JMS 生产者发过来的消息后我们需要有个监听器去监听且 将监听到的消息拿过来处理.
    接下来看看监听器的处理方法做了些什么事情: 
    CustomMessageListener.java:

     1 /*
     2  * 接收MQ中的消息
     3  */
     4 public class CustomMessageListener implements MessageListener{
     5     @Autowired
     6     private SearchService searchService;
     7     
     8     @Override
     9     public void onMessage(Message message) {
    10         //先将接收到的消息强转为ActiveMQ类型的消息
    11         //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage
    12         ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;
    13         try {
    14             String id = amtm.getText();
    15             System.out.println("接收到的ID:"+id);
    16             searchService.insertProductToSolr(Long.parseLong(id));
    17         } catch (JMSException e) {
    18             // TODO Auto-generated catch block
    19             e.printStackTrace();
    20         }
    21     }

    因为我们接收到的是string类型的文本, 所以这里我们直接将接收到的消息转换为ActiveMQText类型, 然后通过getText去得到传递过来的id, 然后我们就可以通过这个productId去做相应的操作了. 

    接下来就看保存商品信息到Solr服务器的逻辑:
    SearchServiceImpl.java:

     1 //保存商品信息到Solr服务器中, 通过ActiveMQ
     2     public void insertProductToSolr(Long productId){
     3         //TODO 保存商品信息到Solr服务器
     4         SolrInputDocument doc = new SolrInputDocument();
     5         //ID
     6         doc.setField("id", productId);
     7         //名称
     8         Product p = productDao.selectByPrimaryKey(productId);
     9         doc.setField("name_ik", p.getName());
    10         //图片URL
    11         doc.setField("url", p.getImgUrls()[0]);
    12         //品牌 ID
    13         doc.setField("brandId", p.getBrandId());
    14         //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 1
    15         SkuQuery skuQuery = new SkuQuery();
    16         skuQuery.createCriteria().andProductIdEqualTo(productId);
    17         skuQuery.setOrderByClause("price asc");
    18         skuQuery.setPageNo(1);
    19         skuQuery.setPageSize(1);
    20         List<Sku> skus = skuDao.selectByExample(skuQuery);
    21         doc.setField("price", skus.get(0).getPrice());
    22         //...时间等 剩下的省略
    23         
    24         try {
    25             solrServer.add(doc);
    26             solrServer.commit();
    27         } catch (Exception e) {
    28             // TODO Auto-generated catch block
    29             e.printStackTrace();
    30         }
    31     }

    这样就比较明朗了, ActiveMQ 队列就是这样来实现的. 

    ====================接下来还会有 ActiveMQ 订阅者模式的示例, 这里只是生产者发送消息给单个消费者, 下次还会更新生产者发送消息给多个消费者.

     2016/09/04 20:32 更新
    上面已经说了 消息的队列模式, 及点对点发送消息, 那么接下来就来说下 消息的一对多模式, 也就是 发布/订阅模式.
    项目原型: 当商品上架后(babasport-product), 发送消息id给solr(babasport-solr)来将商品信息保存到solr服务器和cms(babasport-cms)来对商品详情页面做页面静态化.

    ===================
    babasport-product:
    结构图:

    babasport-product下的项目结构图:


    ProductServiceImpl.java中的上架:

     1 @Autowired
     2     private JmsTemplate jmsTemplate;
     3     
     4     //上架
     5     public void isShow(Long[] ids){
     6         Product product = new Product();
     7         product.setIsShow(true);
     8         for (final Long id : ids) {
     9             //上下架状态
    10             product.setId(id);
    11             productDao.updateByPrimaryKeySelective(product);
    12             
    13             //发送商品ID到ActiveMQ即可.
    14             jmsTemplate.send(new MessageCreator() {
    15                 
    16                 @Override
    17                 public Message createMessage(Session session) throws JMSException {
    18                     
    19                     return session.createTextMessage(String.valueOf(id));
    20                 }
    21             });
    22         }
    23     }
    View Code

    mq.xml:

     1 <beans xmlns="http://www.springframework.org/schema/beans"
     2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
     3     xmlns:context="http://www.springframework.org/schema/context"
     4     xmlns:aop="http://www.springframework.org/schema/aop" 
     5     xmlns:tx="http://www.springframework.org/schema/tx"
     6     xmlns:task="http://www.springframework.org/schema/task"
     7     xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
     8     xsi:schemaLocation="http://www.springframework.org/schema/beans 
     9         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
    10         http://www.springframework.org/schema/mvc 
    11         http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 
    12         http://www.springframework.org/schema/context 
    13         http://www.springframework.org/schema/context/spring-context-4.0.xsd 
    14         http://www.springframework.org/schema/aop 
    15         http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
    16         http://www.springframework.org/schema/tx 
    17         http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
    18         http://www.springframework.org/schema/task
    19            http://www.springframework.org/schema/task/spring-task-4.0.xsd
    20         http://code.alibabatech.com/schema/dubbo        
    21         http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    22         
    23         
    24     <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->
    25     <!-- 连接工厂, 此工厂由Apache提供 -->
    26     <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    27         <!-- 连接地址 
    28             在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是61616
    29         -->
    30         <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
    31         <!-- 设置用户名及密码 -->
    32         <property name="userName" value="admin"></property>
    33         <property name="password" value="admin"></property>
    34     </bean>
    35         
    36     <!-- 配置连接池管理工厂 -->
    37     <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
    38         <!-- 注入工厂 -->
    39         <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
    40         <!-- 设置最大连接数 -->
    41         <property name="maxConnections" value="5"></property>
    42     </bean>
    43     
    44     <!-- 把上面的工厂交给Spring管理  -->
    45     <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    46         <!-- 注入上面的工厂 -->
    47         <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
    48     </bean>
    49     
    50     <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->
    51     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    52         <!-- 注入Spring单例工厂 -->
    53         <property name="connectionFactory" ref="singleConnectionFactory"></property>
    54         <!-- 设置默认的目标管道 -->
    55         <property name="defaultDestinationName" value="pId"/>
    56         <!-- 默认是队列模式, 可改为主题, 发布模式 publish subject -->
    57         <property name="pubSubDomain" value="true"/>
    58     </bean>
    59 </beans>
    View Code

    这里面的最大的变化就是将消息发布模式改为了: publish subject.

    ============================================
    babasport-solr:

    mq.xml配置文件:

     1 <beans xmlns="http://www.springframework.org/schema/beans"
     2     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:mvc="http://www.springframework.org/schema/mvc"
     3     xmlns:context="http://www.springframework.org/schema/context"
     4     xmlns:aop="http://www.springframework.org/schema/aop" 
     5     xmlns:tx="http://www.springframework.org/schema/tx"
     6     xmlns:task="http://www.springframework.org/schema/task"
     7     xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
     8     xsi:schemaLocation="http://www.springframework.org/schema/beans 
     9         http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
    10         http://www.springframework.org/schema/mvc 
    11         http://www.springframework.org/schema/mvc/spring-mvc-4.0.xsd 
    12         http://www.springframework.org/schema/context 
    13         http://www.springframework.org/schema/context/spring-context-4.0.xsd 
    14         http://www.springframework.org/schema/aop 
    15         http://www.springframework.org/schema/aop/spring-aop-4.0.xsd 
    16         http://www.springframework.org/schema/tx 
    17         http://www.springframework.org/schema/tx/spring-tx-4.0.xsd
    18         http://www.springframework.org/schema/task
    19            http://www.springframework.org/schema/task/spring-task-4.0.xsd
    20         http://code.alibabatech.com/schema/dubbo        
    21         http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
    22         
    23         
    24     <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->
    25     <!-- 连接工厂, 此工厂由Apache提供 -->
    26     <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    27         <!-- 连接地址 
    28             在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是61616
    29         -->
    30         <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
    31         <!-- 设置用户名及密码 -->
    32         <property name="userName" value="admin"></property>
    33         <property name="password" value="admin"></property>
    34     </bean>
    35         
    36     <!-- 配置连接池管理工厂 ,由Apache提供.-->
    37     <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
    38         <!-- 注入工厂 -->
    39         <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
    40         <!-- 设置最大连接数 -->
    41         <property name="maxConnections" value="5"></property>
    42     </bean>
    43     
    44     <!-- 把上面的工厂交给Spring管理  -->
    45     <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    46         <!-- 注入上面的工厂 -->
    47         <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
    48     </bean>
    49     
    50     <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->
    51     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    52         <!-- 注入Spring单例工厂 -->
    53         <property name="connectionFactory" ref="singleConnectionFactory"></property>
    54         <!-- 设置默认的目标管道 -->
    55         <property name="defaultDestinationName" value="pId"/>
    56     </bean>
    57     
    58     <!-- 实例化一个监听到消息后 处理此消息的类 -->
    59     <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>
    60     
    61     <!-- 配置实时监听器 -->
    62     <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    63         <!-- 配置工厂, 需要配置spirng的工厂 -->
    64         <property name="connectionFactory" ref="singleConnectionFactory"/>
    65         <!-- 设置监听的目标 -->
    66         <property name="destinationName" value="pId"/>
    67         <!-- 监听后获取消息的类, 接收监听到的消息 -->
    68         <property name="messageListener" ref="customMessageListener"></property>
    69         <!-- 默认是队列模式, 可改为主题, 发布模式 publish subject -->
    70         <property name="pubSubDomain" value="true"/>
    71     </bean>
    72 </beans>
    View Code

    SearchServiceImpl.java: 保存商品信息到Solr服务器中, 通过ActiveMQ

     1 //保存商品信息到Solr服务器中, 通过ActiveMQ
     2     public void insertProductToSolr(Long productId){
     3         //TODO 保存商品信息到Solr服务器
     4         SolrInputDocument doc = new SolrInputDocument();
     5         //ID
     6         doc.setField("id", productId);
     7         //名称
     8         Product p = productDao.selectByPrimaryKey(productId);
     9         doc.setField("name_ik", p.getName());
    10         //图片URL
    11         doc.setField("url", p.getImgUrls()[0]);
    12         //品牌 ID
    13         doc.setField("brandId", p.getBrandId());
    14         //价格 sql查询语句: select price from bbs_sku where product_id = ? order by price asc limit 1
    15         SkuQuery skuQuery = new SkuQuery();
    16         skuQuery.createCriteria().andProductIdEqualTo(productId);
    17         skuQuery.setOrderByClause("price asc");
    18         skuQuery.setPageNo(1);
    19         skuQuery.setPageSize(1);
    20         List<Sku> skus = skuDao.selectByExample(skuQuery);
    21         doc.setField("price", skus.get(0).getPrice());
    22         //...时间等 剩下的省略
    23         
    24         try {
    25             solrServer.add(doc);
    26             solrServer.commit();
    27         } catch (Exception e) {
    28             // TODO Auto-generated catch block
    29             e.printStackTrace();
    30         }
    31     }
    View Code

    CustomMessageListener.java: 监听ActiveMQ中传递过来的消息, 且对传递过来的消息进行处理:

     1 public class CustomMessageListener implements MessageListener{
     2     @Autowired
     3     private SearchService searchService;
     4     
     5     @Override
     6     public void onMessage(Message message) {
     7         //先将接收到的消息强转为ActiveMQ类型的消息
     8         //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage
     9         ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;
    10         try {
    11             String id = amtm.getText();
    12             System.out.println("接收到的ID:"+id);
    13             searchService.insertProductToSolr(Long.parseLong(id));
    14         } catch (JMSException e) {
    15             // TODO Auto-generated catch block
    16             e.printStackTrace();
    17         }
    18     }
    19 }
    View Code


    ===============================
    babasport-cms:
    mq.xml:

     1 <!-- 配置Spring 来管理MQ消息队列 , 连接ActiveMQ-->
     2     <!-- 连接工厂, 此工厂由Apache提供 -->
     3     <bean id="activeMQConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
     4         <!-- 连接地址 
     5             在网页端访问是:http://192.168.200.128:8161, 但是这里是tcp连接, 端口号是61616
     6         -->
     7         <property name="brokerURL" value="tcp://192.168.200.128:61616"/>
     8         <!-- 设置用户名及密码 -->
     9         <property name="userName" value="admin"></property>
    10         <property name="password" value="admin"></property>
    11     </bean>
    12         
    13     <!-- 配置连接池管理工厂 ,由Apache提供.-->
    14     <bean id="pooledConnectionFactoryBean" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
    15         <!-- 注入工厂 -->
    16         <property name="connectionFactory" ref="activeMQConnectionFactory"></property>
    17         <!-- 设置最大连接数 -->
    18         <property name="maxConnections" value="5"></property>
    19     </bean>
    20     
    21     <!-- 把上面的工厂交给Spring管理  -->
    22     <bean id="singleConnectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    23         <!-- 注入上面的工厂 -->
    24         <property name="targetConnectionFactory" ref="pooledConnectionFactoryBean"></property>
    25     </bean>
    26     
    27     <!-- 使用Spring提供的jmsTemplate模板来操作ActiveMQ -->
    28     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    29         <!-- 注入Spring单例工厂 -->
    30         <property name="connectionFactory" ref="singleConnectionFactory"></property>
    31         <!-- 设置默认的目标管道 -->
    32         <property name="defaultDestinationName" value="pId"/>
    33     </bean>
    34     
    35     <!-- 实例化一个监听到消息后 处理此消息的类 -->
    36     <bean id="customMessageListener" class="cn.itcast.core.service.message.CustomMessageListener"/>
    37     
    38     <!-- 配置实时监听器 -->
    39     <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    40         <!-- 配置工厂, 需要配置spirng的工厂 -->
    41         <property name="connectionFactory" ref="singleConnectionFactory"/>
    42         <!-- 设置监听的目标 -->
    43         <property name="destinationName" value="pId"/>
    44         <!-- 监听后获取消息的类, 接收监听到的消息 -->
    45         <property name="messageListener" ref="customMessageListener"></property>
    46         <!-- 默认是队列模式, 可改为主题, 发布模式 publish subject -->
    47         <property name="pubSubDomain" value="true"/>
    48     </bean>
    View Code

    CustomMessageListener.java: 监听ActiveMQ中传递过来的消息, 且对传递过来的消息进行处理:

     1 public class CustomMessageListener implements MessageListener{
     2     @Autowired
     3     private StaticPageService staticPageService;
     4     @Autowired
     5     private CMSService cmsService;
     6     
     7     @Override
     8     public void onMessage(Message message) {
     9         //先将接收到的消息强转为ActiveMQ类型的消息
    10         //因为在消息发送方那边传递的是Text类型的消息对象, 所以需要转成ActiveMQTextMessage
    11         ActiveMQTextMessage amtm = (ActiveMQTextMessage)message;
    12         try {
    13             String id = amtm.getText();
    14             System.out.println("CMS接收到的ID:"+id);
    15             Map<String, Object> root = new HashMap<String, Object>();
    16             
    17             Product product = cmsService.selectProductById(Long.parseLong(id));
    18             List<Sku> skus = cmsService.selectSkuListByProductIdWithStock(Long.parseLong(id));
    19             //去掉重复的颜色
    20             Set<Color> colors = new HashSet<Color>();
    21             for (Sku sku : skus) {
    22                 colors.add(sku.getColor());
    23             }
    24             root.put("colors", colors);
    25             root.put("product", product);
    26             root.put("skus", skus);
    27             
    28             staticPageService.index(root, id);
    29         } catch (JMSException e) {
    30             // TODO Auto-generated catch block
    31             e.printStackTrace();
    32         }
    33     }
    34 }
    View Code

    StaticPageServiceImpl.java: 静态化页面的核心类:

     1 public class StaticPageServiceImpl implements StaticPageService, ServletContextAware{
     2     //SpringMvc 管理 conf
     3     private Configuration conf;
     4     public void setFreeMarkerConfig(FreeMarkerConfig freeMarkerConfig) {
     5         this.conf = freeMarkerConfig.getConfiguration();
     6     }
     7 
     8     //静态化页面的方法
     9     public void index(Map<String, Object> root, String id){
    10         //输出目录: 通过getPath方法获取的是绝对路径
    11         String path = getPath("/html/product/" + id +".html");
    12         File f = new File(path);
    13         File parentFile = f.getParentFile();
    14         if(!parentFile.exists()){
    15             parentFile.mkdirs();
    16         }
    17         
    18         //spring中已经设置了模板路径:<property name="templateLoaderPath" value="/WEB-INF/ftl/" />
    19         Writer out = null;
    20         
    21         try {
    22             //
    23             Template template = conf.getTemplate("product.html");
    24             
    25             //设置输出的位置
    26             //
    27             out = new OutputStreamWriter(new FileOutputStream(f), "UTF-8");
    28             template.process(root, out);
    29         } catch (Exception e) {
    30             // TODO Auto-generated catch block
    31             e.printStackTrace();
    32         }finally {
    33             if (out != null)
    34             {
    35                 try {
    36                     out.close();
    37                 } catch (IOException e) {
    38                     // TODO Auto-generated catch block
    39                     e.printStackTrace();
    40                 }
    41             }
    42             
    43         }
    44         
    45     }
    46 
    47     //获取webapp下的html文件夹所在的位置
    48     //将相对路径转换为绝对路径
    49     public String getPath(String path){
    50         return servletContext.getRealPath(path);
    51     }
    52     
    53     private ServletContext servletContext;
    54     @Override
    55     public void setServletContext(ServletContext servletContext) {
    56         this.servletContext = servletContext;
    57     }
    58 }
    View Code

    Spring管理 freemarkerConfig配置类:

     1 <!-- 配置freemarker 实现类 -->    
     2         <bean class="cn.itcast.core.service.StaticPageServiceImpl">
     3             <property name="freeMarkerConfig">
     4                 <bean class="org.springframework.web.servlet.view.freemarker.FreeMarkerConfigurer">
     5                     <!-- 设置模板所在目录或文件夹的位置, 相对路径  -->
     6                     <property name="templateLoaderPath" value="/WEB-INF/ftl/" />
     7                     <!-- 设置默认编码集 -->
     8                     <property name="defaultEncoding" value="UTF-8"></property>
     9                 </bean>
    10             </property>
    11         </bean>
    View Code

    更多关于freemarker的讲解请关注我以后的博客...
    关于ActiveMQ的内容就更新到这么多.



  • 相关阅读:
    FiddlerScript修改特定请求参数下的返回值
    nginx设置反向代理后,页面上的js css文件无法加载
    通过外网访问内网服务器
    linux下使用正确的用户名密码,本地无法连接mysql
    合并重叠时间段C#
    数据库一直显示为单用户,解决办法
    windows下使用tomcat部署网站
    数据库一直还原中,解决办法
    通过mdf ldf文件还原数据库
    知道css有个content属性吗?有什么作用?有什么应用?
  • 原文地址:https://www.cnblogs.com/wang-meng/p/5831538.html
Copyright © 2011-2022 走看看