技术交流群:233513714
1 @Slf4j 2 @RestController 3 @Component 4 public class VouchersReceiverController implements MessageListener { 5 6 @Autowired 7 private VouchersService vouchersService; 8 9 String MerchantCode = PropertyReader.getValue("MerchantCode"); 10 11 /** 12 * 启动监听 13 */ 14 public void ReceiverVouchersStart() { 15 new ClassPathXmlApplicationContext("spring/rabbitmq-consumer-resources.xml");17 } 18 19 /** 20 * 监听MQ消息队列 21 * @param message 22 */ 23 public void onMessage(Message message) { 24 String phoneNo = ""; 25 String orderNo = ""; 26 String faceValue = ""; 27 String voucherNo = ""; 28 try { 29 String str = new String(message.getBody()); 30 log.info("监听MQ中的信息{}" + str); 31 VouchersResponse vouchers = JsonUtil.fromJson(str, VouchersResponse.class); 32 if ("000000".equals(vouchers.getRESPONSECODE())) { 33 List<VouchersResponseResult> result = vouchers.getRESULTMAP().getRESULT(); 34 for (VouchersResponseResult list : result) { 35 phoneNo = list.getLoginId(); 36 orderNo = list.getAcceptTransSeqNo().substring(0, 20); 37 faceValue = list.getRebateAmt(); 38 voucherNo = list.getVoucherNo(); 39 } 40 } 41 } catch (Exception e) { 42 log.info("监听出现异常{}" + e); 43 } 44 VoucherReqInfoObj obj = new VoucherReqInfoObj(); 45 obj.setProductNo(phoneNo); 46 obj.setOrderTotalAmount(faceValue); 47 obj.setMerchantCode(MerchantCode); 48 obj.setOrderChannel("08"); 49 TInsuranceUserTicket tInsuranceUserTicket = new TInsuranceUserTicket(); 50 tInsuranceUserTicket.setOrderNo(orderNo); 51 tInsuranceUserTicket.setVoucherUseStatus("1"); 52 log.info("状态修改参数:{}" + tInsuranceUserTicket); 53 vouchersService.updateUserTicketStatus(tInsuranceUserTicket); 54 log.info("状态修改完成"); 55 } 56 }
以上代码就是我们要展开讨论的部分。当代码发到服务器上我们向MQ中推一条消息后就会收到一个回调,消费这个回调信息的地方就在onMessage方法中,这就是这个MQ监听的工作流程。正常情况下当监听到消息后会消费下一条消息,如果没有消息则不会再进行消费。然而上面的代码会出现一个很奇怪的问题,当一条消息进来之后会出现重复消费的现象,查看服务日志之后发现程序在走到53行时不再向下执行而是又返回到28行执行,起初考虑是不是因为For循环造成了死循环,但是这个疑问很快被打消,如果是在For循环出现了死循环程序会打不到第30行的日志。
通过在网上谷歌,我将catch的位置进行了调整,代码如下:
1 /** 2 * 监听MQ消息队列 3 * @param message 4 */ 5 public void onMessage(Message message) { 6 String phoneNo = ""; 7 String orderNo = ""; 8 String faceValue = ""; 9 String voucherNo = ""; 10 try { 11 String str = new String(message.getBody()); 12 log.info("监听MQ中的信息{}" + str); 13 VouchersResponse vouchers = JsonUtil.fromJson(str, VouchersResponse.class); 14 if ("000000".equals(vouchers.getRESPONSECODE())) { 15 List<VouchersResponseResult> result = vouchers.getRESULTMAP().getRESULT(); 16 for (VouchersResponseResult list : result) { 17 phoneNo = list.getLoginId(); 18 orderNo = list.getAcceptTransSeqNo().substring(0, 20); 19 faceValue = list.getRebateAmt(); 20 voucherNo = list.getVoucherNo(); 21 } 22 } 23 VoucherReqInfoObj obj = new VoucherReqInfoObj(); 24 obj.setProductNo(phoneNo); 25 obj.setOrderTotalAmount(faceValue); 26 obj.setMerchantCode(MerchantCode); 27 obj.setOrderChannel("08"); 28 TInsuranceUserTicket tInsuranceUserTicket = new TInsuranceUserTicket(); 29 tInsuranceUserTicket.setOrderNo(orderNo); 30 tInsuranceUserTicket.setVoucherUseStatus("1"); 31 log.info("状态修改参数:{}" + tInsuranceUserTicket); 32 vouchersService.updateUserTicketStatus(tInsuranceUserTicket); 33 log.info("状态修改完成"); 34 } catch (Exception e) { 35 log.info("监听出现异常{}" + e); 36 } 37 }
以上就是调整后的代码,死循环的问题终于被解决了。但是新的问题出现了,程序在走到32行时就会报出NullPointerException,没猜错这个是因为没有注入VouchersService造成的,然后在选择注入方式的时候发现RabbitMQ已经整合到spring中了。所以只能通过spring注入VouchersService注入了,经过试用发现构造方法的方式注入会出问题,所以最后选择了属性注入的方式。Controller层、Service层、配置文件依次如下:
Controller层:
1 @Slf4j 2 @RestController 3 @Component 4 public class VouchersReceiverController implements MessageListener { 5 6 private VouchersService vouchersService; 7 8 public VouchersService getVouchersService() { 9 return vouchersService; 10 } 11 12 public void setVouchersService(VouchersService vouchersService) { 13 this.vouchersService = vouchersService; 14 } 15 16 String MerchantCode = PropertyReader.getValue("MerchantCode"); 17 18 /** 19 * 启动监听 20 */ 21 public void ReceiverVouchersStart() { 22 new ClassPathXmlApplicationContext("spring/rabbitmq-consumer-resources.xml"); 23 log.info("保险前置开启对代金券营销平台的MQ监听"); 24 } 25 26 /** 27 * 监听MQ消息队列 28 * @param message 29 */ 30 public void onMessage(Message message) { 31 String phoneNo = ""; 32 String orderNo = ""; 33 String faceValue = ""; 34 String voucherNo = ""; 35 try { 36 String str = new String(message.getBody()); 37 log.info("监听MQ中的信息{}" + str); 38 VouchersResponse vouchers = JsonUtil.fromJson(str, VouchersResponse.class); 39 if ("000000".equals(vouchers.getRESPONSECODE())) { 40 List<VouchersResponseResult> result = vouchers.getRESULTMAP().getRESULT(); 41 for (VouchersResponseResult list : result) { 42 phoneNo = list.getLoginId(); 43 orderNo = list.getAcceptTransSeqNo().substring(0, 20); 44 faceValue = list.getRebateAmt(); 45 voucherNo = list.getVoucherNo(); 46 } 47 } 48 VoucherReqInfoObj obj = new VoucherReqInfoObj(); 49 obj.setProductNo(phoneNo); 50 obj.setOrderTotalAmount(faceValue); 51 obj.setMerchantCode(MerchantCode); 52 obj.setOrderChannel("08"); 53 TInsuranceUserTicket tInsuranceUserTicket = new TInsuranceUserTicket(); 54 tInsuranceUserTicket.setOrderNo(orderNo); 55 tInsuranceUserTicket.setVoucherUseStatus("1"); 56 log.info("状态修改参数:{}" + tInsuranceUserTicket); 57 getVouchersService().updateUserTicketStatus(tInsuranceUserTicket); 58 log.info("状态修改完成"); 59 } catch (Exception e) { 60 log.info("监听出现异常{}" + e); 61 } 62 } 63 }
Service层:
1 @Service 2 @Slf4j 3 @Component 4 public class VouchersService { 5 @Autowired 6 private TInsuranceUserTicketMapper tInsuranceUserTicketMapper; 7 8 public void updateUserTicketStatus(TInsuranceUserTicket tInsuranceUserTicket) { 9 tInsuranceUserTicket.setUpdatedTime(new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date())); 10 log.info("代金券使用状态修改参数{}" + tInsuranceUserTicket); 11 try { 12 tInsuranceUserTicketMapper.updateUserTicketStatus(tInsuranceUserTicket); 13 log.info("代金券使用状态已经修改{}" + tInsuranceUserTicket); 14 } catch (Exception e) { 15 log.info("代金券使用状态修改出现异常{}" + e); 16 } 17 } 18 }
配置文件(rabbitmq-consumer-resources.xml):
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 5 xmlns:context="http://www.springframework.org/schema/context" 6 xsi:schemaLocation="http://www.springframework.org/schema/beans 7 http://www.springframework.org/schema/beans/spring-beans.xsd 8 http://www.springframework.org/schema/rabbit 9 http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd 10 http://www.springframework.org/schema/context 11 http://www.springframework.org/schema/context/spring-context.xsd"> 12 13 <context:property-placeholder location="classpath:/properties/test.properties"/> 14 15 <!-- 配置连接工厂 --> 16 <rabbit:connection-factory id="connectionFactory" 17 host="${rabbit.host}" 18 port="${rabbit.port}" 19 username="${rabbit.consumer.username}" 20 password="${rabbit.consumer.password}" 21 virtual-host="${rabbit.vhost}" 22 connection-factory="refConnectionFactory"/> 23 24 <!-- 配置心跳、超时、自动恢复--> 25 <bean id="refConnectionFactory" class="com.rabbitmq.client.ConnectionFactory"> 26 <property name="requestedHeartbeat" value="240"></property> 27 <property name="connectionTimeout" value="10000"></property> 28 <property name="automaticRecoveryEnabled" value="true"></property> 29 </bean> 30 31 <!-- 配置消费者监听器,指定队列名及监听类 --> 32 <rabbit:listener-container connection-factory="connectionFactory" prefetch="1" concurrency="10" acknowledge="auto"> 33 <rabbit:listener queue-names="${rabbit.queueName.reply}" ref="vouchersReceiverController"/> 34 </rabbit:listener-container> 35 36 <!-- 监听类 --> 37 <bean id="vouchersReceiverController" class="com.bestpay.insurance.dal.controller.vouchers.VouchersReceiverController"> 38 <!--注入Service--> 39 <property name="vouchersService" ref="vouchersService"/> 40 </bean> 41 42 <bean id="vouchersService" class="com.bestpay.insurance.service.vouchers.VouchersService"/> 43 </beans>
经过了一番这样配置之后项目再一次启动起来,心中窃喜应该不会再有什么问题了吧。果然Service层可以注入进来了,并且程序顺利的走入到了Service层的updateUserTicketStatus方法中。问题又一次出现了Service层中第10行的日志可以被打出来,但是之后又会报出NullPointerException的异常,这回问题显而易见,应该是TInsuranceUserTicketMapper接口没有被注入进来,考虑到mapper接口的注入方式是和一般的bean注入是不一样的,所以经过了一番谷歌发现的确是不一样的。以下给出调整后的Service层(Service注入mapper的方式依然采用属性注入)、Mapper接口、调整后的配置文件(rabbitmq-consumer-resources.xml):
Service层:
1 @Service 2 @Slf4j 3 @Component 4 public class VouchersService { 5 6 private TInsuranceVouchersMapper tInsuranceVouchersMapper; 7 8 private TInsuranceUserTicketMapper tInsuranceUserTicketMapper; 9 10 public TInsuranceVouchersMapper getTInsuranceVouchersMapper() { 11 return tInsuranceVouchersMapper; 12 } 13 14 public void setTInsuranceVouchersMapper(TInsuranceVouchersMapper tInsuranceVouchersMapper) { 15 this.tInsuranceVouchersMapper = tInsuranceVouchersMapper; 16 } 17 18 public TInsuranceUserTicketMapper getTInsuranceUserTicketMapper() { 19 return tInsuranceUserTicketMapper; 20 } 21 22 public void setTInsuranceUserTicketMapper(TInsuranceUserTicketMapper tInsuranceUserTicketMapper) { 23 this.tInsuranceUserTicketMapper = tInsuranceUserTicketMapper; 24 } 25 26 public void updateUserTicketStatus(TInsuranceUserTicket tInsuranceUserTicket) { 27 tInsuranceUserTicket.setUpdatedTime(new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format(new Date())); 28 log.info("代金券使用状态修改参数{}" + tInsuranceUserTicket); 29 try { 30 getTInsuranceUserTicketMapper().updateUserTicketStatus(tInsuranceUserTicket); 31 log.info("代金券使用状态已经修改{}" + tInsuranceUserTicket); 32 } catch (Exception e) { 33 log.info("代金券使用状态修改出现异常{}" + e); 34 } 35 } 36 }
Mapper接口:
1 @Component 2 public interface TInsuranceUserTicketMapper { 3 int updateUserTicketStatus(TInsuranceUserTicket record); 4 }
配置文件(rabbitmq-consumer-resources.xml):
1 <?xml version="1.0" encoding="UTF-8"?> 2 <beans xmlns="http://www.springframework.org/schema/beans" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 5 xmlns:context="http://www.springframework.org/schema/context" 6 xsi:schemaLocation="http://www.springframework.org/schema/beans 7 http://www.springframework.org/schema/beans/spring-beans.xsd 8 http://www.springframework.org/schema/rabbit 9 http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd 10 http://www.springframework.org/schema/context 11 http://www.springframework.org/schema/context/spring-context.xsd"> 12 13 <context:property-placeholder location="classpath:/properties/test.properties"/> 14 15 <import resource="classpath:spring/spring-datasource.xml"/> 16 17 <!-- 配置连接工厂 --> 18 <rabbit:connection-factory id="connectionFactory" 19 host="${rabbit.host}" 20 port="${rabbit.port}" 21 username="${rabbit.consumer.username}" 22 password="${rabbit.consumer.password}" 23 virtual-host="${rabbit.vhost}" 24 connection-factory="refConnectionFactory"/> 25 26 <!-- 配置心跳、超时、自动恢复--> 27 <bean id="refConnectionFactory" class="com.rabbitmq.client.ConnectionFactory"> 28 <property name="requestedHeartbeat" value="240"></property> 29 <property name="connectionTimeout" value="10000"></property> 30 <property name="automaticRecoveryEnabled" value="true"></property> 31 </bean> 32 33 <!-- 配置消费者监听器,指定队列名及监听类 --> 34 <rabbit:listener-container connection-factory="connectionFactory" prefetch="1" concurrency="10" acknowledge="auto"> 35 <rabbit:listener queue-names="${rabbit.queueName.reply}" ref="vouchersReceiverController"/> 36 </rabbit:listener-container> 37 38 <!-- 监听类 --> 39 <bean id="vouchersReceiverController" class="com.bestpay.insurance.dal.controller.vouchers.VouchersReceiverController"> 40 <!--注入Service--> 41 <property name="vouchersService" ref="vouchersService"/> 42 </bean> 43 44 <bean id="vouchersService" class="com.bestpay.insurance.service.vouchers.VouchersService"> 45 <!--注入Mapper--> 46 <property name="TInsuranceUserTicketMapper" ref="tInsuranceUserTicketMapper"/> 47 </bean> 48 49 <bean id="tInsuranceUserTicketMapper" class="org.mybatis.spring.mapper.MapperFactoryBean"> 50 <property name="mapperInterface" value="com.bestpay.insurance.dal.mapper.TInsuranceUserTicketMapper"/> 51 <property name="sqlSessionFactory" ref="sqlSessionFactory"/> 52 </bean> 53 54 <bean id="sqlSessionFactory" class="org.mybatis.spring.SqlSessionFactoryBean"> 55 <property name="dataSource" ref="anteaterDs"/> 56 <property name="configLocation" value="classpath:spring/mybatis.xml"/> 57 <property name="mapperLocations"> 58 <list> 59 <value>classpath:mapper/*Mapper.xml</value> 60 </list> 61 </property> 62 </bean> 63 64 <bean id="anteaterDs" class="org.apache.commons.dbcp.BasicDataSource"> 65 <property name="driverClassName" value="${jdbc.driverClassName}"/> 66 <property name="url" value="${jdbc.url}"/> 67 <property name="username" value="${jdbc.username}"/> 68 <property name="password" value="${jdbc.password}"/> 69 <property name="initialSize" value="${jdbc.initialSize}"/> 70 <property name="minIdle" value="${jdbc.minIdle}"/> 71 <property name="maxIdle" value="${jdbc.maxIdle}"/> 72 <property name="maxActive" value="${jdbc.maxActive}"/> 73 <property name="maxWait" value="${jdbc.maxWait}"/> 74 <property name="testOnBorrow" value="${jdbc.testOnBorrow}"/> 75 <property name="testWhileIdle" value="${jdbc.testWhileIdle}"/> 76 <property name="timeBetweenEvictionRunsMillis" value="${jdbc.timeBetweenEvictionRunsMillis}"/> 77 <property name="numTestsPerEvictionRun" value="${jdbc.numTestsPerEvictionRun}"/> 78 <property name="minEvictableIdleTimeMillis" value="${jdbc.minEvictableIdleTimeMillis}"/> 79 <property name="validationQuery" value="SELECT 1 FROM DUAL"/> 80 <property name="removeAbandonedTimeout" value="60"/> 81 <property name="removeAbandoned" value="true"/> 82 </bean> 83 </beans>
通过以上修改之后,再启动项目之后发现问题已经解决。这个问题解决之后回过头来细细想了一下为什么监听的方法中不能注入Service,从而导致了一个死循环的假象,这个问题需要后续不断深入的学习去探索问题的原因,但是还有一个问题可以道出问题的原委,就是监听后不能注入。因为RabbitMQ被整合到了spring中,所以项目在启动的时候会自动的注入RabbitMQ相关的东西,而如果采用@Autowired方式调用Service就会导致注入不进来,所以要在项目启动的同时注入Service和Mapper,这样就不会出现空指针的问题了。