zoukankan      html  css  js  c++  java
  • Kafka提升——接收-处理-发送事务

    接收-处理-发送事务

    在消费者接收到数据后,对数据进行处理,然后进行发送到队列。

    例如:
    用户注册成功后,获得注册优惠券。

    当用户注册成功后,需要向用户表插入数据。同时需要向优惠券表插入新的优惠券信息。若在单体应用中,事务的实现非常容易实现,但是在分布式的服务中,事务的实现就需要进行研究了。

    分布式的事务暂时不表,此处就讨论消息队列如何完成这里的实现。

    假设,目前有用户服务A,优惠券服务B,通过消息队列进行消息传递。

    当用户注册时,首先向A发送消息,完成用户的注册表的插入。然后向B发送信息,通知插入优惠券信息。(loger 代替数据库插入,肯定满足事务,不需要多考虑)

    	@KafkaListener(topics = "registry", containerFactory = "stringKafkaListenerContainerFactory2")
    	@KafkaListener(topics = "youhuiquanhuidiao", containerFactory = "stringKafkaListenerContainerFactory2")
    	@Transactional
    	public void receiveStringRegistry(String message,
    									  @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    		if("youhuiquanhuidiao".equals(topic)){
    			logger.info("优惠券给他了已经!!");
    		}else {
    				logger.info("收到用户信息:"+message);
    				logger.info("用户信息插入");
    				logger.info("用户信息插入完成");
    				transactionalTemplate.send("youhuiquan","新注册用户,给他个优惠券!");
    		}
    	}
    	
    

    B收到消息后,对优惠券信息进行插入,插入完成后,发送信息回调再做另外处理。

    	
    	@KafkaListener(topics = "youhuiquan", containerFactory = "stringKafkaListenerContainerFactory2")
    	@Transactional
    	public void receiveStringYouhuiquan(String message,
    										@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
    		try{
    			logger.info("收到信息:"+message);
    
    			logger.info("优惠券插入");
    
    			logger.info("优惠券插入完成");
    
    			int i = 1/0;
    
    			transactionalTemplate.send("youhuiquanhuidiao","优惠券给他了已经!!");
    		}catch (Exception e){
    			transactionalTemplate.send("youhuiquanhuidiao","发生错误了,优惠券没给成!");
    		}
    	}
    

    controller

    	@RequestMapping("/testSendMsg4")
    	@ResponseBody
    	@Transactional
    	//@KafkaListener(topics = "topic1", containerFactory = "stringKafkaListenerContainerFactory2")
    	public String testSendMsg5(){ //事务发送
    		Message message = new Message();
    		message.setId(1);
    		message.setMsg("我是小明,我来注册");
    		message.setSendTime(new Date());
    		logger.info("发送消息(事务发送) ----->>>>>  message = {}", gson.toJson(message));
    		transactionalTemplate.send("registry", gson.toJson(message)).addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
    			@Override
    			public void onFailure(Throwable throwable) {
    				logger.error("发生错误,消息发送失败!");
    			}
    
    			@Override
    			public void onSuccess(SendResult<String, String> stringStringSendResult) {
    				logger.info("消息发送成功!");
    			}
    		});
    		//int i = 1/0;
    		return "testSendMsg5";
    	}
    

    现在 我们把业务系统想的复杂一些,用户注册完成后还需要进行其他方面的操作,也就是需要向其他服务发出信息。此时若一个发生错误,其他都不在发送(发送错误的处理先不表,可以看下后续的分布式事务的处理。)

    可以看到,添加事务后,发生错误后全都不在进行发送。

  • 相关阅读:
    Ajax
    模型层补充
    Django models.py 模型层(单表多表查询)
    Django 模板层
    Django views.py 视图层
    Django urls.py 路由层
    Browser Security-同源策略、伪URL的域
    Browser Security-css、javascript
    Browser Security-基本概念
    exp2:// 一次存储型XSS从易到难的挖掘过程
  • 原文地址:https://www.cnblogs.com/luckyhui28/p/12518679.html
Copyright © 2011-2022 走看看