zoukankan      html  css  js  c++  java
  • 消息中间件activemq的使用场景介绍(结合springboot的示例)

    一、消息队列概述

    消息队列中间件是分布式系统中重要的组件,主要解决应用耦合,异步消息,流量削锋等问题。实现高性能,高可用,可伸缩和最终一致性架构。是大型分布式系统不可缺少的中间件。

    目前在生产环境,使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ等。

    二、消息队列应用场景

    以下介绍消息队列在实际应用中常用的使用场景。异步处理,应用解耦,流量削锋和消息通讯四个场景。本篇使用ActiveMQ+SpringBoot来模拟这四个场景。

    2.1异步处理

    场景说明:汽车触发围栏报警后,需要发送报警邮件和报警短信。传统的做法有两种1.串行的方式;2.并行方式。

    (1)串行方式:将报警信息写入数据库成功后,发送报警邮件,再发送报警短信。以上三个任务全部完成后,该报警信息加入统计列表。

    (2)并行方式:报警信息写入数据库成功后,同时发送报警邮件和短信。

    假设三个业务节点每个使用50毫秒钟,不考虑网络等其他开销,则串行方式的时间是150毫秒,并行的时间可能是100毫秒。

    因为CPU在单位时间内处理的请求数是一定的,假设CPU1秒内吞吐量是100次。则串行方式1秒内CPU可处理的请求量是7次(1000/150)。并行方式处理的请求量是10次(1000/100)。

    小结:如以上案例描述,传统的方式系统的性能(并发量,吞吐量,响应时间)会有瓶颈。如何解决这个问题呢?

    引入消息队列,将不是必须的业务逻辑,异步处理。改造后的架构如下:

    代码示例

    ①在pom文件中引入activemq依赖

    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-activemq</artifactId>
            <version>1.5.6.RELEASE</version>
        </dependency>

    ②在配置文件中加上activemq的配置

    复制代码
    spring.activemq.broker-url=tcp://127.0.0.1:61616
    # 在考虑结束之前等待的时间
    #spring.activemq.close-timeout=15s 
    # 默认代理URL是否应该在内存中。如果指定了显式代理,则忽略此值。
    spring.activemq.in-memory=true 
    # 是否在回滚回滚消息之前停止消息传递。这意味着当启用此命令时,消息顺序不会被保留。
    spring.activemq.non-blocking-redelivery=false
    # 密码
    spring.activemq.password=123456
    # 等待消息发送响应的时间。设置为0等待永远。
    spring.activemq.send-timeout=0
    spring.activemq.user=haha
    # 是否信任所有包
    #spring.activemq.packages.trust-all=
    # 要信任的特定包的逗号分隔列表(当不信任所有包时)
    #spring.activemq.packages.trusted=
    # 当连接请求和池满时是否阻塞。设置false会抛“JMSException异常”。
    #spring.activemq.pool.block-if-full=true
    # 如果池仍然满,则在抛出异常前阻塞时间。
    #spring.activemq.pool.block-if-full-timeout=-1ms
    # 是否在启动时创建连接。可以在启动时用于加热池。
    #spring.activemq.pool.create-connection-on-startup=true
    # 是否用Pooledconnectionfactory代替普通的ConnectionFactory。
    #spring.activemq.pool.enabled=false 
    # 连接过期超时。
    #spring.activemq.pool.expiry-timeout=0ms
    # 连接空闲超时
    #spring.activemq.pool.idle-timeout=30s
    # 连接池最大连接数
    #spring.activemq.pool.max-connections=1
    # 每个连接的有效会话的最大数目。
    #spring.activemq.pool.maximum-active-session-per-connection=500
    # 当有"JMSException"时尝试重新连接
    #spring.activemq.pool.reconnect-on-exception=true
    # 在空闲连接清除线程之间运行的时间。当为负数时,没有空闲连接驱逐线程运行。
    #spring.activemq.pool.time-between-expiration-check=-1ms
    # 是否只使用一个MessageProducer
    #spring.activemq.pool.use-anonymous-producers=true
    复制代码

    ③消息生产者

    复制代码
    import java.util.Map;
    

    import javax.jms.Destination;
    import javax.jms.Queue;

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsMessagingTemplate;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessagePostProcessor;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.Scheduled;
    import org.springframework.stereotype.Component;

    /**

    • 报警消息Producer
    • @author ko

    */
    @Component
    //@EnableScheduling
    public class AlarmProducer {

    </span><span style="color: #008000;">//</span><span style="color: #008000;"> 也可以注入JmsTemplate,JmsMessagingTemplate对JmsTemplate进行了封装</span>
    

    @Autowired
    private JmsTemplate jmsTemplate;
    // private JmsMessagingTemplate jmsTemplate;

    // @Autowired
    // private Queue queue;

    // @Scheduled(fixedDelay=5000) // 5s执行一次 只有无参的方法才能用该注解
    public void sendMessage(Destination destination, String message){
    // jmsTemplate.convertAndSend(destinationName, payload, messagePostProcessor);
    this.jmsTemplate.convertAndSend(destination, message);
    }

      // 双向队列

        // @JmsListener(destination="out.queue")
        //   public void consumerMessage(String text){
        //   System.out.println("从out.queue队列收到的回复报文为:"+text);
        // }

    }
    复制代码

    ④消息消费者

    复制代码
    import org.apache.commons.lang.StringUtils;
    import org.springframework.jms.annotation.JmsListener;
    import org.springframework.stereotype.Component;
    

    /**

    • 围栏报警Consumer
    • @author ko

    */
    @Component
    public class AlarmConsumer {

    </span><span style="color: #008000;">//</span><span style="color: #008000;"> 使用JmsListener配置消费者监听的队列,其中text是接收到的消息  </span>
    @JmsListener(destination = "mytest.queue"<span style="color: #000000;">)  <br>   // <span class="annotation">@SendTo(<span class="string">"out.queue")&nbsp;为了实现双向队列</span></span>
    </span><span style="color: #0000ff;">public</span> <span style="color: #0000ff;">void</span><span style="color: #000000;"> receiveQueue(String text) {  
        </span><span style="color: #0000ff;">if</span><span style="color: #000000;">(StringUtils.isNotBlank(text)){
            System.out.println(</span>"AlarmConsumer收到的报文为:"+<span style="color: #000000;">text);  
            System.out.println(</span>"把报警信息["+text+"]发送邮件给xxx"<span style="color: #000000;">);
            System.out.println(</span>"把报警信息["+text+"]发送短信给xxx"<span style="color: #000000;">);
            System.out.println(</span>""<span style="color: #000000;">);
        }
    } 
    

    }

    复制代码

    ⑤controller里写上测试接口

    复制代码
    @Autowired
        private AlarmProducer alarmProducer;
    
    @RequestMapping(value</span>="/chufabaojing",method=<span style="color: #000000;">RequestMethod.GET)
    @ApiOperation(value</span>="触发报警", notes="触发报警"<span style="color: #000000;">)
    @ApiImplicitParams({
        @ApiImplicitParam(name </span>= "devicename", value = "name",example = "xxxx", required = <span style="color: #0000ff;">true</span>, dataType = "string",paramType="query"<span style="color: #000000;">),
    })
    </span><span style="color: #0000ff;">public</span><span style="color: #000000;"> String chufabaojing(String devicename){
        
        List</span>&lt;String&gt; alarmStrList = <span style="color: #0000ff;">new</span> ArrayList&lt;&gt;<span style="color: #000000;">();
        alarmStrList.add(devicename</span>+"out fence01"<span style="color: #000000;">);
        alarmStrList.add(devicename</span>+"out fence02"<span style="color: #000000;">);
        alarmStrList.add(devicename</span>+"in fence01"<span style="color: #000000;">);
        alarmStrList.add(devicename</span>+"in fence02"<span style="color: #000000;">);
        
        System.out.println(</span>"设备"+devicename+"出围栏报警"<span style="color: #000000;">);
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> 报警信息写入数据库</span>
        System.out.println("报警数据写入数据库。。。"<span style="color: #000000;">);
        
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> 写入消息队列</span>
        Destination destination = <span style="color: #0000ff;">new</span> ActiveMQQueue("mytest.queue"<span style="color: #000000;">);
        </span><span style="color: #0000ff;">for</span><span style="color: #000000;"> (String alarmStr : alarmStrList) {
            alarmProducer.sendMessage(destination, alarmStr);
        }
        
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> 消息写进消息队列里就不管了
        
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> 下面两步骤移到activemq消费者里
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> 发送邮件
        </span><span style="color: #008000;">//</span><span style="color: #008000;"> 发送短信</span>
        
        <span style="color: #0000ff;">return</span> "success"<span style="color: #000000;">;
    }</span></pre>
    
    复制代码

     2.2 应用解耦

    场景介绍,在spring cloud分布式微服务项目中,工单管理和设备管理分别是两个微服务,如果A工单被张三接单了,那么工单状态要设为已派单,检验员设为张三,设备状态要置为在检。

    传统的做法是,先调用工单管理的工单更新接口,成功之后再调用设备管理的设备更新接口,成功之后再返回操作提示给用户。这样做的缺点是应用耦合,如果在派单操作的时候正好设备管理微服务挂了或者阻塞了,那么派单操作就会失败或者要等待很长时间无反馈。另外如果设备管理的接口有变动,那么工单管理里面的代码也要改动。

    引入消息中间件,派单的时候,工单管理的工单更新接口处理好后把信息写入消息队列,然后直接返回操作反馈给用户。不管工单管理服务正不正常,正常就从消息队列里订阅消息处理,不正常就等待回复正常后再订阅消息处理。

    2.3 流量削峰

    场景介绍,XX公司的系统原来是针对A地区的客户开发的,现在为了抢占市场,拿下了B和C两个地区的客户,那么新系统上线,就存在如何把B和C的基础数据导入XX公司的系统中来的问题,短时间内要把庞大的旧数据改造适合新系统再导入进来,这很容易使系统挂掉,另外每天还有增量数据产生。

    这时可以引入消息中间件,B和C的客户只要负责把数据规则放到消息队列里就好了,XX公司可以有条不紊的从消息队列里订阅数据,可以有效缓解短时间内的高流量压力,但是这也对消息中间件的可靠性提出了要求。

    如果遇到activemq的瓶颈,可以看看activemq集群方案,这篇文章 http://blog.csdn.net/shuangzh115/article/details/50989182

    2.4 点对点通讯

    类似聊天室的功能。

    原文地址:https://www.cnblogs.com/shamo89/p/8010660.html
  • 相关阅读:
    2015年中国500强企业
    汇编语言
    oracle数据库学习路线
    OI生涯回忆录
    NOIP 2020游记
    CF223B Two Strings 题解
    CSP-S 2020游记
    CSP/NOIP 注意事项(2020)
    Luogu P6583 回首过去 题解
    Luogu P2210 Haywire 题解
  • 原文地址:https://www.cnblogs.com/jpfss/p/11008690.html
Copyright © 2011-2022 走看看