测试用例代码:
@RunWith(SpringRunner.class) @SpringBootTest @Slf4j public class RiskCaseBaseDictionaryServiceTest { @Autowired private RiskCaseBaseDictionaryService riskCaseBaseDictionaryService; @Test public void test() { log.info("-------test---------"); HttpMessageResult<?> ret = riskCaseBaseDictionaryService.getDictionaryInfo(); Assert.assertEquals(ret.getCode(), 200); } }
启动后发现,迟迟不能进入测试方法。
原因:启动测试单元,会先加载spring上下文,包括实现CommandLineRunner接口的类,加载完上下文后才会执行测试用例,而我的kafka消费是实现了CommandLineRunner接口,里面
的poll方法是会阻塞的,所以可以使用新线程去poll。如下:
原来:
@Slf4j @Component public class CaseBaseKafkaConsumer implements CommandLineRunner { @Value("${risk.analysis.casebase.kafka.server}") private String servers; @Value("${risk.analysis.casebase.kafka.pay.topic}") private String topic; @Value("${risk.analysis.casebase.kafka.group}") private String GROUPID; @Autowired private RiskCaseBaseService riskCaseBaseService; @Override public void run(String... args) throws Exception { Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); try { for (;;) { System.out.println("==========kafkaConsumer============="); ConsumerRecords<String, String> msgList = consumer.poll(1000); if (null != msgList && msgList.count() > 0) { for (ConsumerRecord<String, String> record : msgList) { log.info("消费topic,消息={}", record.value()); JSONObject json = JSONObject.parseObject(record.value()); RiskCaseBaseCreateDTO riskCaseBaseCreateDTO = JSONObject.toJavaObject(json,RiskCaseBaseCreateDTO.class); if (Objects.isNull(riskCaseBaseCreateDTO)) { log.error("【支付案件消费 从MQ获取数据为空异常!"); } riskCaseBaseService.create(riskCaseBaseCreateDTO); } } else { Thread.sleep(1000); } } } catch (InterruptedException e) { log.error("消费异常={}", e.getMessage()); } finally { consumer.close(); } } }
使用新线程后:
@Slf4j @Component public class CaseBaseKafkaConsumer implements CommandLineRunner { @Value("${risk.analysis.casebase.kafka.server}") private String servers; @Value("${risk.analysis.casebase.kafka.pay.topic}") private String topic; @Value("${risk.analysis.casebase.kafka.group}") private String GROUPID; @Autowired private RiskCaseBaseService riskCaseBaseService; @Override public void run(String... args) throws Exception { new Thread(){ public void run() { Properties props = new Properties(); props.put("bootstrap.servers", servers); props.put("group.id", GROUPID); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList(topic)); try { for (;;) { System.out.println("==========kafkaConsumer============="); ConsumerRecords<String, String> msgList = consumer.poll(1000); if (null != msgList && msgList.count() > 0) { for (ConsumerRecord<String, String> record : msgList) { log.info("消费topic,消息={}", record.value()); JSONObject json = JSONObject.parseObject(record.value()); RiskCaseBaseCreateDTO riskCaseBaseCreateDTO = JSONObject.toJavaObject(json,RiskCaseBaseCreateDTO.class); if (Objects.isNull(riskCaseBaseCreateDTO)) { log.error("【支付案件消费 从MQ获取数据为空异常!"); } riskCaseBaseService.create(riskCaseBaseCreateDTO); } } else { Thread.sleep(1000); } } } catch (InterruptedException e) { log.error("消费异常={}", e.getMessage()); } finally { consumer.close(); } } }.start(); } }
开启新线程,新线程阻塞不影响主线程,主线程初始化完后就可以执行测试单元