【RabbitMQ】显示耗时处理进度
通过网页提交一个耗时的请求,然后启动处理线程,请求返回。处理线程每完成一部分就给前台推送完成的数量,前端显示进度。
依赖jar
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.6.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>rabbitmq-service</artifactId> <version>0.0.1-SNAPSHOT</version> <name>rabbitmq-service</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
配置
server.port=8088 spring.application.name=rabbitmq-service spring.rabbitmq.host=192.168.226.128 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest
主入口
@SpringBootApplication public class RabbitmqServiceApplication { public static void main(String[] args) { SpringApplication.run(RabbitmqServiceApplication.class, args); } }
RabbitMQ配置
package com.example.rabbitmqservice.config; import lombok.Data; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.config.ConfigurableBeanFactory; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Scope; @Data @Configuration @ConfigurationProperties(prefix = "spring.rabbitmq") public class RabbitMQConfig { private String host; private int port; private String username; private String password; @Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); connectionFactory.setPublisherConfirms(true); return connectionFactory; } @Bean @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) public RabbitTemplate rabbitTemplate() { return new RabbitTemplate(connectionFactory()); } }
处理RabbitMQ配置
package com.example.rabbitmqservice.config; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfigProcess { public static final String PROCESS_DIRECT_EXCHANGE = "process.direct.exchange"; public static final String PROCESS_QUEUE = "process.queue"; public static final String PROCESS_KEY = "process.key"; @Bean public DirectExchange processDirectExchange() { return new DirectExchange(PROCESS_DIRECT_EXCHANGE, true, false); } @Bean public Queue processQueue() { return new Queue(PROCESS_QUEUE, true, false, false); } @Bean public Binding processBinding() { return BindingBuilder.bind(processQueue()).to(processDirectExchange()).with(PROCESS_KEY); } }
WebSocket配置
package com.example.rabbitmqservice.config; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.config.annotation.EnableWebSocket; import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker; import org.springframework.web.socket.config.annotation.StompEndpointRegistry; import org.springframework.web.socket.config.annotation.WebSocketMessageBrokerConfigurer; @Configuration @EnableWebSocket @EnableWebSocketMessageBroker public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { public static final String END_POINT = "/websocket"; @Override public void registerStompEndpoints(StompEndpointRegistry registry) { registry.addEndpoint(END_POINT) .setAllowedOrigins("*") .withSockJS() .setHeartbeatTime(5000) .setDisconnectDelay(3000) .setStreamBytesLimit(512); } }
消息接收转发
package com.example.rabbitmqservice.receive; import com.example.rabbitmqservice.config.RabbitMQConfigProcess; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.simp.SimpMessagingTemplate; import org.springframework.stereotype.Component; @Component @RabbitListener(queues = RabbitMQConfigProcess.PROCESS_QUEUE) public class ProcessReceive { @Autowired private SimpMessagingTemplate messagingTemplate; @RabbitHandler public void process(String content) { System.out.println("收到并转发消息 > " + content); messagingTemplate.convertAndSend("/direct/process/", content); } }
请求处理
package com.example.rabbitmqservice.controller; import com.example.rabbitmqservice.config.RabbitMQConfigProcess; import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageDeliveryMode; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; import java.time.LocalTime; import java.util.HashMap; import java.util.Map; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @RestController public class ProcessController { @Autowired private RabbitTemplate rabbitTemplate; ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 40, 2, TimeUnit.MINUTES, new LinkedBlockingDeque<>()); @RequestMapping("/process/direct") public Map<String, Object> direct() { Runnable runnable = () -> process(); executor.execute(runnable); Map<String, Object> result = new HashMap<>(); result.put("status", 200); result.put("message", "开始处理"); return result; } private void process() { // 模拟耗时处理 for (int i = 1; i <= 100; i++) { String msg = String.valueOf(i); rabbitTemplate.convertAndSend(RabbitMQConfigProcess.PROCESS_DIRECT_EXCHANGE, RabbitMQConfigProcess.PROCESS_KEY , msg, // 消息到达broker时调用confirm callback。若是cluster则到达所有的broker时调用confirm callback。 message -> { // 投递模式2 message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); System.out.println("confirm callback"); return message; }, new CorrelationData("123")); rabbitTemplate.setMandatory(true);//开启强制委托模式 // 没有投递到目标队列时调用return callback rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { System.out.println("return callback"); System.out.println("没有投递到目标队列"); }); // 模拟耗时 try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } }
前端html
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> <script type="text/javascript" src="js/sockjs.js"></script> <script type="text/javascript" src="js/stomp.js"></script> <script type="text/javascript" src="js/vue.js"></script> <script type="text/javascript" src="js/axios.min.js"></script> </head> <body> <div id="app"> <span v-text="message"></span> <button type="button" @click="startMock">开始模拟</button> <span v-text="process"></span> </div> <script type="text/javascript"> var vm = new Vue({ el: '#app', data: { process: '', message: '点击开始模拟' }, methods: { startMock: function() { var _this = this; var query = {}; this.requestProcess(query).then(response => { console.log('返回'); console.dir(response); }).then(error => { console.log('出错'); console.dir(error); }); }, requestProcess(query) { return new Promise((resolve, reject) => { axios({ url: 'http://localhost:8088/process/direct', method: 'get', params: query }).then(response => { resolve(response); //_this.message = response.data.message; }).catch(error => { reject(error); }); }); }, displayProcess: function() { var _this = this; console.log('displayProcess'); var sock = new SockJS("http://localhost:8088/websocket"); var stompClient = Stomp.over(sock); stompClient.connect({}, function () { stompClient.subscribe('/direct/process/', function (data) { console.dir(data); _this.process = data.body + '%'; }, {precessId: 123}); }); sock.onclose = function () { console.dir("websocket已经断开连接"); // 断开五秒后重连 setTimeout(function () { _this.displayProcess(); }, 5000); } } }, mounted: function() { this.displayProcess(); } }); </script> </body> </html>