zoukankan      html  css  js  c++  java
  • 微服务中使用MQ——RabbitMQ

    概念

    什么是消息

    • 消息是指在两个独立的系统间传递的数据。这两个系统可以是两台计算机,也可以是两个进程。
    • 消息是平台无关和语言无关的!

    什么是队列

    • 队列是一种数据结构,内部是用数组或链表实现的,
    • 队列的特点是只能队尾放入,队头取出,即先入先出【FIFO】
    • 队列的操作有入队和出队
      也就是你有一个程序在产生内容然后入队(生产者)
      另一个程序读取内容,内容出队(消费者)

    什么是消息队列

    • 简单的理解就是:在消息的传输过程中使用队列作为保存消息的容器。
      队列是在消息的传输过程中的通道,是保存消息的容器,
      根据不同的情形,可以有先进先出,优先级队列等区别 。

    为什么要使用消息队列呢

    解耦

    消息队列能够将业务逻辑解耦,调用方只需要下达命令而不用等待整个逻辑执行完毕!

    比如说:注册的时候需要调用三个服务,这三个服务可以各自独立放在三个服务器中,执行到哪一步直接发送消息即可实现异步调用。注册的效率就快多了
    调用邮件服务:发送带有验证链接的注册邮件,
    调用第三方验证服务:验证身份证信息真假,
    调用用户的服务:对用户进行注册。

    同步转异步

    可以把同步的处理变成异步进行处理
    将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度

    下完订单直接返回给用户结果,只需要耗时50ms,然后再通知MQ做后续的事情。

    削峰

    在高并发场景下【平滑短时间内大量的服务请求】
    分流:将突发大量请求转换为后端能承受的队列请求。

    什么时候使用消息队列呢

    关注下游执行执行结果,用RPC/REST
    不关注下游执行结果,用MQ,不用RPC/REST
    对于需要强事务保证而且延迟敏感的,RPC是优于消息队列的。
    比如:
    你的服务器一秒能处理100个订单,但秒杀活动1秒进来1000个订单,持续10秒,在后端能力无法增加的情况下,
    你可以用消息队列将总共10000个请求压在队列里,后台consumer按原有能力处理,100秒后处理完所有请求(而不是直接宕机丢失订单数据)。

    注意

    mq关心的是“通知”,而非“处理

    简单的说:MQ只能保证消息按照顺序通知给consumer,不能保证consumer处理逻辑,比如:是不是按照顺序执行。
    假设有三个消息: M1(发短信),M2(发邮件),M3(站内推送)
    在队列中的顺序为:M3,M2,M1 MQ能保证消息在消费的时候是按照这个顺序,
    但是不能保证consumer,必须先发送站内推送,再发邮件,最后发短信,
    因为这三个consumer接受到消息执行的业务时间很可能不相同的。

    安装Rabbit MQ

    安装ErLang

    Erlang(['ə:læŋ])是一种通用的面向并发的编程语言,它由瑞典电信设备制造商爱立信所辖的CS-Lab开发,目的是创造一种可以应对大规模并发活动的编程语言和运行环境。

    rpm --import https://packages.erlang-solutions.com/rpm/erlang_solutions.asc

    vi /etc/yum.repos.d/xxx (xxx是目录中的任意一个已有的yum列表文件)
    在文件中增加下述内容:

    [erlang-solutions]
    name=Centos $releasever - $basearch - Erlang Solutions
    baseurl=https://packages.erlang-solutions.com/rpm/centos/$releasever/$basearch
    gpgcheck=1
    gpgkey=https://packages.erlang-solutions.com/rpm/erlang_solutions.asc
    enabled=1
    

    生成yum缓存信息
    yum makecache

    安装ErLang
    yum -y install erlang

    检查安装结果,查看ErLang版本
    erl -version

    安装Rabbit Mq

    报错可以参考:
    安装Rabbit MQ

    启动 Rabbit MQ

    配置为守护进程随系统自动启动,root权限下执行:
    chkconfig rabbitmq-server on
    启动RabbitMQ服务
    service rabbitmq-server start

    检查RabbitMQ服务状态
    service rabbitmq-server status

    安装RabbitMQ的WEB管理界面

    rabbitmq-plugins enable rabbitmq_management

    设置RabbitMQ用户及授予权限

    创建账号
    rabbitmqctl add_user test 123456
    设置用户角色
    rabbitmqctl set_user_tags test administrator
    设置用户权限
    rabbitmqctl set_permissions -p "/" test "." "." ".*"
    设置完成后可以查看当前用户和角色(需要开启服务)
    rabbitmqctl list_users

    浏览器访问WEB管理界面

    http://rabbitmq-server-ip:15672
    rabbitmq-server-ip就是RabbitMQ按照所在物理机的IP。
    RabbitMQ提供的WEB管理界面端口为15672

    RabbitMQ的原理

    原理图

    Message

    有两部分: Header和Body。
    Header是由Producer添加上的各种属性的集合,
    这些属性有控制Message是否可被缓存,接收的queue是哪个,优先级是多少等。
    Body是真正需要传送的数据,它是对Broker不可见的二进制数据流,在传输过程中不应该受到影响。
    (在rabbitMQ中,存储消息可以是任意的java类型的对象,必须实现序列化(serializable))

    Publisher 消息的生产者

    也是一个向交换器发布消息的客户端应用程序

    Consumer 消息的消费者

    表示一个从消息队列中取得消息的客户端应用程序。

    Exchange 交换器。

    用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
    三种常用的交换器类型
    direct(发布与订阅 完全匹配)
    fanout(广播)
    topic(主题,规则匹配)

    Routing-key 路由键

    RabbitMQ决定消息该投递到哪个队列的规则。
    队列通过路由键绑定到交换器。
    消息发送到MQ服务器时,消息将拥有一个路由键,即便是空的,RabbitMQ也会将其和绑定使用的路由键进行匹配。
    如果相匹配,消息将会投递到该队列。
    如果不匹配,消息将会进入黑洞。

    Binding 绑定

    用于【消息队列】和【交换器】之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。

    Queue 消息队列。

    用来保存消息直到发送给消费者。
    它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。
    消息一直在队列里面,等待消费者链接到这个队列将其取走。

    Connection

    指rabbit服务器和服务建立的TCP链接。

    Channel

    信道,是TCP里面的虚拟链接。一条TCP连接上可以创建多条信道。
    TCP一旦打开,就会创建AMQP信道。无论是发布消息、接收消息、订阅队列,这些动作都是通过信道完成的

    Virtual Host

    表示一组交换器,消息队列和相关对象。
    个vhost本质上就是一个mini版的RabbitMQ服务器,拥有自己的队列、交换器、绑定和权限机制。
    类似一个mysql里面有N个数据库一样。

    Borker

    表示消息队列服务器实体。就是RabbitMQ整体应用。

    交换器和队列的关系

    交换器是通过路由键和队列绑定在一起的,如果消息拥有的路由键跟队列和交换器的路由键匹配,那么消息就会被路由到该绑定的队列中。
    也就是说,消息到队列的过程中,消息首先会经过交换器,接下来交换器在通过路由键匹配分发消息到具体的队列中。
    路由键可以理解为匹配的规则。

    RabbitMQ为什么需要信道?为什么不是TCP直接通信?

    TCP的创建和销毁开销特别大。
    创建需要3次握手,销毁需要4次分手。
    如果不用信道,那应用程序就会以TCP链接Rabbit,高峰时每秒成千上万条链接会造成资源巨大的浪费,而且操作系统每秒处理TCP链接数也是有限制的,必定造成性能瓶颈。
    信道的原理是一条线程一条通道,多条线程多条通道同用一条TCP链接。一条TCP链接可以容纳无限的信道,即使每秒成千上万的请求也不会成为性能的瓶颈。

    大致流程

    consumer注册队列监听器到Broker(RabbitMQ)

    Consumer首先注册一个队列监听器,来监听队列的状态,当队列状态变化时消费消息,
    注册队列监听的时候需要提供:

    • Exchange(交换器)信息:
      交换类型(Dircet直连 ,Topic主题 ,Fanout广播),交换器名称,是否自动删除等
    • Queue(队列)信息,
      名称,是否自动删除等
    • 以及Routing Key(路由键)信息。
      自定义的一个key值,这个值是连接Exchange和Queue的标识。

    producer 发送消息到队列

    producer 发送消息给RabbitMQ,需要在消息头中指定Exchange(交换器)信息,Routing Key(路由键)信息

    Broker(RabbitMQ) 匹配

    RebbitMQ通过Producer指定的Exchange名称找到交换器,然后通过指定的Routing key找到对应的队列,将消息放入队列中。
    队列状态发生变化,Consumer就会通过监听器得到消息并消费。

    consumer做一个集群是如何消费消息的

    假设我的一个短信发送服务,为了保证短信发送的稳定,做了一个短信发送服务的集群,这个时候MQ的消息是如何被消费的。

    Exchange

    它的作用:用来接收生产者发送的消息并将这些消息路由给服务器中的队列。

    Exchange是通过Routing Key来匹配对应的Queue的。

    我们要知道在RabbitMQ中Exchange的类型以及Queue,还有Routing key都是由consumer端提供的,
    producer只是提供Exchange和Routing key,broker根据producer提供的Exchange名字找到对应的交换器,然后再
    根据路由键去匹配对应的队列,放入消息到队列中。

    有好几种类型的Exchange:
    Direct类型的Exchange的Routing key就是全匹配。
    Topic类型的Exchange的Routing key就是部分匹配或者是模糊匹配。
    Fanout类型的Exchange的Routing key就是放弃匹配。
    匹配肯定都是限制在同一个Exchange中的,也就是相同的Exchange进行匹配。

    消息的可靠性处理

    消息持久化

    保证消息在MQ中不丢失。

    消息丢失的情况

    • consumer未启动,而producer发送了消息,则消息会丢失。
    • 当所有的consumer宕机的时候,queue会auto-delete,消息仍旧会丢失

    消息确认机制

    必要性

    consumer收到消息,在消费的过程中程序出现异常或者网络中断,如果没有ack的话,MQ就把消息删除了,就造成了数据丢失。

    过程

    RabbitMQ把消息推送给Consumer,RabbitMQ就会把这个消息进行锁定,在锁定状态的消息不会被重复推送也就是二次消费。
    其他consumer可以继续消费下一个消息,当消息的consumer确认消费完成之后发送一个ack给RabbitMQ,RabbitMQ会将这个消息删除。
    如果超过一定时间RabbitMQ没有收到consumer的ack,就会把这个消息进行解锁,重新放入队列头,保证消息的顺序。

    内存泄露的可能

    如果Consumer没有处理消息确认,将导致严重后果。
    假设所有的Consumer都没有正常反馈确认信息,并退出监听状态,那么这些消息则会永久保存,并处于锁定状态,直到消息被正常消费为止。
    而消息的Producer继续持续发送消息到RabbitMQ,那么消息将会堆积,持续占用RabbitMQ所在服务器的内存,导致“内存泄漏”问题。

    解决方案:

    • 配置消息的重试次数。
      通过全局配置文件,开启消息消费重试机制,配置重试次数。
      当RabbitMQ未收到Consumer的确认反馈时,会根据配置来决定重试推送消息的次数,当重试次数使用完毕,无论是否收到确认反馈,RabbitMQ都会删除消息,避免内存泄漏的可能。
      在consumer端具体配置如下:
    #开启重试
    spring.rabbitmq.listener.retry.enabled=true
    #重试次数,默认为3次
    spring.rabbitmq.listener.retry.max-attempts=5
    
    • 编码异常处理
      通过编码处理异常的方式,保证消息确认机制正常执行。
      如:catch代码块中,将未处理成功的消息,重新发送给MQ。
      如:catch代码中,本地逻辑的重试(使用定时线程池重复执行任务3次。)
      如:catch代码中,将异常消息存储到DB,然后使用定时任务去清除消息。

    重复消费

    • 假设RabbitMQ等待ack的超时时间为1s,而consumer消费消息需要2s,那么这个消息就会出现ack等待超时,重新放入队列,这就出现了重复消费。
    • consumer收到消息之后中断了Connection,消息也会被重新放入队列中,也会出现重复消费。
    • 假设consumer端处理消息的时候出现了系统异常,无法发送确认机制。

    【解决方法】

    • 测试consumer的执行时长,并合理限定MQ的ack超时时长。
    • 为消息添加版本或者时间戳,或者根据业务id进行判重。
      如果不强制要求不能出现重复消费,最好还是不要判断。

    RabbitMQ默认是开启消息确认的,不建议关闭。

    Direct 交换器

    就是点对点(point to point)实现【发布/订阅】标准的交换器。这里的交换器就是(Exchange)。

    业务场景

    producer端的代码实现

    pom依赖

    继承spring-boot-starter-parent
    引入rabbitMq:spring-boot-starter-amqp
    rabbitMQ的依赖。rabbitmq已经被spring-boot做了整合访问实现。
    spring cloud也对springboot做了整合逻辑。所以rabbitmq的依赖可以在spring cloud中直接使用。

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    
    	<groupId>com.bjsxt</groupId>
    	<artifactId>rabbitmq-direct-producer</artifactId>
    	<version>0.0.1-SNAPSHOT</version>
    	<packaging>jar</packaging>
    
    	<name>rabbitmq-direct-producer</name>
    	<description>Demo project for Spring Boot</description>
    
    	<parent>
    		<groupId>org.springframework.boot</groupId>
    		<artifactId>spring-boot-starter-parent</artifactId>
    		<version>1.5.13.RELEASE</version>
    		<relativePath /> <!-- lookup parent from repository -->
    	</parent>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    		<java.version>1.8</java.version>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-web</artifactId>
    		</dependency>
    
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-test</artifactId>
    			<scope>test</scope>
    		</dependency>
    		
    		<!-- rabbitMQ的依赖。rabbitmq已经被spring-boot做了整合访问实现。
    			spring cloud也对springboot做了整合逻辑。所以rabbitmq的依赖可以在spring cloud中直接使用。
    		 -->
    		<dependency>
    			<groupId>org.springframework.boot</groupId>
    			<artifactId>spring-boot-starter-amqp</artifactId>
    		</dependency>
    	</dependencies>
    
    	<build>
    		<plugins>
    			<plugin>
    				<groupId>org.springframework.boot</groupId>
    				<artifactId>spring-boot-maven-plugin</artifactId>
    			</plugin>
    		</plugins>
    	</build>
    </project>
    
    

    配置RabbitMQ

    spring.application.name=direct-producer
    
    server.port=8082
    
    # 必要配置
    # 配置rabbitmq链接相关信息。key都是固定的。是springboot要求的。
    # rabbitmq安装位置
    spring.rabbitmq.host=192.168.1.122
    # rabbitmq的端口
    spring.rabbitmq.port=5672
    # rabbitmq的用户名
    spring.rabbitmq.username=test
    # rabbitmq的用户密码
    spring.rabbitmq.password=123456
    

    创建消息载体对象

    • 对象必须实现序列化接口。
      这里把getter和setter方法省略了。
    /**
     * 消息内容载体,在rabbitmq中,存储的消息可以是任意的java类型的对象。
     * 强制要求,作为消息数据载体的类型,必须是Serializable的。
     * 如果消息数据载体类型未实现Serializable,在收发消息的时候,都会有异常发生。
     */
    public class LogMessage implements Serializable {
    
    	private Long id;
    	private String msg;
    	private String logLevel;
    	private String serviceType;
    	private Date createTime;
    	private Long userId;
    	public LogMessage() {
    		super();
    	}
    	public LogMessage(Long id, String msg, String logLevel, String serviceType, Date createTime, Long userId) {
    		super();
    		this.id = id;
    		this.msg = msg;
    		this.logLevel = logLevel;
    		this.serviceType = serviceType;
    		this.createTime = createTime;
    		this.userId = userId;
    	}
    	@Override
    	public String toString() {
    		return "LogMessage [id=" + id + ", msg=" + msg + ", logLevel=" + logLevel + ", serviceType=" + serviceType
    				+ ", createTime=" + createTime + ", userId=" + userId + "]";
    	}
    }
    

    编写测试类

    使用spring boot提供的【AmqpTemplate】接口RabbitMQ的默认实现R【RabbitTemplate】对象发送消息。
    其中convertAndSend方法可以发送消息:
    这个方法是将传入的普通java对象,转换为rabbitmq中需要的message类型对象,并发送消息到rabbitmq中。
    参数一:交换器名称。 类型是String
    参数二:路由键。 类型是String
    参数三:消息,是要发送的消息内容对象。类型是Object

    /**
     * Direct交换器
     * Producer测试。
     * 注意:
     * 在rabbitmq中,consumer都是listener监听模式消费消息的。
     * 一般来说,在开发的时候,都是先启动consumer,确定有什么exchange、queue、routing-key。
     * 然后再启动producer发送消息。
     */
    @RunWith(SpringRunner.class)
    @SpringBootTest(classes=SpringbootServerApplication.class)
    public class QueueTest {
    
    	@Autowired
    	private AmqpTemplate rabbitAmqpTemplate;
    	/*
    	 * 测试消息队列
    	 */
    	@Test
    	public void testSendInfo()throws Exception{
    		Long id = 1L;
    		while(true){
    			Thread.sleep(1000);
    			final LogMessage logMessage = new LogMessage(id, "test log", "info", "订单服务", new Date(), id);
                               
    			this.rabbitAmqpTemplate.convertAndSend("log.direct", "log.error.routing.key", logMessage);
    			id++;
    		}
    	}
    	/*
    	 * 测试消息队列
    	 */
    	@Test
    	public void testSendError()throws Exception{
    		Long id = 1L;
    		while(true){
    			Thread.sleep(1000);
    			final LogMessage logMessage = new LogMessage(id, "test log", "info", "订单服务", new Date(), id);
    			this.rabbitAmqpTemplate.convertAndSend("log.direct", "log.info.routing.key", logMessage);
    			id++;
    		}
    	}
    }
    

    consumer端的实现

    pom

    和producer端一样

    info级别的日志消费代码的编写

    @Component
    @RabbitListener(
    			bindings=@QueueBinding(
    					value=@Queue(value="log.error",autoDelete="false"),
    					exchange=@Exchange(value="log.direct",type=ExchangeTypes.DIRECT),
    					key="log.error.routing.key"
    			)
    		)
    public class ErrorReceiver {
    
    	/**
    	 * 消费消息的方法。采用消息队列监听机制
    	 * @RabbitHandler - 代表当前方法是监听队列状态的方法,就是队列状态发生变化后,执行的消费消息的方法。
    	 * 方法参数。就是处理的消息的数据载体类型。
    	 */
    	@RabbitHandler
    	public void process(LogMessage msg){
    		System.out.println("Error..........receiver: "+msg);
    	}
    }
    

    @RabbitListener

     可以注解类和方法,
        注解类:当表当前类的对象是一个rabbit listener。监听逻辑明确,可以由更好的方法定义规范。 必须配合@RabbitHandler才能实现rabbit消息消费能力。
        注解方法:代表当前方法是一个rabbit listener处理逻辑。方便开发,一个类中可以定义若干个listener逻辑。方法定义规范可能不合理。
        代表当前类型是一个rabbitmq的监听器。
         bindings:绑定队列
    

    @QueueBinding

        @RabbitListener.bindings属性的类型。绑定一个队列。
         value:绑定队列, Queue类型。
         exchange:配置交换器, Exchange类型。
         key:路由键,字符串类型。
    

    @Queue - 队列。

        value:队列名称
         autoDelete:是否是一个临时队列(也就是所有的consumer关闭后是否删除队列)
             true : 删除
             false:如果queue中有消息未消费,无论是否有consumer,都保存queue。
    

    @Exchange - 交换器

    value:为交换器起个名称
        type:指定具体的交换器类型
    

    @RabbitHandler

      代表当前方法是监听队列状态的方法,就是队列状态发生变化后,执行的消费消息的方法。
    

    Error级别的日志消费代码编写

    @Component
    @RabbitListener(
    			bindings=@QueueBinding(
    					value=@Queue(value="log.info",autoDelete="false"),
    					exchange=@Exchange(value="log.direct",type=ExchangeTypes.DIRECT),
    					key="log.info.routing.key"
    			)
    		)
    public class InfoReceiver {
    
    	@RabbitHandler
    	public void process(LogMessage msg){
    		System.out.println("Info........receiver: "+msg);
    	}
    }
    

    Topic 交换器

    场景

    现在有用户服务,订单服务,商品服务三个服务,每个服务都会有日志,日志都分info,error等级别,可以使用MQ实现日志的收集。
    使用Direct交换器,就需要定义至少六个队列。

    如果使用Topic交换器可以简化consumer端的开发:

    实现

    • pom的依赖和上面一样。
    • consumer端主要修改了Exchange的类型以及对应的Routing key的规则

    consumer端

    处理Error日志的消费者

    @Component
    @RabbitListener(
    			bindings=@QueueBinding(
    					value=@Queue(value="${mq.config.queue.error}",autoDelete="true"),
    					exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC),
    					key="*.log.error"
    			)
    		)
    public class ErrorReceiver {
    
    	@RabbitHandler
    	public void process(String msg){
    		System.out.println("......Error........receiver: "+msg);
    	}
    }
    

    处理Info日志的消费者

    @Component
    @RabbitListener(
    			bindings=@QueueBinding(
    					value=@Queue(value="${mq.config.queue.info}",autoDelete="true"),
    					exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.TOPIC),
    					key="*.log.info"
    			)
    		)
    public class InfoReceiver {
    	@RabbitHandler
    	public void process(String msg){
    		System.out.println("......Info........receiver: "+msg);
    	}
    }
    

    producer端

    商品发送日志信息

    @Component
    public class ProductSender {
    
    	@Autowired
    	private AmqpTemplate rabbitAmqpTemplate;
            /*
    	 * 发送消息的方法
    	 */
    	public void send(String msg){
    		//向消息队列发送消息
    		//参数一:交换器名称。
    		//参数二:路由键
    		//参数三:消息
    		this.rabbitAmqpTemplate.convertAndSend("log.topic","product.log.info", "product.log.info....."+msg);
    		this.rabbitAmqpTemplate.convertAndSend("log.topic","product.log.error", "product.log.error....."+msg);
    	}
    }
    

    用户发送信息

    @Component
    public class UserSender {
    
    	@Autowired
    	private AmqpTemplate rabbitAmqpTemplate;
    	
    	/*
    	 * 发送消息的方法
    	 */
    	public void send(String msg){
    		//向消息队列发送消息
    		//参数一:交换器名称。
    		//参数二:路由键
    		//参数三:消息
    		this.rabbitAmqpTemplate.convertAndSend("log.topic","user.log.info", "user.log.info....."+msg);
    		this.rabbitAmqpTemplate.convertAndSend("log.topic","user.log.error", "user.log.error....."+msg);
    	}
    }
    

    订单代码一样,省略。

    Fanout 交换器

    这个更简单,直接在producer和consumer端不需要配置Routing key就行了。

  • 相关阅读:
    HDU 3572 Task Schedule(拆点+最大流dinic)
    POJ 1236 Network of Schools(Tarjan缩点)
    HDU 3605 Escape(状压+最大流)
    HDU 1166 敌兵布阵(分块)
    Leetcode 223 Rectangle Area
    Leetcode 219 Contains Duplicate II STL
    Leetcode 36 Valid Sudoku
    Leetcode 88 Merge Sorted Array STL
    Leetcode 160 Intersection of Two Linked Lists 单向链表
    Leetcode 111 Minimum Depth of Binary Tree 二叉树
  • 原文地址:https://www.cnblogs.com/wangsen/p/11057714.html
Copyright © 2011-2022 走看看