zoukankan      html  css  js  c++  java
  • SpringBoot整合RabbitMQ

    (1)、导入starter依赖

    1         <dependency>
    2             <groupId>org.springframework.boot</groupId>
    3             <artifactId>spring-boot-starter-amqp</artifactId>
    4         </dependency>

    (2)、在配置文件中配置rabbitmq相关属性

    1 spring.rabbitmq.host=192.168.205.128
    2 spring.rabbitmq.port=5672
    3 spring.rabbitmq.username=guest
    4 spring.rabbitmq.password=guest

    (3)、配置rabbitmq使用json进行消息的序列化(默认使用JDK进行消息的序列化)

     1 package cn.coreqi.config;
     2 
     3 import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
     4 import org.springframework.amqp.support.converter.MessageConverter;
     5 import org.springframework.context.annotation.Bean;
     6 import org.springframework.context.annotation.Configuration;
     7 
     8 @Configuration
     9 public class MyAMQPConfig {
    10 
    11     //使用JSON进行消息的序列化
    12     @Bean
    13     public MessageConverter messageConverter(){
    14         return new Jackson2JsonMessageConverter();
    15     }
    16 
    17 }

    (4)主程序类添加@EnableRabbit注解开启基于注解的RabbltMQ模式

     1 package cn.coreqi;
     2 
     3 import org.springframework.amqp.rabbit.annotation.EnableRabbit;
     4 import org.springframework.boot.SpringApplication;
     5 import org.springframework.boot.autoconfigure.SpringBootApplication;
     6 
     7 @SpringBootApplication
     8 @EnableRabbit
     9 public class SpringbootrabbitmqApplication {
    10 
    11     public static void main(String[] args) {
    12         SpringApplication.run(SpringbootrabbitmqApplication.class, args);
    13     }
    14 
    15 }

    (5)、使用AmqpAdmin创建和删除Queue、Exchange、Binding,使用RabbitTemplate发送和接收消息

     1 @Autowired
     2     private RabbitOperations rabbitOperations;
     3     @Autowired
     4     private AmqpAdmin amqpAdmin;
     5 
     6     public void send(){
     7         amqpAdmin.declareExchange(new DirectExchange("userExchange"));  //声明交换器 new DirectExchange("ExchangeName")
     8         amqpAdmin.declareQueue(new Queue("userQueue",true));    //声明队列 new Queue("QueueName",是否持久化)
     9         amqpAdmin.declareBinding(new Binding("userQueue", Binding.DestinationType.QUEUE,"userExchange","user",null));   //声明binding  new Binding(目的地(队列名称),绑定类型,交换器名称,路由键,参数列表)
    10         rabbitOperations.convertAndSend("userExchange","user",new User(1,"fanqi","123456",1));  //ExchangeName,routing-key,object
    11     }
    12 
    13     public User receive(){  //反序列化时要保证Bean具有无参构造器
    14         return (User) rabbitOperations.receiveAndConvert("userQueue");
    15     }

    (6)、监听消息队列,在service层所需方法上添加@RabbitListener注解,当队列中添加了新消息,此方法将会被调用。

     1 package cn.coreqi.service;
     2 
     3 import cn.coreqi.entities.User;
     4 import com.fasterxml.jackson.databind.ObjectMapper;
     5 import org.springframework.amqp.core.Message;
     6 import org.springframework.amqp.rabbit.annotation.RabbitListener;
     7 import org.springframework.stereotype.Service;
     8 
     9 import java.io.IOException;
    10 
    11 @Service
    12 public class UserService {
    13     @RabbitListener(queues = "userQueue")
    14     public void receive(User user){
    15         System.out.println(user.toString());
    16     }
    17     @RabbitListener(queues = "userQueue")
    18     public void receive1(Message message) throws IOException {
    19         ObjectMapper mapper = new ObjectMapper();
    20         User user = mapper.readValue(message.getBody(), User.class);
    21         System.out.println(user.toString());
    22     }
    23 }
  • 相关阅读:
    怎样打印日志
    log4j(一)
    idea没有subversion问题
    tomcat启动报异常(一)
    HashSet
    spring容器初始化bean和销毁bean之前进行一些操作的定义方法
    MyBatis中$和#的区别
    基本数据类型与引用数据类型
    Access restriction: The method 'CharacterEncoder.encode(byte[])' is not API...
    Object源码
  • 原文地址:https://www.cnblogs.com/fanqisoft/p/10353992.html
Copyright © 2011-2022 走看看