zoukankan      html  css  js  c++  java
  • RabbitMQ --- 直连交换机 【 无回调方法,不能获取消费结果 】

    1.前言

      消息队列除了kafka 外,还有许多种,比如RabbitMQ 、ActiveMQ、ZeroMQ、JMQ等。

      老牌的ActiveMQ ,底层使用Java写的,资源消耗大,速度也慢,但是适合 JMS 【java message service】的使用 ,事实上,性能差,现在用的人很少了。

      现在流行使用kafka,那是因为支持很大的吞吐量,处理数据速度很快,但是,对数据的处理安全性不高,而且,需要处理那么大吞吐量的应用实际上不多,

    kafka更多的是使用在大数据方面,底层是 使用 zookeeper开发 。

      RabbitMQ的吞吐量比kafka的低一些,实际上这没有可比性,RabbitMQ的开发理念重点不做吞吐量,而是安全性,常使用在金融方面的应用,使用的人很多,技术成熟,

    是 amqp协议的完美实现,底层使用erlang语言实现,每个节点的服务程序【broker】由交换机和 消息队列 组成 ,消息队列又分主队列和镜像队列,如果主队列挂掉了, 

    那么会选一个镜像队列成为主队列,也就是说镜像队列只要是用来备份的。那么,读取队列信息如果连接到非主队列,则需要交换机路由到指定主队列读取,因此这样的单节点,

    导致了吞吐量受限。

      综合上来说,RabbitMQ是最好的,如果单考虑吞吐量,那么肯定选择kafka。

      这一篇随笔,讲解RabbitMQ 的 4大交换机中的 直连交换机的简单使用。

      消息中间件不仅可以在多个服务器间使用,也可以在单个服务器使用,用于消息转发给订阅消息队列的监听器,

    这里我以两个服务器作为演示,消息生产者工程端口为1004,消息消费者工程端口为1002.

    注意,需要提前安装RabbitMQ软件,window10 详细安装 的 随笔地址 https://www.cnblogs.com/c2g5201314/p/12990634.html
    消息生产者端总结:
    (1)使用直连交换机 ,需要给绑定的消息队列分配路由键 ,也就是一串用于识别的字符串。
    (2)调用rabbit模板发送消息时,需要参数分别是 直连交换机名字、路由键、消息字,
      数据类型都是字符串 ,如果是键值对象则需要转成json字符串,然后后接收的消费者端解析json即可。
    (3)直连交换机发送消息的底层原理,其实是使用rabbit模板,根据指定的交换机名字查找交换机【因此不同类型的交换机名字是不允许相同的】,找到后将路由键和消息传给该直连交换机,
      然后该交换机根据路由键查找消息队列,需要与消息队列的路由键完全一样才可以匹配成功,找到匹配的消息队列后,将消息放入消息队列中,然后消息队列会自动将消息推送给监听该消息队列的消费者端,
      当消费者接收后并且确认后,消息队列会将该消息销毁。 (
    4)如果路由键匹配不到消息队列【即消息队列不存在】,消息将会抛弃。 (5)如果匹配到了消息队列,但是没有监听该消息队列的消费者端,那么消息将一直存在该队列中,直到有监听该队列的消费者端启动后,消费该消息,消息才会从消息队列中销毁。

     消费完清空后,将恢复为0,【因此可证明 ,数据很安全,不会丢失】




    消息消费者端总结:
    总结:
    (1)消息消费者不需要配置什么东西,只需要在配置文件添加rabbit地址端口账号密码,即可连接,
      然后在需要的监听类关联指定的队列名字即可接收到该队列的消息
    (2)如果是使用 直连交换机发送消息,该队列的所有监听将会使用轮询策略做负载均衡来消费信息,
      不论是将监听放于类上还是方法上,效果都是一样的
    (3)方法上写监听,记得在类上加@Service或@Component注册bean,否则消息队列监听注册无效
    (4)消息只能传输字符串,但是可以使用json字符串,获取后再对其解析即可,可用fastjson解析,
      也可以使用objectMapper强制解析【不建议使用】

    2.消息生产者端

    (1)目录结构

     红箭头标出来的两个文件是核心文件

    (2)导入依赖包

            <!-- 消息中间件-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
                <version>2.1.6.RELEASE</version>
            </dependency>

    pom.xml源码【不可直接复制源码,我这里是maven多模块的子工程,需要改依赖管理的】

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>cen.cloud</groupId>
            <artifactId>cen-mycloud</artifactId>
            <version>0.0.1-SNAPSHOT</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>rabbitmq-producer-1004</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>rabbitmq-producer-1004</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    
    
            <!--eureka 注册中心依赖包 -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
            </dependency>
    
            <!-- 消息中间件-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
                <version>2.1.6.RELEASE</version>
            </dependency>
    
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    View Code

    (3)配置application.properties文件

     完整源码

    #工程名/项目名/应用名/服务名
    spring.application.name=rabbitmq-producer-1004
    #端口号
    server.port=1004
    #eureka注册
    eureka.client.serviceUrl.defaultZone=http://localhost:7001/eureka/
    
    
    #rabbitmq配置
    #spring.rabbitmq.virtual-host=/
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    #默认账户
    spring.rabbitmq.username=guest
    #默认密码
    spring.rabbitmq.password=guest
    
    #
    #spring.rabbitmq.listener.simple.concurrency=10
    #spring.rabbitmq.listener.simple.max-concurrency=20
    #spring.rabbitmq.listener.simple.prefetch=50
    ##
    #mq.env=local
    
    
    
    #
    #
    #
    #日志配置
    # 指定日志输入级别【根节点,表明整个项目基本的日志级别】
    logging.level.root=info
    # ** 表示是指定的某个文件的路径或类的日志级别
    #logging.level.**=info
    
    # 指定日志输出位置和日志文件名 , ./指工程根目录
    logging.file=./rabbitmq-producer-1004/log/spring.log
    
    # 指定日志输出路径,若file和path同时配置,则file生效
    # 此配置默认生成文件为spring.log
    #logging.file.path=./log
    
    # 控制台日志输出格式
    # -5表示从左显示5个字符宽度
    logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %boldYellow(%thread) | %boldGreen(%logger) | %msg%n
    
    # 文件中输出的格式
    logging.pattern.file=%d{yyyy-MM-dd HH:mm:ss.SSS} = [%thread] = %-5level = %logger{50} - %msg%n
    View Code

    (4)创建rabbitmq配置类

    package com.example.rabbitmqproducer1004.config;
    
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * rabbitmq配置类---消息生产者
     */
    @Configuration
    public class RabbitmqConfig {
        //日志记录器
        Logger logger = LoggerFactory.getLogger(getClass());
    
    
    
        /**
         * 定义 交换机、消息队列、路由关键字 的名字
         */
    
        //定义交换机名字 exchange
        public static final String EXCHANG_1 = "exchange_1";
    
        //定义消息队列名字 queue
        public static final String QUEUE_1 = "queu_1";
    
    
        //定义路由键 routingkey
        public static final String ROUTINGKEY_1 = "routing_1";
    
    
        //===============================================================
    
        /**
         * 下面的是 直连交换机 设置 绑定 消息队列 到 交换机
         *
         * DirectExchange:直连交换机,按照routingkey分发到指定队列
         */
        //==============================================
    
        /**
         * 设置交换机类型
         */
        @Bean
        public DirectExchange directExchange() {
            logger.warn("设置交换机类型");
            //实例交换机对象,然后注入该交换机的名字
            return new DirectExchange(EXCHANG_1);
        }
    
        /**
         * 创建消息队列
         */
        @Bean
        public Queue queue1() {
            logger.warn("创建消息队列");
            //实例消息队列对象,输入该队列名字,如果需要该队列持久化,则设为true,默认是false
    //        return new Queue(QUEUE_1, true);
            return new Queue(QUEUE_1);
        }
    
        /**
         * 绑定 消息队列 到 交换机【一个 交换机 允许被多个 消息队列 绑定】
         */
        @Bean
        public Binding binding() {
            logger.warn("绑定 消息队列 到 交换机");
            //使用绑定构造器将 指定的队列 绑定到 指定的交换机上 ,Direct交换机需要携带 路由键
            return BindingBuilder.bind(queue1()).to(directExchange()).with(ROUTINGKEY_1);
        }
    
    
    }
    View Code

    (5)创建消息生产类

    package com.example.rabbitmqproducer1004.rabbitmqFactory;
    
    
    import com.example.rabbitmqproducer1004.config.RabbitmqConfig;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.connection.CorrelationData;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    import java.util.UUID;
    
    /**
     * 消息生产类
     */
    @Component
    //实现接口
    public class SendMessage  {
    
        Logger logger = LoggerFactory.getLogger(this.getClass());
    
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
    
        /**
         * 发送消息
         *
         * 参数是消息内容
         */
        public void send(String message){
            logger.warn("发送消息,内容:"+message);
    
            //发送消息 ,参数分别是 : 指定的交换机名字 、指定的路由键、消息字符串
            rabbitTemplate.convertAndSend(RabbitmqConfig.EXCHANG_1,RabbitmqConfig.ROUTINGKEY_1,message);
        }
    
    
    
    }
    View Code

    (6)controller层,调用消息生产类

    package com.example.rabbitmqproducer1004.controller;
    
    import com.example.rabbitmqproducer1004.rabbitmqFactory.SendMessage;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class DController {
    
        @Autowired
        private SendMessage sendMessage;
    
        @RequestMapping("/mq")
        public String mq(String msg){
            sendMessage.send(msg);
            return "发送成功";
        }
    
    }
    View Code

    3.消息消费者端

    (1)目录结构

     红箭头标出来的文件是核心文件

    (2)导入依赖包

            <!-- 消息中间件-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
                <version>2.1.6.RELEASE</version>
            </dependency>

    完整的pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>cen.cloud</groupId>
            <artifactId>cen-mycloud</artifactId>
            <version>0.0.1-SNAPSHOT</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.example</groupId>
        <artifactId>rabbitmq-consumer-1002</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>rabbitmq-consumer-1002</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <!--eureka 注册中心依赖包 -->
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
            </dependency>
    
            <!--        &lt;!&ndash;健康检测管理中心 ,可刷新配置文件&ndash;&gt;-->
            <!--        <dependency>-->
            <!--            <groupId>org.springframework.boot</groupId>-->
            <!--            <artifactId>spring-boot-starter-actuator</artifactId>-->
            <!--        </dependency>-->
    
            <!-- 消息中间件-->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
                <version>2.1.6.RELEASE</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    View Code

    (3)配置application.properties文件

     完整源码

    #工程名/项目名/应用名/服务名
    spring.application.name=rabbitmq-consumer-1002
    #端口号
    server.port=1002
    #eureka注册
    eureka.client.serviceUrl.defaultZone=http://localhost:7001/eureka/
    
    #rabbitmq配置
    #spring.rabbitmq.virtual-host=/
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    #默认账户
    spring.rabbitmq.username=guest
    #默认密码
    spring.rabbitmq.password=guest
    #
    #spring.rabbitmq.listener.simple.concurrency=10
    #spring.rabbitmq.listener.simple.max-concurrency=20
    #spring.rabbitmq.listener.simple.prefetch=50
    ##
    #mq.env=local
    
    #
    #
    #
    #日志配置
    # 指定日志输入级别【根节点,表明整个项目基本的日志级别】
    logging.level.root=info
    # ** 表示是指定的某个文件的路径或类的日志级别
    #logging.level.**=info
    
    # 指定日志输出位置和日志文件名 , ./指工程根目录
    logging.file=./rabbitmq-consumer-1002/log/spring.log
    
    # 指定日志输出路径,若file和path同时配置,则file生效
    # 此配置默认生成文件为spring.log
    #logging.file.path=./log
    
    # 控制台日志输出格式
    # -5表示从左显示5个字符宽度
    logging.pattern.console=%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %boldYellow(%thread) | %boldGreen(%logger) | %msg%n
    
    # 文件中输出的格式
    logging.pattern.file=%d{yyyy-MM-dd HH:mm:ss.SSS} = [%thread] = %-5level = %logger{50} - %msg%n
    View Code

    (4)rabbitmq配置类

    package com.example.rabbitmqconsumer1002.config;
    
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.stereotype.Component;
    
    /**
     * rabbitmq的消费者配置类
     */
    @Configuration
    public class RabbitConfig {
    
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
    
        //定义需要关联的消息队列名字 queue
        public static final String QUEUE_1 = "queu_1";
    
        
    
    }
    View Code

    是的,你没看错,就这么点东西

    (5)消息队列监听

    接听方式分两种方式,

    一种是放在类上

    package com.example.rabbitmqconsumer1002.rabbitmqListener;
    
    import com.example.rabbitmqconsumer1002.config.RabbitConfig;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    /**
     * 消息监听类--发短信
     */
    //注册bean
    @Component
    //设置需要监听的消息队列
    @RabbitListener(queues = RabbitConfig.QUEUE_1)
    public class SendMessageListener {
        Logger logger = LoggerFactory.getLogger(getClass());
    
        //消息事件处理
        @RabbitHandler
        public void sendMessage(String msg) {
            logger.warn("我是端口1002的消费者,收到信息:" + msg);
        }
    
    
    }
    View Code

    一种是放在方法上 【但记得给这个方法的类注册bean】

    package com.example.rabbitmqconsumer1002.rabbitmqListener;
    
    import com.example.rabbitmqconsumer1002.config.RabbitConfig;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    import org.springframework.stereotype.Service;
    
    /**
     * 记得加@Service或@Component注册bean,否则消息队列监听注册无效
     */
    //@Service
    @Component
    public class SMService {
        Logger logger = LoggerFactory.getLogger(getClass());
        @RabbitListener(queues = RabbitConfig.QUEUE_1)
        public void kk(String msg){
            logger.warn("我是端口1002的消费者--方法监听--,收到信息:" + msg);
    
        }
    
    }
    View Code

    4.测试

    必须先启动消息生产者端的工程,会自动在rabbitmq创建消息队列和交换机,然后再启动消息消费者端,

    否则消费者端因为监听不到该指定消息队列而报错。

    (1)启动消息生产端

    打印的初始化循序

     【请忽略日志级别,那是因为我故意设为警告级别,红色看起来明显】

    浏览器输入网址 http://127.0.0.1:15672/

     可进入rabbitmq监控页面

    使用默认账号密码登录

    选择 connection 选项,可以查看当前连接rabbitmq的主机信息

     选择exchange选项,可以看到新建的交换机

    选择queue选项,可看到新建的消息队列

     (2)启动消息消费端

    启动后,再次选择 connection 选项,可以看的消费者端也连接好了

     (3)调用消息生产者的接口发消息,访问网址  http://localhost:1004/mq?msg=你大爷,帮我发短信85345

     

     提示发送成功

     查看生产者控制台

    现在去看消费者的控制台

    可见 ,消费者 成功从消息队列获取到了消息。

  • 相关阅读:
    db2 load命令装载数据时定位错误出现的位置
    DB2 SQL error: SQLCODE: -668, SQLSTATE: 57016, SQLERRMC: 3
    db2重组所有表和更新表统计信息
    DB2消息
    db2 SQL查询路径
    db2备份与恢复
    博弈-题表
    【POJ1082】Calendar Game (博弈)
    【POJ1067】取石子游戏 (威佐夫博弈)
    【POJ3710】Christmas Game (博弈-树上的删边问题)
  • 原文地址:https://www.cnblogs.com/c2g5201314/p/13156932.html
Copyright © 2011-2022 走看看