zoukankan      html  css  js  c++  java
  • 利用CountDownLatch检查大数据抽取过程中数据是否完整

    数据完整性

    1.数据格式完整

    2.数据大小完整

    package com.dwz.utils;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class CountDownLatchExample6 {
        private static Random random = new Random(System.currentTimeMillis());
        
        static class Event {
            int id;
    
            public Event(int id) {
                this.id = id;
            }
        }
        
        interface Watcher {
            void done(Table table);
        }
        
        static class TaskBatch implements Watcher{
            private CountDownLatch countDownLatch;
            private TaskGroup taskGroup;
            
            public TaskBatch(int size, TaskGroup taskGroup) {
                this.countDownLatch = new CountDownLatch(size);
                this.taskGroup = taskGroup;
            }
            
            @Override
            public void done(Table table) {
                countDownLatch.countDown();
                if(countDownLatch.getCount() == 0) {
                    System.out.println("The table " + table.tableName + " finished work, [" + table + "]");
                    taskGroup.done(table);
                }
            }
            
        }
        
        static class TaskGroup implements Watcher{
            private CountDownLatch countDownLatch;
            private Event event;
            
            public TaskGroup(int size, Event event) {
                this.countDownLatch = new CountDownLatch(size);
                this.event = event;
            }
    
            @Override
            public void done(Table table) {
                countDownLatch.countDown();
                if(countDownLatch.getCount() == 0) {
                    System.out.println("======All of table done in event : " + event.id);
                }
            }
            
        }
        
        static class Table {
            String tableName;
            long sourceRecordCount = 10;
            long targetCount;
            String sourceColumnSchema = "<table name = 'a'><column name = 'coll' type = 'varchar2'></table>";
            String targetColumnSchema = "";
            
            public Table(String tableName, long sourceRecordCount) {
                this.tableName = tableName;
                this.sourceRecordCount = sourceRecordCount;
            }
            
            @Override
            public String toString() {
                return "Table [tableName=" + tableName + ", sourceRecordCount=" + sourceRecordCount + ", targetCount="
                        + targetCount + ", sourceColumnSchema=" + sourceColumnSchema + ", targetColumnSchema="
                        + targetColumnSchema + "]";
            }
        }
        
        private static List<Table> capture(Event event) {
            List<Table> list = new ArrayList<Table>();
            for(int i = 0; i < 10; i++) {
                list.add(new Table("table-"+ event.id + "-" + i, i * 1000));
            }
            return list;
        }
        
        public static void main(String[] args) {
            Event[] events = {new Event(1), new Event(2)};
            ExecutorService executorService = Executors.newFixedThreadPool(10);
            
            for(Event event : events) {
                List<Table> tables = capture(event);
                TaskGroup taskGroup = new TaskGroup(tables.size(), event);
                for(Table table : tables) {
                    TaskBatch taskBatch = new TaskBatch(2, taskGroup);
                    TrustSourceColumns columnsRunnable = new TrustSourceColumns(table, taskBatch);
                    TrustSourceRecordCount recordCountRunnable = new TrustSourceRecordCount(table, taskBatch);
                    
                    executorService.submit(columnsRunnable);
                    executorService.submit(recordCountRunnable);
                }
            }
            executorService.shutdown();
        }
        
        static class TrustSourceRecordCount implements Runnable {
            private final Table table;
            private final TaskBatch taskBatch;
            
            public TrustSourceRecordCount(Table table, TaskBatch taskBatch) {
                this.table = table;
                this.taskBatch = taskBatch;
            }
    
            @Override
            public void run() {
                try {
                    Thread.sleep(random.nextInt(5000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                table.targetCount = table.sourceRecordCount;
                taskBatch.done(table);
    //            System.out.println("The table " + table.tableName + " target record count capture done and update the data.");
            }
        }
        
        static class TrustSourceColumns implements Runnable {
            private final Table table;
            private final TaskBatch taskBatch;
            
            public TrustSourceColumns(Table table, TaskBatch taskBatch) {
                this.table = table;
                this.taskBatch = taskBatch;
            }
    
            @Override
            public void run() {
                try {
                    Thread.sleep(random.nextInt(5000));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                table.targetColumnSchema = table.sourceColumnSchema;
                taskBatch.done(table);
    //            System.out.println("The table " + table.tableName + " target columns capture done and update the data.");
            }
        }
    }
  • 相关阅读:
    Unity Ply网格读取
    LoadLibrary加载dll失败
    Anaconda引起cuda MSB3721 with return error code 1
    STL 如何对STL进行扩展,简单小结
    集群环境准备(Centos7)
    MySQL中的常见函数
    Mysql优化_第十三篇(HashJoin篇)
    docker创建和使用mysql
    JNI相关笔记 [TOC]
    选择排序
  • 原文地址:https://www.cnblogs.com/zheaven/p/13214180.html
Copyright © 2011-2022 走看看