zoukankan      html  css  js  c++  java
  • RabbitMQ入门

    安装与环境配置

    SpringBoot整合使用

    不使用交换机

    使用DirectExchange交换机

    交换机和路由不会因为我们的代码删除而删除!

    使用TopicExchange交换机

    使用FanoutExchange交换机

    生产者回调

    消费者重回队列

     

    安装与环境配置

    本文为个人记录RabbitMq的学习笔记。

    由于RabbitMQ是基于erlang的,所以,在正式安装RabbitMQ之前,需要先安装一下erlang。

    下载安装RabbitMq

    1、2两个步骤的安装包,我是在本机Windows10安装的。如果你看到这里可以百度找一下安装博客,其实就是傻瓜式下一步 

    链接:https://pan.baidu.com/s/1mUyOdBKcvoW3Lv8f4y8NAQ
    提取码:41fm

    配置erlang和RabbitMq的环境变量

     

     

    在cmd命令界面安装RabbitMq网页版控制台 rabbitmq-plugins enable rabbitmq_management

    浏览器 http://localhost:15672/ 进入登录页面,账号密码都是 guest

    SpringBoot整合使用

    注意:当前我把生产者和消费者放到了两个项目中,是因为刚好搭建了springcloud项目,一步到位。你也可以当到同一个项目中。

    需要先引入依赖,如果是多个项目则都需要引入

     <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    View Code

    不使用交换机

    package com.dang.springcloud.mq;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    
    @RestController
    public class Send1 {
    
        //创建一个队列
        @Bean(value = "mq1")
        public Queue mq1() {
            return new Queue("mq1");
        }
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
    
        @GetMapping("g1")
        public String send() {
            //发送消息,将消息放入mq1的队列中
            amqpTemplate.convertAndSend("mq1", "我是一个字符串");
            System.out.println("1已发送消息");
            return "发送成功!";
        }
    
    }
    View Code
    package com.datang.springcloud.mq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class Receive1 {
        //    监听 mq1消息队列
        @RabbitListener(queues = "mq1")
        public void receive(String msg) {
            System.out.println("1我接受到的消息是:" + msg);
        }
    }
    View Code

    这种写法,发送者直接将消息投递到队列中,消费者从队列获取。思考一个问题,如果我们需要将一条消息投递到多个队列是不是这么写?

    package com.dang.springcloud.mq;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Queue;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    
    @RestController
    public class Send2 {
    
        //创建一个队列
        @Bean(value = "mq2")
        public Queue mq2() {
            return new Queue("mq2");
        }
    
    
        //创建一个队列
        @Bean(value = "mq3")
        public Queue mq3() {
            return new Queue("mq3");
        }
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
    
        @GetMapping("g2")
        public String send() {
            //发送消息,将消息放入mq1的队列中
            amqpTemplate.convertAndSend("mq2", "我是一个字符串");
            amqpTemplate.convertAndSend("mq3", "我是一个字符串");
            System.out.println("23已发送消息");
            return "发送成功!";
        }
    
    }
    View Code 
    package com.datang.springcloud.mq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class Receive2 {
        //    监听 mq2消息队列
        @RabbitListener(queues = "mq2")
        public void receive(String msg) {
            System.out.println("2我接受到的消息是:" + msg);
        }
    
        //    监听 mq3消息队列
        @RabbitListener(queues = "mq3")
        public void receive2(String msg) {
            System.out.println("3我接受到的消息是:" + msg);
        }
    }
    View Code

    RabbitMQ在消息发送者和队列之间在抽象一个概念,交换机。消息发送者,不关心消息到底发送给谁,而是将消息投递给交换机,并且指定路由地址,交换机通过路由标记决定消息投递到哪个队列中。

    使用DirectExchange交换机

    package com.dang.springcloud.mq;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @Configuration
    @RestController
    public class Send2 {
    
        //创建一个队列
        @Bean(value = "mq2")
        public Queue mq2() {
            return new Queue("mq2");
        }
    
    
        //创建一个队列
        @Bean(value = "mq3")
        public Queue mq3() {
            return new Queue("mq3");
        }
    
    
        //Direct交换机
        @Bean(value = "ex1")
        DirectExchange ex1() {
            return new DirectExchange("ex1");
        }
    
        //绑定 将队列和交换机绑定, 并设置用于匹配键
        @Bean(value = "bi1")
        Binding binding1() {
            return BindingBuilder.bind(mq2()).to(ex1()).with("k1");
        }
    
        //绑定 将队列和交换机绑定, 并设置用于匹配键
        @Bean(value = "bi2")
        Binding binding2() {
            return BindingBuilder.bind(mq3()).to(ex1()).with("k1");
        }
    
    
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
    
        @GetMapping("g2")
        public String send() {
            amqpTemplate.convertAndSend("ex1", "k1", "我是一条消息");
            System.out.println("消息已投递给交换机ex1");
            return "发送成功!";
        }
    
    }
    View Code
    package com.datang.springcloud.mq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class Receive2 {
        //    监听 mq2消息队列
        @RabbitListener(queues = "mq2")
        public void receive(String msg) {
            System.out.println("2我接受到的消息是:" + msg);
        }
    
        //    监听 mq3消息队列
        @RabbitListener(queues = "mq3")
        public void receive2(String msg) {
            System.out.println("3我接受到的消息是:" + msg);
        }
    }
    View Code

    这种写法,我们还是需要创建多个队列,然后创建交换机,将队列和交换机绑定,此时我们需要填入一个路由地址。消息发送者直接将消息投递给交换机,并且指定路由。由交换机根据路由查询绑定的队列。上边代码片段,只有唯一的一个交换机,但是从这个交换机中查到了两个不同的路由,将消息投递给绑定的队列中。消费者则不改变写法。

    交换机和路由不会因为我们的代码删除而删除!

     

     

     

    使用TopicExchange交换机

    一下代码片段和上边的十分类似,只是使用了TopicExchange交换机。第二个绑定器使用的是 * 通配符。消息是可以被投递到mq5的。

    package com.dang.springcloud.mq;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @Configuration
    @RestController
    public class Send3 {
    
        //创建一个队列
        @Bean(value = "mq4")
        public Queue mq4() {
            return new Queue("mq4");
        }
    
    
        //创建一个队列
        @Bean(value = "mq5")
        public Queue mq5() {
            return new Queue("mq5");
        }
    
    
        //Topic交换机
        @Bean(value = "ex2")
        TopicExchange ex2() {
            return new TopicExchange("ex2");
        }
    
        //绑定 将队列和交换机绑定, 并设置用于匹配键
        @Bean(value = "bi3")
        Binding binding1() {
            return BindingBuilder.bind(mq4()).to(ex2()).with("student.age.12");
        }
    
        //绑定 将队列和交换机绑定, 并设置用于匹配键
        @Bean(value = "bi4")
        Binding binding2() {
            return BindingBuilder.bind(mq5()).to(ex2()).with("student.age.*");
        }
    
    
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
    
        @GetMapping("g3")
        public String send() {
            amqpTemplate.convertAndSend("ex2", "student.age.12", "我是一条消息");
            System.out.println("消息已投递给交换机ex2");
            return "发送成功!";
        }
    
    }
    View Code
    package com.datang.springcloud.mq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class Receive3 {
        //    监听 mq4消息队列
        @RabbitListener(queues = "mq4")
        public void receive(String msg) {
            System.out.println("4我接受到的消息是:" + msg);
        }
    
        //    监听 mq5消息队列
        @RabbitListener(queues = "mq5")
        public void receive2(String msg) {
            System.out.println("5我接受到的消息是:" + msg);
        }
    }
    View Code

     

    接下来我们删除 student.age.* 的路由,换成 student.* 这样消息就不能转发到mq5了。可见 * 只能匹配一个词。

    删除掉所有绑定到mq5队列的路由,重新绑定。下面代码片段使用的 # 通配符,# 匹配多个词。

    package com.dang.springcloud.mq;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @Configuration
    @RestController
    public class Send3 {
    
        //创建一个队列
        @Bean(value = "mq4")
        public Queue mq4() {
            return new Queue("mq4");
        }
    
    
        //创建一个队列
        @Bean(value = "mq5")
        public Queue mq5() {
            return new Queue("mq5");
        }
    
    
        //Topic交换机
        @Bean(value = "ex2")
        TopicExchange ex2() {
            return new TopicExchange("ex2");
        }
    
        //绑定 将队列和交换机绑定, 并设置用于匹配键
        @Bean(value = "bi3")
        Binding binding1() {
            return BindingBuilder.bind(mq4()).to(ex2()).with("student.age.12");
        }
    
        //绑定 将队列和交换机绑定, 并设置用于匹配键
        @Bean(value = "bi4")
        Binding binding2() {
            return BindingBuilder.bind(mq5()).to(ex2()).with("student.#");
        }
    
    
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
    
        @GetMapping("g3")
        public String send() {
            amqpTemplate.convertAndSend("ex2", "student.age.12", "我是一条消息");
            System.out.println("消息已投递给交换机ex2");
            return "发送成功!";
        }
    
    }
    View Code

    使用FanoutExchange交换机

    FanoutExchange没有路由的概念,到像是点对点的直接发送。

    package com.dang.springcloud.mq;
    
    import org.springframework.amqp.core.*;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @Configuration
    @RestController
    public class Send4 {
    
        //创建一个队列
        @Bean(value = "mq6")
        public Queue mq6() {
            return new Queue("mq6");
        }
    
    
        //创建一个队列
        @Bean(value = "mq7")
        public Queue mq7() {
            return new Queue("mq7");
        }
    
    
        //Fanout交换机
        @Bean(value = "ex3")
        FanoutExchange ex3() {
            return new FanoutExchange("ex3");
        }
    
        //绑定 将队列和交换机绑定
        @Bean(value = "bi5")
        Binding binding1() {
            return BindingBuilder.bind(mq6()).to(ex3());
        }
    
        //绑定 将队列和交换机绑定
        @Bean(value = "bi6")
        Binding binding2() {
            return BindingBuilder.bind(mq7()).to(ex3());
        }
    
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
    
        @GetMapping("g4")
        public String send() {
            //此处第二个路由参数必须给 null,否则不能成功将消息投递到队列
            amqpTemplate.convertAndSend("ex3", null,"我是一条消息");
            System.out.println("消息已投递给交换机ex3");
            return "发送成功!";
        }
    
    }
    View Code
    package com.datang.springcloud.mq;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    public class Receive4 {
        //    监听 mq6消息队列
        @RabbitListener(queues = "mq6")
        public void receive(String msg) {
            System.out.println("6我接受到的消息是:" + msg);
        }
    
        //    监听 mq7消息队列
        @RabbitListener(queues = "mq7")
        public void receive2(String msg) {
            System.out.println("7我接受到的消息是:" + msg);
        }
    }
    View Code

    生产者回调

    在Spring配置文件中配置如下

    spring.rabbitmq.publisher-confirms=true
    spring.rabbitmq.publisher-returns=true
    View Code

    注意看回调函数的写法

    package com.dang.springcloud.mq;
    
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.rabbit.support.CorrelationData;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    @Configuration
    public class Send5 {
    
        @Bean
        public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
            RabbitTemplate rabbitTemplate = new RabbitTemplate();
            rabbitTemplate.setConnectionFactory(connectionFactory);
            //设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
            rabbitTemplate.setMandatory(true);
    
            rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                @Override
                public void confirm(CorrelationData correlationData, boolean b, String s) {
                    System.out.println("confirm-----"+correlationData);
                    System.out.println("confirm-----"+b);
                    System.out.println("confirm-----"+s);
                }
            });
            rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
                @Override
                public void returnedMessage(Message message, int i, String s, String s1, String s2) {
                    System.out.println("returnedMessage-----"+message);
                    System.out.println("returnedMessage-----"+s);
                    System.out.println("returnedMessage-----"+s1);
                    System.out.println("returnedMessage-----"+s2);
                }
            });
    
            return rabbitTemplate;
        }
    
        //创建一个队列
        @Bean(value = "mq8")
        public Queue mq8() {
            return new Queue("mq8");
        }
    
    
        //Topic交换机
        @Bean(value = "ex4")
        TopicExchange ex4() {
            return new TopicExchange("ex4");
        }
    
        //绑定 将队列和交换机绑定, 并设置用于匹配键
        @Bean(value = "bi7")
        Binding binding1() {
            return BindingBuilder.bind(mq8()).to(ex4()).with("HaHaHa");
        }
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        @GetMapping(value = "g5")
        public String g5() {
            rabbitTemplate.convertAndSend("ex4", "HaHaHa", "我是一条消息");
            System.out.println("消息已投递给交换机ex4");
            return "发送成功";
        }
    
    }
    View Code

    回调结果

    交换机不对
    confirm-----null
    confirm-----false
    confirm-----channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'ex' in vhost '/', class-id=60, method-id=40)
    
    
    找不到路由
    returnedMessage-----(Body:'我是一条消息' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0])
    returnedMessage-----NO_ROUTE
    returnedMessage-----ex4
    returnedMessage-----HaHaHaww
    confirm-----null
    confirm-----true
    confirm-----null
    
    成功
    confirm-----null
    confirm-----true
    confirm-----null
    View Code

    消费者重回队列

    package com.datang.springcloud.mq;
    
    import com.rabbitmq.client.Channel;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    @Component
    public class Receive5 {
        //    监听 mq8消息队列
        @RabbitListener(queues = "mq8")
        public void receive(String msg, Message message, Channel channel) throws IOException {
            try {
                System.out.println("8我接受到的消息是:" + msg);
                int a = 1 / 0;
                // 第一个参数为队列的ID,第二个参数批处理,手动提交比当前ID小的。如果这个队列为正确的,那就可以把之前的提交
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
                System.out.println("消费消息确认" + message.getMessageProperties().getConsumerQueue() + ",接收到了回调方法");
            } catch (Exception e) {
                //其实重发也没多大意义,一般都是做个日志,或者其他补偿。
                System.out.println("尝试重发:" + message.getMessageProperties().getConsumerQueue());
                //前两个参数和 basicAck 一样,最后一个为是否重回队列
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
    
    
            }
        }
    }
    View Code

     

  • 相关阅读:
    .NET设计模式系列文章
    [转]给年轻工程师的十大忠告
    [你必须知道的.NET]第二十回:学习方法论
    写给开发者看的关系型数据库设计
    AjaxPro使用说明
    Spring.Net入门篇(一) [转]
    [从设计到架构] 必须知道的设计模式
    4月1日SharePoint Designer将开始免费
    12月累计更新的一个导出导入网站的问题在2月累计更新中修复了
    修复错误1093 “Unable to get the private bytes memory limit for the W3WP process”
  • 原文地址:https://www.cnblogs.com/zumengjie/p/12359084.html
Copyright © 2011-2022 走看看