zoukankan      html  css  js  c++  java
  • springboot 整合rabbitmq

    rabbitmq的几种类型

    1、 

    Direct Exchange

    处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配

    注:就是有个交换机绑定路由键,交换机再绑定队列,消息发送到交换机 通过路由键发送到指定的队列

    Fanout Exchange

    不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上

    注:和上面的一样就是少个路由键,创建交换机绑定队列,发送消息到交换机上后 这个交换机绑定的队列都能收到这条消息

    Topic Exchange

    将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“”匹配不多不少一个词

    注:和direct Exchange模式一样 只是路由键 可以用通配符。

    生产者投递 消息的时候 确保消息投递成功 可以用amqp机制(事物 ) confirm(问答机制)

    发送消息:

    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
     </dependency>
    server:
      port: 8080
    spring:
      application:
        name: rabbitmqsender
      rabbitmq:
        port: 5672
        username: admin
        password: 123456
        virtual-host: /admin_vhost
        host: localhost
        listener:
          simple:
            retry:
              enabled: true     ##开启重试功能
              max-attempts: 5   ##最多重试5次
              initial-interval: 3000ms   ##重试的间隔时间
            acknowledge-mode: manual    ##开启手动消息确认机制

    发送消息的类:

    package com.liuchao.rabbitmqsender.sender;
    
    import com.liuchao.rabbitmqsender.mqconfig.MqConfig;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MqSender {
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        private static String EXCHANGE="admin_exchange";
        private static String ROUTINGKEY="admin_routingkey";
    
        private String EMAIL_EXCHANGE="admin_email_exchange";
        private String EMAIL_ROUTING_KEY="admin_email_routing_key";
    
        public void sender(String message){  //发送普通的消息
            rabbitTemplate.convertAndSend(EXCHANGE,ROUTINGKEY,message);
        }
    
        public void sendrDead(String message){ // 发送绑定了死信队列的消息
            rabbitTemplate.convertAndSend(EMAIL_EXCHANGE,EMAIL_ROUTING_KEY,message);
        }
    }

    发送消息的配置类:package com.liuchao.rabbitmqsender.mqconfig;import org.springframework.amqp.core.*;

    import org.springframework.boot.SpringBootConfiguration;
    import org.springframework.context.annotation.Bean;
    
    import java.util.HashMap;
    import java.util.Map;
    
    @SpringBootConfiguration
    public class MqConfig {
        private String EMAIL_QUEUE_NAME="admin_email_queue"; //发送邮件的队列
    
        private String EMAIL_EXCHANGE="admin_email_exchange"; //发送邮件的交换机
    
        private String EMAIL_ROUTING_KEY="admin_email_routing_key"; //邮件队列和交换机的路由key
    
        private String DEAD_EMAIL_QUEUE_NAME="dead_admin_email_queue"; //发送邮件的死信队列
    
        private String DEAD_EMAIL_EXCHANGE="dead_admin_email_exchange"; //发送邮件的死信息队列的交换机
    
        private String DEAD_EMAIL_ROUTING_KEY="dead_admin_email_routing_key"; //发送邮件的死信队列的路由键
      

      //这个是创建邮件的队列并绑定死信队列 这里面的x-dead-letter-exchange,x-dead-letter-routing-key是固定值指定死信队列的交换机和路由键
    @Bean
    public Queue adminEmailQueue(){ Map<String,Object> args=new HashMap<String,Object>(2); args.put("x-dead-letter-exchange",DEAD_EMAIL_EXCHANGE); args.put("x-dead-letter-routing-key",DEAD_EMAIL_ROUTING_KEY); return QueueBuilder.durable(EMAIL_QUEUE_NAME).withArguments(args).build(); }   //创建邮件的交换机 @Bean public DirectExchange adminEmailExchange(){ return new DirectExchange(EMAIL_EXCHANGE); }   //绑定邮件队列和邮件交换机 @Bean public Binding bindExchangeEmail(Queue adminEmailQueue, DirectExchange adminEmailExchange){ return BindingBuilder.bind(adminEmailQueue).to(adminEmailExchange).with(EMAIL_ROUTING_KEY); }   //创建邮件死信队列 @Bean public Queue deadAdminExminQueue(){ return new Queue(DEAD_EMAIL_QUEUE_NAME); }
      //创建邮件死信交换机 @Bean
    public DirectExchange deadAdminEmailExchange(){ return new DirectExchange(DEAD_EMAIL_EXCHANGE); }   //邮件死信队列和死信交换机绑定 @Bean public Binding bindDeadExchangeEmail(Queue deadAdminExminQueue,DirectExchange deadAdminEmailExchange){ return BindingBuilder.bind(deadAdminExminQueue).to(deadAdminEmailExchange).with(DEAD_EMAIL_ROUTING_KEY); } }

    定义发送邮件的controller:

    package com.liuchao.rabbitmqsender.controller;
    
    import com.liuchao.rabbitmqsender.sender.MqSender;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Controller;
    import org.springframework.util.StringUtils;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class SendController {
        @Autowired
        private MqSender mqSender;
    
        @RequestMapping("/senderMessage")
        public String senderMessage(@RequestParam("message")String message){
            if(StringUtils.isEmpty(message)){
                return "没有传消息";
            }
    
            mqSender.sender(message);
            return "sucess";
        }
      //往绑定了死信队列的发送信息
        @RequestMapping("/senderDeadMessage")
        public String sendDeadMessage(@RequestParam("message")String message){
            if(StringUtils.isEmpty(message)){
                return "消息不能为空";
    
            }
    
            mqSender.sendrDead(message);
            return "success";
        }
    }

    生产者确定消息可以投递成功 confim机制 

    package com.liuchao.rabbitmqsender.sender;
    
    import com.liuchao.rabbitmqsender.mqconfig.MqConfig;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.UUID;
    
    @Component
    public class MqSender implements RabbitTemplate.ConfirmCallback{//可以确保消息投递成功
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        private static String EXCHANGE="admin_exchange";
        private static String ROUTINGKEY="admin_routingkey";
    
        private String EMAIL_EXCHANGE="admin_email_exchange";
        private String EMAIL_ROUTING_KEY="admin_email_routing_key";
    
        public void sender(String message){
            rabbitTemplate.setConfirmCallback(this);//设置消息投递成功或失败时候的能回调类
            rabbitTemplate.setMandatory(true);
            CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString().replace("-",""));
            rabbitTemplate.convertAndSend(EXCHANGE,ROUTINGKEY,message,correlationData);
            //int i=1/0;
        }
    
        public void sendrDead(String message){
            rabbitTemplate.setConfirmCallback(this);
            CorrelationData correlationData=new CorrelationData(UUID.randomUUID().toString().replace("-",""));
            rabbitTemplate.convertAndSend(EMAIL_EXCHANGE,EMAIL_ROUTING_KEY,message,correlationData);
    
        }
    
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String s) {//投递成功失败的时候都会调用这个方法
            String id = correlationData.getId();
            System.out.println("接收到的id为:"+id);
            if(ack){//说明投递成功了
                System.out.println("消息发送成功");
            }else{
                System.out.println("消息发送失败:"+s);
            }
        }
    }

     注:如果这个投递消息的时候mq里面没有指定的交换机,投递消息就会失败 返回的这个ack就是false说明投递消息失败了

    消费者:

     <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    server:
      port: 8081
    spring:
      application:
        name: rabbitmqsender
      rabbitmq:
        port: 5672
        username: admin
        password: 123456
        virtual-host: /admin_vhost
        host: localhost
        listener:
          simple:
            retry:
              enabled: true
              max-attempts: 5
              initial-interval: 3000ms
              ##acknowledge-mode: manual
    package com.liuchao.rabbitmqreceive.listener;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.*;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    
    @Component
    public class RabbitMqListener {
      //绑定交换机和队列(如果生产者没有绑定交换机和队列 这样写就可以直接把交换机和队列绑定也就是只要这个消费启动就会在mq里面创建交换机和队列) @RabbitHandler @RabbitListener(bindings
    = @QueueBinding(value=@Queue("admin-queue"), exchange = @Exchange(value="admin_exchange"), key = "admin_routingkey" )) public void directReceiver(String content){ System.out.println("*********"+content); }   //这个是绑定了死信队列的如果这里面拒绝这条信息(重试次数达到) 就会自动存入到死信队列 @RabbitHandler @RabbitListener(queues = "admin_email_queue") public void directDeadReceiver(String content, Channel channel, Message message) throws IOException { System.out.println("****dead*****"+content); System.out.println("*****message***"+new String(message.getBody())); // channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); 这个可以拒绝消息 int i=1/0; } }
    x-dead-letter-exchange
  • 相关阅读:
    用XMLSocket获得SmartFoxServer的zone在线人数
    php mvc开发系列教程第一节 认识mvc
    使用java编写SmartFoxServer自定义安全验证登录扩展
    php mvc开发系列教程第二节 单一入口文件(路由文件)
    php mvc开发系列教程第三节 Controller 类实现
    [转]IE下对文件(图片)进行base64转换
    从TFS中的现有项目复制一份作为新项目
    在javascript中实现document.ready,实现点Export按钮后刷新页面
    SQL Server 2005 学习笔记系列文章
    How to Read XMLDocument into a SQL Sever XML field / saving XML to database or filesystem...best method?
  • 原文地址:https://www.cnblogs.com/dkws/p/12124588.html
Copyright © 2011-2022 走看看