zoukankan      html  css  js  c++  java
  • springboot整合activeMQ

      消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。消息形式支持点对点和订阅-发布。

    ActiveMQ是什么
        1、ActiveMQ是消息队列技术,为解决高并发问题而生
        2、ActiveMQ生产者消费者模型(生产者和消费者可以跨平台、跨系统)
        3、ActiveMQ支持如下两种消息传输方式

        • 点对点模式,生产者生产了一个消息,只能由一个消费者进行消费

        • 发布/订阅模式,生产者生产了一个消息,可以由多个消费者进行消费

    1、安装activeMQ

    • 下载地址
    • 解压
    • 进入解压后的目录运行 ./bin/activemq start
    • 启动后activemq会启动两个端口:
      • 8161是activemq的管理页面,默认的账号密码都是admin
      • 61616是程序连接activemq的通讯地址

    2、项目结构

    3、引入依赖

           <!-- activemq依赖 -->
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-activemq</artifactId>
            </dependency>
            
            <!--消息队列连接池-->
            <!-- 使用springboot2.0+及以下版本时候 -->
            <!-- <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                <version>5.15.0</version>
            </dependency> -->
            <!-- 使用springboot2.1+时候 -->
            <dependency>
                <groupId>org.messaginghub</groupId>
                <artifactId>pooled-jms</artifactId>
            </dependency>

    3、修改application.properties

    #服务端口号
    server.port=8080
    server.servlet.context-path=/activemqsb
    
    #activemq配置
    #ActiveMQ通讯地址
    spring.activemq.broker-url=tcp://localhost:61616
    #用户名
    spring.activemq.user=admin
    #密码
    spring.activemq.password=admin
    #是否启用内存模式(就是不安装MQ,项目启动时同时启动一个MQ实例)
    spring.activemq.in-memory=false
    #信任所有的包
    spring.activemq.packages.trust-all=true
    #是否替换默认的连接池,使用ActiveMQ的连接池需引入的依赖
    spring.activemq.pool.enabled=false

    4、配置activeMQ

    @Configuration
    @EnableJms
    public class ActiveMQConfig {
        @Bean
        public Queue queue() {
            return new ActiveMQQueue("springboot.queue") ;
        }
    
        //springboot默认只配置queue类型消息,如果要使用topic类型的消息,则需要配置该bean
        @Bean
        public JmsListenerContainerFactory jmsTopicListenerContainerFactory(ConnectionFactory connectionFactory){
            DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            //这里必须设置为true,false则表示是queue类型
            factory.setPubSubDomain(true);
            return factory;
        }
    
        @Bean
        public Topic topic() {
            return new ActiveMQTopic("springboot.topic") ;
        }
    }

    5、创建消费者

    @Service
    //消费者
    public class Consumer {
        //接收queue类型消息
        //destination对应配置类中ActiveMQQueue("springboot.queue")设置的名字
        @JmsListener(destination="springboot.queue")
        public void ListenQueue(String msg){
            System.out.println("接收到queue消息:" + msg);
        }
    
        //接收topic类型消息
        //destination对应配置类中ActiveMQTopic("springboot.topic")设置的名字
        //containerFactory对应配置类中注册JmsListenerContainerFactory的bean名称
        @JmsListener(destination="springboot.topic", containerFactory = "jmsTopicListenerContainerFactory")
        public void ListenTopic(String msg){
            System.out.println("接收到topic消息:" + msg);
        }
    }

    6、创建生产者

    @RestController
    //生产者
    public class Producer {
        
        @Autowired
        private JmsMessagingTemplate jmsTemplate;
    
        @Autowired
        private Queue queue;
    
        @Autowired
        private Topic topic;
    
        //发送queue类型消息
        @GetMapping("/queue")
        public void sendQueueMsg(String msg){
            jmsTemplate.convertAndSend(queue, msg);
        }
    
        //发送topic类型消息
        @GetMapping("/topic")
        public void sendTopicMsg(String msg){
            jmsTemplate.convertAndSend(topic, msg);
        }
    }

    7、启动类

    @SpringBootApplication
    @EnableJms //启动消息队列
    public class ActivemqsbApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(ActivemqsbApplication.class, args);
        }
    
    }

    8、启动程序测试

    • 浏览器中输入http://localhost:8080/activemqsb/queue?msg=hello

      控制台:接收到queue消息:hello

    • 浏览器中输入http://localhost:8080/activemqsb/topic?msg=hello
      控制台:接收到topic消息:hello

    参考链接:
  • 相关阅读:
    HDU 1058 Humble Numbers
    HDU 1160 FatMouse's Speed
    HDU 1087 Super Jumping! Jumping! Jumping!
    HDU 1003 Max Sum
    HDU 1297 Children’s Queue
    UVA1584环状序列 Circular Sequence
    UVA442 矩阵链乘 Matrix Chain Multiplication
    DjangoModels修改后出现You are trying to add a non-nullable field 'download' to book without a default; we can't do that (the database needs something to populate existing rows). Please select a fix:
    opencv做的简单播放器
    c++文件流输入输出
  • 原文地址:https://www.cnblogs.com/qiantao/p/12846573.html
Copyright © 2011-2022 走看看