zoukankan      html  css  js  c++  java
  • SpringBoot整合ActiveMQ的publish/subscribe发布订阅模式(二)

    (注意:当使用了发布订阅模式那么P2P模式就不能用了,解决的方法:)

    (1)pom.xml完整的文件

    <project xmlns="http://maven.apache.org/POM/4.0.0"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    	<groupId>com.qingfeng</groupId>
    	<artifactId>springboot-ActiveMQ</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>2.1.1.RELEASE</version>
    		<relativePath /> <!-- lookup parent from repository -->
    	</parent>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-activemq</artifactId>
    		</dependency>
    
    			<!-- 热部署 -->
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-devtools</artifactId>
    		</dependency>
    
    	</dependencies>
    
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    
    </project>
    

    (2)application.properties文件配置

      

    server.port=8088
    server.servlet.context-path=/springboot-ActiveMQ/
    #不能用127.0.0.1,不然会报错
    #spring.activemq.broker-url=tcp://127.0.0.1:61616 
    spring.activemq.broker-url=tcp://localhost:61616
    spring.activemq.in-memory=true 
    spring.activemq.pool.enabled=false
    #支持发布订阅模型,默认只支持点对点,使用了spring.jms.pub-sub-domain=true那么点对点模式就失效了
    spring.jms.pub-sub-domain=true
    

      

    (4)SpringBoot启动类

    package com.qingfeng;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class AppStart {
    
    	public static void main(String[] args) {
    		SpringApplication.run(AppStart.class, args);
    	}
    	
    }
    

     

    (5)发布者Publish类 

    package com.qingfeng.producer;
    
    import java.util.Map;
    
    import javax.jms.Destination;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.stereotype.Service;
    
    /**
     * 发布者
     *
     */
    @Service
    public class Publish {
    	
    	// 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装 
    		@Autowired
    		private JmsMessagingTemplate jmsMessagingTemplate;
    
    
    		// 发送消息,destination是发送到的队列名称,message是待发送的消息  
    		public void sendTopicMessage(Destination destination, String message){  
    			jmsMessagingTemplate.convertAndSend(destination, message);  
    		}  
    
    		// 发送消息,destination是发送到的队列名称,map类型的发送的消息  
    			public void sendTopicMapMessage(Destination destination, Map<String ,String> map){  
    				jmsMessagingTemplate.convertAndSend(destination, map);  
    			}  
    	
    	
    
    }
    

      

    (6)订阅者Subscribe类

    package com.qingfeng.consumer;
    
    import java.util.Map;
    
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    /**
     * 订阅者
     */
    @Component
    public class Subscribe {
    
    	// 使用JmsListener配置消费者监听的topic名称qingfeng-topic,其中text是接收到的消息  
    		@JmsListener(destination = "qingfeng-topic")  
    		public void receiveTopic(String text) {  
    			System.out.println("qingfeng-topic类型Subscribe收到的Publish的报文为:"+text);  
    		}  
    		
    		@JmsListener(destination = "qingfeng-topic")  
    		public void receiveTopic2(String text) {  
    			System.out.println("qingfeng-topic类型Subscribe收到的Publish的报文为:"+text);  
    		}  
    		
    		@JmsListener(destination = "qingfeng-topic")  
    		public void receiveTopic3(String text) {  
    			System.out.println("qingfeng-topic类型Subscribe收到的Publish的报文为:"+text);  
    		}  
    
    		// 使用JmsListener配置消费者监听的topic名称qingfeng-map-topic,其中map是接收到的消息 
    		@JmsListener(destination = "qingfeng-map-topic")  
    		public void receiveMapTopic(Map<String ,String> map) {  
    			System.out.println("qingfeng-map-topic类型Subscribe收到的Publishmap类型的报文为:"+map);  
    		}  
    		
    		@JmsListener(destination = "qingfeng-map-topic")  
    		public void receiveMapTopic2(Map<String ,String> map) {  
    			System.out.println("qingfeng-map-topic类型Subscribe收到的Publishmap类型的报文为:"+map);  
    		}  
    
    	
    }
    

      

      

    (7)测试类TopicController类

    package com.qingfeng.controller;
    
    import java.util.HashMap;
    
    import javax.jms.Destination;
    
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.command.ActiveMQTopic;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.qingfeng.producer.Publish;
    
    @RestController
    public class TopicController {
    	
    	@Autowired
    	private Publish publish;
    	
    //  http://localhost:8088/springboot-ActiveMQ/topic/send?text=qingfeng
    	@GetMapping("topic/send")
    	public String  send(String text){
    		//创建一个Topic名称为qingfeng-topic
    		ActiveMQTopic activeMQTopic = new ActiveMQTopic("qingfeng-topic");
    		//调用生产者产生消息
    		publish.sendTopicMessage(activeMQTopic, text);
    		return "发送成功";
    	}
    	
    // http://localhost:8088/springboot-ActiveMQ/topic/sendMap
    	@GetMapping("topic/sendMap")
    	public String  sendMap(){
    		//创建一个Topic名称为qingfeng-map-topic
    		ActiveMQTopic activeMQTopic = new ActiveMQTopic("qingfeng-topic");
    		HashMap<String, String> map = new HashMap<>();
    		map.put("name", "qingfeng");
    		map.put("age", "18");
    		map.put("QQ", "37942135");
    		//调用生产者产生消息
    		publish.sendTopicMapMessage(activeMQTopic, map);
    		return "发送成功";
    	}
    
    }
    

      

    8)启动项目AppStart类

    测试:http://localhost:8088/springboot-ActiveMQ/topic/send?text=qingfeng

     在控制台上输出

    测试:http://localhost:8088/springboot-ActiveMQ/topic/sendMap

    在控制台上输出

  • 相关阅读:
    Winform读取app.config文件
    判断本机只能运行一个winform程序
    [导入][链接] Top 10: The best, worst... and craziest uses of RFID
    [导入][Tips] 在Ubuntu下限制本机使用的网络带宽
    [导入][一点一滴学英语] 20061205
    [导入][链接] Linux Distribution Chooser
    [导入][链接] Open Source Java Clustering
    [导入][链接] 关于Vista的关机选项
    [导入]Drip, Transfusion, Perfusion还是Infusion?关于一个词的翻译
    [导入][阅读] "Computer Programmer" vs. "Software Developer"
  • 原文地址:https://www.cnblogs.com/Amywangqing/p/13636049.html
Copyright © 2011-2022 走看看