zoukankan      html  css  js  c++  java
  • spring整合rabbitmq

    1、依赖
    <!-- https://mvnrepository.com/artifact/org.springframework.amqp/spring-rabbit -->
    <dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
    <version>1.7.4.RELEASE</version>
    </dependency>
    2、初始化配置

    #RabbitMQ消息队列
    rabbitmq.host=10.0.0.236
    rabbitmq.port=5672
    rabbitmq.username=root
    rabbitmq.password=123456


    package com.lv.qggz.man.mq;

    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.beans.factory.config.ConfigurableBeanFactory;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Scope;

    import java.util.HashMap;
    import java.util.Map;

    /**
    * @Author: sh
    * @Description: RabbitConfig
    * @Date: 14:46 2019/11/5
    */
    @Configuration
    public class RabbitConfig {

    @Value("${rabbitmq.host}")
    private String host;

    @Value("${rabbitmq.port}")
    private int port;

    @Value("${rabbitmq.username}")
    private String username;

    @Value("${rabbitmq.password}")
    private String password;


    public static final String QUEUE = "gs_queue";

    public static final int ALIVETIME = 50000;

    @Bean
    public CachingConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost("/");
    connectionFactory.setPublisherConfirms(true);
    return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    return template;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public AmqpTemplate myMqpTemplate() {
    RabbitTemplate template = new RabbitTemplate(connectionFactory());
    return template;
    }

    @Bean
    public Queue cretaeQueue(){
    // return new Queue(QUEUE,true);
    Map<String, Object> argMap = new HashMap<>();
    // 设置消息存活时间
    argMap.put("x-message-ttl",ALIVETIME);
    return new Queue(QUEUE, true, false, false, argMap);

    }
    }
    3、生产
    package com.lv.qggz.man.mq;

    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;

    import javax.annotation.Resource;

    /**
    * @Author: sh
    * @Description: MqSender
    * @Date: 10:34 2019/11/4
    */
    @Slf4j
    @Service("mqSenderService")
    public class MqSenderService {

    @Resource
    AmqpTemplate myMqpTemplate;

    public String sendMsgToQueue(Object message) {
    try {
    log.info("sendMsgToQueue--messgae:" + message);
    //amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
    return myMqpTemplate.convertSendAndReceive(RabbitConfig.QUEUE, message).toString();
    } catch (AmqpException e) {
    return null;
    }

    }

    public void sendMsg(Object message) {
    log.info("send message:" + message);
    myMqpTemplate.convertAndSend(RabbitConfig.QUEUE, message);
    log.info("sendMsg()---消息发送成功!");

    }

    }
    4、消费
    package dhht.seal.hn.gsgate.rabbitmq;

    import dhht.seal.hn.gsgate.service.CropQueryService;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.messaging.handler.annotation.SendTo;
    import org.springframework.stereotype.Service;

    import javax.annotation.Resource;

    /**
    * @Author: sh
    * @Description: MqReceiver
    * @Date: 10:40 2019/11/4
    */
    @Service
    public class MqReceiverService {

    private static Logger log = LoggerFactory.getLogger(MqReceiverService.class);

    @Resource
    private CropQueryService cropQueryService;

    @RabbitListener(queues = RabbitMqConfig.QUEUE)
    @SendTo(RabbitMqConfig.QUEUE)
    public String receiveQueueMsg(String message) {
    log.info("接收到队列消息:" + message);
    // 业务处理代码,工商拉取入库
    String resJson = cropQueryService.crropQuery(message);
    return resJson;
    }
    }

  • 相关阅读:
    Linux常见问题解决
    (转)CoreDNS:Kubernetes内部域名解析原理、弊端及优化方式
    (转)Go sync.WaitGroup的用法
    (转)5个维度对 Kubernetes 集群优化及压测方案
    使用 Alpine 作为基础镜像时可能会遇到的常见问题的解决方法
    提前预防K8s集群资源不足的处理方式配置
    docker runc升级
    Nginx常见问题解决
    DNS泛域名解析应用(nip.io)
    使用Velero备份Kubernetes集群
  • 原文地址:https://www.cnblogs.com/sung1024/p/11818104.html
Copyright © 2011-2022 走看看