rabbitmq的几种类型
1、
Direct Exchange
处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配
注:就是有个交换机绑定路由键,交换机再绑定队列,消息发送到交换机 通过路由键发送到指定的队列
Fanout Exchange
不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上
注:和上面的一样就是少个路由键,创建交换机绑定队列,发送消息到交换机上后 这个交换机绑定的队列都能收到这条消息
Topic Exchange
将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词
注:和direct Exchange模式一样 只是路由键 可以用通配符。
生产者投递 消息的时候 确保消息投递成功 可以用amqp机制(事物 ) confirm(问答机制)
发送消息:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
server:
port: 8080
spring:
application:
name: rabbitmqsender
rabbitmq:
port: 5672
username: admin
password: 123456
virtual-host: /admin_vhost
host: localhost
listener:
simple:
retry:
enabled: true ##开启重试功能
max-attempts: 5 ##最多重试5次
initial-interval: 3000ms ##重试的间隔时间
acknowledge-mode: manual ##开启手动消息确认机制
发送消息的类:
package com.liuchao.rabbitmqsender.sender; import com.liuchao.rabbitmqsender.mqconfig.MqConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MqSender { @Autowired private RabbitTemplate rabbitTemplate; private static String EXCHANGE="admin_exchange"; private static String ROUTINGKEY="admin_routingkey"; private String EMAIL_EXCHANGE="admin_email_exchange"; private String EMAIL_ROUTING_KEY="admin_email_routing_key"; public void sender(String message){ //发送普通的消息 rabbitTemplate.convertAndSend(EXCHANGE,ROUTINGKEY,message); } public void sendrDead(String message){ // 发送绑定了死信队列的消息 rabbitTemplate.convertAndSend(EMAIL_EXCHANGE,EMAIL_ROUTING_KEY,message); } }
发送消息的配置类:package com.liuchao.rabbitmqsender.mqconfig;import org.springframework.amqp.core.*;
import org.springframework.boot.SpringBootConfiguration; import org.springframework.context.annotation.Bean; import java.util.HashMap; import java.util.Map; @SpringBootConfiguration public class MqConfig { private String EMAIL_QUEUE_NAME="admin_email_queue"; //发送邮件的队列 private String EMAIL_EXCHANGE="admin_email_exchange"; //发送邮件的交换机 private String EMAIL_ROUTING_KEY="admin_email_routing_key"; //邮件队列和交换机的路由key private String DEAD_EMAIL_QUEUE_NAME="dead_admin_email_queue"; //发送邮件的死信队列 private String DEAD_EMAIL_EXCHANGE="dead_admin_email_exchange"; //发送邮件的死信息队列的交换机 private String DEAD_EMAIL_ROUTING_KEY="dead_admin_email_routing_key"; //发送邮件的死信队列的路由键
//这个是创建邮件的队列并绑定死信队列 这里面的x-dead-letter-exchange,x-dead-letter-routing-key是固定值指定死信队列的交换机和路由键
@Bean public Queue adminEmailQueue(){ Map<String,Object> args=new HashMap<String,Object>(2); args.put("x-dead-letter-exchange",DEAD_EMAIL_EXCHANGE); args.put("x-dead-letter-routing-key",DEAD_EMAIL_ROUTING_KEY); return QueueBuilder.durable(EMAIL_QUEUE_NAME).withArguments(args).build(); } //创建邮件的交换机 @Bean public DirectExchange adminEmailExchange(){ return new DirectExchange(EMAIL_EXCHANGE); } //绑定邮件队列和邮件交换机 @Bean public Binding bindExchangeEmail(Queue adminEmailQueue, DirectExchange adminEmailExchange){ return BindingBuilder.bind(adminEmailQueue).to(adminEmailExchange).with(EMAIL_ROUTING_KEY); } //创建邮件死信队列 @Bean public Queue deadAdminExminQueue(){ return new Queue(DEAD_EMAIL_QUEUE_NAME); }
//创建邮件死信交换机 @Bean public DirectExchange deadAdminEmailExchange(){ return new DirectExchange(DEAD_EMAIL_EXCHANGE); } //邮件死信队列和死信交换机绑定 @Bean public Binding bindDeadExchangeEmail(Queue deadAdminExminQueue,DirectExchange deadAdminEmailExchange){ return BindingBuilder.bind(deadAdminExminQueue).to(deadAdminEmailExchange).with(DEAD_EMAIL_ROUTING_KEY); } }
定义发送邮件的controller:
package com.liuchao.rabbitmqsender.controller; import com.liuchao.rabbitmqsender.sender.MqSender; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Controller; import org.springframework.util.StringUtils; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; @RestController public class SendController { @Autowired private MqSender mqSender; @RequestMapping("/senderMessage") public String senderMessage(@RequestParam("message")String message){ if(StringUtils.isEmpty(message)){ return "没有传消息"; } mqSender.sender(message); return "sucess"; } //往绑定了死信队列的发送信息 @RequestMapping("/senderDeadMessage") public String sendDeadMessage(@RequestParam("message")String message){ if(StringUtils.isEmpty(message)){ return "消息不能为空"; } mqSender.sendrDead(message); return "success"; } }
生产者确定消息可以投递成功 confim机制
package com.liuchao.rabbitmqsender.sender; import com.liuchao.rabbitmqsender.mqconfig.MqConfig; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.util.UUID; @Component public class MqSender implements RabbitTemplate.ConfirmCallback{//可以确保消息投递成功 @Autowired private RabbitTemplate rabbitTemplate; private static String EXCHANGE="admin_exchange"; private static String ROUTINGKEY="admin_routingkey"; private String EMAIL_EXCHANGE="admin_email_exchange"; private String EMAIL_ROUTING_KEY="admin_email_routing_key"; public void sender(String message){ rabbitTemplate.setConfirmCallback(this);//设置消息投递成功或失败时候的能回调类 rabbitTemplate.setMandatory(true); CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString().replace("-","")); rabbitTemplate.convertAndSend(EXCHANGE,ROUTINGKEY,message,correlationData); //int i=1/0; } public void sendrDead(String message){ rabbitTemplate.setConfirmCallback(this); CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString().replace("-","")); rabbitTemplate.convertAndSend(EMAIL_EXCHANGE,EMAIL_ROUTING_KEY,message,correlationData); } @Override public void confirm(CorrelationData correlationData, boolean ack, String s) {//投递成功失败的时候都会调用这个方法 String id = correlationData.getId(); System.out.println("接收到的id为:"+id); if(ack){//说明投递成功了 System.out.println("消息发送成功"); }else{ System.out.println("消息发送失败:"+s); } } }
注:如果这个投递消息的时候mq里面没有指定的交换机,投递消息就会失败 返回的这个ack就是false说明投递消息失败了
消费者:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
server:
port: 8081
spring:
application:
name: rabbitmqsender
rabbitmq:
port: 5672
username: admin
password: 123456
virtual-host: /admin_vhost
host: localhost
listener:
simple:
retry:
enabled: true
max-attempts: 5
initial-interval: 3000ms
##acknowledge-mode: manual
package com.liuchao.rabbitmqreceive.listener; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.io.IOException; @Component public class RabbitMqListener {
//绑定交换机和队列(如果生产者没有绑定交换机和队列 这样写就可以直接把交换机和队列绑定也就是只要这个消费启动就会在mq里面创建交换机和队列) @RabbitHandler @RabbitListener(bindings = @QueueBinding(value=@Queue("admin-queue"), exchange = @Exchange(value="admin_exchange"), key = "admin_routingkey" )) public void directReceiver(String content){ System.out.println("*********"+content); } //这个是绑定了死信队列的如果这里面拒绝这条信息(重试次数达到) 就会自动存入到死信队列 @RabbitHandler @RabbitListener(queues = "admin_email_queue") public void directDeadReceiver(String content, Channel channel, Message message) throws IOException { System.out.println("****dead*****"+content); System.out.println("*****message***"+new String(message.getBody())); // channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); 这个可以拒绝消息 int i=1/0; } }
x-dead-letter-exchange