1.引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
2. 配置文件设置
spring.application.name=springboot-rabbitmq spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.publisher-confirms=true spring.rabbitmq.virtual-host=/
3. 配置类
package org.springframework.boot.autoconfigure.amqp; import 。。。。 @Configuration @ConditionalOnClass({RabbitTemplate.class, Channel.class}) @EnableConfigurationProperties({RabbitProperties.class}) @Import({RabbitAnnotationDrivenConfiguration.class}) public class RabbitAutoConfiguration { public RabbitAutoConfiguration() { } @Configuration @ConditionalOnClass({RabbitMessagingTemplate.class}) @ConditionalOnMissingBean({RabbitMessagingTemplate.class}) @Import({RabbitAutoConfiguration.RabbitTemplateConfiguration.class}) protected static class MessagingTemplateConfiguration { protected MessagingTemplateConfiguration() { } @Bean @ConditionalOnSingleCandidate(RabbitTemplate.class) public RabbitMessagingTemplate rabbitMessagingTemplate(RabbitTemplate rabbitTemplate) { return new RabbitMessagingTemplate(rabbitTemplate); } } @Configuration @Import({RabbitAutoConfiguration.RabbitConnectionFactoryCreator.class}) protected static class RabbitTemplateConfiguration { private final RabbitProperties properties; private final ObjectProvider<MessageConverter> messageConverter; private final ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers; public RabbitTemplateConfiguration(RabbitProperties properties, ObjectProvider<MessageConverter> messageConverter, ObjectProvider<RabbitRetryTemplateCustomizer> retryTemplateCustomizers) { this.properties = properties; this.messageConverter = messageConverter; this.retryTemplateCustomizers = retryTemplateCustomizers; } @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnMissingBean public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { PropertyMapper map = PropertyMapper.get(); RabbitTemplate template = new RabbitTemplate(connectionFactory); MessageConverter messageConverter = (MessageConverter)this.messageConverter.getIfUnique(); if (messageConverter != null) { template.setMessageConverter(messageConverter); } template.setMandatory(this.determineMandatoryFlag()); Template properties = this.properties.getTemplate(); if (properties.getRetry().isEnabled()) { template.setRetryTemplate((new RetryTemplateFactory((List)this.retryTemplateCustomizers.orderedStream().collect(Collectors.toList()))).createRetryTemplate(properties.getRetry(), Target.SENDER)); } properties.getClass(); map.from(properties::getReceiveTimeout).whenNonNull().as(Duration::toMillis).to(template::setReceiveTimeout); properties.getClass(); map.from(properties::getReplyTimeout).whenNonNull().as(Duration::toMillis).to(template::setReplyTimeout); properties.getClass(); map.from(properties::getExchange).to(template::setExchange); properties.getClass(); map.from(properties::getRoutingKey).to(template::setRoutingKey); properties.getClass(); map.from(properties::getDefaultReceiveQueue).whenNonNull().to(template::setDefaultReceiveQueue); return template; } private boolean determineMandatoryFlag() { Boolean mandatory = this.properties.getTemplate().getMandatory(); return mandatory != null ? mandatory : this.properties.isPublisherReturns(); } @Bean @ConditionalOnSingleCandidate(ConnectionFactory.class) @ConditionalOnProperty( prefix = "spring.rabbitmq", name = {"dynamic"}, matchIfMissing = true ) @ConditionalOnMissingBean public AmqpAdmin amqpAdmin(ConnectionFactory connectionFactory) { return new RabbitAdmin(connectionFactory); } } @Configuration @ConditionalOnMissingBean({ConnectionFactory.class}) protected static class RabbitConnectionFactoryCreator { protected RabbitConnectionFactoryCreator() { } @Bean public CachingConnectionFactory rabbitConnectionFactory(RabbitProperties properties, ObjectProvider<ConnectionNameStrategy> connectionNameStrategy) throws Exception { PropertyMapper map = PropertyMapper.get(); CachingConnectionFactory factory = new CachingConnectionFactory((com.rabbitmq.client.ConnectionFactory)this.getRabbitConnectionFactoryBean(properties).getObject()); properties.getClass(); map.from(properties::determineAddresses).to(factory::setAddresses); properties.getClass(); map.from(properties::isPublisherConfirms).to(factory::setPublisherConfirms); properties.getClass(); map.from(properties::isPublisherReturns).to(factory::setPublisherReturns); org.springframework.boot.autoconfigure.amqp.RabbitProperties.Cache.Channel channel = properties.getCache().getChannel(); channel.getClass(); map.from(channel::getSize).whenNonNull().to(factory::setChannelCacheSize); channel.getClass(); map.from(channel::getCheckoutTimeout).whenNonNull().as(Duration::toMillis).to(factory::setChannelCheckoutTimeout); Connection connection = properties.getCache().getConnection(); connection.getClass(); map.from(connection::getMode).whenNonNull().to(factory::setCacheMode); connection.getClass(); map.from(connection::getSize).whenNonNull().to(factory::setConnectionCacheSize); connectionNameStrategy.getClass(); map.from(connectionNameStrategy::getIfUnique).whenNonNull().to(factory::setConnectionNameStrategy); return factory; } private RabbitConnectionFactoryBean getRabbitConnectionFactoryBean(RabbitProperties properties) throws Exception { PropertyMapper map = PropertyMapper.get(); RabbitConnectionFactoryBean factory = new RabbitConnectionFactoryBean(); properties.getClass(); map.from(properties::determineHost).whenNonNull().to(factory::setHost); properties.getClass(); map.from(properties::determinePort).to(factory::setPort); properties.getClass(); map.from(properties::determineUsername).whenNonNull().to(factory::setUsername); properties.getClass(); map.from(properties::determinePassword).whenNonNull().to(factory::setPassword); properties.getClass(); map.from(properties::determineVirtualHost).whenNonNull().to(factory::setVirtualHost); properties.getClass(); map.from(properties::getRequestedHeartbeat).whenNonNull().asInt(Duration::getSeconds).to(factory::setRequestedHeartbeat); Ssl ssl = properties.getSsl(); if (ssl.isEnabled()) { factory.setUseSSL(true); ssl.getClass(); map.from(ssl::getAlgorithm).whenNonNull().to(factory::setSslAlgorithm); ssl.getClass(); map.from(ssl::getKeyStoreType).to(factory::setKeyStoreType); ssl.getClass(); map.from(ssl::getKeyStore).to(factory::setKeyStore); ssl.getClass(); map.from(ssl::getKeyStorePassword).to(factory::setKeyStorePassphrase); ssl.getClass(); map.from(ssl::getTrustStoreType).to(factory::setTrustStoreType); ssl.getClass(); map.from(ssl::getTrustStore).to(factory::setTrustStore); ssl.getClass(); map.from(ssl::getTrustStorePassword).to(factory::setTrustStorePassphrase); ssl.getClass(); map.from(ssl::isValidateServerCertificate).to((validate) -> { factory.setSkipServerCertificateValidation(!validate); }); ssl.getClass(); map.from(ssl::getVerifyHostname).to(factory::setEnableHostnameVerification); } properties.getClass(); map.from(properties::getConnectionTimeout).whenNonNull().asInt(Duration::toMillis).to(factory::setConnectionTimeout); factory.afterPropertiesSet(); return factory; } } }
默认配置
@ConfigurationProperties( prefix = "spring.rabbitmq" ) public class RabbitProperties { private String host = "localhost"; private int port = 5672; private String username = "guest"; private String password = "guest"; private final RabbitProperties.Ssl ssl = new RabbitProperties.Ssl(); private String virtualHost; private String addresses; @DurationUnit(ChronoUnit.SECONDS) private Duration requestedHeartbeat; private boolean publisherConfirms; private boolean publisherReturns; private Duration connectionTimeout; private final RabbitProperties.Cache cache = new RabbitProperties.Cache(); private final RabbitProperties.Listener listener = new RabbitProperties.Listener(); private final RabbitProperties.Template template = new RabbitProperties.Template(); private List<RabbitProperties.Address> parsedAddresses;
SpringBoot 启动类
/** * 自动配置 * 1、RabbitAutoConfiguration * 2、有自动配置了连接工厂ConnectionFactory; * 3、RabbitProperties 封装了 RabbitMQ的配置 * 4、 RabbitTemplate :给RabbitMQ发送和接受消息; * 5、 AmqpAdmin : RabbitMQ系统管理功能组件; * AmqpAdmin:创建和删除 Queue,Exchange,Binding * 6、@EnableRabbit + @RabbitListener 监听消息队列的内容 * */ @EnableRabbit //开启基于注解的RabbitMQ模式 @SpringBootApplication public class Springboot02AmqpApplication { public static void main(String[] args) { SpringApplication.run(Springboot02AmqpApplication.class, args); } }
使序列化到MQ的对象消息能够以JSON的形式展现而不是乱码
@Configuration public class MyAMQPConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
测试类
@RunWith(SpringRunner.class) @SpringBootTest public class Springboot02AmqpApplicationTests { @Autowired RabbitTemplate rabbitTemplate; @Autowired AmqpAdmin amqpAdmin; @Test public void createExchange(){ // amqpAdmin.declareExchange(new DirectExchange("amqpadmin.exchange")); // System.out.println("创建完成"); // amqpAdmin.declareQueue(new Queue("amqpadmin.queue",true)); //创建绑定规则 // amqpAdmin.declareBinding(new Binding("amqpadmin.queue", Binding.DestinationType.QUEUE,"amqpadmin.exchange","amqp.haha",null)); //amqpAdmin.de } /** * 1、单播(点对点) */ @Test public void contextLoads() { //Message需要自己构造一个;定义消息体内容和消息头 //rabbitTemplate.send(exchage,routeKey,message); //object默认当成消息体,只需要传入要发送的对象,自动序列化发送给rabbitmq; //rabbitTemplate.convertAndSend(exchage,routeKey,object); Map<String,Object> map = new HashMap<>(); map.put("msg","这是第一个消息"); map.put("data", Arrays.asList("helloworld",123,true)); //对象被默认序列化以后发送出去 rabbitTemplate.convertAndSend("exchange.direct","atguigu.news",new Book("西游记","吴承恩")); } //接受数据,如何将数据自动的转为json发送出去 @Test public void receive(){ Object o = rabbitTemplate.receiveAndConvert("atguigu.news"); System.out.println(o.getClass()); System.out.println(o); } /** * 广播 */ @Test public void sendMsg(){ rabbitTemplate.convertAndSend("exchange.fanout","",new Book("红楼梦","曹雪芹")); } }