zoukankan      html  css  js  c++  java
  • RocketMQ and kafka

    MQ 入门实践

     

    MQ

    业界产品

     ActiveMQRabbitMQRocketMQkafka
    单机吞吐量 万级 万级 10 万级 10 万级
    可用性 非常高 非常高
    可靠性 较低概率丢失消息 基本不丢 可以做到 0 丢失 可以做到 0 丢失
    功能支持 较为完善 基于 erlang,并发强,性能好,延时低 分布式,拓展性好,支持分布式事务 较为简单,主要应用与大数据实时计算,日志采集等
    社区活跃度

    Message Queue,消息队列,FIFO 结构。

    例如电商平台,在用户支付订单后执行对应的操作;

    优点:

    • 异步
    • 削峰
    • 解耦

    缺点

    • 增加系统复杂性
    • 数据一致性
    • 可用性

    JMS

    Java Message Service,Java消息服务,类似 JDBC 提供了访问数据库的标准,JMS 也制定了一套系统间消息通信的规范;

    区别于 JDBC,JDK 原生包中并未定义 JMS 相关接口。

    1. ConnectionFactory

    2. Connection

    3. Destination

    4. Session

    5. MessageConsumer

    6. MessageProducer

    7. Message

    协作方式图示为;

    业界产品

     ActiveMQRabbitMQRocketMQkafka
    单机吞吐量 万级 万级 10 万级 10 万级
    可用性 非常高 非常高
    可靠性 较低概率丢失消息 基本不丢 可以做到 0 丢失 可以做到 0 丢失
    功能支持 较为完善 基于 erlang,并发强,性能好,延时低 分布式,拓展性好,支持分布式事务 较为简单,主要应用与大数据实时计算,日志采集等
    社区活跃度

    ActiveMQ

    作为 Apache 下的开源项目,完全支持 JMS 规范。并且 Spring Boot 内置了 ActiveMQ 的自动化配置,作为入门再适合不过。

    快速开始

    添加依赖;

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-core</artifactId>
        <version>5.7.0</version>
    </dependency>
    

    消息发送;

    // 1. 创建连接工厂
    ConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
    // 2. 工厂创建连接
    Connection connection = factory.createConnection();
    // 3. 启动连接
    connection.start();
    // 4. 创建连接会话session,第一个参数为是否在事务中处理,第二个参数为应答模式
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 5. 根据session创建消息队列目的地
    Destination queue = session.createQueue("test-queue");
    // 6. 根据session和目的地queue创建生产者
    MessageProducer producer = session.createProducer(queue);
    // 7. 根据session创建消息实体
    Message message = session.createTextMessage("hello world!");
    // 8. 通过生产者producer发送消息实体
    producer.send(message);
    // 9. 关闭连接
    connection.close();
    

    Spring Boot 集成

    自动注入参考:org.springframework.boot.autoconfigure.jms.activemq.ActiveMQConnectionFactoryConfiguration.SimpleConnectionFactoryConfiguration

    添加依赖;

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    

    添加 yaml 配置;

    spring:
      activemq:
        broker-url: tcp://localhost:61616
      jms:
        #消息模式 true:广播(Topic),false:队列(Queue),默认时false
        pub-sub-domain: true
    

    收发消息;

    @Autowired
    private JmsTemplate jmsTemplate;
    
    // 接收消息
    @JmsListener(destination = "test")
    public void receiveMsg(String msg) {
        System.out.println(msg);
    }
    
    // 发送消息
    public void sendMsg(String destination, String msg) {
        jmsTemplate.convertAndSend(destination, msg);
    }
    

    高可用

    基于 zookeeper 实现主从架构,修改 activemq.xml 节点 persistenceAdapter 配置;

    <persistenceAdapter>
        <replicatedLevelDB
            directory="${activemq.data}/levelDB"
            replicas="3"
            bind="tcp://0.0.0.0:0"
            zkAddress="172.17.0.4:2181,172.17.0.4:2182,172.17.0.4:2183"
            zkPath="/activemq/leveldb-stores"
            hostname="localhost"
        />
    </persistenceAdapter>
    

    broker 地址为:failover:(tcp://192.168.4.19:61616,tcp://192.168.4.19:61617,tcp://192.168.4.19:61618)?randomize=false

    负载均衡

    在高可用集群节点 activemq.xml 添加节点 networkConnectors;

    <networkConnectors>
        <networkConnector uri="static:(tcp://192.168.0.103:61616,tcp://192.168.0.103:61617,tcp://192.168.0.103:61618)" duplex="false"/>
    </networkConnectors>
    

    更多详细信息可参考:https://blog.csdn.net/haoyuyang/article/details/53931710

    集群消费

    由于发布订阅模式,所有订阅者都会接收到消息,在生产环境,消费者集群会产生消息重复消费问题。

    ActiveMQ 提供 VirtualTopic 功能,解决多消费端接收同一条消息的问题。于生产者而言,VirtualTopic 就是一个 topic,对消费而言则是 queue。

    在 activemq.xml 添加节点 destinationInterceptors;

    <destinationInterceptors> 
        <virtualDestinationInterceptor> 
            <virtualDestinations> 
                <virtualTopic name="testTopic" prefix="consumer.*." selectorAware="false"/>    
            </virtualDestinations>
        </virtualDestinationInterceptor> 
    </destinationInterceptors>
    

    生产者正常往 testTopic 中发送消息,订阅者可修改订阅主题为类似 consumer.A.testTopic 这样来消费。

    更多详细信息可参考:https://blog.csdn.net/java_collect/article/details/82154829

    RocketMQ

    是一个队列模型的消息中间件,具有高性能、高可靠、高实时、分布式特点。

    架构图示

    1. Name Server

      名称服务器,类似于 Zookeeper 注册中心,提供 Broker 发现;

    2. Broker

      RocketMQ 的核心组件,绝大部分工作都在 Broker 中完成,接收请求,处理消费,消息持久化等;

    3. Producer

      消息生产方;

    4. Consumer

      消息消费方;

    快速开始

    安装后,依次启动 nameserver 和 broker,可以用 mqadmin 管理主题、集群和 broker 等信息;

    https://segmentfault.com/a/1190000017841402

    添加依赖;

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.5.2</version>
    </dependency>
    

    消息发送;

    DefaultMQProducer producer = new DefaultMQProducer("producer-group");
    producer.setNamesrvAddr("127.0.0.1:9876");
    producer.setInstanceName("producer");
    producer.start();
    Message msg = new Message(
        "producer-topic",
        "msg",
        "hello world".getBytes()
    );
    //msg.setDelayTimeLevel(1);
    SendResult sendResult = producer.send(msg);
    System.out.println(sendResult.toString());
    producer.shutdown();
    

    delayLevel 从 1 开始默认依次是:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h。

    参考 org.apache.rocketmq.store.schedule.ScheduleMessageService#parseDelayLevel。

    消息接收;

    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group");
    consumer.setNamesrvAddr("127.0.0.1:9876");
    consumer.setInstanceName("consumer");
    consumer.subscribe("producer-topic", "msg");
    consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
        for (MessageExt msg : list) {
            System.out.println(new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    consumer.start();
    

    .mqadmin.cmd sendMessage -t producer-topic -c msg -p "hello rocketmq" -n localhost:9876

    Spring Boot 集成

    添加依赖;

    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-spring-boot-starter</artifactId>
        <version>2.0.4</version>
    </dependency>
    

    添加 yaml 配置;

    rocketmq:
      name-server: 127.0.0.1:9876
      producer:
        group: producer
    

    发送消息;

    @Autowired
    private RocketMQTemplate mqTemplate;
    
    public void sendMessage(String topic, String tag, String message) {
        SendResult result = mqTemplate.syncSend(topic + ":" + tag, message);
        System.out.println(JSON.toJSONString(result));
    }
    

    接收消息;

    @Component
    @RocketMQMessageListener(consumerGroup = "consumer", topic = "topic-test", selectorExpression = "tag-test")
    public class MsgListener implements RocketMQListener<String> {
    
        @Override
        public void onMessage(String message) {
            System.out.println(message);
        }
    }
    

    Console 控制台

    RocketMQ 拓展包提供了管理控制台;

    https://github.com/apache/rocketmq-externals/tree/master/rocketmq-console

    重复消费

    产生原因:

    1. 生产者重复投递;
    2. 消息队列异常;
    3. 消费者异常消费;

    怎么解决重复消费的问题,换句话怎么保证消息消费的幂等性

    通常基于本地消息表的方案实现,消息处理过便不再处理。

    顺序消息

    消息错乱的原因:

    1. 一个消息队列 queue,多个 consumer 消费;
    2. 一个 queue 对应一个 consumer,但是 consumer 多线程消费;

    要保证消息的顺序消费,有三个关键点:

    1. 消息顺序发送
    2. 消息顺序存储
    3. 消息顺序消费

    参考 RocketMq 中的 MessageQueueSelector 和 MessageListenerOrderly。

    分布式事务

    在分布式系统中,一个事务由多个本地事务组成。这里介绍一个基于 MQ 的分布式事务解决方案。

    通过 broker 的 HA 高可用,和定时回查 prepare 消息的状态,来保证最终一致性。

    作者:Leo_wl
             
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
    版权信息
  • 相关阅读:
    迭代器和生成器
    案例:复制大文件
    案例:使用seek倒查获取日志文件的最后一行
    Leetcode165. Compare Version Numbers比较版本号
    Leetcode137. Single Number II只出现一次的数字2
    Leetcode129. Sum Root to Leaf Numbers求根到叶子节点数字之和
    Leetcode116. Populating Next Right Pointers in Each Node填充同一层的兄弟节点
    Leetcode114. Flatten Binary Tree to Linked List二叉树展开为链表
    Leetcode113. Path Sum II路径总和2
    C++stl中vector的几种常用构造方法
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/14763245.html
Copyright © 2011-2022 走看看