zoukankan      html  css  js  c++  java
  • RocketMq的事务消息发送方法,消息零丢失的实现方式,代码流程讲解,干货分享

    1.消息发送mq不丢失实现方式
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.client.producer.TransactionMQProducer;
    import org.apache.rocketmq.client.producer.TransactionSendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    import java.io.UnsupportedEncodingException;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.ThreadFactory;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @description RocketMQ事务消息发送
     */
    public class TransactionProducer {
        public static void main(String[] args) throws MQClientException, UnsupportedEncodingException {
            //用来接收RocketMQ回调的监听接口
            //这里是我们自己定义的实现执行本地事务,commit.rollback,回调查询等逻辑
            TransactionListener transactionListener = new TransactionListenerImpl();
            //下面就是创建一个支持事务消息的Producer
            //对这个Producer指定一个生产者分组
            TransactionMQProducer producer = new TransactionMQProducer("test");
    
            //下面指定了一个线程池,里面包含一些线程
            //这个线程就是用来处理RocketMQ的回调函数
            ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(
                    1,//核心线程数
                    2,//线程池最大线程数
                    1000,//超时时间
                    TimeUnit.SECONDS,//时间的单位
                    new ArrayBlockingQueue<Runnable>(2000),//存放阻塞线程的列表
                    new ThreadFactory() {//线程创建工厂
                        @Override
                        public Thread newThread(Runnable r) {
                            Thread thread = new Thread(r);
                            thread.setName("TestThread");
                            return thread;
                        }
                    }
            );
            //给事务消息生产者设置对应的线程池,负责执行RocketMQ的回调请求
            producer.setExecutorService(poolExecutor);
            //给生产者设置对应的回调函数
            producer.setTransactionListener(transactionListener);
            //启动消息生产者
            producer.start();
            //虚拟一条成功的消息
            Message message = new Message(
                    "successTopic"
                    , "tag"
                    , "key",
                    ("成功的消息").getBytes(RemotingHelper.DEFAULT_CHARSET));
            //这里存放发送的half消息到内存或者磁盘文件中,后台开启一个线程,扫描这个文件,如果超过一定时间没有收到响应,就回滚业务
            //save(message)
            //将消息作为half消息的模式发送出去,如果发送失败,则会收到一个异常,我们捕获异常进行对应的异常处理即可
            try {
                TransactionSendResult sendResult = producer.sendMessageInTransaction(message, null);
                //这里可以更新或者删除存放在本地内存或者磁盘文件的消息记录
                //delete(message)
            }catch (Exception e){
                //half消息发送失败
                //本地系统执行业务回滚,更新数据库信息等操作
            }
        }
    }

    上面是发送RocketMq的事务消息发送方法

    下面是RocketMq的事务消息发送方法的回调函数的实现类

    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    
    /**
     * @description 事务监听实现类
     */
    public class TransactionListenerImpl implements TransactionListener {
    
        //如果half消息发送成功了就会回调这个函数,就可以执行本地事务
        @Override
        public LocalTransactionState executeLocalTransaction(Message message, Object o) {
            //执行本地事务
            //根据本地一连串的事务执行结果,去选择commit或者rollback
            try{
                //如果本地事务都执行成功了,返回commit
                return LocalTransactionState.COMMIT_MESSAGE;
            }catch (Exception e){
                //本地事务都失败了,回滚所有的执行过的操作
                //返回rollback,标记half消息无效
                return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
    
        //如果因为各种原因生产者没有返回commit或者rollback给Broker,
        //broker会定时扫描没有回应的half消息,然后回调这个函数
        @Override
        public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
            //查询本地事务,是否都执行成功
            Integer status = 0;
            //Integer status = localTrans.get(messageExt.getTransactionId());
            //根据本地事务情况选择执行commit或者rollback
            if (null != status){
                switch(status){
                    case 0:return LocalTransactionState.UNKNOW;
                    case 1:return LocalTransactionState.COMMIT_MESSAGE;
                    case 2:return LocalTransactionState.ROLLBACK_MESSAGE;
                }
            }
            return LocalTransactionState.COMMIT_MESSAGE;
        }
    }

     以上就是RocketMq的事务消息发送方法,可以实现发送消息的零丢失,以事务的方式确保消息一定可以发送到RocketMQ

    2.mq消息不丢失实现方式

      以上仅仅只能保证消息发送到mq成功,但是一定能保证消息不丢失吗?显然是不行的;

      假设1:消息发送到了mq,就一定进入到了磁盘文件了吗?rocketmq是先存入os cache中,也就是内存,如果这个时候机器宕机,内存上的数据也就全部丢失了,显然消息也会丢失

      解决方案:rocketmq默认是异步刷盘的模式,保证了数据的高吞吐量,但是有可能出现消息丢失的情况,可以改为同步刷盘策略,通过修改配置文件broker.config中flushDiskType参数为SYNC_FLUSH即可,这样,只要mq告诉我们half消息响应成功了,就代表成功写入了磁盘中了;

      假设2:消息写入磁盘就一定不会丢失吗?显然也不能;如果磁盘损坏,那么消息也会丢失

      解决方案:基于Dledger和Raft协议的rocketmq主从架构,只要消息写入master成功了,那么就一定会基于raft协议同步给其他的broker,就算master机器磁盘损坏,那么一定有broker存储了同样的消息,可以确保消息在mq上不会丢失;

  • 相关阅读:
    Validate US Telephone Numbers
    7月份总结
    Arguments Optional
    Everything Be True
    手机开发网页模板(20140124)
    整站开发初始化
    switch滑动开关
    js 面向对象
    Bootstrap 导航栏
    Bootstrap 标签页
  • 原文地址:https://www.cnblogs.com/bin-zhao/p/13595226.html
Copyright © 2011-2022 走看看