1、首先添加一个用户,设置角色,添加权限
保证新建用户和原先用户的权限是一致的
2、添加配置文件
spring.rabbitmq.username=fenghao spring.rabbitmq.password=123456 spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.virtual-host=/ # spring.rabbitmq.listener.simple.acknowledge-mode=manual
3、添加RabbitMQ的配置类
package com.voole.demo.config; import java.util.HashMap; import java.util.Map; import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.core.TopicExchange; import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitConfig { public final static String DEFAULT_BOOK_QUEUE = "dev.book.register.queue"; public final static String MANUAL_BOOL_QUEUE = "dev.book.register.queue"; private static final String REGISTER_DELAY_QUEUE = "dev.book.register.delay.queue"; public static final String REGISTER_DELAY_EXCHANGE = "dev.book.register.delay.exchange"; public static final String DELAY_ROUTING_KEY = ""; public static final String REGISTER_QUEUE_NAME = "dev.book.register.queue"; public static final String REGISTER_EXCHANGE_NAME = "dev.book.register.exchange"; public static final String ROUTING_KEY = "all"; @Bean public Queue defaultQueue() { return new Queue(DEFAULT_BOOK_QUEUE,true); } @Bean public Queue manualQueue() { return new Queue(MANUAL_BOOL_QUEUE,true); } @Bean public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) { connectionFactory.setPublisherConfirms(true); connectionFactory.setPublisherReturns(true); RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMandatory(true); rabbitTemplate.setConfirmCallback((correlationData,ack,cause) ->System.out.println(String.format("消息发送成功:correlationData(%s),ack(%s),cause(%s)", correlationData,ack,cause))); rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) ->System.out.println(String.format(""消息丢失:exchange(%s),route(%s),replyCode(%s),replyText(%s),message:%s"", exchange, routingKey, replyCode, replyText, message))); return rabbitTemplate; } @Bean public Queue delayProcessQueue() { Map<String,Object> params = new HashMap<>(); params.put("x-dead-letter-exchange", REGISTER_EXCHANGE_NAME); params.put("x-dead-letter-routing-key", ROUTING_KEY); return new Queue(REGISTER_DELAY_QUEUE,true,false,false,params); } @Bean public DirectExchange delayExchange() { return new DirectExchange(REGISTER_DELAY_EXCHANGE); } @Bean public Binding dlxBinding() { return BindingBuilder.bind(delayProcessQueue()).to(delayExchange()).with(DELAY_ROUTING_KEY); } @Bean public Queue registerBookQueue() { return new Queue(REGISTER_QUEUE_NAME,true); } @Bean public TopicExchange registerBookTopicExchange() { return new TopicExchange(REGISTER_EXCHANGE_NAME); } @Bean public Binding registerBookBinding() { return BindingBuilder.bind(registerBookQueue()).to(registerBookTopicExchange()).with(ROUTING_KEY); } }
4、消息发送类
package com.voole.demo.controller; import java.time.LocalDateTime; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.AbstractJavaTypeMapper; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import com.voole.demo.config.RabbitConfig; import com.voole.demo.vo.Book; @RestController public class BookController { private final RabbitTemplate rabbitTemplate; @Autowired public BookController(RabbitTemplate rabbit) { this.rabbitTemplate = rabbit; } @GetMapping("send") public String defaultMessage() { Book book = new Book(); book.setId("1"); book.setName("fengho"); this.rabbitTemplate.convertAndSend(RabbitConfig.DEFAULT_BOOK_QUEUE,book); this.rabbitTemplate.convertAndSend(RabbitConfig.MANUAL_BOOL_QUEUE,book); return "success"; } @GetMapping("delay") public String delay() { Book book = new Book(); book.setId("1"); book.setName("fengho"); this.rabbitTemplate.convertAndSend(RabbitConfig.REGISTER_DELAY_EXCHANGE,RabbitConfig.DELAY_ROUTING_KEY,book,message ->{ message.getMessageProperties().setHeader(AbstractJavaTypeMapper.DEFAULT_CONTENT_CLASSID_FIELD_NAME, Book.class.getName()); message.getMessageProperties().setExpiration(5 * 1000 +""); return message; }); System.out.println("发送时间:"+LocalDateTime.now()); return "Success"; } }
5、消息处理类
package com.voole.demo.handle; import java.io.IOException; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; import com.voole.demo.config.RabbitConfig; import com.voole.demo.vo.Book; @Component public class BookHandler { @RabbitListener(queues= {RabbitConfig.DEFAULT_BOOK_QUEUE}) public void listenerAutoAck(Book book,Message message,Channel channel) { final long deliverTag = message.getMessageProperties().getDeliveryTag(); try { System.out.println(book.toString()); channel.basicAck(deliverTag, false); }catch (Exception e) { try { channel.basicRecover(); } catch (IOException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } } @RabbitListener(queues= {RabbitConfig.MANUAL_BOOL_QUEUE}) public void listenerManualAck(Book book,Message message,Channel channel) { try { System.out.println(book.toString()); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }catch (Exception e) { //容错处理 } } @RabbitListener(queues= {RabbitConfig.REGISTER_QUEUE_NAME}) public void delayQueue(Book book,Message message,Channel channel) { System.out.println(book.toString()); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { // TODO: handle exception } } }
6、实体类
package com.voole.demo.vo; import java.io.Serializable; public class Book implements Serializable{ /** * @Fields serialVersionUID : TODO */ private static final long serialVersionUID = 1L; private String id; private String name; public String getId() { return id; } public void setId(String id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } @Override public String toString() { return id+":"+name; } }
完!