zoukankan      html  css  js  c++  java
  • rabbitmq项目案例

    一、项目结构

    二、pom.xml

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.wuxi</groupId>
        <artifactId>A01mq</artifactId>
        <version>0.0.1-SNAPSHOT</version>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.3.4.RELEASE</version>
        </parent>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.9</source>
                        <target>1.9</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>

    三、application.yml

    server:
      port: 8080
    spring:
      application:
        name: rabbitmqServer
      rabbitmq:
        host: 127.0.0.1

    四、启动类

    package com.wuxi;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class MyApplication {
        public static void main(String[] args) {
            SpringApplication.run(MyApplication.class, args);
        }
    }

    五、service

    package com.wuxi.services;
    
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Service;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    @Service
    //@RabbitListener(queues = "work") //注解在类上时,只能有一个方法需要加@RabbitHandler注解
    public class MqService {
    
        @RabbitListener(queues = "work")
        public void work1(String text) {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("消费者1:+++++" + text);
        }
    
        @RabbitListener(queues = "work")
        public void work2(String text) {
            System.out.println("消费者2:-----" + text);
        }
    
        //****************************************************************
    
        @RabbitListener(queues = "publish_queue1")
        public void publish1(String text) {
            try {
                Thread.sleep(50);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("消费者1:+++++" + text);
        }
    
        @RabbitListener(queues = "publish_queue2")
        public void publish2(String text) {
            System.out.println("消费者2:-----" + text);
        }
    
        //****************************************************************
    
        @RabbitListener(queues = "routing_queue1")
        public void routing1(String text) {
            System.out.println("消费者1:+++++" + text);
        }
    
        @RabbitListener(queues = "routing_queue2")
        public void routing2(String text) {
            System.out.println("消费者2:-----" + text);
        }
    
        //****************************************************************
    
        @RabbitListener(queues = "topic_queue1")
        public void topic1(String text) {
            System.out.println("消费者1:+++++" + text);
        }
    
        @RabbitListener(queues = "topic_queue2")
        public void topic2(String text) {
            System.out.println("消费者2:-----" + text);
        }
    
        //****************************************************************
    
        @RabbitListener(queues = "delayed_queue")
        public void delayed(String text) {
            System.out.println("接收时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
            System.out.println("消费者1:+++++" + text);
        }
    }

    六、controller

    package com.wuxi.controllers;
    
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessagePostProcessor;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * 安装完erlang再安装rabbitmq,然后开启界面管理。如果用户无法登录,命令行添加用户再配置权限然后登录
     *
     * @author LL
     */
    @RestController
    public class MqController {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        /**
         * work模式:只有一个消费者能收到消息,当一个消费者较忙时,消息将被另一个不忙的消费者接收
         * 在rabbitmq中要创建一个队列
         *
         * @return
         */
        @RequestMapping("/work")
        public String work() {
            for (int i = 0; i < 10; i++) {
                amqpTemplate.convertAndSend("work", "发送消息" + i);
            }
            return "ok";
        }
    
        // ****************************************************************
    
        /**
         * publish模式:多个消费者同时接收到消息
         * 在rabbitmq中要创建一个交换机(fanout)和两个队列,两个队列要绑定到交换机
         *
         * @return
         */
        @RequestMapping("/publish")
        public String publish() {
            for (int i = 0; i < 10; i++) {
                amqpTemplate.convertAndSend("publish_exchange", "", "发送消息" + i);
            }
            return "ok";
        }
    
        // ****************************************************************
    
        /**
         * 路由模式:具有routing_key的将同时接收到消息(完全匹配)
         * 在rabbitmq创建1个交换机(direct)和两个队列,两个队列绑定到交换机,并且配置routing_key,根据routing_key发送消息到队列
         *
         * @return
         */
        @RequestMapping("/routing1")
        public String routing1() {
            amqpTemplate.convertAndSend("routing_exchange", "routing_key1", "发送消息+routing_key1");
            return "ok";
        }
    
        @RequestMapping("/routing2")
        public String routing2() {
            amqpTemplate.convertAndSend("routing_exchange", "routing_key2", "发送消息+routing_key2");
            return "ok";
        }
    
        // ****************************************************************
    
        /**
         * topic模式:根据routing_key匹配的队列将同时接收到消息(通配符匹配)
         * rabbitmq配置一个交换机(topic)和两个队列,两个队列绑定交换机,并配置通配routing_key
         *
         * @return
         */
        @RequestMapping("/topic1")
        public String topic1() {
            amqpTemplate.convertAndSend("topic_exchange", "key1", "发送消息+key1");
            return "ok";
        }
    
        @RequestMapping("/topic2")
        public String topic2() {
            amqpTemplate.convertAndSend("topic_exchange", "topic", "发送消息+topic");
            return "ok";
        }
    
        // ****************************************************************
    
        /**
         * 需要开启延时插件功能,arguments配置{"x-delayed-type"="topic(公共模式publish、路由模式direct、通配符模式topic)"}
         * rabbitmq创建一个交换机(x-delayed-message)和一个队列,队列绑定到交换机,routing_key根据arguments配置的模式进行配置
         *
         * @return
         */
        @RequestMapping("/delayed")
        public String delayed() {
            amqpTemplate.convertAndSend("delayed_exchange", "delayed_key", "发送消息+delayed", new MessagePostProcessor() {
                @Override
                public Message postProcessMessage(Message message) throws AmqpException {
                    System.out.println("发送时间:" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
                    message.getMessageProperties().setDelay(5000);
                    return message;
                }
            });
            return "ok";
        }
    }

    七、管理界面截图

      1、队列

      2、交换机

      3、Routing Key

      4、用户

      5、虚拟主机

  • 相关阅读:
    nodeJS实现完整文件夹结构压缩
    chrome浏览器插件开发
    让用户端JS触发F11全屏
    inline-block和float的共性和区别
    安家落户
    ActiveMQ简单实现之一对一生产和消费
    Centos下 修改mysql密码
    虚拟机centos7 安装was和ihs
    webservice简单实现
    Centos7安装mysql
  • 原文地址:https://www.cnblogs.com/linding/p/13728843.html
Copyright © 2011-2022 走看看