zoukankan      html  css  js  c++  java
  • 2020-03-17 20:18:50springboot整合rabbitmq

    1、理论知识

    rabbitmq交换机的类型:

    direct:消息中的路由键(routing key)如果和 Binding 中的 bindingkey 一致, 交换器就将消息发到对应的队列中。它是完全匹配、单播的模式。
    fanout:每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 类型转发消息是最快的。广播形式
    topic:topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它同样也会识别两个通配符:符号“#”和符号“*”。#匹配0个或多个单词,*匹配一个单词。
    headers:headers 匹配 AMQP 消息的 header而不是路由键, headers 交换器和 direct 交换器完全一致,但性能差很多,目前几乎用不到了
     
    AMQP运行机制:
    AMQP 中消息的路由过程和 Java 开发者熟悉的 JMS 存在一些差别,AMQP 中增加了Exchange 和 Binding 的角色。生产者把消息发布到 Exchange 上,消息最终到达队列并被消费者接收,而 Binding 决定交换器的消息应该发送到那个队列。
     
    rabbitmq服务端默认端口:5672,web浏览器查看端口:15672
    整合代码:
    pom
    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.2.2.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.study.springboot</groupId>
        <artifactId>amqp-study</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>amqp-study</name>
        <description>Demo project for Spring Boot</description>
    
        <properties>
            <java.version>1.8</java.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <scope>runtime</scope>
                <optional>true</optional>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.junit.vintage</groupId>
                        <artifactId>junit-vintage-engine</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.springframework.amqp</groupId>
                <artifactId>spring-rabbit-test</artifactId>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>

    yml

    spring:
      rabbitmq:
        host: localhost#主机地址
        password: guest#用户名
        username: guest#密码

    主启动类

    package com.study.springboot.amqp;
    
    import org.springframework.amqp.rabbit.annotation.EnableRabbit;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @EnableRabbit
    @SpringBootApplication
    public class AmqpStudyApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(AmqpStudyApplication.class, args);
        }
    
    }

    监听接受消息的代码:

    当接受消息的队列一致,类型为Message时,会将其他类型覆盖,即其他类型的方法不执行

    package com.study.springboot.amqp.service;
    
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    import java.util.Map;
    
    @Service
    public class MessageService {
    
        @RabbitListener(queues = "amqp.study")
        public void receiveMap(Map map){
            System.out.println("收到的消息:"+map);
        }
    
        /*@RabbitListener(queues = "amqp.study")
        public void receiveMessage(Message message){
            System.out.println("收到的消息体:"+message.getBody());
            System.out.println("收到的消息MessageProperties:"+message.getMessageProperties());
        }*/
    }

    自定义消息解析器:将消息转为json形式

    package com.study.springboot.amqp.config;
    
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.amqp.support.converter.MessageConverter;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    @Configuration
    public class MyAMQPConfig {
    
        @Bean
        public MessageConverter messageConverter(){
            return new Jackson2JsonMessageConverter();
        }
    }

    测试代码:

    package com.study.springboot.amqp;
    
    import org.junit.jupiter.api.Test;
    import org.springframework.amqp.core.AmqpAdmin;
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    @SpringBootTest
    class AmqpStudyApplicationTests {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Autowired
        private AmqpAdmin amqpAdmin;
    
        @Test
        void contextLoads() {
        }
    
        @Test
        public void createEv(){
            //创建交换机
            amqpAdmin.declareExchange(new DirectExchange("amqp.study"));
            //创建队列
            amqpAdmin.declareQueue(new Queue("amqp.study"));
    
            //建立绑定关系
            amqpAdmin.declareBinding(new Binding("amqp.study",Binding.DestinationType.QUEUE,"amqp.study","amqp.study",null));
        }
    
        @Test
        public void send(){
            Map<String,Object> map = new HashMap<>();
            map.put("msg","studymsg");
            map.put("data", Arrays.asList("你好","(*^_^*)", "/(ㄒoㄒ)/~~"));
    
            rabbitTemplate.convertAndSend("amqp.study","amqp.study",map);
        }
    
        @Test
        public void receive(){
            Object o = rabbitTemplate.receiveAndConvert("amqp.study");
            System.out.println(o.getClass());
            System.out.println(o);
        }
    
    
    
    }
  • 相关阅读:
    数据结构-树与二叉树-思维导图
    The last packet successfully received from the server was 2,272 milliseconds ago. The last packet sent successfully to the server was 2,258 milliseconds ago.
    idea连接mysql报错Server returns invalid timezone. Go to 'Advanced' tab and set 'serverTimezone' property
    redis学习笔记
    AJAX校验注册用户名是否存在
    AJAX学习笔记
    JSON学习笔记
    JQuery基础知识学习笔记
    Filter、Listener学习笔记
    三层架构学习笔记
  • 原文地址:https://www.cnblogs.com/lxw-all/p/12513185.html
Copyright © 2011-2022 走看看