zoukankan      html  css  js  c++  java
  • 简单记录下springboot+jms+activemq

    1. 安装ActiveMQ

    到Apache官方网站下载最新的ActiveMQ的安装包,并解压到本地目录下后运行

    2. pom.xml引入  springboot配置文件中填写相关配置

     3.创建生产者

    @Component

    public class JmsProducer {

    private static final Logger LOG = LoggerFactory.getLogger(JmsProducer.class);

    @Autowired
    private JmsMessagingTemplate jmsMessagingTemplate;

    /**
    * 一对一发送
    *
    * @param queueName
    * @param message
    */
    public void send(String queueName, String message) {
    jmsMessagingTemplate.convertAndSend(queueName, message);
    }

    /**
    * 一对多发送
    *
    * @param topicName
    * @param message
    */
    public void multicast(String topicName, String message) {
    LOG.debug("发送广播消息到{},内容是: {}", topicName, message);
    Destination destination = new ActiveMQTopic(topicName);
    jmsMessagingTemplate.convertAndSend(destination, message);
    }
    }


    4.创建消费者
    @Component
    @Profile({"test", "prod"})
    @Slf4j
    public class PushConsumer {

    @Autowired
    PushServiceImpl pushService;

    /**
    * 聊天消息推送
    * @param message 消息体
    */
    @JmsListener(destination = MqConfig.PUSH_NAME, containerFactory = "jmsListenerContainerTopic")
    public void push(String message){
    log.debug("从消息队列{}中收到消息:{}",MqConfig.PUSH_NAME,message);
    try {
    MessageModel messageModel = JSONArray.parseObject(message,MessageModel.class);
    pushService.push(messageModel.getEnumDeviceType(),
    messageModel.getTitle(),
    messageModel.getMessage(),
    messageModel.getAliasList(),
    messageModel.getParm());
    }catch (Exception e){
    log.error("推送失败");
    throw new BusinessException("推送失败",e);
    }
    }
    }

    5.mqConfig
    @Configuration
    public class MqConfig {

    public final static String PUSH_NAME = "topic.push.name";
    //需要给topic定义独立的JmsListenerContainer

    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
    DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
    bean.setPubSubDomain(true);
    bean.setConnectionFactory(activeMQConnectionFactory);
    return bean;
    }

    // queue模式的ListenerContainer
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerQueue(ConnectionFactory activeMQConnectionFactory) {
    DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
    bean.setConnectionFactory(activeMQConnectionFactory);
    return bean;
    }
    }
    6.编写测试类


  • 相关阅读:
    idea 快捷键
    上传代码
    maven 打包
    mysql 通过测试'for update',深入了解行锁、表锁、索引
    mysql中,手动提交事务
    java 发送邮件
    zk脑裂
    malloc,free和new,delete之间的区别
    sizeof和strlen区别
    字符串常量问题
  • 原文地址:https://www.cnblogs.com/xiaokangk/p/11663194.html
Copyright © 2011-2022 走看看