zoukankan      html  css  js  c++  java
  • rabbitMq完整通信(二)---consumer

    application.properties:

    server.port=8090
    spring.application.name=consumer
    spring.rabbitmq.host=127.0.0.1
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    spring.rabbitmq.listener.simple.concurrency: 5

    创建队列和交换器,并进行绑定:

    package com..sender;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.core.TopicExchange;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    //配置类,随系统启动时,创建交换器和队列
    @Configuration
    public class TopicConf {
    
            @Bean(name="orderMessage")
            public Queue queueMessage() {
                //系统启动时:创建一个topic.orderReceive的队列到rabbitMQ
                return new Queue("topic.orderReceive");
            }
    
            @Bean(name="exchange")
            public TopicExchange exchange() {
                //系统启动时:创建一个exchange的交换机到rabbitMQ
                return new TopicExchange("exchange");
            }
    
            @Bean
            Binding bindingExchangeMessage(@Qualifier("orderMessage") Queue queueMessage, TopicExchange exchange) {
                //将交换器与指定的队列绑定起来
                    System.out.println("#将交换器与指定的队列绑定起来   "+queueMessage+"    "+exchange);
                return BindingBuilder.bind(queueMessage).to(exchange).with("topic.orderReceive");
            }
    
           /* @Bean
            Binding bindingExchangeMessages(@Qualifier("messages") Queue queueMessages, TopicExchange exchange) {
                return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");//*表示一个词,#表示零个或多个词
            }*/
    }

    controller:

    package com..controller;
    
    import com..sender.RabbitSender;
    import com..service.impl.OrderServiceImpl;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestParam;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class Controller {
    
        @Autowired
        private RabbitSender sender; 
        
        //对外开放的接口,地址为:http://127.0.0.1:8090/queryOrderInfo?orderId=123456
        @RequestMapping("/queryOrderInfo")
        public String queryOrderInfo(@RequestParam(required = false) String orderId){
            try {
                //调用业务代码
                 sender.send("topic.orderReceive",orderId);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
        @RequestMapping("/queryOrderInfo.do")
        public String Server(){
    
            String keys = "keys";
            rabbitTemplate.convertAndSend("exchange","topic.orderReceive",keys);
            return "true";
        }
    }

    服务端定义队列发送的方法:

    package com..sender;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class RabbitSender {
        //注入AmqpTemplate
        @Autowired
        private AmqpTemplate template;
    
        public void send(String queueName,String orderInfo) {
            //由AmqpTemplate将数据发送到指定的队列
            System.out.println("send方法两个参数:"+queueName+" / "+orderInfo);
            template.convertAndSend(queueName,orderInfo);
        }    
    }

    定义一个接口 和 实现类::

    package com..service;
    
    import org.springframework.stereotype.Service;
    
    @Service
    public interface OrderService {
        String queryOrderInfo(String orderId) throws Exception;
    }
    
    
    
    package com..service.impl;
    
    import com..dao.OrderDao;
    import com..model.Order;
    import com..sender.RabbitSender;
    import com..service.OrderService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    @Service
    public class OrderServiceImpl implements OrderService {
    
        @Autowired
        private RabbitSender sender;
    
            public String queryOrderInfo(String orderid) {
                sender.send("orderMessage",orderid);
                return orderid+" ###进入 orderMessage 队列~~~~~";
            }
    }

    实现接收信息的处理类,由此类从MQ取相关的数据:

    package com..business;
    
    import java.util.Map;
    import java.util.UUID;
    import com..sender.RabbitSender;
    import com..service.OrderService;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class OrderBusiness {
        @Autowired
        private OrderService orderService;
        @Autowired
        private RabbitSender rabbitSender;//将处理结果发送数据到队列
        
        //监听器监听指定的Queue
        @RabbitListener(queues="queue")    
        public void processC(String orderId) {
            System.out.println("%监听queue 队列取到的 orderId===:"+orderId);
            try {
                orderService.queryOrderInfo(orderId);
                rabbitSender.send("topic.orderReceive", orderId.toString()+" &&从queue进入队列~~~~~");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //监听指定queueObject队列,获取的数据为Map对象
        @RabbitListener(queues="queueObject")   
        public void process1(Map user) {    
            System.out.println("%%监听queueObject 队列取到的 user===:"+user);
            rabbitSender.send("topic.orderReceive", user.toString()+" &&从queueObject进入队列~~~~~");
        }
        //监听指定的topic.order队列,当此队列有数据时,数据就会被取走
        @RabbitListener(queues="topic.order")    
        public void process1(String orderId) {
            System.out.println("监听topic.order 队列取到的 orderId :"+orderId);
            try {
                //业务代码
                UUID id = UUID.randomUUID();
                System.out.println("并由AmqpTemplate 发往 topic.orderReceive:"+id);
                rabbitSender.send("topic.orderReceive", id.toString()+" &&从topic.order进入队列~~~~~");
            } catch (Exception e) {
                e.printStackTrace();
            }       
        }
    }

    POM:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <artifactId>consumer</artifactId>
        <packaging>jar</packaging>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>1.5.21.RELEASE</version>
        </parent>
        <properties>
            <java.version>1.8</java.version>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
    
        <dependencies>
            <!--mybatis -->
            <dependency>
                <groupId>org.mybatis.spring.boot</groupId>
                <artifactId>mybatis-spring-boot-starter</artifactId>
                <version>RELEASE</version>
            </dependency>
    
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
            </dependency>
    
    
            <!--数据库驱动 -->
            <dependency>
                <groupId>com.oracle</groupId>
                <artifactId>ojdbc6</artifactId>
                <version>11.2.0.3</version>
            </dependency>
            <dependency>
                <groupId>com.oracle.tools</groupId>
                <artifactId>oracle-tools-coherence</artifactId>
                <version>1.2.2</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-jdbc</artifactId>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>druid</artifactId>
                <version>1.0.11</version>
            </dependency>
    
            <dependency>
                <groupId>com.github.miemiedev</groupId>
                <artifactId>mybatis-paginator</artifactId>
                <version>1.2.16</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-devtools</artifactId>
                <optional>true</optional>
                <scope>true</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-actuator</artifactId>
            </dependency>
            <!-- 添加springboot对amqp的支持 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-tomcat</artifactId>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.tomcat.embed</groupId>
                <artifactId>tomcat-embed-jasper</artifactId>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
        </dependencies>
    </project>
  • 相关阅读:
    牛客 小乐乐和25
    codeforces 1303 D 二进制瞎搞
    codeforces 1307 D 最短路bz+贪心
    codeforces 1316 C math
    codeforces 1328E LCA
    codeforces 1335 E2 思维
    codeforces 1335 E1 思维
    codeforces 1342 D 贪心+后缀和
    codeforces 1348D (思维+贪心)
    codeforces 1362 E 进制的性质
  • 原文地址:https://www.cnblogs.com/lifan12589/p/14309021.html
Copyright © 2011-2022 走看看