zoukankan      html  css  js  c++  java
  • BlockingQueue解决物联网多设备数据整合问题

    环境:

      硬件设备N个采集器将数据以json形式发送至主机,主机将数据以mqtt协议发送至服务端,服务端接收到数据,发布异步事件,进行数据存储,报警等任务。

    问题:我们认为不同的传感器在同一个主机上传的数据是同一个设备,数据整合逻辑是,设置一个上传数据间隔

    (此间隔认为是,网络延迟+其他延迟),查询语句(select id from  xx where deviceid=? and monitortime between ? and ?),如果有记录,更新,没有记录存储,一次达到合并数据的目的,

    此时问题出现,如果在查询时候,语句未走到判断数据存在,那么会出现并发问题,导致数据合并失败。

    解决方案:

      1.并发同步锁

      2.redis队列

      3.BlockingQueue

      4.......

    最后选择BlockingQueue解决了这个问题,

    主要代码实现:

    1.接收到数据,将数据整理成服务端数据格式

    2.将BQ给Spring管理

    @Configuration
    public class QueueAutoConfig {
        @Bean(initMethod = "run")
        public MessageQueueConsume messageQueueConsume(){
            return new MessageQueueConsume(LocalQueue.getMessageQueue());
        }
    }

    3.BQ实例化

    @Slf4j
    public class LocalQueue {
        private static BlockingQueue<SaveRecordEvent> messageQueue;
        static {
            messageQueue = new LinkedBlockingQueue<SaveRecordEvent>();
            log.info("queue initialization finished");
        }
    
        public static BlockingQueue<SaveRecordEvent> getMessageQueue() {
            return messageQueue;
        }
    }

    4.数据存储逻辑

    @Slf4j
    public class MessageQueueConsume {
        @Autowired
        IDeviceMonitorRecordService deviceMonitorRecordService;
        @Autowired
        ThreadPoolTaskExecutor asyncServiceExecutor;
        private Long dataRangeTime=90L;
        private BlockingQueue<SaveRecordEvent> blQueue;
    
        public void run() {
            asyncServiceExecutor.execute(()->{
                while (true) {
                    try {
                        SaveRecordEvent saveRecordEvent = blQueue.take();
                        saveRecord(saveRecordEvent);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    
        private void saveRecord(SaveRecordEvent saveRecordEvent){
            // TODO: 2021/1/7 这里写具体实现 向队列写数据LocalQueue.getMessageQueue().add();
            saveOrUpdate(saveRecordEvent.getEquipmentId(),saveRecordEvent.getDataBean());
        }
        public MessageQueueConsume(BlockingQueue queue) {
            this.blQueue = queue;
        }
    
        public static void main(String[] args) {
            System.err.println(new Date());
            LocalDateTime now = LocalDateTime.now();
            LocalDateTime localDateTime = now.plusMinutes(1);
            System.err.println(localDateTime);
        }
        public  void saveOrUpdate(String equipmentId, DeviceMonitorRecordDTO bean) {
            LocalDateTime monitorTime = bean.getMonitorTime();
            LambdaQueryWrapper<DeviceMonitorRecordEntity> between = Wrappers.lambdaQuery(DeviceMonitorRecordEntity.class)
                    .eq(DeviceMonitorRecordEntity::getEquipmentId, equipmentId)
                    .between(DeviceMonitorRecordEntity::getMonitorTime, monitorTime.minusSeconds(dataRangeTime),monitorTime.plusSeconds(dataRangeTime) ).last("limit 1");
            DeviceMonitorRecordEntity one = deviceMonitorRecordService.getOne(between,false);
            // 5.转换db
            DeviceMonitorRecordEntity dbBean = new DeviceMonitorRecordEntity();
            BeanUtil.copyProperties(bean, dbBean);
            // 5.存储逻辑
            if (one == null) {
                // 不存在 保存_
                deviceMonitorRecordService.save(dbBean);
                log.info(monitorTime+"==数据存储成功==");
            } else {
                // 存在更具设别id和时间更新
    //            LambdaQueryWrapper<DeviceMonitorRecordEntity> update = Wrappers.lambdaQuery(DeviceMonitorRecordEntity.class)
    //                    .eq(DeviceMonitorRecordEntity::getEquipmentId, equipmentId)
    //                    .eq(DeviceMonitorRecordEntity::getMonitorTime, one.getMonitorTime());
    //            deviceMonitorRecordService.update(dbBean, update);
                dbBean.setId(one.getId());
                deviceMonitorRecordService.updateById(dbBean);
                log.info(monitorTime+"==数据更新成功==");
            }
        }
    
    }

    5.发布到BlockingQueue

            //发布阻塞队列之前,将时间更新
            Date date = new Date();
            String time = bean.getTime();
            if (StringUtils.isNotBlank(time)){
                //补数据字段不为空说明此数据是数据补传数据,时间使用此字段
                DateTime parse = DateUtil.parse(time);
                date=parse;
            }
            LocalDateTime localDateTime = DateUtil.toLocalDateTime(date);
            bean.setMonitorTime(localDateTime);
            LocalQueue.getMessageQueue().add(saverecordevent);

    备注:数据量巨大,已经准备转向mongDb

  • 相关阅读:
    Python实现决策树ID3算法
    ML——决策树模型
    Linux下用matplotlib画决策树
    RedHat7.2安装matplotlib——之Python.h:没有那个文件或目录
    没想到这么简单——滚雪球
    pyspark中使用累加器Accumulator统计指标
    pscp多线程传输文件
    [笔记] 使用numpy手写k-means算法
    [LeetCode in Python] 309 (M) best time to buy and sell stock with cooldown 最佳买卖股票时机含冷冻期
    [LeetCode in Python] 75 (M) sort colors 颜色分类
  • 原文地址:https://www.cnblogs.com/xyzxy/p/14249982.html
Copyright © 2011-2022 走看看