zoukankan      html  css  js  c++  java
  • springboot整合activeMQ

      消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。消息形式支持点对点和订阅-发布。

    ActiveMQ是什么
        1、ActiveMQ是消息队列技术,为解决高并发问题而生
        2、ActiveMQ生产者消费者模型(生产者和消费者可以跨平台、跨系统)
        3、ActiveMQ支持如下两种消息传输方式

        • 点对点模式,生产者生产了一个消息,只能由一个消费者进行消费

        • 发布/订阅模式,生产者生产了一个消息,可以由多个消费者进行消费

    1、安装activeMQ

    • 下载地址
    • 解压
    • 进入解压后的目录运行 ./bin/activemq start
    • 启动后activemq会启动两个端口:
      • 8161是activemq的管理页面,默认的账号密码都是admin
      • 61616是程序连接activemq的通讯地址

    2、项目结构

    3、引入依赖

           <!-- activemq依赖 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
            
            <!--消息队列连接池-->
            <!-- 使用springboot2.0+及以下版本时候 -->
            <!-- <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                <version>5.15.0</version>
            </dependency> -->
            <!-- 使用springboot2.1+时候 -->
            <dependency>
                <groupId>org.messaginghub</groupId>
                <artifactId>pooled-jms</artifactId>
            </dependency>

    3、修改application.properties

    #服务端口号
    server.port=8080
    server.servlet.context-path=/activemqsb
    
    #activemq配置
    #ActiveMQ通讯地址
    spring.activemq.broker-url=tcp://localhost:61616
    #用户名
    spring.activemq.user=admin
    #密码
    spring.activemq.password=admin
    #是否启用内存模式(就是不安装MQ,项目启动时同时启动一个MQ实例)
    spring.activemq.in-memory=false
    #信任所有的包
    spring.activemq.packages.trust-all=true
    #是否替换默认的连接池,使用ActiveMQ的连接池需引入的依赖
    spring.activemq.pool.enabled=false

    4、配置activeMQ

    @Configuration
    @EnableJms
    public class ActiveMQConfig {
        @Bean
        public Queue queue() {
            return new ActiveMQQueue("springboot.queue") ;
        }
    
        //springboot默认只配置queue类型消息,如果要使用topic类型的消息,则需要配置该bean
        @Bean
        public JmsListenerContainerFactory jmsTopicListenerContainerFactory(ConnectionFactory connectionFactory){
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            //这里必须设置为true,false则表示是queue类型
            factory.setPubSubDomain(true);
            return factory;
        }
    
        @Bean
        public Topic topic() {
            return new ActiveMQTopic("springboot.topic") ;
        }
    }

    5、创建消费者

    @Service
    //消费者
    public class Consumer {
        //接收queue类型消息
        //destination对应配置类中ActiveMQQueue("springboot.queue")设置的名字
        @JmsListener(destination="springboot.queue")
        public void ListenQueue(String msg){
            System.out.println("接收到queue消息:" + msg);
        }
    
        //接收topic类型消息
        //destination对应配置类中ActiveMQTopic("springboot.topic")设置的名字
        //containerFactory对应配置类中注册JmsListenerContainerFactory的bean名称
        @JmsListener(destination="springboot.topic", containerFactory = "jmsTopicListenerContainerFactory")
        public void ListenTopic(String msg){
            System.out.println("接收到topic消息:" + msg);
        }
    }

    6、创建生产者

    @RestController
    //生产者
    public class Producer {
        
        @Autowired
        private JmsMessagingTemplate jmsTemplate;
    
        @Autowired
        private Queue queue;
    
        @Autowired
        private Topic topic;
    
        //发送queue类型消息
        @GetMapping("/queue")
        public void sendQueueMsg(String msg){
            jmsTemplate.convertAndSend(queue, msg);
        }
    
        //发送topic类型消息
        @GetMapping("/topic")
        public void sendTopicMsg(String msg){
            jmsTemplate.convertAndSend(topic, msg);
        }
    }

    7、启动类

    @SpringBootApplication
    @EnableJms //启动消息队列
    public class ActivemqsbApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(ActivemqsbApplication.class, args);
        }
    
    }

    8、启动程序测试

    • 浏览器中输入http://localhost:8080/activemqsb/queue?msg=hello

      控制台:接收到queue消息:hello

    • 浏览器中输入http://localhost:8080/activemqsb/topic?msg=hello
      控制台:接收到topic消息:hello

    参考链接:
  • 相关阅读:
    IE无法打开internet网站已终止操作的解决的方法
    欢乐暑假-高校俱乐部暑期线上编程竞赛奖励机制
    pojAGTC(LCS,DP)
    ExtJs自学教程(1):一切从API開始
    Java实现BASE64编解码
    IOS框架概览
    DeviceIoControl的使用说明
    strcpy_s与strcpy的比較
    第三章 引擎的核心:渲染框架
    具体解释VB中连接access数据库的几种方法
  • 原文地址:https://www.cnblogs.com/qiantao/p/12846573.html
Copyright © 2011-2022 走看看