zoukankan      html  css  js  c++  java
  • Springboot简单整合Rabbit

    两个项目。分别是生产者和消费者项目

    。首先引入依赖。两边pom都一样

    第一次练习,启动生产者后,再启动消费者,一直报找不到 队列的声明。

    后排查发现是  需要现在生产者这边浏览器访问一次生产消息的方法,以让交换机和队列在rabbit服务器生成。

    因为交换机的生成属于懒加载。不发送消息是不生成交换机的。所以开始直接启动消费者会报错 找不到队列的声明

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.kf</groupId>
      <artifactId>consumer-springboot</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      
      
      <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.0.0.RELEASE</version>
        </parent>
      <dependencies>
    
            <!-- springboot-web组件 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
            <!-- 添加springboot对amqp的支持 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
            </dependency>
            <!--fastjson -->
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.49</version>
            </dependency>
        </dependencies>
    </project>

    生产者这边写入Fanout的配置

    package com.kf.conf;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.stereotype.Component;
    
    
    /**
     * Fanout配置
     * @author kf
     *
     */
    @Component
    public class FanoutConf {
        /**
         * fanout没有路由键
         * 1:定义队列名,交换机名
         * 2:注册队列,交换机
         * 3:绑定队列和交换机
         */
        
        private String QUEUE_SMS = "QUEUE_SMS";
        
        private String QUEUE_EMAIL = "QUEUE_EMAIL";
        
        private String FANOUTEXCHANGE = "FANOUTEXCHANGE";
        
        //注册短信队列
        @Bean
        public Queue queueSms(){
            return new Queue(QUEUE_SMS);
        }
        //注册邮件队列
        @Bean
        public Queue queueEmail(){
            return new Queue(QUEUE_EMAIL);
        }
        
        //注册交换机
        @Bean
        public FanoutExchange fanoutExchange(){
            return new FanoutExchange(FANOUTEXCHANGE);
        }
        
        //绑定队列和交换机
        /**
         * 
         * @param queueSms  此参数用的是上方定义队列的方法名,注册Bean时,默认方法名为该Bean的id
         * @param fanoutExchange 此参数用的是上方定义队列的方法名,注册Bean时,默认方法名为该Bean的id
         * @return
         */
        @Bean
        public Binding bindgFanoutExchangeSms(Queue queueSms, FanoutExchange fanoutExchange){
            return BindingBuilder.bind(queueSms).to(fanoutExchange);
        }
        
        @Bean
        public Binding bindgFanoutExchangeEmail(Queue queueEmail, FanoutExchange fanoutExchange){
            return BindingBuilder.bind(queueEmail).to(fanoutExchange);
        }
        
    }

     创建消息发送的templet:

    package com.kf.producer;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class FanoutProducer {
        
        @Autowired
        private AmqpTemplate amqpTemplate;
        
        public void sendMes(String queueName){
            System.out.println("队列名:"+queueName);
            String msg = "此处是fanout的消息体";
            amqpTemplate.convertAndSend(queueName, msg);
        }
    
    }

    创建controller:

    package com.kf.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.kf.producer.FanoutProducer;
    
    @RestController
    public class ProducerController {
        @Autowired
        private FanoutProducer fanoutProducer;
        
        @RequestMapping("/sendFanout")
        public String sendFanoutMessage(String queueName){
            fanoutProducer.sendMes(queueName);
            return "success";
        }
    
    }

    创建启动:

    package com.kf;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class ProducerMainClass {
        public static void main(String[] args) {
            SpringApplication.run(ProducerMainClass.class, args);
        }
    
    }

    引入rabbit的链接配置

    spring:
      rabbitmq:
      ####ip
        host: 127.0.0.1
       ####端口  
        port: 5672
       ####用户名
        username: guest
       ####密码
        password: guest
       ### 虚拟主机
        virtual-host: /kf

    消费者工程:

    package com.kf.consumer.sms;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    @RabbitListener(queues = "QUEUE_SMS")
    public class SmsConsumer {
        
        //监听队列消息注解
        @RabbitHandler
        public void accept(String msg){
            System.out.println("短信消费者接收消息:"+msg);
            
        } 
    
    }
    package com.kf.consumer.email;
    
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    @Component
    @RabbitListener(queues = "QUEUE_EMAIL")
    public class EmailConsumer {
        
        //监听队列消息注解
        @RabbitHandler
        public void accept(String msg){
            System.out.println("邮件消费者接收到:"+msg);
            
        } 
    
    }

    创建启动:

    package com.kf;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    
    @SpringBootApplication
    public class ConsumerMainClass {
        public static void main(String[] args) {
            SpringApplication.run(ConsumerMainClass.class, args);
        }
    
    }

    创建链接。注意启动端口号:

    spring:
      rabbitmq:
      ####ip
        host: 127.0.0.1
       ####端口
        port: 5672
       ####用户名
        username: guest
       ####密码
        password: guest
       ### 虚拟主机
        virtual-host: /kf
    
    server:
      port: 8081
  • 相关阅读:
    泛型
    HDU 4917 Permutation
    OC本学习笔记Foundation框架NSString与NSMutableString
    HDU 5095 Linearization of the kernel functions in SVM(模拟)
    大约Java有点感悟---开发商根本上感悟学习
    Codeforces 442B Andrey and Problem(贪婪)
    mysql数据库优化课程---15、mysql优化步骤(mysql中最常用最立竿见影的优化是什么)
    mysql数据库优化课程---14、常用的sql技巧
    mysql数据库优化课程---13、mysql基础操作(mysql如何复制表)
    mysql数据库优化课程---12、mysql嵌套和链接查询(查询user表中存在的所有班级的信息?)
  • 原文地址:https://www.cnblogs.com/fuguang/p/10660664.html
Copyright © 2011-2022 走看看