zoukankan      html  css  js  c++  java
  • rocketMq学习

    Mq的作用

    1.系统解耦
    2.流量削峰
    3.数据分发

    安装目录说明:

    bin:启动脚本,包括shell脚本和CMD脚本
    conf:实例配置文件,包括broker配置文件,logback配置文件
    lib:依赖jar包,包括Netty,commons-lang,FastJson等

    启动/停止方式:

    1. 启动rocketMq

    #a.启动NameServer
    nohup sh bin/mqnamesrv &
    #b.查看启动日志
    tail -f ~/log/rocketmqlogs/namesrv.log

    2. 启动broker

    #a.启动broker
    nohup sh bin/mqbroker -n localhost:9876 &
    #b.查看启动日志
    tail -f ~/log/rocketmqlogs/broker.log``

     **启动broker时,默认设置的内存较大。可能会发生启动失败的情况。这时需要编辑以下两个文件,修改jvm内存大小

    #编辑runbroker.sh和runserver.sh默认修改JVM大小
    vi runbroker.sh
    vi runserver.sh

    关闭rocketMq

    #1.关闭nameServer
    sh bin/myshutdown namesrv
    #2.关闭broker
    sh bin/myshutdown broker

    可以用jps可以查看进程 || 查看日志。看namesrv和broker是否启动成功

    测试rocketMq发送和接收消息

    1.发送消息
    #1.设置环境变量
    export NAMESRV_ADDR=localhost:9876
    #2.使用安装包的Demo发送消息
    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Product
    
    2.接收消息
    #1.设置环境变量
    export NAMESRV_ADDR=localhost:9876
    #2.接收消息
    sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer

    rocketMq集群搭建

    product:消息的发送者;举例:发信者

    Consumer:消息的接收者;举例:收信者

    Broker:暂存和传输消息;举例:邮局

    NameServer:管理Broker;举例:邮局的管理机构

    Topic:区分消息的种类;一个发送者可以发送给一个或者多个Topic;一个消息的接收者可以订阅一个或者多个Topic消息

    Message Queue:相当于topic的分区;用于并行发送和接收消息

    集群特点:

    NameServer:无状态的节点,启动多个即为集群部署,节点之间没有信息同步

    broker:部署相对复杂,Broker分为Master和Slave。一个Master对应多个Slave。但是一个Slave只能对应一个master

    Master和Slave的对应关系通过指定相同的的BrokerName,不同的BrokerId来指定。brokerId为0表示为master。不为0表示为slave。

    Master也可以部署多个,每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer

    Product与NameServer集群中的一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息。并向提供Topic服务的Master建立长连接,且定时向Master发送心跳。product完全无状态,可集群部署

    Consumer与NameServer集群中的一个节点(随机选择)建立长连接,定期从NameServer取Topic路由信息。并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息,订购规则由Broker配置决定

    消息:

    结构:

    1.消息主体Topic
    2.消息Tag
    3.消息内容

    基本方式:

    生产者发送:

    1.同步--send

    2.异步--异步send(有回调函数)

    3.发送单向消息(只管发 不管结果)--sendOneway

    消息者消费:

    consumer.subscribe(Tpoic, subExpression);//消息的订阅

    设置消息的回调函数,处理消息

    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) ->{
        for(MessageExt msg: msgs) {
            System.out.println("消费消息:"+new String (msg.getBody()));
        }
        return ConsumeOrderlyStatus.SUCCESS;
    })

    consumer.setMessageModel 可以设置消息的消费模式。不设置的话默认是负载均衡模式

    广播的消费模式:rocketMq有N条数据。每个消费者都消费N条

    负载均衡的消费模式(默认):rocketMq有N条数据。所有消费者加起来共消费N条

    顺序消息:

    生产者发送(增加了一个消息队列的选择器):

    /**
    * 实现将同样orderId的消息发送到同一个队列里面
    * message:要发送的消息
    * MessageQueueSelector:消息队列选择器,规则结果相同的 返回同一个队列
    * orderId:队列选择逻辑参数。用来当规则的入参
    */

    SendResult sendResult = producer.send(message, new MessageQueueSelector() {   @Override   public MessageQueue select (List<MessageQueue> mqs, Message msg, Object arg) {     long orderId = (long) arg;     long index = orderId % mqs.size();     return mqs.get((int) index);   } }, orderId);

    消费者消费(增加了顺序性的消息监听器): 

    /**
    * 消费者订阅完消息后,注册消息监听器
    */
    
    consumer.registerMessageListener(new MessageListenerOrderly() {
      @Override
      public ConsumeOrderlyStatus consumeMessage (List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for(MessageExt msg: msgs) {
                System.out.println("消费消息:"+new String (msg.getBody()));
            }
        return ConsumeOrderlyStatus.SUCCESS;
      }
    });
    
    //启动消费者
    consumer.start();

    延迟消息:

    消费消息的速度 比 存储的消息延迟一段时间

     生产者:

    增加对消息的延迟设置

    message.setDelayTimeLevel(2);//设置消息延迟的等级。

    其他上述的生产者

    消费者和上述几个消费者一样

    批量消息:

    生产者发送消息是 不用for循环 逐条发送,直接发送一个List

    消费者没变

    数据不能超过4m。如果超过 需要分割

    过滤消息:

    1.使用tag进行过滤

    2.使用sql语法进行过滤--使用消息选择器

    生产者:message.putUserProperty("i", String.valueof(i));

    消费者:consumer.subscribe(Tpoic, MessageSelector.bySql());//使用消息选择器进行消息的订阅

    事务消息:

    事务消息有3种状态:提交状态,回滚状态,中间状态

    消息的存储结构

    rocketMq将消息数据存在磁盘文件中,通过安装时的配置文件,可以知道消息存储的文件位置。

    1.commitLog: 存储消息的元数据,每个1G。包括Topic、QueueId、Message

    2.ConsumerQueue: 存储消息在CommitLog的索引

    3.IndexFile: 为了消息查询提供了一种通过key或者时间区间来查询消息的方法。通过这种IndexFile来查找消息的方法不影响发送和消费消息的主流程

    刷盘机制

    1.同步刷盘:生产者发送消息过来时,先将消息写到内存中,然后阻塞生产者响应,唤醒刷盘线程,等刷盘完成后,唤醒生产者响应,将响应结果告知生产者

    2.异步刷盘:生产者发送消息过来时,将消息写到内存中,然后立即返回生产者响应,等数据积累一定量后,唤醒刷盘线程,进行刷盘保存。

    通过Broker配置文件中的flushDiskType参数设置刷盘机制的模式。这个参数被配置成SYNC_FLUSH或者ASYNC_FLUSH

    建议:一般刷盘设置成异步刷盘的。broker设置为主从同步复制

    生产者|消费者的负载均衡

    product:

    在发送消息时,默认会才用Roundbin轮询所有的messageQueue进行发送。让消息平均落在不同的Queue上。而由于Queue可以散落在不同的broker上。所以消息可以发送到不同的broker节点上。

    consumer:

    负载均衡模式(1条消息只被消费1次):采用AllocateMessageQueueAveragely或者AllocateMessageQueueAveragelybyCircle分配算法。每个consumer平均分配consumer queue。如果消费者个数>consumer queue个数。多出来的消费者将分配不到消息队列。此时多出来的消费者是没有用的。

    广播模式(1条消息被所有消费者消费):由于广播模式的特性,所以每个consumer分配到了所有的consumer queue

    消息重试

    顺序消息的重试:

    对于顺序消息,当消费者消费失败后,消息队列会不断进行消息重试(1s1次),此时。容易造成应用程序的消息阻塞。因此在使用顺序消息时,要保证应用能够及时监控并处理消费失败的情况。避免阻塞现象的发生。

    无序消息的重试:

    包括普通、定时、延时、事务消息。当无序消息失败时。可以通过设置返回状态达到消息重试的结果。无序消息的重试只是针对负载均衡的消费模式。若是广播模式,失败的消息不进行重试,继续消费新的消息。

    最多可以进行16次消息重试。需要4小时46分钟。若是超过16次都重试失败,则该条消息不再重试投递,进入死信队列。一条消息无论重试多少次,这些重试消息的MessageId不会改变。

     

    死信队列

    多次重试失败的消息进入死信队列

    特征:

    1.不会再被消费者正常消费

    2.有效期和正常消息一样 都是3天。3天后自动删除

    3.1个死信队列对应一个Group Id ,而不是对应单个消费者实例

    可以在RocketMq的控制台查看并重新发送该消息。让消费者重新消费

    消息幂等性

    消息重复场景:

    1.发送时消息重复

    2.投递时消息重复

    3.负载均衡时消息重复

  • 相关阅读:
    Hibernate学习笔记
    Oracle12c 在windonServer2012中安装的步骤
    提升tomcat服务器性能的七条经验
    zyUpload---照片上传并显示效果
    js的隐含参数(arguments,callee,caller)使用方法
    js中callback.call()和callback()的区别
    理解javascript中的回调函数(callback)
    Spring MVC 流程图
    JDBC在springMvc等框架中使用的方式
    为IE8添加EventListener系列方法支持
  • 原文地址:https://www.cnblogs.com/linhongwenBlog/p/12368137.html
Copyright © 2011-2022 走看看