zoukankan      html  css  js  c++  java
  • springboot-rabbitmq

     https://www.cnblogs.com/free-wings/p/10254998.html

    1.服务端(消息提供者)

    springboot amqp   默认非持久化

    <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.9.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        
        
        <properties>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target> 
        </properties>
        
        
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <!-- 配置文件 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-configuration-processor</artifactId>
                <optional>true</optional>
            </dependency>
            
            
            <dependency>
                 <groupId>org.apache.commons</groupId>
                 <artifactId>commons-lang3</artifactId>
            </dependency>
            
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.58</version>
            </dependency>
            
            <dependency>
                <!-- https://www.hutool.cn/docs -->
                <groupId>cn.hutool</groupId>
                <artifactId>hutool-all</artifactId>
                <version>4.5.10</version>
            </dependency>
            
            <!--rabbitmq-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            
            
        </dependencies>
            <repositories>
                <repository>
                <id>alimaven</id>
                <name>aliyun maven</name>
                <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            </repository>
        </repositories>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>

     topic  消息多个客户端消费,默认公平分发

     如果手动确认,消息不会重复消费,例如客户端1消费了A消息,客户端2就接收不到A消息了

    new Queue(TOPIC_QUEUE, true, false, false);

    第二个参数持久化,如果不持久话,消息发送完就没了,不管客户端收没收到

    @Configuration
    public class RabbitConfig {
        
    
        private static final String TOPIC_QUEUE = "topic.message";
        
        @Bean
        public Queue queueMessage() {
    //        //是否持久化
    //        durable;
    //        //是否声明该队列是否为连接独占,若为独占,连接关闭后队列即被删除
    //        exclusive;
    //        //是否自动删除,若没有消费者订阅该队列,队列将被删除
    //        autoDelete;
    //        //参数,可以指定队列长度,消息生存时间等队列的设置
    //        java.util.Map<java.lang.String, java.lang.Object> arguments;
    //        #开启重试 
    //        spring: 
    //            rabbitmq: 
    //                listener: 
    //                    retry: 
    //                        enabled:true
    //         
    //        #重试次数,默认为3次
    //        spring:
    //            rabbitmq:
    //                listener:
    //                    retry:
    //                        max-attempts: 5
            
            return new Queue(TOPIC_QUEUE, true, false, false);
        }
    
        @Bean
        TopicExchange exchange() {
            return new TopicExchange("topicExchange");
        }
    
      
        @Bean
        Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
            return BindingBuilder.bind(queueMessage).to(exchange).with("topic.*");
        }
    
    }
    server.port=8081
    server.servlet.context-path=/test
    
    spring.rabbitmq.host=192.168.80.110
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest




    @Component
    public class Send1 {
    
        
        @Autowired
        private RabbitTemplate rabbitTemplate;
        
        
        public boolean send() {
            rabbitTemplate.convertAndSend("topicExchange", "topic.add", "topic消息测试");
            System.out.println("发送消息");
            return true;
        }
    }

    客户端(消息消费者)

     spring.rabbitmq.listener.simple.acknowledge-mode=manual

    可户端开启手动确认,如果不进行确认,消息不会删除,重启客户端的时候还会重新消费

    如果客户端spring.rabbitmq.listener.simple.prefetch = 1并且没有确认,会卡第一条消息,不会继续调用,如果去掉,有多少条消息就会消费多少次,下次重启时还会重新消费

    默认自动确认,确认后就消息就删除了

    #设置消费端手动 ack
    spring.rabbitmq.listener.simple.acknowledge-mode=manual

    #消费者一次消费的数量
    #消费者数量
    spring.rabbitmq.listener.simple.concurrency=2
    #最大消费者数量
    spring.rabbitmq.listener.simple.max-concurrency=2

    客户端如果加spring.rabbitmq.listener.simple.prefetch = 1

    一次消费一个,如果手动消息没有确认成功,会一直卡在这条消息中

    channel.basicNack(tag,false,false);

    如果将消息丢掉,会接着往下消费都会丢掉

    import java.io.IOException;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.support.AmqpHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    import org.springframework.stereotype.Component;
    
    import com.rabbitmq.client.Channel;
    
    @Component
    public class Recv1 {
        
        
    //    @RabbitListener(
    //            bindings = @QueueBinding(
    //                    value = @Queue(value = "boot.queue1", durable = "true"),
    //                    exchange = @Exchange(value = "BOOT-EXCHANGE-1", type = "topic", durable = "true", ignoreDeclarationExceptions = "true"),
    //                    key = "boot.*"
    //            )
    //    )
            
        @RabbitListener(queues = "topic.message")
        @RabbitHandler
        public void recevier(Message message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException{
            System.out.println("topic接收消息:" + new String(message.getBody()));
            //long deliveryTag = message.getMessageProperties().getDeliveryTag();
            //手工ack
            //channel.basicAck(deliveryTag,true);
            try {
    
                /**
                                        * 无异常就确认消息
                 * basicAck(long deliveryTag, boolean multiple)
                 * deliveryTag:取出来当前消息在队列中的的索引;
                 * multiple:为true的话就是批量确认,如果当前deliveryTag为5,那么就会确认
                 * deliveryTag为5及其以下的消息;一般设置为false
                 */
                channel.basicAck(tag, false);
            }catch (Exception e){
                /**
                                       * 有异常就绝收消息
                 * basicNack(long deliveryTag, boolean multiple, boolean requeue)
                 * requeue:true为将消息重返当前消息队列,还可以重新发送给消费者;
                 *         false:将消息丢弃
                 */
                channel.basicNack(tag,false,true);
            }
        }
        
        
        
    }
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.messaging.handler.annotation.Payload;
    import org.springframework.stereotype.Component;
    import xy.study.rabbitmq.conf.TopicRabbitConfig;
    
    @Component
    @Slf4j
    public class HelloReceiver {
    
        /**
         * 下面四个消费者,exchange和RoutingKey都相同,最后两个消费者队列名都相同
         * @param msg
         */
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = TopicRabbitConfig.QUEUE_NAME, durable = "true"),
                exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC),
                key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY)
        )
        public void queueName(@Payload String msg) {
            log.info("{}-----HelloReceiver msg : {}",TopicRabbitConfig.QUEUE_NAME,msg);
        }
    
    
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = TopicRabbitConfig.QUEUE_NAME+".test", durable = "true"),
                exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC),
                key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY)
        )
        public void queueNameTest(@Payload String msg) {
            log.info("{}-----HelloReceiver msg : {}",TopicRabbitConfig.QUEUE_NAME+".test",msg);
        }
    
        /**
         * 这里我的消费者队列名"123445",是乱写的,也能够接受
         * @param msg
         */
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = 123445+"", durable = "true"),
                exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC),
                key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY)
        )
        public void queueNameNumber(@Payload String msg) {
            log.info("{}-----HelloReceiver msg : {}",123445+""+".test",msg);
        }
    
        /**
         * 由于这个和上面的Exchange、RoutingKey、queue完全相同,所以这两个消费者,一条消息,只有一个能消费(随机)
         * @param msg
         */
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = 123445+"", durable = "true"),
                exchange = @Exchange(value = TopicRabbitConfig.TEST_EXCHANGE_HAHA, type = ExchangeTypes.TOPIC),
                key = TopicRabbitConfig.TEST_TOPIC_ROUTINGKEY)
        )
        public void queueNameNumberSame(@Payload String msg) {
            log.info("same+{}-----HelloReceiver msg : {}",123445+""+".test",msg);
        }
    }

    多个消费端

    根据结果可知,当Exchange和RoutingKey相同、queue不同时,所有消费者都能消费同样的信息;

    Exchange和RoutingKey、queue都相同时(最后两个消费者),消费者中只有一个能消费信息,其他消费者都不能消费该信息。

    springboot amqp  一些配置

    #---------------------服务端配置
    spring.rabbitmq.connection-timeout=15000
    
    #开启 confirm 确认机制
    spring.rabbitmq.publisher-confirms=true
    #开启 return 确认机制
    spring.rabbitmq.publisher-returns=true
    #设置为 true 后 消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除
    spring.rabbitmq.template.mandatory=true
    #---------------------服务端配置
    
    
    #---------------------客户端配置
    #设置消费端手动 ack
    spring.rabbitmq.listener.simple.acknowledge-mode=manual
    #消费者最小数量
    spring.rabbitmq.listener.simple.concurrency=1
    #消费之最大数量
    spring.rabbitmq.listener.simple.max-concurrency=10
    #在单个请求中处理的消息个数,他应该大于等于事务数量(unack的最大数量)
    spring.rabbitmq.listener.simple.prefetch=2
    #---------------------客户端配置
    #消费者数量
    spring.rabbitmq.listener.simple.concurrency=10
    #最大消费者数量
    spring.rabbitmq.listener.simple.max-concurrency=10
    #消费者每次从队列获取的消息数量。写多了,如果长时间得不到消费,数据就一直得不到处理
    spring.rabbitmq.listener.simple.prefetch=1
    #消费者自动启动
    spring.rabbitmq.listener.simple.auto-startup=true
    #消费者消费失败,自动重新入队
    spring.rabbitmq.listener.simple.default-requeue-rejected=true
    #启用发送重试 队列满了发不进去时启动重试
    spring.rabbitmq.template.retry.enabled=true 
    #1秒钟后重试一次
    spring.rabbitmq.template.retry.initial-interval=1000 
    #最大重试次数 3次
    spring.rabbitmq.template.retry.max-attempts=3
    #最大间隔 10秒钟
    spring.rabbitmq.template.retry.max-interval=10000
    #等待间隔 的倍数。如果为2  第一次 乘以2 等1秒, 第二次 乘以2 等2秒 ,第三次 乘以2 等4秒
    spring.rabbitmq.template.retry.multiplier=1.0
    #消费者数量
    
    spring.rabbitmq.listener.simple.concurrency=10
    
    #最大消费者数量
    
    spring.rabbitmq.listener.simple.max-concurrency=10
    
    #消费者每次从队列获取的消息数量
    
    spring.rabbitmq.listener.simple.prefetch=1
    
    #消费者自动启动
    
    spring.rabbitmq.listener.simple.auto-startup=true
    
    #消费失败,自动重新入队
    
    #重试次数超过最大限制之后是否丢弃(true不丢弃时需要写相应代码将该消息加入死信队列)
    
    #true,自动重新入队,要写相应代码将该消息加入死信队列
    
    #false,丢弃
    
    spring.rabbitmq.listener.simple.default-requeue-rejected=false
    
    #是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
    
    spring.rabbitmq.listener.simple.retry.enabled=true
    
    spring.rabbitmq.listener.simple.retry.initial-interval=1000
    
    spring.rabbitmq.listener.simple.retry.max-attempts=3
    
    spring.rabbitmq.listener.simple.retry.multiplier=1.0
    
    spring.rabbitmq.listener.simple.retry.max-interval=10000

     消费端重试

    spring.rabbitmq.listener.simple.retry.max-attempts=5  最大重试次数
    spring.rabbitmq.listener.simple.retry.enabled=true 是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)
    spring.rabbitmq.listener.simple.retry.initial-interval=5000 重试间隔时间(单位毫秒)
    spring.rabbitmq.listener.simple.default-requeue-rejected=false 重试次数超过上面的设置之后是否丢弃(false不丢弃时需要写相应代码将该消息加入死信队列)
    # base
    spring.rabbitmq.host: 服务Host
    spring.rabbitmq.port: 服务端口
    spring.rabbitmq.username: 登陆用户名
    spring.rabbitmq.password: 登陆密码
    spring.rabbitmq.virtual-host: 连接到rabbitMQ的vhost
    spring.rabbitmq.addresses: 指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
    spring.rabbitmq.requested-heartbeat: 指定心跳超时,单位秒,0为不指定;默认60s
    spring.rabbitmq.publisher-confirms: 是否启用【发布确认】
    spring.rabbitmq.publisher-returns: 是否启用【发布返回】
    spring.rabbitmq.connection-timeout: 连接超时,单位毫秒,0表示无穷大,不超时
    spring.rabbitmq.parsed-addresses:
    
    
    # ssl
    spring.rabbitmq.ssl.enabled: 是否支持ssl
    spring.rabbitmq.ssl.key-store: 指定持有SSL certificate的key store的路径
    spring.rabbitmq.ssl.key-store-password: 指定访问key store的密码
    spring.rabbitmq.ssl.trust-store: 指定持有SSL certificates的Trust store
    spring.rabbitmq.ssl.trust-store-password: 指定访问trust store的密码
    spring.rabbitmq.ssl.algorithm: ssl使用的算法,例如,TLSv1.1
    
    
    # cache
    spring.rabbitmq.cache.channel.size: 缓存中保持的channel数量
    spring.rabbitmq.cache.channel.checkout-timeout: 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
    spring.rabbitmq.cache.connection.size: 缓存的连接数,只有是CONNECTION模式时生效
    spring.rabbitmq.cache.connection.mode: 连接工厂缓存模式:CHANNEL 和 CONNECTION
    
    
    # listener
    spring.rabbitmq.listener.simple.auto-startup: 是否启动时自动启动容器
    spring.rabbitmq.listener.simple.acknowledge-mode: 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
    spring.rabbitmq.listener.simple.concurrency: 最小的消费者数量
    spring.rabbitmq.listener.simple.max-concurrency: 最大的消费者数量
    spring.rabbitmq.listener.simple.prefetch: 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
    spring.rabbitmq.listener.simple.transaction-size: 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
    spring.rabbitmq.listener.simple.default-requeue-rejected: 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
    spring.rabbitmq.listener.simple.idle-event-interval: 多少长时间发布空闲容器时间,单位毫秒
    
    spring.rabbitmq.listener.simple.retry.enabled: 监听重试是否可用
    spring.rabbitmq.listener.simple.retry.max-attempts: 最大重试次数
    spring.rabbitmq.listener.simple.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔
    spring.rabbitmq.listener.simple.retry.multiplier: 应用于上一重试间隔的乘数
    spring.rabbitmq.listener.simple.retry.max-interval: 最大重试时间间隔
    spring.rabbitmq.listener.simple.retry.stateless: 重试是有状态or无状态
    
    
    # template
    spring.rabbitmq.template.mandatory: 启用强制信息;默认false
    spring.rabbitmq.template.receive-timeout: receive() 操作的超时时间
    spring.rabbitmq.template.reply-timeout: sendAndReceive() 操作的超时时间
    spring.rabbitmq.template.retry.enabled: 发送重试是否可用
    spring.rabbitmq.template.retry.max-attempts: 最大重试次数
    spring.rabbitmq.template.retry.initial-interval: 第一次和第二次尝试发布或传递消息之间的间隔
    spring.rabbitmq.template.retry.multiplier: 应用于上一重试间隔的乘数
    spring.rabbitmq.template.retry.max-interval: 最大重试时间间隔

    #开启 confirm 确认机制
    spring.rabbitmq.publisher-confirms=true

    消息发送到交换器Exchange后触发回调


    #开启 return 确认机制
    spring.rabbitmq.publisher-returns=true

    通过实现ReturnCallback接口,如果消息从交换器发送到对应队列失败时触发(比如根据发送消息时指定的routingKey找不到队列时会触发)

     

    如果消息没有到exchange,则confirm回调,ack=false

    如果消息到达exchange,则confirm回调,ack=true

    exchange到queue成功,则不回调return

    exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)

    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    import java.util.Date;
    import java.util.Map;
    
    //@Component
    public class MQSender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        final RabbitTemplate.ConfirmCallback confirmCallback= new RabbitTemplate.ConfirmCallback() {
    
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("correlationData: " + correlationData);
                System.out.println("ack: " + ack);
                if(!ack){
                    System.out.println("异常处理....");
                }
            }
    
        };
    
        final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
    
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                System.out.println("return exchange: " + exchange + ", routingKey: "
                        + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
            }
        };
    
        //发送消息方法调用: 构建Message消息
        public void send(Object message, Map<String, Object> properties) throws Exception {
            MessageProperties mp = new MessageProperties();
            //在生产环境中这里不用Message,而是使用 fastJson 等工具将对象转换为 json 格式发送
            Message msg = new Message(message.toString().getBytes(),mp);
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            //id + 时间戳 全局唯一
            CorrelationData correlationData = new CorrelationData("1234567890"+new Date());
            rabbitTemplate.convertAndSend("BOOT-EXCHANGE-1", "boot.save", msg, correlationData);
        }
        
        //发送消息方法调用: 构建Message消息
        public void sendUser() throws Exception {
            rabbitTemplate.setMandatory(true);
            rabbitTemplate.setConfirmCallback(confirmCallback);
            rabbitTemplate.setReturnCallback(returnCallback);
            //id + 时间戳 全局唯一
            CorrelationData correlationData = new CorrelationData("1234567890"+new Date());
            rabbitTemplate.convertAndSend("BOOT-EXCHANGE-1", "boot.save", "发送消息", correlationData);
        }
    }

    https://www.cnblogs.com/tinyj/p/9977205.html

  • 相关阅读:
    当一组单选按钮中的一个选中,后文本框为只读属性
    在.net 环境下,进行了伪静态页面处理后,后台的Fckeditor就不能正常显示了
    PL/SQL Developer 8注册码
    选中checkbox后才能填写输入框
    net 试图加载格式不正确的程序。(Exception from HRESULT: 0x8007000B)
    在sql中将varchar型转换成int型再进行排序
    ASP.NET中显示农历时间
    改变自己,拥抱生活
    人生最不值得你去做的事情
    Oracle 中的周、月、日历的查询实现
  • 原文地址:https://www.cnblogs.com/jentary/p/13603469.html
Copyright © 2011-2022 走看看