zoukankan      html  css  js  c++  java
  • SpringBoot整合ActiveMQ

    目录结构

    引入 maven依赖

      <parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.4.RELEASE</version>
    		<relativePath/> 
    	</parent>
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    	</properties>
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-activemq</artifactId>
    		</dependency>
    	</dependencies>
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    

    引入 application.yml配置

    spring:
      activemq:
        broker-url: tcp://127.0.0.1:61616
        user: admin
        password: admin
    queue: springboot-queue
    server:
      port: 8080
    
    

    创建QueueConfig

    @Configuration
    public class QueueConfig {
    	@Value("${queue}")
    	private String queue;
    
    	@Bean
    	public Queue logQueue() {
    		return new ActiveMQQueue(queue);
    	}
    
    	@Bean
    	public JmsTemplate jmsTemplate(ActiveMQConnectionFactory activeMQConnectionFactory, Queue queue) {
    		JmsTemplate jmsTemplate = new JmsTemplate();
    		jmsTemplate.setDeliveryMode(2);// 进行持久化配置 1表示非持久化,2表示持久化</span>
    		jmsTemplate.setConnectionFactory(activeMQConnectionFactory);
    		jmsTemplate.setDefaultDestination(queue); // 此处可不设置默认,在发送消息时也可设置队列
    		jmsTemplate.setSessionAcknowledgeMode(4);// 客户端签收模式</span>
    		return jmsTemplate;
    	}
    
    	// 定义一个消息监听器连接工厂,这里定义的是点对点模式的监听器连接工厂
    	@Bean(name = "jmsQueueListener")
    	public DefaultJmsListenerContainerFactory jmsQueueListenerContainerFactory(
    			ActiveMQConnectionFactory activeMQConnectionFactory) {
    		DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
    		factory.setConnectionFactory(activeMQConnectionFactory);
    		// 设置连接数
    		factory.setConcurrency("1-10");
    		// 重连间隔时间
    		factory.setRecoveryInterval(1000L);
    		factory.setSessionAcknowledgeMode(4);
    		return factory;
    	}
    
    }
    

    创建生产者:

    @SpringBootApplication
    @Component
    @EnableScheduling
    public class Producer {
    	
    	@Autowired
    	private JmsMessagingTemplate jmsMessagingTemplate;
    	
    	@Autowired
    	private Queue queue;
    	
    	@Scheduled(fixedDelay=3000)
    	public void send() {
    		String result = System.currentTimeMillis()+"---测试";
    		System.out.println("result"+result);
    		jmsMessagingTemplate.convertAndSend(queue,result);
    	}
    	public static void main(String[] args) {
    		SpringApplication.run(Producer.class, args);
    	}
    }
    
    

    创建消费者的application.yml

    spring:
      activemq:
        broker-url: tcp://127.0.0.1:61616
        user: admin
        password: admin
    queue: springboot-queue
    server:
      port: 8081
    

    创建消费者:

    @Component
    @SpringBootApplication
    public class consumer {
    
    	private int count =0;
    	
    	@JmsListener(destination = "${queue}")
    	public void receive(TextMessage textMessage,Session session) throws JMSException {
    		String text = textMessage.getText();
    		
    		System.out.println("消费:"+text+"第几次获取消息count:"+(++count));
    		
    		System.out.println();
    		String jmsMessageID = textMessage.getJMSMessageID();
    	}
    	
    	public static void main(String[] args) {
    		SpringApplication.run(consumer.class,args);
    	}
    }
    

    结果显示:

  • 相关阅读:
    书单
    [转载] 修改WIN10的DNS、以及操作系统和 Web 浏览器清除和刷新 DNS 缓存方法汇总
    【题解】 【集训队作业2018】喂鸽子 minmax容斥+期望dp+补集转化 UOJ449
    【题解】 CF809E Surprise me! 虚树+莫比乌斯反演+狄利克雷卷积
    【题解】 CF1478E Nezzar and Binary String 线段树+时间逆序
    如何处理调用EasyCVR地址集成通过EasyPlayer播放器不能播放的问题?
    智慧能源:智能安防监控技术EasyCVR在石油能源行业中的场景应用
    网络穿透/动态组网工具EasyNTS报错connect refused该如何处理?
    如何处理C++编译webrtc无法成功获取sdp的问题?
    硬核讲解:编译webrtc协议为什么需要turn服务器?
  • 原文地址:https://www.cnblogs.com/Libbo/p/11547852.html
Copyright © 2011-2022 走看看