zoukankan      html  css  js  c++  java
  • RocketMQ读书笔记2——生产者

    【生产者的不同写入策略】

    生产者向消息队列里写入数据,不同的业务需要生产者采用不同的写入策略:

    同步发送、异步发送、延迟发送、发送事务消息等。

    【DefaultMQProduce示例】

    public class ProducerQuickStart {
    
        public static void main(String[] args) throws MQClientException,InterruptedException {
            /**1.设置Producer的GroupName**/
            DefaultMQProducer producer = new DefaultMQProducer("GROUP_B");
            /**2.设置Instance**/
            producer.setInstanceName("instanceB");
            /**3.设置发送失败的重试次数**/
            producer.setRetryTimesWhenSendFailed(3);
            /**4.设置NameServer的地址**/
            producer.setNamesrvAddr("127.0.0.1:9876;127.0.0.2:9876");
            /**5.启动Producer**/
            producer.start();
            for (int i = 0; i < 10; i++) {
                try{
                    /**6.组装消息并发送**/
                    Message msg = new Message("TopicTest","TagA",
                            ("Hello HigginCui:"+i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                    producer.send(msg, new SendCallback() {
                        @Override
                        public void onSuccess(SendResult sendResult) {
                            System.out.println("sendResultStatus:" + sendResult.getSendStatus());
                        }
    
                        @Override
                        public void onException(Throwable e) {
                            e.printStackTrace();
                        }
                    });
                }catch (Exception e){
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
            producer.shutdown();
    
        }
    }

     [ 提示:设置Instance ]

    当一个JVM需要启动多个Producer的时候,通过设置不同的InstanceName来区分,不设置的话系统使用默认名称“DEFAULT”。

    [ 提示:设置发送失败的重试次数 ]

    当网络出现异常的时候,这个次数影响消息的重复投递次数。想保证消息不丢失,可以设置多重试几次。

    【消息发送的返回值】

    FLUSH_DISK_TIMEOUT
    刷盘超时(需要Broker设置为SYNC_FLUSH同步刷盘才会报这个错)
    FLUSH_SLAVE_TIMEOUT
    主从同步超时(在主备方式,且Broker设置为SYNC_MASTER情况下)
    SLAVE_NOT_AVALIABLE
    没有找到被设置成SLAVE的Broker。(在主备方式,且Broker设置成SYNC_MASTER的情况下)
    SEND_OK
    发送成功(需要结合所配置的 刷盘策略、主从策略来定)

    【延迟消息】

    RocketMQ支持延迟消息,Broker收到这类消息后,延迟一段时间再处理,使消息在规定的一段时间内生效。

    延迟消息使用方法:

    在创建Message对象时,调用setDelayTimeLevel(int level)方法设置延迟时间,然后再把这个新消息发送出去。

    目前延迟消息不支持任意设置,仅支持预设值的时间长度(1s/5s/10s/30s/1m/2m/3m/4m/5m/6m/7m/8m/9m/10m/20m/30m/1h/2h)。如setDelayTimeLevel(3)表示延迟10s。

     【自定义消息发送规则】

      一个Topic下会有多个MessageQueue,如果使用Producer的默认配置,这个Producer会轮流向各个MessageQueue发送消息。Consumer消费的时候,会根据负载均衡策略,消费分配到的MessageQueue。不经过特定设置,某条消息发往哪个MessageQueue,被哪个Consumer消费都是未知的,

    [ 如果把同一类型的消息发往相同的MessageQueue? ]

    想把同一类型的消息发往相同的MessageQueue,可以用MessageQueueSelector。

    代码示例:

    public class OrderMessageQueueSelector implements MessageQueueSelector {
    
        /**
         * 根据订单的id值平均分配对应的MessageQueue
         * @param mqs 消息队列
         * @param msg 消息
         * @param orderKey
         * @return
         */
        @Override
        public MessageQueue select(List<MessageQueue> mqs, Message msg, Object orderKey) {
            int id = Integer.parseInt(orderKey.toString());
            int idMainIndex = id/100;
            int size = mqs.size(); //MessageQueue的总数
            int index= idMainIndex /size ;
            return mqs.get(index);  //返回选中的MessageQueue
        }
    }

    在发送消息的时候,把MessageQueueSelector的对象作为参数,使用

    MQProducer接口的自定义发送方法:
    SendResult send(final Message msg, final MessageQueueSelector selector, final Object arg)

    在MessageQueueSelector的实现中,根据传入的Object参数,或者根据Message消息内容确定把消息发往哪个MessageQueue,返回被选中的MessageQueue。

    【事务消息】

    RocketMQ的事务消息:发送消息事件和其他事件需要同时成功或失败。

    如转账操作:A账户转账1W到B账户,A发送"B账户加1W"的消息,要和"A账户扣除1W"的这个操作同时成功或失败。

    [ 关键词 ]

    两阶段提交。

    [ 大致流程 ]

    RocketMQ采用两阶段提交的方式实现事务消息。

    TransactionMQProducer处理流程如下:

    1.先发一个"B账户增加1W"的待确认消息。

    2.发送成功后做"A账户扣除1W"的操作

    3.根据"A账户扣除1W"的操作成功与否,决定之前"B账户增加1W"的消息是commit还是rollback。

    [ 具体流程 ]

    1.Producer向MQ发送"B账户增加1W"的待确认消息。

    2.RocketMQ将这个待确认消息持久化成功后,向Producer回复消息发送成功,此时第一阶段消息发送完成。

    3.执行本地事件逻辑,即"A账户扣除1W"的操作。

    4.Producer根据本地事件执行结果向RocketMQ发送二次确认(Commit或RollBack)消息:

    如果收到Commit状态则将第一阶段的待确认消息标记为“可投递”,Consumer将收到该消息;

    如果收到RocketBack状态则删除第一阶段的待确认消息,Consumer无法收到该消息。

    5.若中途出现异常,步骤4提交的二次确认最终未到达RocketMQ,服务器在经过固定的时间会对“待确认”消息发起回查请求。

    6.Producer收到回查请求后,通过检查本地对应消息的本地事件执行结果返回Commit或RockBack状态(如果发送第一阶段待确认消息的Producer不能工作,回查请求将被发送到和Producer在同一个Group里的其他Producer)。

    【为什么RocketMQ4.x版本删除事务消息】

    虽然上述的方案很好地实现了事务消息功能,也是RocketMQ之前的版本实现事务消息的逻辑,因为RocketMQ依赖将数据顺序写到磁盘的这个特征来提高性能,步骤4需要更改第一阶段待确认消息的状态,这样会导致磁盘Catch的脏页过多,降低了系统性能,所以RocketMQ在4.x版本将这部分功能去除了。

  • 相关阅读:
    flash中网页跳转总结
    as3自定义事件
    mouseChildren启示
    flash拖动条移出flash无法拖动
    需要一个策略文件,但在加载此媒体时未设置checkPolicyFile标志
    Teach Yourself SQL in 10 Minutes
    电子书本地转换软件 Calibre
    Teach Yourself SQL in 10 Minutes
    Teach Yourself SQL in 10 Minutes
    Teach Yourself SQL in 10 Minutes – Page 31 练习
  • 原文地址:https://www.cnblogs.com/HigginCui/p/10030586.html
Copyright © 2011-2022 走看看