SpringBoot实现
引入jar包
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
配置application.properties
spring.activemq.broker-url=tcp://192.168.114.129:61616
spring.activemq.in-memory=true
spring.activemq.enabled=false
spring.jms.pub-sub-domain=true
创建activemq配置文件类
@EnableJms
@Configuration
public class ActiveMQConfig {
@Bean
public Queue queue(){
return new ActiveMQQueue("queue1");
}
@Bean
public RedeliveryPolicy redeliveryPolicy(){
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
//是否在每次尝试重新发送失败后,增长这个等待时间
redeliveryPolicy.setUseExponentialBackOff(true);
//重发次数,默认为6次 这里设置为10次
redeliveryPolicy.setMaximumRedeliveries(6);
//重发时间间隔,默认为1秒
redeliveryPolicy.setInitialRedeliveryDelay(1);
//第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value
redeliveryPolicy.setBackOffMultiplier(1);
//是否避免消息碰撞
redeliveryPolicy.setUseCollisionAvoidance(false);
//设置重发最大拖延时间-1 表示没有拖延只有UseExponentialBackOff(true)为true时生效
redeliveryPolicy.setMaximumRedeliveryDelay(1000);
return redeliveryPolicy;
}
@Bean
ActiveMQConnectionFactory activeMQConnectFactory(@Value("${spring.activemq.broker-url}")String url,RedeliveryPolicy redeliveryPolicy){
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("admin","admin",url);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
return activeMQConnectionFactory;
}
@Bean
public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory,Queue queue){
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setDeliveryMode(1);//进行持久化配置 1表示非持久化,2表示持久化
jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
jmsTemplate.setDefaultDestination(queue);//此处可不设置默认,在发送消息时也可设置队列
jmsTemplate.setSessionAcknowledgeMode(1);//客户端签收模式
return jmsTemplate;
}
@Bean
public JmsTransactionManager jmsTransactionManager(ActiveMQConnectionFactory activeMQConnectionFactory){
JmsTransactionManager jmsTransactionManager = new JmsTransactionManager();
jmsTransactionManager.setConnectionFactory(activeMQConnectionFactory);
return jmsTransactionManager;
}
//定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
@Bean(name = "jmsQueueListener")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ActiveMQConnectionFactory activeMQConnectionFactory, JmsTransactionManager jmsTransactionManager){
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMQConnectionFactory);
//设置连接数
factory.setConcurrency("1-10");
//重连间隔时间
//factory.setRecoveryInterval(1000L);
//factory.setSessionAcknowledgeMode(1);
factory.setTransactionManager(jmsTransactionManager);
return factory;
}
}
创建生产者类
@Service("producer")
public class Producer {
@Autowired
private JmsMessagingTemplate jMessagingTemplate;
public void sendMessage(Destination destination, final String message){
jMessagingTemplate.convertAndSend(destination, message);
}
}
创建消费者类
@Component
public class Consumer {
@JmsListener(destination = "mytest.queue", containerFactory="jmsQueueListener")
public void receiveQueue(TextMessage textMessage) throws JMSException{
System.out.println("Consumer收到的报文为:"+textMessage.getText());
}
}
测试
@Autowired
private Producer producer;
@Test
public void test01(){
Destination destination = new ActiveMQQueue("mytest.queue");
for(int i=0; i<10; i++){
producer.sendMessage(destination, "my name laowang");
}
}