zoukankan      html  css  js  c++  java
  • [知识整理] 导数据工具

             以前负责DB相关,经常需要写专门工具将原始数据经过一定逻辑处理后导入目标库,工具需要做在不影响业务的情况下以最快的速度完成导入。

            1. 一般数据源只有一个,而目的库有多个,对于DB读取要远快于写入,要注意读取的数据顺序保证能够对多个目的库同时写入。

             2. Buffer 整个处理过程分为几段分别并行进行,为保证整个流水线每个时期都并行进行,每段之间需要足够大的Buffer。

              3. 多线程+同步IO操作,可控性强,保证同时一个DB上执行导入导出的连接数最大值,可以通过库状态和需要的速度做合理调整。

       使用生产消费队列控制Buffer大小:

    public class BlockingQueue<T>
        {
            private Queue<T> _queue;
            private object _sync = new object();
            private object _sync2 = new object();
            private int _capacity;
            private BlockingQueuePerfCounters _counters;
    
            public BlockingQueue(string queueName, int capacity)
            {
                _queue = new Queue<T>();
                _capacity = capacity;
            }
    
            public void SetCapacity(int capacity)
            {
                _capacity = capacity;
            }
    
            public int Capacity
            {
                get { return _capacity; }
            }
    
            public int Count
            {
                get
                {
                    lock (_sync)
                    {
                        return _queue.Count;
                    }
                }
            }
    
            public void Enqueue(T item)
            {
                while (this.Count > this.Capacity)
                {
                    Thread.Sleep(1000);
                }
    
                lock (_sync)
                {
                    _queue.Enqueue(item);
                }
    
                _counters.EnqueuePerSecond.Increment();
                _counters.QueueLength.Increment();
                _counters.EnqueueTotal.Increment();
            }
    
            public void Enqueue(IEnumerable<T> list)
            {
                while (this.Count > this.Capacity)
                {
                    Thread.Sleep(1);
                }
    
                lock (_sync)
                {
                    foreach (T item in list)
                    {
                        _queue.Enqueue(item);
                    }
                }
    
                _counters.EnqueuePerSecond.IncrementBy(list.Count());
                _counters.QueueLength.IncrementBy(list.Count());
                _counters.EnqueueTotal.IncrementBy(list.Count());
            }
    
            public T Dequeue()
            {
                T val;
                lock (_sync)
                {
                    val = _queue.Dequeue();
                }
                _counters.DequeueTotal.Increment();
                _counters.DequeuePerSecond.Increment();
                _counters.QueueLength.Decrement();
    
                return val;
            }
    
            public List<T> Dequeue(int count)
            {
                List<T> list = new List<T>();
    
                lock (_sync)
                {
                    while (_queue.Count > 0 && list.Count < count)
                    {
                        list.Add(_queue.Dequeue());
                    }
                }
    
                _counters.DequeueTotal.IncrementBy(list.Count);
                _counters.DequeuePerSecond.IncrementBy(list.Count);
                _counters.QueueLength.IncrementBy(-list.Count);
    
                return list;
            }
    
            public List<T> ToList()
            {
                lock (_sync)
                {
                    return _queue.ToList();
                }
            }
    
            public void Clear()
            {
                int count = 0;
    
                lock (_sync)
                {
                    count = _queue.Count;
                    _queue.Clear();
                }
    
                _counters.DequeueTotal.IncrementBy(count);
                _counters.DequeuePerSecond.IncrementBy(count);
                _counters.QueueLength.IncrementBy(-count);
            }
        }

    多线程管理:

     public class MultiThread<T>
        {
            private ITracing _tracing = TracingManager.GetTracing(typeof(MultiThread<T>));
            private BlockingQueue<T> _queue;
            private Thread[] _threads;
            private int _realQueueLength;
    
            public Action<T> ProcessData;
            public Action<List<T>> ProcessDataBatch;
    
            public MultiThread(int threadCount, int queueCapacity, string threadName)
            {
                _queue = new BlockingQueue<T>(threadName, queueCapacity);
                _threads = new Thread[threadCount];
                for (int i = 0; i < threadCount; i++)
                {
                    _threads[i] = new Thread(Proc);
                    _threads[i].IsBackground = true;
                    _threads[i].Name = string.Format("{0}_{1}", threadName, i);
                    _threads[i].Start();
                }
            }
    
            public void Close()
            {
                foreach (Thread th in _threads)
                    th.Abort();
            }
    
            public int QueueLength
            {
                get
                {
                    return _queue.Count;
                }
            }
    
            public void SetCapacity(int capacity)
            {
                _queue.SetCapacity(capacity);
            }
    
            public void WaitForProcessAll()
            {
                while (true)
                {
                    if (_realQueueLength > 0)
                        Thread.Sleep(1);
                    else
                        break;
                }
            }
            public void Enqueue(IEnumerable<T> list)
            {
                _queue.Enqueue(list);
    
                Interlocked.Add(ref _realQueueLength, list.Count());
            }
    
            public void Enqueue(T item)
            {
                _queue.Enqueue(item);
                Interlocked.Increment(ref _realQueueLength);
            }
    
            public void Proc()
            {
                try
                {
                    while (true)
                    {
                        while (this.QueueLength > 0)
                        {
                            ProcessDataList(_queue.Dequeue(100));
                        }
    
                        Thread.Sleep(1);
                    }
                }
                catch (ThreadAbortException)
                {
                    Thread.ResetAbort();
                    return;
                }
                catch (Exception ex)
                {
                    _tracing.ErrorFmt(ex, "Proc Error");
                }
            }
    
            private void ProcessDataList(List<T> list)
            {
    
                if (list == null || list.Count == 0)
                    return;
    
                if (ProcessDataBatch != null)
                {
                    try
                    {
                        ProcessDataBatch(list);
                    }
                    catch (Exception ex)
                    {
                        _tracing.Error(ex, "ProcessDataList Error");
                    }
                    finally
                    {
                        Interlocked.Add(ref _realQueueLength, -1 * list.Count);
                    }
                }
                else if (ProcessData != null)
                {
                    foreach (T item in list)
                    {
                        try
                        {
                            ProcessData(item);
                        }
                        catch (Exception ex)
                        {
                            _tracing.Error(ex, "ProcessDataList Error");
                        }
                        finally
                        {
                            Interlocked.Decrement(ref _realQueueLength);
                        }
                    }
                }
            }
        }
  • 相关阅读:
    Ionic Android开发环境搭建 下
    Ionic Android开发环境搭建 上
    百度地图API 简单使用
    json2.js 的使用
    第三回 Bootstrap3.x 起步
    第二回 认识CDN
    WPF 实现的等待效果界面
    AutoFac使用
    SQL语句优化
    MySQL索引的使用
  • 原文地址:https://www.cnblogs.com/lulu/p/3130807.html
Copyright © 2011-2022 走看看