zoukankan      html  css  js  c++  java
  • RabbitMQ核心组件及应用场景

    一、适用场景

    1.解耦
    2.最终一致性
    3.广播
    4.错峰与流控(秒杀业务用于流量削峰场景)

    秒杀场景

    二、核心组件,关键点(交换器、队列、绑定)

    AMPQ消息路由必要三部分:交换器、队列、绑定。

    Java核心组件:ConnectionFactory、Connection、Channel、Delivery、DeliverCallback、CancelCallback

    队列

    1. 建立连接
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("127.0.0.1");
    factory.setPort(5672);
    factory.setUsername("admin");
    factory.setPassword("admin");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    
    2. 声明队列

    如果在同一条信道上订阅了另一个队列,那就不能再声明队列,必须先取消订阅。

    Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;
    

    queue:需要指定队列名称,如果不指定,MQ会随机分配一个并在queue.declare命令中返回,

    durable:队列将在服务器重启后存在。

    exclusive:为true时,队列变成私有的。

    autoDelete: 为true时,当最后一个消费者取消订阅时,队列自动移除。

    3. 消费者通过AMQP的basic.consume命令订阅消息,将信道置为接收模式。

    Java代码Channel:

    String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;
    
    4. 当有多个消费者存在时,队列里的消息将以循环的方式发送给消费者。消费者接收到消息后必须进行确认,可通过basic.ack显示确认:

    Java代码Channel:

    void basicAck(long deliveryTag, boolean multiple) throws IOException;
    
    

    上面的手动确认,第二个参数为true,批量确认;如果为false,会一次确认一条。当有耗时任务时,可以利用手动确认延迟确认消息,防止消息大量涌入应用导致过载。
    也可以在订阅队列时就将basicConsume方法的autoAck参数设置为true,开启自动确认。确认成功后rabbitmq会从队列中删除消息。

    5. 如果在确认过程中和rabbitmq服务器断链,那么这条消息就会发送给下一个消费者。可以使用basic.reject拒绝消息:

    Java代码Channel:

    void basicReject(long deliveryTag, boolean requeue) throws IOException;
    

    第二个参数requeue设置为true,消息会重新排队并发送给下一个消费者;为false则会丢弃该条消息。可以利用此性质丢弃错误格式的消息。

    6. 发布消息
    void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
    

    exchange:第一个参数是交换的名称。空字符串表示默认或无名交换,消息通过routingKey路由到指定队列。

    交换器和绑定

    1. 交换器一共有四种类型:fanout、direct 、topic、headers。

    Java中Channel申明交换器:

    Exchange.DeclareOk exchangeDeclare(String exchange, BuiltinExchangeType type) throws IOException;
    

    BuiltinExchangeType对应有四种枚举类型:

    DIRECT("direct"), FANOUT("fanout"), TOPIC("topic"), HEADERS("headers");
    
    2. 队列通过路由键(routing key)绑定到交换器。
    channel.queueBind(String queue, String exchange, String routingKey)
    

    RabbitMQ中消息传递模型的核心思想是生产者永远不会将任何消息直接发送到队列。实际上,生产者甚至不知道消息是否会被传递到哪个队列。

    不指定队列名时,通过服务器随机生成队列名称:

    String queueName = channel.queueDeclare().getQueue();
    

    不传参数时,queueDeclare生成一个非持久的,独占的自动删除队列

    在linux服务器上可以通过命令查看所有交换器:

    rabbitmqctl list_exchanges
    
    3. fanout广播方式

    广播方式会将消息投递给所有附加在此交换器的队列。

    4. direct模式

    direct类型在绑定时设定一个routing_key,消息的routing_key匹配时, 才会被交换器投递到绑定的队列中去.

    5. topic模式

    按规则投递,通过通配符#和*组合

    *(星号)可以替代一个单词。

    (hash)可以替换零个或多个单词。

    持久化

    将队列和交换器的durable属性设置为true

    申明队列时:

    channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) throws IOException;
    

    申明交换器时:

    channel.exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable) throws IOException;
    

    投递消息时将投递模式(delivery mode)设置为2,Java代码中MessageProperties.PERSISTENT_TEXT_PLAIN来设置:

    channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8"));
    

    在MessageProperties源码中如下所示:

    /** Content-type "text/plain", deliveryMode 2 (persistent), priority zero */
    public static final BasicProperties PERSISTENT_TEXT_PLAIN =
        new BasicProperties("text/plain",
                            null,
                            null,
                            2,
                            0, null, null, null,
                            null, null, null, null,
                            null, null);
    

    实现事务功能(发送方确认)

    使用事务会使rabbitmq的性能大大降低,为了避免这个问题,rabbitmq支持:发送方确认模式。通过这个模式来保证消息的投递。当生产者P投递消息后会等待消费者C发送确认,P收到确认后可以调用回调函数处理相关业务。在生产者P等待确认的同时也可以继续发送下一条消息。

    服务器管理

    1、虚拟主机vhost

    rabbitmq支持创建虚拟主机,默认的虚拟主机为“/”,默认用户guest;当在rabbitmq集群中创建虚拟主机时,整个集群都会创建。

    2、错误日志查看

    rabbitmq的日志文件在/var/log/rabbitmq/下的rabbit@[localhost].log

    3、rabbitmq配置文件

    配置文件在rpm安装/usr/share/doc/rabbitmq-server-3.6.5/rabbitmq.config.example,复制一份:

    cp /usr/share/doc/rabbitmq-server-3.5.3/rabbitmq.config.example /etc/rabbitmq.config
    

    参考官网配置:https://www.rabbitmq.com/configure.html#configuration-file

    三、底层原理,主要实现

    应用程序和rabbitmq服务器之间建立一条tcp连接,tcp连接打开后,应用程序就可以和rabbitmq创建多条AMQP信道,信道是建立在tcp连接上的虚拟连接。

    四、同类技术产品比较

    1.ActiveMQ

    优点

    单机吞吐量:万级
    topic数量都吞吐量的影响:
    时效性:ms级
    可用性:高,基于主从架构实现高可用性
    消息可靠性:有较低的概率丢失数据
    功能支持:MQ领域的功能极其完备
    缺点:

    官方社区现在对ActiveMQ 5.x维护越来越少,较少在大规模吞吐的场景中使用。

    2.Kafka

    号称大数据的杀手锏,谈到大数据领域内的消息传输,则绕不开Kafka,这款为大数据而生的消息中间件,以其百万级TPS的吞吐量名声大噪,迅速成为大数据领域的宠儿,在数据采集、传输、存储的过程中发挥着举足轻重的作用。

    Apache Kafka它最初由LinkedIn公司基于独特的设计实现为一个分布式的提交日志系统( a distributed commit log),之后成为Apache项目的一部分。

    目前已经被LinkedIn,Uber, Twitter, Netflix等大公司所采纳。

    优点

    性能卓越,单机写入TPS约在百万条/秒,最大的优点,就是吞吐量高。
    时效性:ms级
    可用性:非常高,kafka是分布式的,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
    消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次;
    有优秀的第三方Kafka Web管理界面Kafka-Manager;
    在日志领域比较成熟,被多家公司和多个开源项目使用;
    功能支持:功能较为简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用
    缺点:

    Kafka单机超过64个队列/分区,Load会发生明显的飙高现象,队列越多,load越高,发送消息响应时间变长
    使用短轮询方式,实时性取决于轮询间隔时间;
    消费失败不支持重试;
    支持消息顺序,但是一台代理宕机后,就会产生消息乱序;
    社区更新较慢;

    3.RabbitMQ

    RabbitMQ 2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最主流的消息中间件之一。

    RabbitMQ优点:

    由于erlang语言的特性,mq 性能较好,高并发;
    吞吐量到万级,MQ功能比较完备
    健壮、稳定、易用、跨平台、支持多种语言、文档齐全;
    开源提供的管理界面非常棒,用起来很好用
    社区活跃度高;
    RabbitMQ缺点:

    erlang开发,很难去看懂源码,基本职能依赖于开源社区的快速维护和修复bug,不利于做二次开发和维护。
    RabbitMQ确实吞吐量会低一些,这是因为他做的实现机制比较重。
    需要学习比较复杂的接口和协议,学习和维护成本较高。

    4.RocketMQ

    RocketMQ出自 阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并做出了自己的一些改进。

    RocketMQ在阿里集团被广泛应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。

    RocketMQ优点:

    单机吞吐量:十万级
    可用性:非常高,分布式架构
    消息可靠性:经过参数优化配置,消息可以做到0丢失
    功能支持:MQ功能较为完善,还是分布式的,扩展性好
    支持10亿级别的消息堆积,不会因为堆积导致性能下降
    源码是java,我们可以自己阅读源码,定制自己公司的MQ,可以掌控
    RocketMQ缺点:

    支持的客户端语言不多,目前是java及c++,其中c++不成熟;
    社区活跃度一般
    没有在 mq 核心中去实现JMS等接口,有些系统要迁移需要修改大量代码

    github代码

    https://github.com/LighterTang/RabbitMQUtils/tree/master/rabbitmq_base/src/main/java/com/mq/mqutils/mqdoc

    参考资料

    https://www.rabbitmq.com

    http://youzhixueyuan.com/comparison-of-kafka-rocketmq-rabbitmq.html

  • 相关阅读:
    Spring boot unable to determine jdbc url from datasouce
    Unable to create initial connections of pool. spring boot mysql
    spring boot MySQL Public Key Retrieval is not allowed
    spring boot no identifier specified for entity
    Establishing SSL connection without server's identity verification is not recommended
    eclipse unable to start within 45 seconds
    Oracle 数据库,远程访问 ora-12541:TNS:无监听程序
    macOS 下安装tomcat
    在macOS 上添加 JAVA_HOME 环境变量
    Maven2: Missing artifact but jars are in place
  • 原文地址:https://www.cnblogs.com/monkjavaer/p/10914547.html
Copyright © 2011-2022 走看看