一:ActiveMQ的Broker
ActiveMQ除了可以作为独立进程单独部署在服务器上之外,也可以很小巧的内嵌在程序中启动,下面我们来简单的介绍内置Broker启动的一种方式。
1.1引入maven的依赖
<!--ActiveMQ依赖包--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </dependency>
1.2Java代码
package com.yjc.activemq; import org.apache.activemq.broker.BrokerService; public class Broker { public static void main(String[] args) throws Exception { BrokerService brokerService=new BrokerService(); brokerService.setUseJmx(true); brokerService.addConnector("tcp://localhost:61616"); brokerService.start(); } }
启动上面的main方法之后,就可以使用生产者和消费者对我们部署的这个小型的ActiveMQ进行访问了,三者的地址要一样,十分的小巧方便
二:Spring整合ActiveMQ
2.1引入maven的依赖
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all --> <!--ActiveMQ依赖包--> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-all</artifactId> <version>5.15.9</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring --> <!--ActiveMQ和SPring整合包--> <dependency> <groupId>org.apache.xbean</groupId> <artifactId>xbean-spring</artifactId> <version>4.14</version> </dependency> <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --> <!--用于引入ActiveMQ的broker--> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.10.0</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-jms</artifactId> </dependency> <dependency> <groupId>org.apache.activemq</groupId> <artifactId>activemq-pool</artifactId> </dependency>
2.2配置ApplicationContext.xml文件
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!--开启包扫描器--> <context:component-scan base-package="com.yjc.spring"/> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.118.3:61616" /> </bean> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 通过往PooledConnectionFactory注入一个ActiveMQConnectionFactory可以用来将Connection,Session和MessageProducer池化这样可以大大减少我们的资源消耗, --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="10" /> </bean> <!--默认的目的地地址--> <bean id="activeMQQueue" class="org.apache.activemq.command.ActiveMQQueue"> <!--设置队列的名称--> <constructor-arg index="0" value="spring-active-queue"/> </bean> <!-- 配置生产者:配置好ConnectionFactory之后我们就需要配置生产者。生产者负责产生消息并发送到JMS服务器,这通常对应的是我们的一个业务逻辑服务实现类。 但是我们的服务实现类是怎么进行消息的发送的呢?这通常是利用Spring为我们提供的JmsTemplate类来实现的, 所以配置生产者其实最核心的就是配置进行消息发送的JmsTemplate。对于消息发送者而言,它在发送消息的时候要知道自己该往哪里发, 为此,我们在定义JmsTemplate的时候需要往里面注入一个Spring提供的ConnectionFactory对象 --> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="activeMQQueue"/> </bean> </beans>
2.3编写生产者代码
由于进行简单的整合测试,没有使用MVC的分层架构,仅仅使用了一个service,要想访问Spring容器中的bean对象时,需要当前对象也需要是一个bean对象,所以我用@Service将生产者和消费者都声明成bean,方便我调用其他的bean。
package com.yjc.spring; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; import javax.jms.TextMessage; @Service public class Producer { @Autowired private JmsTemplate jmsTemplate; public static void main(String[] args) { ApplicationContext applicationContext=new ClassPathXmlApplicationContext("ApplicationContext.xml"); Producer producer = (Producer)applicationContext.getBean("producer"); //才用1.8的新特性lombda表达式来实现的 producer.jmsTemplate.send((session)->{ TextMessage textMessage= session.createTextMessage("俺是消息"); return textMessage; }); System.out.println("消息已经放入到队列里了"); } }
2.4消费者
package com.yjc.spring; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; import org.springframework.jms.core.JmsTemplate; import org.springframework.stereotype.Service; @Service public class Consumer { @Autowired private JmsTemplate jmsTemplate; public static void main(String[] args) { ApplicationContext applicationContext=new ClassPathXmlApplicationContext("ApplicationContext.xml"); Consumer consumer = (Consumer)applicationContext.getBean("consumer"); String retValue = (String) consumer.jmsTemplate.receiveAndConvert(); System.out.println("----------------消费者收到的消息"+retValue); } }
2.5在Spring中实现消费者不启动,依然可以消费消息,通过配置监听完成
在Topic模式中,如果没有消费者进行订阅,那么生产者生产出来的消息就是非消息,我们可以通过配置监听来实现不期待消费者,实现消费
2.5.1在配置文件中将默认的目标地址更改为Topic
<!--开启包扫描器--> <context:component-scan base-package="com.yjc.spring"/> <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="tcp://192.168.118.3:61616" /> </bean> <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory"> <property name="targetConnectionFactory" ref="targetConnectionFactory" /> </bean> <!-- 通过往PooledConnectionFactory注入一个ActiveMQConnectionFactory可以用来将Connection,Session和MessageProducer池化这样可以大大减少我们的资源消耗, --> <bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"> <property name="connectionFactory" ref="targetConnectionFactory" /> <property name="maxConnections" value="10" /> </bean> <!--默认的目的地地址--> <bean id="activeMQTopic" class="org.apache.activemq.command.ActiveMQTopic"> <!--设置队列的名称--> <constructor-arg index="0" value="spring-active-topic"/> </bean> <!-- 配置生产者:配置好ConnectionFactory之后我们就需要配置生产者。生产者负责产生消息并发送到JMS服务器,这通常对应的是我们的一个业务逻辑服务实现类。 但是我们的服务实现类是怎么进行消息的发送的呢?这通常是利用Spring为我们提供的JmsTemplate类来实现的, 所以配置生产者其实最核心的就是配置进行消息发送的JmsTemplate。对于消息发送者而言,它在发送消息的时候要知道自己该往哪里发, 为此,我们在定义JmsTemplate的时候需要往里面注入一个Spring提供的ConnectionFactory对象 --> <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 --> <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 --> <property name="connectionFactory" ref="connectionFactory" /> <property name="defaultDestination" ref="activeMQTopic"/> </bean> <!-- 配置监听程序--> <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer"> <property name="connectionFactory" ref="connectionFactory"/> <property name="destination" ref="activeMQTopic"/> <property name="messageListener" ref="myMessageListener"/> </bean>
2.5.2创建监听类
package com.yjc.spring; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageListener; import javax.jms.TextMessage; @Component public class MyMessageListener implements MessageListener { @Override public void onMessage(Message message) { if (null!=message&&message instanceof TextMessage) { TextMessage textMessage = (TextMessage)message; try { System.out.println("监听器监听到的消息:"+textMessage.getText()); } catch (JMSException e) { e.printStackTrace(); } } } }
生产者和消费者的代码不做改动,只启动生产者即可,当生产者生产出消息之后,会被监听器立刻监听到
三:SpringBoot整合ActiveMQ(队列)
3.1导入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-activemq</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <scope>test</scope> </dependency>
3.2 application.yml配置文件
server:
port: 8888
spring:
activemq:
broker-url: tcp://192.168.118.3:61616 #服务器地址
user: admin #用户名
password: admin #密码
jms:
pub-sub-domain: false #目的地类型,false为Queue,true为Topic,默认为false
#自定义队列名称
myqueue: boot-activemq-queue
3.3Config配置类
package com.yjc.activemq; import org.apache.activemq.command.ActiveMQQueue; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.jms.annotation.EnableJms; import org.springframework.stereotype.Component; import javax.jms.Queue; @Component @EnableJms public class ConfigBean { @Value("${myqueue}") private String queueName; @Bean private Queue queue(){ return new ActiveMQQueue(queueName); } }
3.4生产者
package com.yjc.activemq; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jms.core.JmsMessagingTemplate; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.annotation.Resource; import javax.jms.Queue; import java.util.UUID; @Component public class Queue_Produce { @Autowired private JmsMessagingTemplate jmsMessagingTemplate; @Resource private Queue queue;
//调用方法启动一次 public void produceMsg(){ jmsMessagingTemplate.convertAndSend(queue,"-----------"+UUID.randomUUID().toString().substring(0,8)); } //定时发送消息,时间间隔为三秒,去主配置类开启支持,启动主配置类时开始定时发送 @Scheduled(fixedDelay = 3000) public void scheduledMsg(){ jmsMessagingTemplate.convertAndSend(queue,"-----------scheduledMsg"+UUID.randomUUID().toString().substring(0,8)); System.out.println("时间到了发一条"); } }
3.5 主程序类
package com.yjc; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling //开始对定时投递的支持 public class ActivemqApplication { public static void main(String[] args) { SpringApplication.run(ActivemqApplication.class, args); } }
3.6 测试类
import com.yjc.ActivemqApplication; import com.yjc.activemq.Queue_Produce; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.test.context.web.WebAppConfiguration; @SpringBootTest(classes = ActivemqApplication.class) @RunWith(SpringJUnit4ClassRunner.class) @WebAppConfiguration public class QuqueTest { @Autowired private Queue_Produce queue_produce; @Test public void testMsg(){ queue_produce.produceMsg(); } }
3.7消费者
package com.yjc.consumer; import org.springframework.jms.annotation.JmsListener; import org.springframework.stereotype.Component; import javax.jms.JMSException; import javax.jms.TextMessage; @Component public class Queue_Consumer { @JmsListener(destination = "${myqueue}") public void receive(TextMessage textMessage) throws JMSException { System.out.println("消费者收到的消息"+textMessage.getText()); } }
使用@JmsListener注解进行监听消息