zoukankan      html  css  js  c++  java
  • springboot rabbitmq快速入门上手(实用)

    1.添加依赖

    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
      <version>2.3.11.RELEASE</version>
    </dependency>

    2.添加一个直连型交换机的配置类  DirectRabbitConfig

    
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.DirectExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    /**
    * 直连型交换机 * @author 执笔coding * @version 1.0 * @date 2021/6/9 15:03 */ @Configuration public class DirectRabbitConfig { /**队列 起名:TestDirectQueue*/ @Bean public Queue TestDirectQueue() { // durable:是否持久化,默认是false,持久化队列:会被存储在磁盘上,当消息代理重启时仍然存在,暂存队列:当前连接有效 // exclusive:默认也是false,只能被当前创建的连接使用,而且当连接关闭后队列即被删除。此参考优先级高于durable // autoDelete:是否自动删除,当没有生产者或者消费者使用此队列,该队列会自动删除。 // return new Queue("TestDirectQueue",true,true,false); //一般设置一下队列的持久化就好,其余两个就是默认false return new Queue("TestDirectQueue",true); } /**Direct交换机 起名:TestDirectExchange*/ @Bean DirectExchange TestDirectExchange() { // return new DirectExchange("TestDirectExchange",true,true); return new DirectExchange("TestDirectExchange",true,false); } /**绑定 将队列和交换机绑定, 并设置用于匹配键:TestDirectRouting*/ @Bean Binding bindingDirect() { return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("TestDirectRouting"); } @Bean DirectExchange lonelyDirectExchange() { return new DirectExchange("lonelyDirectExchange"); } }

    3.新建一个controller用来测试发送消息和接收消息

    
    
    import cn.ushowtime.admin.service.message.IRabbitmqService;
    import cn.ushowtime.core.util.object.JsonUtil;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;

    import javax.annotation.Resource;
    import java.io.IOException;
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.TimeoutException;

    /**
    * @author 执笔coding * @version 1.0 * @date 2021/6/9 15:50 */ @RestController @RequestMapping("/rabbit") public class RabbitmqController { /**使用RabbitTemplate,这提供了接收/发送等等方法*/ @Resource private RabbitTemplate rabbitTemplate; @Resource private IRabbitmqService rabbitmqService; /** * rabbitmq消息发送测试 * @author songmin * @date 2021/6/9 15:07 * @return: java.lang.String */ @GetMapping("/send") public String sendDirectMessage(String msg) { String createTime = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")); Map<String,Object> map=new HashMap<>(); map.put("messageData",msg); map.put("createTime",createTime); //将消息携带绑定键值:TestDirectRouting 发送到交换机TestDirectExchange rabbitTemplate.convertAndSend("TestDirectExchange", "TestDirectRouting", JsonUtil.map2Json(map)); return "ok"; } /** * 获取rabbitmq消息 * @author songmin * @date 2021/6/9 15:55 * @return: java.lang.String */ @GetMapping("/get") public String getDirectMessage() throws IOException, TimeoutException { return rabbitmqService.getRabbitmq(); } }

    4. service和实现

    IRabbitmqService
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author 执笔coding
     * @version 1.0
     * @date 2021/6/9 15:36
     */
    public interface IRabbitmqService {
    
        /**
         * 获取消息队列消息
         * @author songmin
         * @date 2021/6/9 15:49
         * @return: java.lang.String
         */
        String getRabbitmq() throws IOException, TimeoutException;
    
    }
    RabbitmqServiceImpl
    import cn.ushowtime.admin.service.message.IRabbitmqService;
    import cn.ushowtime.core.exception.OriginalException;
    import cn.ushowtime.core.util.logger.LoggerFactory;
    import cn.ushowtime.core.util.logger.ShowLogger;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.GetResponse;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Service;
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @author 执笔coding
     * @version 1.0
     * @date 2021/6/9 15:39
     */
    @Service("rabbitmqService")
    public class RabbitmqServiceImpl implements IRabbitmqService {
    
        @Value("${spring.rabbitmq.host}")
        String host;
    
        @Value("${spring.rabbitmq.port}")
        Integer port;
    
        @Value("${spring.rabbitmq.username}")
        String username;
    
        @Value("${spring.rabbitmq.password}")
        String password;
    
        private final static String QUEUE_NAME = "TestDirectQueue";
    
        private static final ShowLogger logger = LoggerFactory.getLogger(RabbitmqServiceImpl.class);
    
    
        @Override
        public String getRabbitmq() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            //设置登录账号
            factory.setHost(host);
            factory.setPort(port);
            factory.setUsername(username);
            factory.setPassword(password);
            //链接服务器
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            //定义一个队列
            //持久化
            boolean duiable=true;
            //排他队列
            boolean exclusive = false;
            //没有consumer时,队列是否自动删除
            boolean autoDelete=false;
            channel.queueDeclare(QUEUE_NAME, duiable, exclusive, autoDelete, null);
    
            String message ="";
            GetResponse resp ;
                resp = channel.basicGet(QUEUE_NAME, true);
                if(resp==null){
                    throw new OriginalException("200",QUEUE_NAME+" 队列无消息");
                }else {
                    message = new String(resp.getBody(),"utf-8");
                    logger.info("取到消息 {}",message);
                }
            return message;
        }
    
    }

    5.测试

    5.1 发送消息

    5.2 接收消息

     

     6. 消息监听自动接收

    import cn.ushowtime.core.util.logger.LoggerFactory;
    import cn.ushowtime.core.util.logger.ShowLogger;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    /**
     * rabbitmq消息监听接收类
     * @author 执笔coding
     * @version 1.0
     * @date 2021/6/9 15:12
     */
    @Component
    @RabbitListener(queues = "TestDirectQueue")
    public class DirectReceiver {
    
        private static final ShowLogger logger = LoggerFactory.getLogger(DirectReceiver.class);
    
        @RabbitHandler
        public void process(Map testMessage) {
            logger.info("DirectReceiver消费者收到消息  : {}" ,testMessage);
        }
    }

    这个类可以在消息发送之后就立即监听到消息,并获取到。

  • 相关阅读:
    天梯赛 社交集群(并查集)
    蓝桥杯 正则问题(dfs)
    天梯赛L3-001. 凑零钱(01背包记录物品)
    天梯赛/PAT 二叉树总结
    GPLT天梯赛 L2-022. 重排链表
    蓝桥杯 2的次幂表示(递归)
    排列与组合的一些定理
    卡特兰数
    洛谷P1349 广义斐波那契数列(矩阵快速幂)
    Manacher's Algorithm 马拉车算法(最长回文串)
  • 原文地址:https://www.cnblogs.com/ushowtime/p/14867578.html
Copyright © 2011-2022 走看看