环境:
硬件设备N个采集器将数据以json形式发送至主机,主机将数据以mqtt协议发送至服务端,服务端接收到数据,发布异步事件,进行数据存储,报警等任务。
问题:我们认为不同的传感器在同一个主机上传的数据是同一个设备,数据整合逻辑是,设置一个上传数据间隔
(此间隔认为是,网络延迟+其他延迟),查询语句(select id from xx where deviceid=? and monitortime between ? and ?),如果有记录,更新,没有记录存储,一次达到合并数据的目的,
此时问题出现,如果在查询时候,语句未走到判断数据存在,那么会出现并发问题,导致数据合并失败。
解决方案:
1.并发同步锁
2.redis队列
3.BlockingQueue
4.......
最后选择BlockingQueue解决了这个问题,
主要代码实现:
1.接收到数据,将数据整理成服务端数据格式
2.将BQ给Spring管理
@Configuration public class QueueAutoConfig { @Bean(initMethod = "run") public MessageQueueConsume messageQueueConsume(){ return new MessageQueueConsume(LocalQueue.getMessageQueue()); } }
3.BQ实例化
@Slf4j public class LocalQueue { private static BlockingQueue<SaveRecordEvent> messageQueue; static { messageQueue = new LinkedBlockingQueue<SaveRecordEvent>(); log.info("queue initialization finished"); } public static BlockingQueue<SaveRecordEvent> getMessageQueue() { return messageQueue; } }
4.数据存储逻辑
@Slf4j public class MessageQueueConsume { @Autowired IDeviceMonitorRecordService deviceMonitorRecordService; @Autowired ThreadPoolTaskExecutor asyncServiceExecutor; private Long dataRangeTime=90L; private BlockingQueue<SaveRecordEvent> blQueue; public void run() { asyncServiceExecutor.execute(()->{ while (true) { try { SaveRecordEvent saveRecordEvent = blQueue.take(); saveRecord(saveRecordEvent); } catch (InterruptedException e) { e.printStackTrace(); } } }); } private void saveRecord(SaveRecordEvent saveRecordEvent){ // TODO: 2021/1/7 这里写具体实现 向队列写数据LocalQueue.getMessageQueue().add(); saveOrUpdate(saveRecordEvent.getEquipmentId(),saveRecordEvent.getDataBean()); } public MessageQueueConsume(BlockingQueue queue) { this.blQueue = queue; } public static void main(String[] args) { System.err.println(new Date()); LocalDateTime now = LocalDateTime.now(); LocalDateTime localDateTime = now.plusMinutes(1); System.err.println(localDateTime); } public void saveOrUpdate(String equipmentId, DeviceMonitorRecordDTO bean) { LocalDateTime monitorTime = bean.getMonitorTime(); LambdaQueryWrapper<DeviceMonitorRecordEntity> between = Wrappers.lambdaQuery(DeviceMonitorRecordEntity.class) .eq(DeviceMonitorRecordEntity::getEquipmentId, equipmentId) .between(DeviceMonitorRecordEntity::getMonitorTime, monitorTime.minusSeconds(dataRangeTime),monitorTime.plusSeconds(dataRangeTime) ).last("limit 1"); DeviceMonitorRecordEntity one = deviceMonitorRecordService.getOne(between,false); // 5.转换db DeviceMonitorRecordEntity dbBean = new DeviceMonitorRecordEntity(); BeanUtil.copyProperties(bean, dbBean); // 5.存储逻辑 if (one == null) { // 不存在 保存_ deviceMonitorRecordService.save(dbBean); log.info(monitorTime+"==数据存储成功=="); } else { // 存在更具设别id和时间更新 // LambdaQueryWrapper<DeviceMonitorRecordEntity> update = Wrappers.lambdaQuery(DeviceMonitorRecordEntity.class) // .eq(DeviceMonitorRecordEntity::getEquipmentId, equipmentId) // .eq(DeviceMonitorRecordEntity::getMonitorTime, one.getMonitorTime()); // deviceMonitorRecordService.update(dbBean, update); dbBean.setId(one.getId()); deviceMonitorRecordService.updateById(dbBean); log.info(monitorTime+"==数据更新成功=="); } } }
5.发布到BlockingQueue
//发布阻塞队列之前,将时间更新 Date date = new Date(); String time = bean.getTime(); if (StringUtils.isNotBlank(time)){ //补数据字段不为空说明此数据是数据补传数据,时间使用此字段 DateTime parse = DateUtil.parse(time); date=parse; } LocalDateTime localDateTime = DateUtil.toLocalDateTime(date); bean.setMonitorTime(localDateTime); LocalQueue.getMessageQueue().add(saverecordevent);
备注:数据量巨大,已经准备转向mongDb