zoukankan      html  css  js  c++  java
  • rocketmq 事务消息

    1 配置 生产者

      和普通的生成者不同,需要使用事物消息的生产者,并且需要指定  TransactionListenerImpl ,TransactionListenerImpl 

      

    	/**
    	 * 初始化生产者
    	 * 
    	 * @param rockterMQConfig
    	 * @return
    	 * @throws MQClientException
    	 */
    	@Bean
    	public DefaultMQProducer defaultMQProducer(RockterMQConfig rockterMQConfig) throws MQClientException {
    		TransactionMQProducer producer = new TransactionMQProducer(rockterMQConfig.getProducerGroup());
    		producer.setNamesrvAddr(rockterMQConfig.getNamesrvAddr());
    		producer.setInstanceName(rockterMQConfig.getInstanceName());
    		producer.setTransactionListener( new TransactionListenerImpl() );
    		producer.start();
    		return producer;
    	}
    

      

    2 实现 TransactionListener  ,TransactionListener   里面有两个方法,分别描述怎么持久化这个消息发送记录,和 提供会查,以供决定未被确认的消息应该丢弃,还是确认,或者重复回查

      

    package com.xyebank.hzx.message.listener;
    
    import java.util.concurrent.ConcurrentHashMap;
    
    import org.apache.rocketmq.client.producer.LocalTransactionState;
    import org.apache.rocketmq.client.producer.TransactionListener;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    
    
    public class TransactionListenerImpl implements TransactionListener  {
    
    	// 存储事务状态信息 key:事务id value:当前事务执行的状态
    	private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    
    	// 执行本地事务
    	@Override
    	public LocalTransactionState executeLocalTransaction(Message message, Object o) {
    		// 事务id
    		String transactionId = message.getTransactionId();
    		// 0:执行中,状态未知 1:执行成功 2:执行失败
    		localTrans.put(transactionId, 0);
    		// 业务执行,本地事务,service
    		System.out.println("hello-demo-transaction");
    		try {
    			System.out.println("正在执行本地事务---");
    			Thread.sleep( 60*1000 );
    			System.out.println("本地事务执行成功---");
    			localTrans.put(transactionId, 1);
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    			localTrans.put(transactionId, 2);
    			return LocalTransactionState.ROLLBACK_MESSAGE;
    		}
    		return LocalTransactionState.COMMIT_MESSAGE;
    	}
    
    	// 消息回查
    	@Override
    	public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
    		// 获取对应事务的状态信息
    		String transactionId = messageExt.getTransactionId();
    		// 获取对应事务id执行状态
    		Integer status = localTrans.get(transactionId);
    		// 消息回查
    		System.out.println("消息回查---transactionId:" + transactionId + "状态:" + status);
    		switch (status) {
    		case 0:
    			return LocalTransactionState.UNKNOW;
    		case 1:
    			return LocalTransactionState.COMMIT_MESSAGE;
    		case 2:
    			return LocalTransactionState.ROLLBACK_MESSAGE;
    		}
    		return LocalTransactionState.UNKNOW;
    	}
    
    }
    

      

    备注:正常我们肯定不会这么干,因为这样重启一次就丢了。最好的做法是和本地数据库使用同一套事务环境,持久到本地数据库中。写到每次写入都持久化的redis也是可以的。

    3 事务消息的发送

      

    	@Override
    	@Transactional
    	public void send(MessageInfo mqMessage) throws Exception  {
    		
    		if (mqMessage == null) {
    			log.debug("无效的消息");
    			return;
    		}
    
    		String body = com.alibaba.fastjson.JSONObject.toJSONString(mqMessage);
    		Message message = new Message("log-topic", "user-tag", body.getBytes());
    		producer.sendMessageInTransaction(message, "args2");
    		log.debug("发送了消息:{}", body);
    		
    		
    	}
    

      

      备注:这里是发送事务消息,不是普通消息,第二个参数是可以随便填写,可以在TransactionListener 里面获取到

    4 接收者那边就正常接收了。没啥特别的,只有在消息发

    原理,发送的事务消息会在 mq中存着,不会投递出去,TransactionListener 鉴定到 事务提交,并且做好记录以后,会确认这个消息,然后消息会被投递到消费者,如果长时间没有收到确认消息,mq会调用回查接口,然后决定应该重发,删除,还是确认这个消息。

    关于rocktermq 的 两篇博客

    https://blog.csdn.net/u012921921/article/details/104331412


    https://blog.51cto.com/qiangsh/1937184

    能耍的时候就一定要耍,不能耍的时候一定要学。
  • 相关阅读:
    函数的返回值与调用
    函数的定义
    文件的高级应用
    文件三种打开模式
    c++0x11新特性:delete删除函数
    网络研发工程师
    cannot find -lGL
    webSocket 使用 HttpSession 的数据配置与写法
    websocket 使用 spring 的service层 ,进而调用里面的 dao层 来操作数据库 ,包括redis、mysql等通用
    redis 重启服务丢失 密码设置 现象 与 解决过程
  • 原文地址:https://www.cnblogs.com/cxygg/p/15030337.html
Copyright © 2011-2022 走看看