zoukankan      html  css  js  c++  java
  • springboot rabbitmq消息同步用作接口调用

    1、引入依赖

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>

    2、配置文件

    rabbitmq:
    addresses: 10.0.0.203
    port: 5672
    username: root
    password: 123456
    virtual-host: /
    listener:
    simple:
    concurrency: 10
    max-concurrency: 10
    prefetch: 1
    auto-startup: true
    default-requeue-rejected: true
    template:
    retry:
    enabled: true
    initial-interval: 1000
    max-attempts: 3
    max-interval: 10000
    multiplier: 1

    2、注解配置

    package dhht.seal.hn.gsgate.rabbitmq;

    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitAdmin;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;

    /**
    * @Author: sh
    * @Description: RabbitMqConfig
    * @Date: 10:33 2019/11/4
    */
    @Configuration
    public class RabbitMqConfig {

    public static final String QUEUE = "hnyz_gs_queue";

    @Value("${spring.rabbitmq.addresses}")
    private String host;

    @Value("${spring.rabbitmq.port}")
    private int port;

    @Value("${spring.rabbitmq.username}")
    private String username;

    @Value("${spring.rabbitmq.password}")
    private String password;

    @Value("${spring.rabbitmq.virtual-host}")
    private String virtualHost;

    public static final String GS_EXCHANGE ="gs_exchange";

    /**
    * 处理连接端口
    * @return
    */
    @Bean
    public CachingConnectionFactory connectionFactory() {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
    connectionFactory.setUsername(username);
    connectionFactory.setPassword(password);
    connectionFactory.setVirtualHost(virtualHost);
    connectionFactory.setPublisherConfirms(true);
    return connectionFactory;
    }

    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
    RabbitAdmin admin = new RabbitAdmin(connectionFactory);
    admin.setAutoStartup(true);
    return admin;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
    RabbitTemplate template =new RabbitTemplate(connectionFactory);
    template.setUseDirectReplyToContainer(false);
    template.setReplyTimeout(-1);
    return template;
    }

    @Bean
    public Queue cretaeQueue(){
    return new Queue(QUEUE,true);
    }

    @Bean(name="gs_exchange")
    public FanoutExchange getGsExchange() {
    return new FanoutExchange("gs_exchange", true, false, null);
    }

    @Bean
    public Binding getFauoutBinding() {
    return BindingBuilder.bind(cretaeQueue()).to(getGsExchange());
    }
    }

    package dhht.seal.hn.gsgate.rabbitmq;

    import com.alibaba.fastjson.JSON;
    import dhht.seal.hn.gsgate.model.pojo.CropQueryVO;
    import dhht.seal.hn.gsgate.service.CropQueryService;
    import dhht.seal.hn.gsgate.service.impl.CropServiceImpl;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.core.ParameterizedTypeReference;
    import org.springframework.stereotype.Service;

    import javax.annotation.Resource;
    import java.io.UnsupportedEncodingException;
    import java.lang.reflect.ParameterizedType;

    /**
    * @Author: sh
    * @Description: MqSender
    * @Date: 10:34 2019/11/4
    */
    @Slf4j
    @Service
    public class MqSenderService {

    @Autowired
    AmqpTemplate amqpTemplate;

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Resource
    private CropServiceImpl cropImplService;

    public String sendMsgToQueue(Object message) {
    try {
    String msg = beanToString(message);
    log.info("【发送的消息-社会信用代码】:" + msg);
    Message mm = rabbitTemplate.sendAndReceive(RabbitMqConfig.QUEUE, new Message(msg.getBytes("UTF-8"),new MessageProperties()));
    String msgResult = new String(mm.getBody());
    log.info("【同步消息返回结果-msgResult】:"+msgResult);
    CropQueryVO cropQueryVO = cropImplService.cropQueryVO(msg);
    log.info("【消息发送后-sql查询结果-CropQueryVO】:"+JSON.toJSONString(cropQueryVO));
    return JSON.toJSONString(cropQueryVO);
    //amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
    //ParameterizedTypeReference param = new ParameterizedTypeReference<Object>(){};
    //return amqpTemplate.convertSendAndReceiveAsType(RabbitMqConfig.QUEUE, beanToString(message),param).toString();
    //return amqpTemplate.convertSendAndReceive(RabbitMqConfig.QUEUE, message).toString();
    } catch (UnsupportedEncodingException e) {
    log.info(e.getMessage());
    return null;
    }

    }

    public void sendMsg(Object message) {
    String msg = beanToString(message);
    log.info("send message:" + msg);
    amqpTemplate.convertAndSend(RabbitMqConfig.QUEUE, msg);
    log.info("sendMsg()---消息发送成功!");

    }

    public static <T> String beanToString(T value) {
    if (value == null) {
    return null;
    }
    Class<?> clazz = value.getClass();
    if (clazz == int.class || clazz == Integer.class) {
    return "" + value;
    } else if (clazz == String.class) {
    return (String) value;
    } else if (clazz == long.class || clazz == Long.class) {
    return "" + value;
    } else {
    return JSON.toJSONString(value);
    }
    }

    }

    package dhht.seal.hn.gsgate.rabbitmq;

    import dhht.seal.hn.gsgate.service.CropQueryService;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.messaging.handler.annotation.SendTo;
    import org.springframework.stereotype.Service;

    import javax.annotation.Resource;
    import java.io.UnsupportedEncodingException;

    /**
    * @Author: sh
    * @Description: MqReceiver
    * @Date: 10:40 2019/11/4
    */
    @Service
    public class MqReceiverService {

    private static Logger log = LoggerFactory.getLogger(MqReceiverService.class);

    @Resource
    private CropQueryService cropQueryService;

    @RabbitListener(queues = RabbitMqConfig.QUEUE)
    @SendTo(RabbitMqConfig.QUEUE)
    public String receiveQueueMsg(/*String message*/Message message) {
    try {
    log.info("接收到队列消息:" + new String(message.getBody()));
    // 业务处理代码,工商拉取入库
    String resJson = cropQueryService.crropQuery(new String(message.getBody(),"UTF-8"));
    return resJson;
    } catch (UnsupportedEncodingException e) {
    log.error(e.getMessage());
    return null;
    }
    }
    }






  • 相关阅读:
    iOS开发--UIPickerView(选择器控件) 省份和城市的做法
    UITableView左滑设置更多的按钮
    UITableView的增,删,改例子
    UITableView的简单用法
    Block传值原理
    UIToolbar的简单用法
    用UIScrollView,UIPageControl来实现滚动视图。
    用UIPickerView来显示省和市
    如何设计好的UI控件
    UITextfield属性用法
  • 原文地址:https://www.cnblogs.com/sung1024/p/11944915.html
Copyright © 2011-2022 走看看