zoukankan      html  css  js  c++  java
  • activemq 生产消费模式,订阅发布模式不同类型数据传输

    1.项目结构

    2. activemq-pom pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>activemq</groupId>
      <artifactId>activemq-pom</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>pom</packaging>
      <modules>
          <module>activemq-producer</module>
          <module>activemq-consumer</module>
          <module>activemq-common</module>
      </modules>
      
      <properties>
          <springframework>5.0.4.RELEASE</springframework>
          <slf4j.version>1.7.7</slf4j.version>
          <logback.version>1.1.2</logback.version>
          <fastjson.version>1.2.44</fastjson.version>
      </properties>
      
    </project>

    3.activemq-common pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <parent>
        <groupId>activemq</groupId>
        <artifactId>activemq-pom</artifactId>
        <version>0.0.1-SNAPSHOT</version>
      </parent>
      <groupId>activemq-common</groupId>
      <artifactId>activemq-common</artifactId>
      
      <dependencies>
          <!-- activemq -->
        <dependency>  
            <groupId>org.apache.activemq</groupId>  
            <artifactId>activemq-core</artifactId>  
            <version>5.7.0</version>
        </dependency> 
        <dependency>  
            <groupId>org.apache.activemq</groupId>  
            <artifactId>activemq-pool</artifactId>  
            <version>5.12.1</version>  
        </dependency>  
        
        
        <!-- logback -->
            <dependency>
            <!--主要介绍的是这个jar包,这个包是负责logback随着项目启动的jar包-->
                <groupId>org.logback-extensions</groupId>
                <artifactId>logback-ext-spring</artifactId>
                <version>0.1.4</version>
            </dependency>
            <!-- slf4j start -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>jcl-over-slf4j</artifactId>
                <version>${slf4j.version}</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-access</artifactId>
                <version>${logback.version}</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-core</artifactId>
                <version>${logback.version}</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>${logback.version}</version>
            </dependency>
            <!-- slf4j end -->
        
        <!-- 反射工具 -->
            <dependency>
                <groupId>org.db4j</groupId>
                <artifactId>reflectasm</artifactId>
                <version>1.11.4-2</version>
            </dependency>
            
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>${fastjson.version}</version>
            </dependency>
            
            
              <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
            
      </dependencies>
      
    </project>

    4.activemq-producer pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <parent>
        <groupId>activemq</groupId>
        <artifactId>activemq-pom</artifactId>
        <version>0.0.1-SNAPSHOT</version>
      </parent>
      <groupId>activemq-producer</groupId>
      <artifactId>activemq-producer</artifactId>
      <packaging>war</packaging>
      
      <dependencies>
          <dependency>  
                <groupId>org.springframework</groupId>  
                <artifactId>spring-test</artifactId>  
                <version>${springframework}</version>  
                <scope>provided</scope>  
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${springframework}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${springframework}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>${springframework}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${springframework}</version>
        </dependency>
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-jms</artifactId>  
            <version>${springframework}</version>  
        </dependency>
        <!-- 导入java ee jar 包 -->
            <dependency>
                <groupId>javax</groupId>
                <artifactId>javaee-api</artifactId>
                <version>7.0</version>
            </dependency>
        <!-- xbean 如<amq:connectionFactory /> -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
    
        <dependency>
            <groupId>activemq-common</groupId>
            <artifactId>activemq-common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
      </dependencies>
    </project>

    5.activemq-consumer pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <parent>
        <groupId>activemq</groupId>
        <artifactId>activemq-pom</artifactId>
        <version>0.0.1-SNAPSHOT</version>
      </parent>
      <groupId>activemq-consumer</groupId>
      <artifactId>activemq-consumer</artifactId>
      <packaging>war</packaging>
      
      <dependencies>
          <dependency>  
                <groupId>org.springframework</groupId>  
                <artifactId>spring-test</artifactId>  
                <version>${springframework}</version>  
                <scope>provided</scope>  
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${springframework}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${springframework}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-tx</artifactId>
            <version>${springframework}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-webmvc</artifactId>
            <version>${springframework}</version>
        </dependency>
        <dependency>  
            <groupId>org.springframework</groupId>  
            <artifactId>spring-jms</artifactId>  
            <version>${springframework}</version>  
        </dependency>
        <!-- 导入java ee jar 包 -->
            <dependency>
                <groupId>javax</groupId>
                <artifactId>javaee-api</artifactId>
                <version>7.0</version>
            </dependency>
        <!-- xbean 如<amq:connectionFactory /> -->
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
    
        <dependency>
            <groupId>activemq-common</groupId>
            <artifactId>activemq-common</artifactId>
            <version>0.0.1-SNAPSHOT</version>
        </dependency>
      </dependencies>
      
    </project>

    6.activemq-producer activemq.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:amq="http://activemq.apache.org/schema/core"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:mvc="http://www.springframework.org/schema/mvc"
        xsi:schemaLocation="
            http://www.springframework.org/schema/beans     
            http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-4.1.xsd
            http://www.springframework.org/schema/mvc
            http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
            http://www.springframework.org/schema/jms
            http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
            http://activemq.apache.org/schema/core
            http://activemq.apache.org/schema/core/activemq-core.xsd
            http://camel.apache.org/schema/spring 
            http://camel.apache.org/schema/spring/camel-spring.xsd"
            >
        
        <!-- 配置JMS连接工厂 -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">  
            <!-- Session缓存数量 -->  
            <property name="sessionCacheSize" value="10" />  
            <!-- 接收者ID -->  
            <property name="clientId" value="client_3" />  
            <property name="targetConnectionFactory">  
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                    <!-- MQ地址 -->  
                    <property name="brokerURL" value="tcp://192.168.64.128:61616" />  
                    <property name="userName" value="admin" />
                    <property name="password" value="admin" />
                </bean>  
            </property>  
        </bean>  
        
        <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="receiveTimeout" value="10000" />
            <!-- true是topic,false是queue,默认是false,此处显示写出false -->
            <property name="pubSubDomain" value="false" />
        </bean>
        
        <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
        <bean id="jmsTopicTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="receiveTimeout" value="10000" />
            <!-- true是topic,false是queue,默认是false,此处显示写出false -->
            <property name="pubSubDomain" value="true" />
             <!--转换器,我们自己可以继承重写这个类的方法 ,这里使用spring提供的默认方法 -->  
            <property name="messageConverter">  
                <bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />  
            </property>  
        </bean>
        
     </beans>

    7.消息发送工具类 ProducerTest.java

    package com.bonade.activemq.test;
    
    import java.util.Date;
    import java.util.HashMap;
    import java.util.Map;
    
    import javax.jms.JMSException;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    import com.bonade.activemq.util.ProducerUtil;
    
    @RunWith(SpringJUnit4ClassRunner.class) //使用junit4进行测试  
    @ContextConfiguration(locations={"classpath:spring.xml"}) //加载配置文件 
    public class ProducerTest {
        
         @Autowired
         private ProducerUtil producerUtil;
        
        @Test
        public void sendTextMsg() {
            try {
                producerUtil.sendTextMsg("bonade.q1", "it is my world");
            } catch (JMSException e) {
                
            }
        }
        
        @Test
        public void sendMapMsg() {
            Map<String,String> param = new HashMap<>();
            param.put("name", "张三");
            param.put("sex", "男");
            param.put("age", "23");
            
            producerUtil.sendMapMsg("bonade.q", param);
        }
        
        
        @Test
        public void sendObjectMsg() {
            Map<String,String> param = new HashMap<>();
            param.put("name", "张三");
            param.put("sex", "男");
            param.put("age", "23");
            
            BaseDTO dto = new BaseDTO();
            dto.setBizno("CS201805110001");
            dto.setDate(new Date());
            producerUtil.sendObjectMsg("bonade.q", dto);
        }
    
        
        @Test
        public void sendTopicTextMsg() {
            try {
                producerUtil.sendTopicTextMsg("topic.q.2", "it is my world");
            } catch (JMSException e) {
                
            }
        }
        
        @Test
        public void sendTopicMapMsg() {
            Map<String,String> param = new HashMap<>();
            param.put("name", "张三");
            param.put("sex", "男");
            param.put("age", "23");
            
            producerUtil.sendTopicMapMsg("topic.q.2", param);
        }
        
        
        @Test
        public void sendTopicObjectMsg() {
            Map<String,String> param = new HashMap<>();
            param.put("name", "张三");
            param.put("sex", "男");
            param.put("age", "23");
            
            BaseDTO dto = new BaseDTO();
            dto.setBizno("CS201805110001");
            dto.setDate(new Date());
            producerUtil.sendTopicObjectMsg("topic.q.2", dto);
        }
        
        
    }

    8.activemq-consumer activemq.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xmlns:amq="http://activemq.apache.org/schema/core"
        xmlns:jms="http://www.springframework.org/schema/jms"
        xmlns:context="http://www.springframework.org/schema/context"
        xmlns:mvc="http://www.springframework.org/schema/mvc"
        xsi:schemaLocation="
            http://www.springframework.org/schema/beans     
            http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-4.1.xsd
            http://www.springframework.org/schema/mvc
            http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
            http://www.springframework.org/schema/jms
            http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
            http://activemq.apache.org/schema/core
            http://activemq.apache.org/schema/core/activemq-core.xsd
            http://camel.apache.org/schema/spring 
            http://camel.apache.org/schema/spring/camel-spring.xsd"
            >
        
        <!-- 配置JMS连接工厂 -->  
        <bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">  
            <!-- Session缓存数量 -->  
            <property name="sessionCacheSize" value="10" />  
            <!-- 接收者ID -->  
            <property name="clientId" value="client_2" />  
            <property name="targetConnectionFactory">  
                <bean class="org.apache.activemq.ActiveMQConnectionFactory">  
                    <!-- MQ地址 -->  
                    <property name="brokerURL" value="tcp://192.168.64.128:61616" />  
                    <property name="userName" value="admin" />
                    <property name="password" value="admin" />
                </bean>  
            </property>  
        </bean>  
        
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg index="0" value="bonade.q1" />
        </bean>
        
        <bean id="queueListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="queueDestination" />
            <property name="messageListener" ref="queueMessageListener" />
            <property name="receiveTimeout" value="10000"/>  
        </bean>
        
        <bean id="queueMessageListener" class="com.bonade.activemq.QueueMessageListener" />
        
        <!-- 发送消息的目的地(一个主题) -->  
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">  
            <!-- 设置消息主题的名字 -->  
            <constructor-arg index="0" value="topic.q.2" />  
        </bean>  
      
        <!-- 消息监听容器 -->  
        <bean id="topicListenerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">  
            <property name="connectionFactory" ref="connectionFactory" />  
            <!-- 发布订阅模式 -->  
            <property name="pubSubDomain" value="true"/>  
            <!-- 消息持久化 -->  
            <property name="subscriptionDurable" value="true"/>  
            <property name="receiveTimeout" value="10000"/>  
            <property name="destination" ref="topicDestination" />  
            <property name="messageListener" ref="queueMessageListener" />  
        </bean>  
        
     </beans>

    9.消费监听类 QueueMessageListener.java

    package com.bonade.activemq;
    
    import javax.jms.MapMessage;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.ObjectMessage;
    import javax.jms.TextMessage;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.alibaba.fastjson.JSON;
    
    public class QueueMessageListener implements MessageListener {
        
        private static final Logger LOGGER = LoggerFactory.getLogger(QueueMessageListener.class);
    
        public void onMessage(Message message) {  
            if(message instanceof TextMessage){
                TextMessage text = (TextMessage) message;  
                LOGGER.debug("收到文本消息:{}",text);
            }  
            if(message instanceof MapMessage){
                MapMessage map = (MapMessage) message;  
                LOGGER.debug("收到Map消息:{}",map);
                
            }  
            if(message instanceof ObjectMessage){  
                ObjectMessage objMsg = (ObjectMessage) message;  
                LOGGER.debug("收到Object消息:{}",JSON.toJSONString(objMsg));
                
            }  
        }  
        
    }
  • 相关阅读:
    [日常工作] cmd以及bash 直接使用当前目录的方法
    [安全] 公司局域网病毒处理
    SQLserver 使用网络驱动器恢复数据库
    MiniDP与HDMI的关系
    Win10删除微软拼音输入法的方法
    SQLSERVER case when 的学习
    [日常工作]偷懒创建一个存储过程进行模拟工作.
    oracle 18c centos7 设置开机自动启动Oracle
    kali linux升级
    [日常工作]Oracle新增数据文件的小知识点
  • 原文地址:https://www.cnblogs.com/yun965861480/p/9040159.html
Copyright © 2011-2022 走看看