zoukankan      html  css  js  c++  java
  • 【SpringBoot】息队列介绍和SpringBoot2.x整合RockketMQ、ActiveMQ

    ========================13、消息队列介绍和SpringBoot2.x整合RockketMQ、ActiveMQ =======================


    1、JMS介绍和使用场景及基础编程模型
    简介:讲解什么是小写队列,JMS的基础知识和使用场景
    1、什么是JMS: Java消息服务(Java Message Service),Java平台中关于面向消息中间件的接口

    2、JMS是一种与厂商无关的 API,用来访问消息收发系统消息,它类似于JDBC(Java Database Connectivity)。这里,JDBC 是可以用来访问许多不同关系数据库的 API

    3、使用场景:
    1)跨平台
    2)多语言
    3)多项目
    4)解耦
    5)分布式事务

    6)流量控制
    7)最终一致性
    8)RPC调用
    上下游对接,数据源变动->通知下属
    4、概念
    JMS提供者:Apache ActiveMQ、RabbitMQ、Kafka、Notify、MetaQ、RocketMQ
    JMS生产者(Message Producer)
    JMS消费者(Message Consumer)
    JMS消息
    JMS队列
    JMS主题

    JMS消息通常有两种类型:点对点(Point-to-Point)、发布/订阅(Publish/Subscribe)

    5、编程模型
    MQ中需要用的一些类
    ConnectionFactory :连接工厂,JMS 用它创建连接
    Connection :JMS 客户端到JMS Provider 的连接
    Session: 一个发送或接收消息的线程
    Destination :消息的目的地;消息发送给谁.
    MessageConsumer / MessageProducer: 消息接收者,消费者


    2、ActiveMQ5.x消息队列基础介绍和安装

    简介:介绍ActiveMQ5.x消息队列基础特性和本地快速安装
    特点:
    1)支持来自Java,C,C ++,C#,Ruby,Perl,Python,PHP的各种跨语言客户端和协议
    2)支持许多高级功能,如消息组,虚拟目标,通配符和复合目标
    3) 完全支持JMS 1.1和J2EE 1.4,支持瞬态,持久,事务和XA消息
    4) Spring支持,ActiveMQ可以轻松嵌入到Spring应用程序中,并使用Spring的XML配置机制进行配置
    5) 支持在流行的J2EE服务器(如TomEE,Geronimo,JBoss,GlassFish和WebLogic)中进行测试
    6) 使用JDBC和高性能日志支持非常快速的持久化
    ...

    1、下载地址:http://activemq.apache.org/activemq-5153-release.html
    2、快速开始:http://activemq.apache.org/getting-started.html
    3、如果我们是32位的机器,就双击win32目录下的activemq.bat,如果是64位机器,则双击win64目录下的activemq.bat
    4、bin目录里面启动 选择对应的系统版本和位数,activeMQ start 启动
    5、启动后访问路径http://127.0.0.1:8161/

    6、用户名和密码默认都是admin


    7、官方案例集合
    https://github.com/spring-projects/spring-boot/tree/master/spring-boot-samples
    面板:
    Name:队列名称。
    Number Of Pending Messages:等待消费的消息个数。
    Number Of Consumers:当前连接的消费者数目
    Messages Enqueued:进入队列的消息总个数,包括出队列的和待消费的,这个数量只增不减。
    Messages Dequeued:已经消费的消息数量。

    3、SpringBoot2.x整合ActiveMQ实战之点对点消息(p2p)

    简介:SpringBoot2.x整合ActiveMQ实战之点对点消息

    1、官网地址:https://docs.spring.io/spring-boot/docs/2.1.0.BUILD-SNAPSHOT/reference/htmlsingle/#boot-features-activemq

    2、加入依赖
    <!-- 整合消息队列ActiveMQ -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>

    <!-- 如果配置线程池则加入 -->
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    </dependency>

    3、application.properties配置文件配置
    #整合jms测试,安装在别的机器,防火墙和端口号记得开放
    spring.activemq.broker-url=tcp://127.0.0.1:61616

    #集群配置
    #spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)

    spring.activemq.user=admin
    spring.activemq.password=admin
    #下列配置要增加依赖
    spring.activemq.pool.enabled=true
    spring.activemq.pool.max-connections=100

    4、springboot启动类 @EnableJms,开启支持jms

    5、模拟请求
    localhost:8080/api/v1/order?msg=12312321321312

    6、消费者:实时监听对应的队列
    @JmsListener(destination = "order.queue")

    流程:

     pom加入依赖

    application.properties加入配置

    package com.atguigu.springboot;
    
    import org.apache.activemq.command.ActiveMQQueue;
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.web.servlet.MultipartConfigFactory;
    import org.springframework.boot.web.servlet.ServletComponentScan;
    import org.springframework.context.annotation.Bean;
    import org.springframework.jms.annotation.EnableJms;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    import javax.jms.Queue;
    import javax.servlet.MultipartConfigElement;
    
    @SpringBootApplication
    //@ServletComponentScan
    @MapperScan("com.atguigu.springboot.mapper")
    @EnableScheduling //定时任务
    @EnableAsync  //开启异步任务
    @EnableJms //开启支持jms消息
    public class SpringBoot04WebRestfulcurdApplication {
        
        public static void main(String[] args) {
            SpringApplication.run(SpringBoot04WebRestfulcurdApplication.class, args);
        }
        @Bean // @Bean configuration类注解方法 ,注册到context,添加的bean的id为方法名,交给spring进行管理 方便后续进行注入
        public Queue queue(){
            return new ActiveMQQueue("common.queue");
        }
        
        @Bean
        public MultipartConfigElement multipartConfigElement() {
            MultipartConfigFactory  factory = new MultipartConfigFactory();
    //单个文件最大
            factory.setMaxFileSize("10240KB"); //KB,MB
    /* / 设置总上传数据总大小 */
            factory.setMaxRequestSize("1024000KB");
            return factory.createMultipartConfig();
        }
        
    }
    main
    package com.atguigu.springboot.service;
    
    import javax.jms.Destination;
    
    public interface ProducerService {
        /**
         * 
         * @param destination 指定消息队列
         * @param message
         */
    public void sendMessage(Destination destination, final String message);
        
        /**
         * 功能描述:使用默认消息队列, 发送消息
         * @param message
         */
        public void sendMessage( final String message);
        
        
        /**
         * 功能描述:消息发布者
         * @param msg
         */
        public void publish(String msg);
    }
    producerServiceInterface
    package com.atguigu.springboot.service.impl;
    
    
    import javax.jms.Destination;
    import javax.jms.Queue;
    import javax.jms.Topic;
    
    
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.stereotype.Service;
    
    import com.atguigu.springboot.service.ProducerService;
    
    /**
     * 功能描述:消息生产者
     *
     * <p> 创建时间:May 2, 2018 11:29:47 PM </p> 
     *
     *@作者 小D课堂  小D
     */
    @Service
    public class ProducerServiceImpl implements ProducerService{
    
        @Autowired
        private Queue queue;
        
        @Autowired
        private JmsMessagingTemplate jmsTemplate; //用来发送消息到broker的对象
        
        public void sendMessage(final String message){
            jmsTemplate.convertAndSend(this.queue,message);
        }
        
        //发送消息,destination是发送到的队列,message是待发送的消息
        @Override
        public void sendMessage(Destination destination, String message) {
            
            jmsTemplate.convertAndSend(destination, message);
            
        }
    
        @Override
        public void publish(String msg) {
        }
    
        
    //    //发送消息,destination是发送到的队列,message是待发送的消息
    //    @Override
    //    public void sendMessage(final String message) {
    //        jmsTemplate.convertAndSend( message);
    //        
    //    }
    
        //=======发布订阅相关代码=========
        
    //    @Autowired
    //    private Topic topic;
    //    
    //    
    //     @Override
    //    public void publish(String msg) {
    //        this.jmsTemplate.convertAndSend(this.topic, msg);
    //        
    //    }
    
             
    }
    produceServiceImpl
    package com.atguigu.springboot.controller;
    
    import javax.jms.Destination;
    
    
    
    import org.apache.activemq.command.ActiveMQQueue;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.atguigu.springboot.controller.domain.JsonData;
    import com.atguigu.springboot.service.ProducerService;
    
    /**
     * 功能描述:模拟微信支付回调
     *
     * <p> 创建时间:May 3, 2018 9:53:14 PM </p> 
     *
     *@作者 小D课堂  小D
     */
    @RestController
    @RequestMapping("/api/v1")
    public class OrderController {
        
        @Autowired
        private ProducerService producerService;
        /**
         * 功能描述:微信支付回调接口
         * @param msg 支付信息
         * @return
         */
        @GetMapping("order")
        public Object order(String msg){
            
            Destination destination = new ActiveMQQueue("order.queue");
            
            producerService.sendMessage(destination, msg);
        
           return JsonData.buildSuccess();
        }
        
        
        
        @GetMapping("common")
        public Object common(String msg){
            producerService.sendMessage(msg);    
           return JsonData.buildSuccess();
        }
        
    }    
        
        
        
        
        
        
        
        
        
        
    controller
    package com.atguigu.springboot.jms;
    
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    @Component
    public class OrderConsumer {
    
        @JmsListener(destination="order.queue")
        public void receiveQueue(String text){
            System.out.println("OrderConsumer收到的报文为:"+text);
        }
    }
    consumer

     http://127.0.0.1:8080/api/v1/order?msg=testsdssfs111

     

     

     


    4、SpringBoot整合ActiveMQ实战之发布订阅模式(pub/sub)
    简介:SpringBoot整合ActiveMQ实战之发布订阅模式(pub/sub),及同时支持点对点和发布订阅模型

    1、需要加入配置文件,支持发布订阅模型,默认只支持点对点
    #default point to point
    spring.jms.pub-sub-domain=true

    注意点:

    1、默认消费者并不会消费订阅发布类型的消息,这是由于springboot默认采用的是p2p模式进行消息的监听

    默认不能同时支持两种模式
    修改配置:spring.jms.pub-sub-domain=true

    2、@JmsListener如果不指定独立的containerFactory的话是只能消费queue消息  设置属性支持两种模式
    修改订阅者container:containerFactory="jmsListenerContainerTopic"

    //需要给topic定义独立的JmsListenerContainer   在main中添加
    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
    DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
    bean.setPubSubDomain(true);
    bean.setConnectionFactory(activeMQConnectionFactory);
    return bean;
    }

    在配置文件里面,注释掉 #spring.jms.pub-sub-domain=true

    支持两种模式

    #整合jms测试,安装在别的机器,防火墙和端口号记得开放
    spring.activemq.broker-url=tcp://127.0.0.1:61616
    
    #集群配置
    #spring.activemq.broker-url=failover:(tcp://localhost:61616,tcp://localhost:61617)
    
    spring.activemq.user=admin
    spring.activemq.password=admin
    #下列配置要增加依赖
    spring.activemq.pool.enabled=false
    spring.activemq.in-memory=true
    spring.activemq.pool.max-connections=100
    #default point to point
    spring.jms.pub-sub-domain=true
    application.properties

     

    package com.atguigu.springboot;
    
    import org.apache.activemq.command.ActiveMQQueue;
    import org.apache.activemq.command.ActiveMQTopic;
    import org.mybatis.spring.annotation.MapperScan;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.web.servlet.MultipartConfigFactory;
    import org.springframework.boot.web.servlet.ServletComponentScan;
    import org.springframework.context.annotation.Bean;
    import org.springframework.jms.annotation.EnableJms;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    import javax.jms.Queue;
    import javax.jms.Topic;
    import javax.servlet.MultipartConfigElement;
    
    @SpringBootApplication
    //@ServletComponentScan
    @MapperScan("com.atguigu.springboot.mapper")
    @EnableScheduling //定时任务
    @EnableAsync  //开启异步任务
    @EnableJms //开启支持jms消息
    public class SpringBoot04WebRestfulcurdApplication {
        
        public static void main(String[] args) {
            SpringApplication.run(SpringBoot04WebRestfulcurdApplication.class, args);
        }
        @Bean // @Bean configuration类注解方法 ,注册到context,添加的bean的id为方法名,交给spring进行管理 方便后续进行注入
        public Queue queue(){
            return new ActiveMQQueue("common.queue");
        }
        @Bean
        public Topic topic(){
            return new ActiveMQTopic("video.topic");
        }
        
        @Bean
        public MultipartConfigElement multipartConfigElement() {
            MultipartConfigFactory  factory = new MultipartConfigFactory();
    //单个文件最大
            factory.setMaxFileSize("10240KB"); //KB,MB
    /* / 设置总上传数据总大小 */
            factory.setMaxRequestSize("1024000KB");
            return factory.createMultipartConfig();
        }
        
    }
    main
    package com.atguigu.springboot.service;
    
    import javax.jms.Destination;
    
    public interface ProducerService {
        /**
         * 
         * @param destination 指定消息队列
         * @param message
         */
    public void sendMessage(Destination destination, final String message);
        
        /**
         * 功能描述:使用默认消息队列, 发送消息
         * @param message
         */
        public void sendMessage( final String message);
        
        
        /**
         * 功能描述:消息发布者
         * @param msg
         */
        public void publish(String msg);
    }
    producer_pulish方法 interface
    package com.atguigu.springboot.service.impl;
    
    
    import javax.jms.Destination;
    import javax.jms.Queue;
    import javax.jms.Topic;
    
    
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.stereotype.Service;
    
    import com.atguigu.springboot.service.ProducerService;
    
    /**
     * 功能描述:消息生产者
     *
     * <p> 创建时间:May 2, 2018 11:29:47 PM </p> 
     *
     *@作者 小D课堂  小D
     */
    @Service
    public class ProducerServiceImpl implements ProducerService{
    
        @Autowired
        private Queue queue;
        
        @Autowired
        private JmsMessagingTemplate jmsTemplate; //用来发送消息到broker的对象
        
        public void sendMessage(final String message){
            jmsTemplate.convertAndSend(this.queue,message);
        }
        
        //发送消息,destination是发送到的队列,message是待发送的消息
        @Override
        public void sendMessage(Destination destination, String message) {
            
            jmsTemplate.convertAndSend(destination, message);
            
        }
        
        //=======发布订阅相关代码=========
        @Autowired
        private Topic topic;
        @Override
        public void publish(String msg) {
            this.jmsTemplate.convertAndSend(this.topic,msg);
        }
    }
    producer_pulishi实现 impl
    package com.atguigu.springboot.jms;
    
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    @Component
    public class TopicConsumer {
        @JmsListener(destination="video.topic")
        public void receive1(String text){
            System.out.println("video.topic 消费者 receive1= "+text);
        }
        @JmsListener(destination="video.topic")
        public void receive2(String text){
            System.out.println("video.topic 消费者 receive2= "+text);
        }
        @JmsListener(destination="video.topic")
        public void receive3(String text){
            System.out.println("video.topic 消费者 receive3= "+text);
        }
    }
    topicComsumer

     以上代码只支持订阅者模式


    5、RocketMQ4.x消息队列介绍
    简介:阿里开源消息队列 RocketMQ4.x介绍和新概念讲解

    1、Apache RocketMQ作为阿里开源的一款高性能、高吞吐量的分布式消息中间件
    2、特点
    1)在高压下1毫秒内响应延迟超过99.6%。
    2)适合金融类业务,高可用性跟踪和审计功能。
    3)支持发布订阅模型,和点对点
    4)支持拉pull和推push两种消息模式
    5)单一队列百万消息
    6)支持单master节点,多master节点,多master多slave节点
    ...
    3、概念
    Producer:消息生产者
    Producer Group:消息生产者组,发送同类消息的一个消息生产组

    Consumer:消费者
    Consumer Group:消费同个消息的多个实例

    Tag:标签,子主题(二级分类),用于区分同一个主题下的不同业务的消息

    Topic:主题
    Message:消息
    Broker:MQ程序,接收生产的消息,提供给消费者消费的程序
    Name Server:给生产和消费者提供路由信息,提供轻量级的服务发现和路由

    3、官网地址:http://rocketmq.apache.org/

    学习资源:
    1)http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/
    2)https://www.jianshu.com/p/453c6e7ff81c


    6、RocketMQ4.x本地快速部署
    简介:RocketMQ4.x本地快速部署

    1、安装前提条件(推荐)
    64bit OS, Linux/Unix/Mac
    64bit JDK 1.8+;

    2、快速开始 http://rocketmq.apache.org/docs/quick-start/
    下载安装包:https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.2.0/rocketmq-all-4.2.0-bin-release.zip

    路径:/Users/jack/Desktop/person/springboot/资料/第13章/第5课/rocketmq-all-4.2.0-bin-release/bin

    3、解压压缩包
    1)进入bin目录,启动namesrv
    nohup sh mqnamesrv &


    2) 查看日志 tail -f nohup.out
    结尾:The Name Server boot success. serializeType=JSON 表示启动成功


    3、启动broker
    nohup sh mqbroker -n 127.0.0.1:9876 &

    4)、关闭nameserver broker执行的命令
    sh mqshutdown namesrv
    sh mqshutdown broker

    7、RoekerMQ4.x可视化控制台讲解
    简介:RoekerMQ4.x可视化控制台讲解

    1、下载 https://github.com/apache/rocketmq-externals
    2、编译打包 mvn clean package -Dmaven.test.skip=true
    3、target目录 通过java -jar的方式运行



    4、无法连接获取broker信息
    1)修改配置文件,名称路由地址为 namesrvAddr,例如我本机为
    2)src/main/resources/application.properties
    rocketmq.config.namesrvAddr=192.168.0.101:9876

    5、默认端口 localhost:8080

    6、注意:
    在阿里云,腾讯云或者虚拟机,记得检查端口号和防火墙是否启动

    8、Springboot2.x整合RocketMQ4.x实战上集
    简介:Springboot2.x整合RocketMQ4.x实战,加入相关依赖,开发生产者代码

    启动nameser和broker

    1、加入相关依赖
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>${rocketmq.version}</version>
    </dependency>
    <dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-common</artifactId>
    <version>${rocketmq.version}</version>
    </dependency>


    2、application.properties加入配置文件
    # 消费者的组名
    apache.rocketmq.consumer.PushConsumer=orderConsumer
    # 生产者的组名
    apache.rocketmq.producer.producerGroup=Producer
    # NameServer地址
    apache.rocketmq.namesrvAddr=127.0.0.1:9876

    3、开发MsgProducer
    /**
    * 生产者的组名
    */
    @Value("${apache.rocketmq.producer.producerGroup}")
    private String producerGroup;

    /**
    * NameServer 地址
    */
    @Value("${apache.rocketmq.namesrvAddr}")
    private String namesrvAddr;

    private DefaultMQProducer producer ;


    public DefaultMQProducer getProducer(){
    return this.producer;
    }

    @PostConstruct
    public void defaultMQProducer() {
    //生产者的组名
    producer = new DefaultMQProducer(producerGroup);
    //指定NameServer地址,多个地址以 ; 隔开
    //如 producer.setNamesrvAddr("192.168.100.141:9876;192.168.100.142:9876;192.168.100.149:9876");
    producer.setNamesrvAddr(namesrvAddr);
    producer.setVipChannelEnabled(false);

    try {
    /**
    * Producer对象在使用之前必须要调用start初始化,只能初始化一次
    */
    producer.start();

    } catch (Exception e) {
    e.printStackTrace();
    }

    // producer.shutdown(); 一般在应用上下文,关闭的时候进行关闭,用上下文监听器

    }

    9、Springboot2.x整合RocketMQ4.x实战下集
    简介:Springboot2.x整合RocketMQ4.x实战,开发消费者代码,常见问题处理

    1、创建消费者


    问题:
    1、Caused by: org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.42.1:10911> failed

    2、com.alibaba.rocketmq.client.exception.MQClientException: Send [1] times, still failed, cost [1647]ms, Topic: TopicTest1, BrokersSent: [broker-a, null, null]

    3、org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [497]ms, Topic: TopicTest, BrokersSent: [chenyaowudeMacBook-Air.local, chenyaowudeMacBook-Air.local, chenyaowudeMacBook-Air.local]
    解决:多网卡问题处理
    1、设置producer: producer.setVipChannelEnabled(false);
    2、编辑ROCKETMQ 配置文件:broker.conf(下列ip为自己的ip)
    namesrvAddr = 192.168.0.101:9876
    brokerIP1 = 192.168.0.101

    4、DESC: service not available now, maybe disk full, CL:
    解决:修改启动脚本runbroker.sh,在里面增加一句话即可:
    JAVA_OPT="${JAVA_OPT} -Drocketmq.broker.diskSpaceWarningLevelRatio=0.98"
    (磁盘保护的百分比设置成98%,只有磁盘空间使用率达到98%时才拒绝接收producer消息)


    常见问题处理:
    https://blog.csdn.net/sqzhao/article/details/54834761
    https://blog.csdn.net/mayifan0/article/details/67633729
    https://blog.csdn.net/a906423355/article/details/78192828

  • 相关阅读:
    centos7 安装中文编码
    docker一些命令
    bash: ifconfig: command not found
    sublime3的licence(update 2016-04-14)
    Mac下更改python版本为3.5
    BigDecimal的equals
    cvc-elt.1: 找不到元素 'beans' 的声明
    Configuration problem: Only one AsyncAnnotationBeanPostProcessor may exist within the context.
    You need tcl 8.5 or newer in order to run the Redis test
    自定义的类型转换器中怎样自定义错误消息?(待解答)
  • 原文地址:https://www.cnblogs.com/hellowq/p/10538485.html
Copyright © 2011-2022 走看看