zoukankan      html  css  js  c++  java
  • 消息队列 概念 配合SpringBoot使用Demo

    http://www.jianshu.com/p/048e954dab40 

    概念: 

    分布式消息队列

    ‘分布式消息队列’包含两个概念

    一是‘消息队列’,二是‘分布式’

    那么就先看下消息队列的概念,和为什么需要分布式


    消息队列的定义

    消息”指进程间传送的数据

    队列”是在消息的传输过程中保存消息的容器

    消息被发送到队列中,消息队列充当中间人,将消息从源发送给目标

    当系统中出现“生产“和“消费“的速度或稳定性等因素不一致时,就需要消息队列,作为抽象层,弥合双方的差异

    例如

    (1)服务员点菜快,厨师做菜慢,服务员只需要下单给厨师,然后就可以继续去服务顾客,不需要等待厨师把菜做完

    点菜单就相当于消息,放单子的位置就相当于队列

    (2)业务系统需要发短信,但短信发送模块速度跟不上,业务系统就可以把发送短信的相关信息封装为一个消息,放入队列,短信发送模块从队列中获取消息进行处理

    消息队列的好处

    (1)提高系统响应速度

    使用了消息队列,生产者一方,把消息往队列里一扔,就可以立马返回响应用户了,无需等待处理结果

    (2)保证消息的传递

    如果发送消息时接收者不可用,消息队列会保留消息,直到成功地传递它

    (3)解耦

    只要消息格式不变,即使接收者的接口、位置、或者配置改变,也不会给发送者带来任何改变

    消息发送者无需知道消息接收者是谁,使得系统设计更清晰


    为什么需要分布式消息队列

    (1)多系统协作需要分布式

    例如消息队列中的数据需要在多个系统间共享,所以需要提供分布式通信机制、协同机制

    (2)可靠

    消息会被持久化到分布式存储中,这样避免了单台机器存储的消息由于机器问题导致消息的丢失

    (3)可扩展

    分布式消息队列,会随着访问量的增加而方便的增加处理服务器


    2. 消息队列技术是分布式应用间交换信息的一种技术

     现在电商网站某个抢购活动,并发怎么办?消息队列

    AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。
    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
    RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。有玩RabbitMq的哥们,多多交流!

    用户请求会post到后台一些信息如(用户信息,商品信息)到消息队列中。

    消息队列通过Windows服务去处理解析过来的信息,单线程处理(多线程可能会出现问题,你懂得)!

    成功的话,插入到本次活动的成功记录表里面;失败的话,插入到有意购买表(方便业务人员销售)!

    怎样通知用户?

    1,ajax异步去请求(隔两分钟去请求一次成功记录,如果不请求库的话我们会用Redis缓存)

    2,长连接的方式(websocket,signalr之类的)


    在整套的微服务架构中, 消息队列是不可或缺的部分, 它能够起到线程内同步或者异步调用无法达到的作用,优缺点分别是:
    优点:

    1. 解耦
      i. 只依赖消息的格式, 而不依赖发送者的ip和端口
      ii. 多消费者的情况下, 发送者不需要关注消费者的任何信息
    2. 路由
      不能互相访问的网络之间可以消息队列实现访问, 可以减少对现有网络的修改。
    3. 消息可靠性
      当消费者发生故障时, 消息可以被有效保存下来, 等待恢复后继续访问.
    4. 异步调用
      发送者异步发送消息, 不等待消息ack,不会对发送者本身产生响应速度的影响, 当然异步调用也是可以实现的。
    5. 方便扩展
      集群部署消息队列, 当流量增大和减小是可以通过调整部署来实现和发送方, 消费方无关。

    缺点:
    多出一个环节,需要保证消息队列的可用性。

    二. 常用消息队列

    目前常用的消息队列大概有三种类型,RabbitMQ等AMQP系列, Kafka, Redis等kev value系列,它们的使用场景分别是:
    1.RabbitMQ: 相对重量级高并发的情况,比如数据的异步处理 任务的串行执行等.
    2.Kafka: 基于Pull的模式来处理,具体很高的吞吐量,一般用来进行 日志的存储和收集.
    3.Redis: 轻量级高并发,实时性要求高的情况,比如缓存,秒杀,及时的数据分析(ELK日志分析框架,使用的就是Redis).

    三. SpingBoot 集成消息队列

    在SpingBoot中对这个三种都有支持

    1. Redis
      请参照 其他作者的 文章 http://www.jianshu.com/p/a2ab17707eff

    2. RabbitMQ
      i. 配置

       <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
       </dependency>

      ii.实现方式 RabbitMQDemoConfigration.java

      import org.springframework.amqp.core.*;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Qualifier;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      /**
      * RabbitMQDemo
      *
      * @author: sunjie
      * @date: 16/01/10
      */
      @Configuration
      @SuppressWarnings("SpringJavaAutowiringInspection")
      public class RabbitMQDemoConfigration {
      @Bean
       public CachingConnectionFactory myConnectionFactory() {
           CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
           connectionFactory.setUsername("guest");
           connectionFactory.setPassword("guest");
           connectionFactory.setHost("192.168.1.1");
           connectionFactory.setPort(5672);
           connectionFactory.setVirtualHost("/myHost");
           return connectionFactory;
       }
       //  生成CachingConnectionFactory  也可以使用下面的方式,在application.properties
       //  中定义好属性即可
       //     @Autowired
       //    ConnectionFactory connectionFactory;
      @Bean
       public DirectExchange myExchange() {
           return new DirectExchange("myExchangeDemo", true, false);
       }
       @Bean
       public Queue myQueue() {
           return new Queue("myQueueDemo", true);
       }
       @Bean
       public Binding myExchangeBinding(@Qualifier("myExchange") DirectExchange directExchange,
                                        @Qualifier("myQueue") Queue queue) {
           return BindingBuilder.bind(queue).to(directExchange).with("routeDemo");
       }
       @Bean
       public RabbitTemplate myExchangeTemlate() {
           RabbitTemplate r = new RabbitTemplate(myConnectionFactory());
           r.setExchange("myExchangeDemo");
           r.setRoutingKey("routeDemo");
           return r;
       }
      /**
        *  发送消息,工业使用需要自己做个性化实现
        */
       @Bean
       public void sendMessage(RabbitTemplate myExchangeTemlate) {
           String string = "Hello RabbitmQ";
           myExchangeTemlate.convertAndSend(string);
       }
       /**
        *  接受消息,工业使用时需要在监听类中实现process逻辑
        */
       @RabbitListener(queues = "myQueueDemo")
       public void process(Message message) {
           System.out.println("__________" + message.getBody().toString() + "__________");
           try {
               this.wait(1000);
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
    3. Kafka 使用
      i. pom.xml 配置

       <dependency>
         <groupId>org.apache.kafka</groupId>
         <artifactId>kafka_2.10</artifactId>
        </dependency>
        <dependency>
          <groupId>org.springframework.integration</groupId>
          <artifactId>spring-integration-kafka</artifactId>
        </dependency>

      ii. spring-integration-kafka.xml 也可以研究通过bean方式来实现

      <?xml version="1.0" encoding="UTF-8"?>
      <beans xmlns="http://www.springframework.org/schema/beans"
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xmlns:int="http://www.springframework.org/schema/integration"
          xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka"
          xmlns:task="http://www.springframework.org/schema/task"
          xsi:schemaLocation="http://www.springframework.org/schema/integration/kafka http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd
       http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
       http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
       http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
       <int:channel id="inputToKafka">
           <int:queue/>
       </int:channel>
       <int-kafka:outbound-channel-adapter
               id="kafkaOutboundChannelAdapter"
               kafka-producer-context-ref="kafkaProducerContext"
               channel="inputToKafka">
           <int:poller fixed-delay="1000" time-unit="MILLISECONDS" receive-timeout="0" task-executor="taskExecutor"/>
       </int-kafka:outbound-channel-adapter>
       <task:executor id="taskExecutor" pool-size="5" keep-alive="120" queue-capacity="50000"/>
       <int-kafka:producer-context id="kafkaProducerContext">
           <int-kafka:producer-configurations>
               <int-kafka:producer-configuration broker-list="192.168.2.2:9092"
                                                 key-class-type="java.lang.String"
                                                 value-class-type="java.lang.String"
                                                 topic="1_service"
                                                 value-encoder="kafkaEncoder"
                                                 key-encoder="kafkaEncoder"/>
           </int-kafka:producer-configurations>
       </int-kafka:producer-context>
       <bean id="kafkaEncoder" class="org.springframework.integration.kafka.serializer.common.StringEncoder"/>
      </beans>

      iii. 客户端实现代码:

      @Autowired
       MessageChannel             inputToKafka;
       String value = "Hello Kafka";
       Message<String> mess = MessageBuilder.withPayload(value)
               .setHeader(KafkaHeaders.TOPIC, appSetting.getStrValue("topic")).build();
       inputToKafka.send(mess);

    四. 后记

    消息中间件有很多,实际使用中往往是根据架构师的技术栈相关,做到了解使用场景和基本原理,在项目中提升自己细节能力,做到个人和公司共同成长,才是好的方式.

  • 相关阅读:
    windows pm2 开机启动
    微信小程序自定义组件封装及父子间组件传值
    cloc 统计代码行数工具
    小程序获取当前页面路径url
    小程序navigator点击有时候会闪一下
    Centos7安装python3与Python2共存
    Docker配置国内官方镜像
    Centos7下Docker的安装与使用
    Centos7yum源修改为国内阿里源
    Vim下一键运行python代码
  • 原文地址:https://www.cnblogs.com/xmanblue/p/6393474.html
Copyright © 2011-2022 走看看