zoukankan      html  css  js  c++  java
  • 【并发】5、多线程并发解析单文件大数据了量解析入库,1800万数据8线程5分钟入库

    1.首先机器要求8核,不然可能会慢点

    2.数据库建表的时候,最后建那种nologging类型的表,不然归档日志满了,数据库入库会很慢,甚至丢数据,因为数据量很大,我们不可能一次性提交所有数据,只能分批提交

    package com.ztesoft.interfaces.predeal.util;
    
    import com.ztesoft.interfaces.predeal.bl.IHandle;
    
    import java.io.ByteArrayOutputStream;
    import java.io.RandomAccessFile;
    import java.nio.MappedByteBuffer;
    import java.nio.channels.FileChannel;
    import java.util.ArrayList;
    import java.util.List;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.atomic.AtomicLong;
    
    /**
     * @ProjectName: cutter-point
     * @Package: io.bigData
     * @ClassName: CsvBigTask
     * @Author: xiaof
     * @Description: ${description}
     * @Date: 2019/3/8 11:03
     * @Version: 1.0
     */
    public class CsvBigTask implements Runnable {
    
        //拦截器
        private CyclicBarrier cyclicBarrier;
        private AtomicLong atomicLong; //查询统计个数
        private long start; //文件读取起始位置
        private long totalSize; //结束位置
        private int buffSize;
        private byte[] buff; //读取缓冲大小
        private RandomAccessFile randomAccessFile; //随机读取文件
        private IHandle iHandle; //接口对象,用来实现业务逻辑
        private List tempData;
    
        public CsvBigTask(CyclicBarrier cyclicBarrier, AtomicLong atomicLong, long start, long totalSize, int buffSize, RandomAccessFile randomAccessFile, IHandle iHandle) {
            this.cyclicBarrier = cyclicBarrier;
            this.atomicLong = atomicLong;
            this.start = start;
            this.totalSize = totalSize;
            this.buffSize = buffSize;
            this.buff = new byte[buffSize];
            this.randomAccessFile = randomAccessFile;
            this.iHandle = iHandle;
        }
    
        public CsvBigTask(CyclicBarrier cyclicBarrier, AtomicLong atomicLong, PartitionPair partitionPair, int buffSize, RandomAccessFile randomAccessFile, IHandle iHandle) {
            this.cyclicBarrier = cyclicBarrier;
            this.atomicLong = atomicLong;
            this.start = partitionPair.getStart();
            this.totalSize = partitionPair.getEnd() - partitionPair.getStart() + 1;
            this.buffSize = buffSize;
            this.buff = new byte[buffSize];
            this.randomAccessFile = randomAccessFile;
            this.iHandle = iHandle;
        }
    
        public CsvBigTask(CyclicBarrier cyclicBarrier, AtomicLong atomicLong, PartitionPair partitionPair, int buffSize, RandomAccessFile randomAccessFile, List tempData, IHandle iHandle) {
            this.cyclicBarrier = cyclicBarrier;
            this.atomicLong = atomicLong;
            this.start = partitionPair.getStart();
            this.totalSize = partitionPair.getEnd() - partitionPair.getStart() + 1;
            this.buffSize = buffSize;
            this.buff = new byte[buffSize];
            this.randomAccessFile = randomAccessFile;
            this.iHandle = iHandle;
            this.tempData = tempData;
        }
    
        @Override
        public void run() {
    
            MappedByteBuffer mappedByteBuffer = null;
            //1.读取文件映射到内存中
            try {
                //只读模式,不需要加锁,因为不涉及到资源的共享
                mappedByteBuffer = randomAccessFile.getChannel().map(FileChannel.MapMode.READ_ONLY, start, this.totalSize);
                //2.读取指定内存大小,并判断是否有进行换行
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                //依次循环读取一定缓存的数据量
                for(int i = 0; i < totalSize; i += buffSize) {
                    //确定会读取的数据
                    int realReadLength = 0;
                    if(i + buffSize > totalSize) {
                        //如果超出范围
                        realReadLength = (int) (totalSize - i);
                    } else {
                        realReadLength = buffSize;
                    }
                    //3.如果进行了换行了,那么就清空一次输出,输出一行数据
                    //读取一次数据,这里为0的原因是randomAccessFile会进行seek位置起始的索引
                    //并且get之后,buffer会更新当前位置索引
                    mappedByteBuffer.get(buff, 0, realReadLength);
                    //遍历这个buf,确定是否需要进行调用业务逻辑
                    for(int j = 0; j < realReadLength; ++j) {
                        //遍历每一个字符,判断是不是换行,如果遍历到了换行符,那么就进行处理
                        byte temp = buff[j];
                        if(temp == '
    ' || temp == '
    ') {
                            //这里要进行非空校验
                            String result = byteArrayOutputStream.toString("gbk");
                            if(result != null && !result.equals("")) {
                                iHandle.handle(result, false, tempData);
                                atomicLong.incrementAndGet();
                            }
                            //输出之后,置空文件
                            byteArrayOutputStream.reset();
                        } else if (temp == 0) {
                            break;
                        } else {
                            //如果不是换行符那么就把这个数据放入输出流缓存中
                            byteArrayOutputStream.write(temp);
                        }
                    }
                }
    
                //4.最后清空一次缓冲数据,循环结束之后,如果output对象中还有数据没有清空,说明那就是最后一行
                if(byteArrayOutputStream.size() > 0) {
                    String result = byteArrayOutputStream.toString("gbk");
                    if(result != null && !result.equals("")) {
                        iHandle.handle(result, true, tempData);
                        atomicLong.incrementAndGet();
                    }
                    //输出之后,置空文件
                    byteArrayOutputStream.reset();
                } else {
                    //通知最后一行,如果为空
                    iHandle.handle("", true, tempData);
                }
                //5.栅栏最后拦截完成
                cyclicBarrier.await();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    }

    业务逻辑实现接口类

    package com.ztesoft.interfaces.predeal.bl;
    
    import java.util.List;
    
    /**
     *
     * @program:
     * @description: 
     * @auther: xiaof
     * @date: 2019/3/1 18:08
     */
    public interface IHandle {
    
        void handle(String line, boolean lastLine, List list);
    
    }

    一些辅助类,可要可不要,看业务逻辑

    package com.ztesoft.interfaces.predeal.util;
    
    import java.text.DateFormat;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    /**
     * @ProjectName: cutter-point
     * @Package: io.util
     * @ClassName: ConcurrentDateUtil
     * @Author: xiaof
     * @Description: ${description}
     * @Date: 2019/2/27 11:30
     * @Version: 1.0
     */
    public class ConcurrentDateUtil {
    
        private static ThreadLocal<DateFormat> threadLocal = new ThreadLocal<DateFormat>(){
            @Override
            protected DateFormat initialValue() {
                //("yyyy/MM/dd HH:mm:ss");
                return new SimpleDateFormat("yyyyMMdd");
            }
        };
    
        private static ThreadLocal<DateFormat> threadLocalDateDir = new ThreadLocal<DateFormat>(){
            @Override
            protected DateFormat initialValue() {
                //("yyyy/MM/dd HH:mm:ss");
                return new SimpleDateFormat("yyyy/MM/dd");
            }
        };
    
        private static ThreadLocal<DateFormat> threadDatabase = new ThreadLocal<DateFormat>(){
            @Override
            protected DateFormat initialValue() {
                //("yyyy/MM/dd HH:mm:ss");
                return new SimpleDateFormat("yyyyMMddHHmmss");
            }
        };
    
        private static ThreadLocal<DateFormat> threadResourceFile = new ThreadLocal<DateFormat>(){
            @Override
            protected DateFormat initialValue() {
                //("yyyy/MM/dd HH:mm:ss");
                return new SimpleDateFormat("yyyyMMdd000000");
            }
        };
    
        public static Date parse(String dateStr) throws ParseException {
            return threadLocal.get().parse(dateStr);
        }
    
        public static Date parseDatabase(String dateStr) throws ParseException {
            return threadDatabase.get().parse(dateStr);
        }
    
        public static String format(Date date) {
            return threadLocal.get().format(date);
        }
    
        public static String formatDateDir(Date date) {
            return threadLocalDateDir.get().format(date);
        }
    
        public static Date parseDateDir(String dateStr) throws ParseException {
            return threadLocalDateDir.get().parse(dateStr);
        }
    
        public static String formatResourceFile(Date date) {
            return threadResourceFile.get().format(date);
        }
    }
    package com.ztesoft.interfaces.predeal.util;
    
    import java.io.*;
    import java.nio.charset.Charset;
    import java.util.Enumeration;
    import java.util.zip.ZipEntry;
    import java.util.zip.ZipFile;
    
    /**
     * @ProjectName: cutter-point
     * @Package: io.util
     * @ClassName: CreateFileUtil
     * @Author: xiaof
     * @Description: ${description}
     * @Date: 2019/3/1 17:23
     * @Version: 1.0
     */
    public class CreateFileUtil {
    
        public static boolean createFile(File file) throws IOException {
    
            if(file.getParentFile().exists()) {
                //如果上级存在,那么直接创建
                return file.createNewFile();
            } else {
                file.getParentFile().mkdir();
                return createFile(file);
            }
        }
    
        public static void unZipFile() {
    
        }
    
    
        /**
         *
         * @program: cn.cutter.common.util.ZipUtil
         * @description: 解压单个文件到当前目录
         * @auther: xiaof
         * @date: 2019/3/3 13:33
         */
        public static String unZipSingleFileCurrentDir(File zipFile) throws IOException {
            //1.获取解压文件
            ZipFile zipFile1 = new ZipFile(zipFile);
            String fileName = "";
            //2.循环压缩文件中的文件内容
            for(Enumeration enumeration = zipFile1.entries(); enumeration.hasMoreElements();) {
                //3.获取输出路径,也即是文件的父级目录
                ZipEntry entry = (ZipEntry) enumeration.nextElement();
                fileName = entry.getName();
                // 判断路径是否存在,不存在则创建文件路径
                InputStream in = zipFile1.getInputStream(entry);
                String outPath = zipFile.getParentFile().getPath();
                // 判断路径是否存在,不存在则创建文件路径
                File fileDir = zipFile.getParentFile();
                if (!fileDir.exists()) {
                    fileDir.mkdirs();
                }
    
                //4.输出文件信息到当前目录
                FileOutputStream out = new FileOutputStream((outPath + "/" + fileName).replaceAll("\*", "/"));
                byte[] buf1 = new byte[1024];
                int len;
                while ((len = in.read(buf1)) > 0) {
                    out.write(buf1, 0, len);
                }
                in.close();
                out.close();
            }
    
            return fileName;
        }
    
    
    }
    package com.ztesoft.interfaces.predeal.util;
    
    import org.apache.log4j.Logger;
    
    import java.io.*;
    import java.nio.channels.FileChannel;
    import java.nio.channels.FileLock;
    import java.sql.Date;
    import java.text.ParseException;
    import java.util.HashSet;
    import java.util.Set;
    
    /**
     * @ProjectName: cutter-point
     * @Package: io.util
     * @ClassName: DataUtil
     * @Author: xiaof
     * @Description: ${description}
     * @Date: 2019/2/21 14:38
     * @Version: 1.0
     */
    public class DataUtil {
    
        private final static Logger logger = Logger.getLogger(DataUtil.class);
    
        /**
         *
         * @program: io.util.DataUtil
         * @description: 对数据进行分区
         * @auther: xiaof
         * @date: 2019/2/21 14:39
         */
        public static void partition(long start, long length, long totalSize, RandomAccessFile randomAccessFile, Set partitionPairs) throws IOException {
            if(start > totalSize - 1) {
                return;
            }
    
            //每次获取length长度,并判断这个位置是否是换行符
            PartitionPair partitionPair = new PartitionPair();
            partitionPair.setStart(start);
            //判断这个length是否是换行符
            long index = start + length;
    
            //递归终止条件
            if(index > totalSize - 1) {
                //最后一个递归终止
                partitionPair.setEnd(totalSize - 1);
                partitionPairs.add(partitionPair);
    
            } else {
                //设置位置并读取一个字节
                randomAccessFile.seek(index);
                byte oneByte = randomAccessFile.readByte();
                //判断是否是换行符号,如果不是换行符,那么读取到换行符为止
                while(oneByte != '
    ' && oneByte != '
    ') {
                    //不能越界
                    if(++index > totalSize - 1) {
                        index = totalSize-1;
                        break;
                    }
    
                    randomAccessFile.seek(index);
                    oneByte = randomAccessFile.readByte();
                }
    
                partitionPair.setEnd(index);
                //递归下一个位置
                partitionPairs.add(partitionPair);
    
                partition(index + 1, length, totalSize, randomAccessFile, partitionPairs);
            }
    
        }
    
    
        /**
         *
         * @program: io.util.DataUtil
         * @description: 分片数据
         * @auther: xiaof
         * @date: 2019/2/22 15:20
         */
        public static Set partition(long start, long length, long totalSize, RandomAccessFile randomAccessFile) throws IOException {
            if(start > totalSize - 1) {
                return null;
            }
    
            //每次获取length长度,并判断这个位置是否是换行符
            Set partitionPairs = new HashSet();
            PartitionPair partitionPair = new PartitionPair();
            partitionPair.setStart(start);
            //判断这个length是否是换行符
            long index = start + length;
    
            //递归终止条件
            if(index > totalSize - 1) {
                //最后一个递归终止
                partitionPair.setEnd(totalSize - 1);
                partitionPairs.add(partitionPair);
                return partitionPairs;
    
            } else {
                //设置位置并读取一个字节
                randomAccessFile.seek(index);
                byte oneByte = randomAccessFile.readByte();
                //判断是否是换行符号,如果不是换行符,那么读取到换行符为止
                while(oneByte != '
    ' && oneByte != '
    ') {
                    //不能越界
                    if(++index > totalSize - 1) {
                        index = totalSize-1;
                        break;
                    }
    
                    randomAccessFile.seek(index);
                    oneByte = randomAccessFile.readByte();
                }
    
                partitionPair.setEnd(index);
                //递归下一个位置
                partitionPairs.add(partitionPair);
    
                partitionPairs.addAll(partition(index + 1, length, totalSize, randomAccessFile));
            }
    
            return partitionPairs;
        }
    
        public static Date getSQLDateThreadSafe(String dateStr) throws ParseException {
            return new Date(ConcurrentDateUtil.parse(dateStr).getTime());
        }
    
        /**
         *  复制单个文件,这里考虑使用文件锁,保证线程安全
         *  @param  oldPath  String  原文件路径  如:c:/fqf.txt
         *  @param  newPath  String  复制后路径  如:f:/fqf.txt
         *  @return  boolean
         */
        public static void copyFile(String  oldPath,  String  newPath) throws Exception {
    //        int  byteread  =  0;
            File oldfile  =  new  File(oldPath);
            if  (oldfile.exists())  {  //文件存在时
                //对文件枷锁,然后进行复制操作,
                InputStream inStream  =  new  FileInputStream(oldPath);  //读入原文件
                FileOutputStream fs  =  new  FileOutputStream(newPath);
                FileChannel fileChannel = fs.getChannel();
                //开始加锁
                FileLock fileLock = null;
                try {
                    while (true) {
                        fileLock = fileChannel.lock(); //直接上锁
                        if(fileLock != null) {
                            break;
                        } else {
                            //文件无法被锁定,1s后尝试
                            logger.warn(oldPath + " 文件无法被锁定,1s后尝试");
                            Thread.sleep(1000);
                        }
                    }
    
                    //开始拷贝数据
                    byte[] buf = new byte[2048];
                    int len = 0;
                    while((len = inStream.read(buf)) != -1) {
                        fs.write(buf, 0, len);
                    }
    
                    //刷新
                    fileLock.release();
                    fs.flush();
                    fs.close();
                    inStream.close();
    
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                } finally {
                    if(fileLock.isValid()) {
    
                        fileLock.release();
                    }
                }
            }
    
        }
    
        /**
         *
         * @program: com.ztesoft.interfaces.predeal.util.DataUtil
         * @description: 删除文件
         * @auther: xiaof
         * @date: 2019/3/5 18:08
         */
        public static void deletFile(String filePath) {
            File file = new File(filePath);
            file.delete();
        }
    
    }
    package com.ztesoft.interfaces.predeal.util;
    
    /**
     * @ProjectName: cutter-point
     * @Package: io.util
     * @ClassName: PartitionPair
     * @Author: xiaof
     * @Description: ${description}
     * @Date: 2019/2/21 14:36
     * @Version: 1.0
     */
    public class PartitionPair {
        private long start;
        private long end;
    
        @Override
        public String toString() {
            return "start="+start+";end="+end;
        }
    
        @Override
        public int hashCode() {
            final int prime = 31;
            int result = 1;
            result = prime * result + (int) (end ^ (end >>> 32));
            result = prime * result + (int) (start ^ (start >>> 32));
            return result;
        }
    
        @Override
        public boolean equals(Object obj) {
            if (this == obj)
                return true;
            if (obj == null)
                return false;
            if (getClass() != obj.getClass())
                return false;
            PartitionPair other = (PartitionPair) obj;
            if (end != other.end)
                return false;
            return start == other.start;
        }
    
        public long getStart() {
            return start;
        }
    
        public void setStart(long start) {
            this.start = start;
        }
    
        public long getEnd() {
            return end;
        }
    
        public void setEnd(long end) {
            this.end = end;
        }
    }

    这里开始,我们实战使用这个方法解析入库

    package com.ztesoft.interfaces.predeal.bl;
    
    import com.ztesoft.interfaces.common.bll.CommonQueuePool;
    import com.ztesoft.interfaces.common.vo.CommonQueueVo;
    import com.ztesoft.interfaces.predeal.constrant.PreDealResourceConstrant;
    import com.ztesoft.interfaces.predeal.dao.PreDealResourceDao;
    import com.ztesoft.isa.service.common.util.SpringContextUtil;
    import org.apache.commons.collections.MapUtils;
    import org.apache.log4j.Logger;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @ProjectName: 湖北移动智慧装维支撑系统
     * @Package: com.ztesoft.interfaces.predeal.bl
     * @ClassName: PreDealResourceServer
     * @Author: xiaof
     * @Description: 预处理综资ftp服务数据同步
     * @Date: 2019/3/8 16:21
     * @Version: 1.0
     */
    public class PreDealResourceServer extends Thread {
    
        private Logger logger = Logger.getLogger(PreDealResourceServer.class);
    
        private static Map<String, Object> config = new HashMap<>();
    
        //获取dao层操作类
        private PreDealResourceDao preDealResourceDao = (PreDealResourceDao) SpringContextUtil.getBean("preDealResourceDao");
    
        private CommonQueuePool commonQueuePool;
    
        //配置线程池数量
        private ScheduledExecutorService service;
    
        @Override
        public void run() {
            //        1.启动定时线程,设置定时启动时间(通过定时器),要求定时可配置,最好实时生效(这里目前考虑取消定时器,重新生成定时器)
            Map paramMap = new HashMap();
            paramMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
            paramMap.put("pkey", PreDealResourceConstrant.SERVER_KEY_INFO);
            paramMap = preDealResourceDao.qryPreDealResourceConfig(paramMap);
    //            codea-线程池数量
    //            codeb-队列长度
            service = Executors.newScheduledThreadPool(MapUtils.getInteger(paramMap, "CODEA", 8));
            commonQueuePool = new CommonQueuePool<CommonQueueVo>(MapUtils.getInteger(paramMap, "CODEB", 3000));
    
            //启动资源信息生产者
            this.startResourceProducer();
            //启动资源信息消费者
            this.startResourceConsum();
        }
    
        private void startResourceProducer() {
            //设置生产线程&消费线程
            Map producerMap = new HashMap();
            producerMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
            producerMap.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_THREAD_KEY_INFO);
            producerMap = preDealResourceDao.qryPreDealResourceConfig(producerMap);
            //获取启动时间,和获取间隔时间
            int initialDelay = MapUtils.getInteger(producerMap, "CODEA", 30);
            int period = MapUtils.getInteger(producerMap, "CODEB", 86400);
            PreDealResourceProducer preDealResourceProducer = new PreDealResourceProducer(commonQueuePool);
            Future preDealResourceUserFuture = service.scheduleAtFixedRate(preDealResourceProducer, initialDelay, period, TimeUnit.SECONDS);
    
            //吧结果存放进入map中,以便后面更新间隔时间
            List preDealAAAUserFutureList = new ArrayList();
            preDealAAAUserFutureList.add(preDealResourceUserFuture);
            config.put(MapUtils.getString(producerMap, "CODEC", "ProducerResourceThread"), preDealAAAUserFutureList);
        }
    
        private void startResourceConsum() {
            //启动消费者
            Map consumMap = new HashMap();
            consumMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
            consumMap.put("pkey", PreDealResourceConstrant.CONSUM_RESOURCE_THREAD_KEY_INFO);
            consumMap = preDealResourceDao.qryPreDealResourceConfig(consumMap);
            int initialDelay = MapUtils.getInteger(consumMap, "CODEA", 30);
            int threadNum = MapUtils.getInteger(consumMap, "CODEB", 3);
            List preDealAAAUserFutureList = new ArrayList();
            for(int i = 0; i < threadNum; ++i) {
                PreDealResourceConsum preDealResourceConsum = new PreDealResourceConsum(commonQueuePool);
                Future future = service.schedule(preDealResourceConsum, initialDelay, TimeUnit.SECONDS);
                //吧结果存放进入map中,以便后面更新间隔时间
                preDealAAAUserFutureList.add(future);
            }
            config.put(MapUtils.getString(consumMap, "CODEC", "ConsumResourceThread"), preDealAAAUserFutureList);
        }
    
        /**
         * 启动,多态方式,避免重复启动
         */
        public void start(String... args) {
            this.start();
        }
    
        public static Map<String, Object> getConfig() {
            return config;
        }
    
        public static void setConfig(Map<String, Object> config) {
            PreDealResourceServer.config = config;
        }
    
    }
    package com.ztesoft.interfaces.predeal.bl;
    
    import com.ztesoft.interfaces.common.bll.CommonQueuePool;
    import com.ztesoft.interfaces.common.invoke.FtpInvoke;
    import com.ztesoft.interfaces.common.util.FtpTemplate;
    import com.ztesoft.interfaces.predeal.constrant.PreDealResourceConstrant;
    import com.ztesoft.interfaces.predeal.dao.PreDealResourceDao;
    import com.ztesoft.interfaces.predeal.dto.OmPredealSyncLogDto;
    import com.ztesoft.interfaces.predeal.util.ConcurrentDateUtil;
    import com.ztesoft.interfaces.predeal.util.DataUtil;
    import com.ztesoft.interfaces.predeal.vo.PreDealResourceVo;
    import com.ztesoft.isa.service.common.util.SpringContextUtil;
    import com.ztesoft.services.common.CommonHelper;
    import org.apache.commons.collections.MapUtils;
    import org.apache.commons.net.ftp.FTPClient;
    import org.apache.commons.net.ftp.FTPFile;
    import org.apache.log4j.Logger;
    
    import java.io.File;
    import java.io.FileOutputStream;
    import java.io.OutputStream;
    import java.nio.channels.FileChannel;
    import java.nio.channels.FileLock;
    import java.util.*;
    
    /**
     * @ProjectName: 湖北移动智慧装维支撑系统
     * @Package: com.ztesoft.interfaces.predeal.bl
     * @ClassName: PreDealResourceProducer
     * @Author: xiaof
     * @Description: ${description}
     * @Date: 2019/3/10 15:03
     * @Version: 1.0
     */
    public class PreDealResourceProducer implements Runnable {
    
    
        private final static Logger logger = Logger.getLogger(PreDealResourceProducer.class);
    
        //这个是要用来控制多线的队列
        private final CommonQueuePool commonQueuePool;
    
        //获取dao层操作类
        private PreDealResourceDao preDealResourceDao = (PreDealResourceDao) SpringContextUtil.getBean("preDealResourceDao");
    
        public PreDealResourceProducer(CommonQueuePool commonQueuePool) {
            this.commonQueuePool = commonQueuePool;
        }
    
        @Override
        public void run() {
    //        2.定时启动之后,扫描ftp文件,下载到本地(并记录数据),(要求本地上传如果有文件,要能直接开始入库,不用通过远程服务器)
            try {
                //1.ftp连接服务器,获取AAAftp服务器
                Map paramMap = new HashMap();
                paramMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
                paramMap.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_KEY);
                paramMap = preDealResourceDao.qryPreDealResourceConfig(paramMap);
                String resourceIp = MapUtils.getString(paramMap, "CODEA", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_IP);
                int resourcePort = MapUtils.getInteger(paramMap, "CODEB", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_PORT);
                String resourceUserName = MapUtils.getString(paramMap, "CODEC", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_USERNAME);
                String resourcePasswd = MapUtils.getString(paramMap, "CODED", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_PASSWD);
                String resourceRemoteDir = MapUtils.getString(paramMap, "CODEE", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_REMOTEDIR);
                String resourceLocalDir = MapUtils.getString(paramMap, "CODEH", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_LOCALDIR);
                String resourceDeleteMark = MapUtils.getString(paramMap, "CODEG", "0");
                //PRODUCER_RESOURCE_CONSUM_LOCALDIR
                String resourceConsumLocalDir = MapUtils.getString(paramMap, "CODEI", PreDealResourceConstrant.PRODUCER_RESOURCE_CONSUM_LOCALDIR);
    
    //            resourceLocalDir = "D:\湖北移动\任务\预处理\综资\ftp文件\sync";
    //            resourceConsumLocalDir = resourceLocalDir + "\consum";
    
                //获取需要下载的文件目录,不在包含的文件里面的,那么就不用下载
                Map paramMapFile = new HashMap();
                paramMapFile.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
                paramMapFile.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_RULE);
                paramMapFile = preDealResourceDao.qryPreDealResourceConfig(paramMapFile);
                String fileNameContain = MapUtils.getString(paramMapFile, "CODEA", "");
                String endWith = MapUtils.getString(paramMapFile, "CODEB", "");
    
                //1.获取指定时间
                FtpTemplate ftpTemplate = new FtpTemplate(resourceIp, resourcePort, resourceUserName, resourcePasswd);
                //2.下载文件进入本地服务器,并删除服务器上文件(删除操作,我们做个开关,并且只有所有数据下载完成才能删除)
                try {
                    ftpTemplate.operatorByPathAndName(resourceRemoteDir, resourceLocalDir, new FtpInvoke() {
                        @Override
                        public void doOperator(FTPClient ftp, String remoteDir, String localDir) throws Exception {
                            // 转移到FTP服务器目录至指定的目录下
                            //1.获取远程文件,并存放进入队列汇总等待处理
                            //2.存放进入本地路径
                            //设置被动模式,服务端开端口给我用
                            ftp.enterLocalPassiveMode();
                            //获取所有文件
                            FTPFile[] ftpFile = ftp.listFiles(remoteDir);
                            //建立输出到本地文件的输出流
                            OutputStream outputStream = null;
    
                            for(int i = 0; i < ftpFile.length; ++i) {
                                String ftpFileName = ftpFile[i].getName();
                                //判断文件过滤规则
                                String localFilePath = localDir + "/" + ftpFileName;
                                File file = new File(localFilePath);
                                //判断本地是否已经存在,如果存在,那么也不用获取,并且只获取
                                if(fileNameContain.contains(ftpFileName.substring(0, ftpFileName.lastIndexOf("_")))
                                        && !file.exists() && ftpFileName.contains(endWith)) { //判断是话单文件
                                    String filePath = remoteDir + "/" + ftpFileName;
                                    //下载文件
                                    outputStream = new FileOutputStream(localDir + "/" + ftpFileName);
                                    ftp.retrieveFile(filePath, outputStream);
                                    logger.info("下载文件:" + ftpFileName + " 完成");
                                    if(outputStream != null) {
                                        outputStream.flush();
                                        outputStream.close();
                                        outputStream = null;
                                    }
                                }
                            }
    
                            if(outputStream != null) {
                                outputStream.flush();
                                outputStream.close();
                                outputStream = null;
                            }
    
    
                        }
                    });
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
                //3.下载到本地之后,我们遍历本地所有文件
                File localDir = new File(resourceLocalDir);
                //获取本地文件的所有文件
                if(!localDir.exists()) {
                    localDir.mkdirs();
                }
                //4.获取所有文件(不是目录)之后,解析文件名,并依次吧数据送入队列中,
                File[] recFiles = localDir.listFiles();
                for(int i = 0; i < recFiles.length; ++i) {
                    if(recFiles[i].isDirectory()) {
                        continue; //目录不操作
                    }
                    //移动文件数据到相应的日期目录
                    String fileName = recFiles[i].getName();
                    String filePath = recFiles[i].getPath();
                    Calendar calendar = Calendar.getInstance();//获取当前日期
                    String consumLocalDir = resourceConsumLocalDir + "/" + ConcurrentDateUtil.formatDateDir(calendar.getTime());
                    File file = new File(consumLocalDir);
                    if(!file.exists()) {
                        file.mkdirs();
                    }
                    //5.记录同步日志记录
                    //入库存放一条记录
                    PreDealResourceVo preDealResourceVo = new PreDealResourceVo();
                    String desPath = consumLocalDir + "/" + fileName;
                    long omPredealSyncLogSeq = CommonHelper.getCommonDAO().getSeqNextVal(PreDealResourceConstrant.OM_PREDEAL_SYNC_LOG_SEQ);
                    OmPredealSyncLogDto omPredealSyncLogDto = new OmPredealSyncLogDto();
                    omPredealSyncLogDto.setId(String.valueOf(omPredealSyncLogSeq));
                    omPredealSyncLogDto.setCreateTime(new Date());
                    omPredealSyncLogDto.setUpdateTime(new Date());
                    omPredealSyncLogDto.setState("0");
                    omPredealSyncLogDto.setTimes("0");
                    omPredealSyncLogDto.setLogName(recFiles[i].getName() + "资源信息同步");
                    omPredealSyncLogDto.setLogType(PreDealResourceConstrant.PRE_DEAL_RESOURCE_LOG_TYPE);
                    omPredealSyncLogDto.setRemark(filePath);
                    omPredealSyncLogDto.add();
                    //先交给消费者消费,只有当消费完毕,我们才移动和删除文件
                    preDealResourceVo.setOmPredealSyncLogDto(omPredealSyncLogDto);
                    preDealResourceVo.setFileName(recFiles[i].getName());
                    preDealResourceVo.setFileFullPath(desPath);
    
                    //移动数据进入对应的消费目录
                    DataUtil.copyFile(filePath, desPath);
                    DataUtil.deletFile(filePath);
    
                    commonQueuePool.put(preDealResourceVo);
                }
                //更新配置信息,由于文件是生成前就会删除,所以这个逻辑就不做了,有文件就全下载下来
                paramMapFile.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
                paramMapFile.put("pkey", PreDealResourceConstrant.PRODUCER_RESOURCE_FTP_RULE);
                //更新为今天
                paramMapFile.put("codeb", ConcurrentDateUtil.formatResourceFile(new Date()));
                preDealResourceDao.updateConfig(paramMapFile);
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
            }
        }
    }
    package com.ztesoft.interfaces.predeal.bl;
    
    import com.ztesoft.interfaces.common.bll.CommonQueuePool;
    import com.ztesoft.interfaces.common.bll.CommonQueueWork;
    import com.ztesoft.interfaces.common.vo.CommonQueueVo;
    import com.ztesoft.interfaces.predeal.vo.PreDealResourceVo;
    import org.apache.log4j.Logger;
    
    /**
     * @ProjectName: 湖北移动智慧装维支撑系统
     * @Package: com.ztesoft.interfaces.predeal.bl
     * @ClassName: PreDealResourceConsum
     * @Author: xiaof
     * @Description: ${description}
     * @Date: 2019/3/10 14:44
     * @Version: 1.0
     */
    public class PreDealResourceConsum implements Runnable {
    
        private Logger logger = Logger.getLogger(PreDealResourceConsum.class);
    
        //这个是要用来控制多线的队列
        private final CommonQueuePool commonQueuePool;
        public PreDealResourceConsum(CommonQueuePool commonQueuePool) {
            this.commonQueuePool = commonQueuePool;
        }
    
        @Override
        public void run() {
            //只要不为空,那么就可以一直取
            while(true) {
                //3.删除远端路径文件(考虑延后处理,比如解析完成之后删除),那么这里我们放消费者中处理
                try {
                    CommonQueueVo commonQueueVo = (CommonQueueVo) commonQueuePool.take();
                    CommonQueueWork commonQueueWork = null;
                    //这里进行区分类型
                    if(commonQueueVo instanceof PreDealResourceVo) {
                        commonQueueWork = new PreDealResourceWork();
                    }
    
                    if(commonQueueWork != null) {
                        commonQueueWork.doWork(commonQueueVo);
                    }
                } catch (InterruptedException e) {
    //                e.printStackTrace();
                    logger.error(e.getMessage(), e);
                }
    
            }
        }
    }

    最后核心解析类实现

    package com.ztesoft.interfaces.predeal.bl;
    
    import com.ztesoft.interfaces.common.bll.CommonQueueWork;
    import com.ztesoft.interfaces.common.vo.CommonQueueVo;
    import com.ztesoft.interfaces.predeal.constrant.PreDealResourceConstrant;
    import com.ztesoft.interfaces.predeal.dao.PreDealResourceDao;
    import com.ztesoft.interfaces.predeal.dto.OmPredealSyncLogDto;
    import com.ztesoft.interfaces.predeal.util.CreateFileUtil;
    import com.ztesoft.interfaces.predeal.util.CsvBigTask;
    import com.ztesoft.interfaces.predeal.util.DataUtil;
    import com.ztesoft.interfaces.predeal.util.PartitionPair;
    import com.ztesoft.interfaces.predeal.vo.PreDealResourceVo;
    import com.ztesoft.isa.service.common.util.SpringContextUtil;
    import org.apache.commons.collections.MapUtils;
    import org.apache.log4j.Logger;
    
    import java.io.ByteArrayOutputStream;
    import java.io.File;
    import java.io.IOException;
    import java.io.RandomAccessFile;
    import java.util.*;
    import java.util.concurrent.CyclicBarrier;
    import java.util.concurrent.atomic.AtomicLong;
    
    /**
     * @ProjectName: 湖北移动智慧装维支撑系统
     * @Package: com.ztesoft.interfaces.predeal.bl
     * @ClassName: PreDealResourceWork
     * @Author: xiaof
     * @Description: 解析资源ftp文件,并准备入库
     * @Date: 2019/3/10 15:00
     * @Version: 1.0
     */
    public class PreDealResourceWork implements CommonQueueWork {
    
        private final static Logger logger = Logger.getLogger(PreDealResourceWork.class);
    
        private PreDealResourceDao preDealResourceDao = (PreDealResourceDao) SpringContextUtil.getBean("preDealResourceDao");
    
        private static ThreadLocal<List> tempDataThreadLocal = new ThreadLocal<List>() {
            @Override
            protected List initialValue() {
                return new ArrayList();
            }
        };
    
        @Override
        public void doWork(CommonQueueVo commonQueueVo) {
    
            PreDealResourceVo preDealResourceVo = (PreDealResourceVo) commonQueueVo;
            OmPredealSyncLogDto omPredealSyncLogDto = preDealResourceVo.getOmPredealSyncLogDto();
            //1.获取文件
            logger.info("===========1开始解压文件=======" + System.currentTimeMillis());
            File zipFile = new File(preDealResourceVo.getFileFullPath());
            //解压文件
            try {
                String fileName = CreateFileUtil.unZipSingleFileCurrentDir(zipFile);
                String tableName = this.getTableName(fileName);
                //2.判断文件是否存在
                logger.info("===========2判断文件是否存在=======" + System.currentTimeMillis());
                File resourceFile = new File(zipFile.getParent() + "/" + fileName);
                if(!resourceFile.exists()) {
                    omPredealSyncLogDto.setState("0");
                    omPredealSyncLogDto.setUpdateTime(new Date());
                    omPredealSyncLogDto.setTimes(Integer.valueOf(omPredealSyncLogDto.getTimes()) + 1 + "");
                    omPredealSyncLogDto.setRemark(omPredealSyncLogDto.getRemark() + "
    " + resourceFile.getPath() +  " 文件不存在");
                    return;
                }
                //3.获取随机文件索引
                logger.info("===========3获取随机文件索引=======" + System.currentTimeMillis());
                RandomAccessFile randomAccessFile = new RandomAccessFile(resourceFile, "r");
                long length = resourceFile.length();
                //文件大小1G,获取处理器核心数
                int availProcessors = Runtime.getRuntime().availableProcessors();
                logger.info("===========3。1可使用线程=======" + availProcessors);
                long blockLength = length / availProcessors;
                byte b = '0';
                int index = 0;
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                while(b != '
    ' && b != '
    ') {
                    b = randomAccessFile.readByte();
                    byteArrayOutputStream.write(b);
                    ++index;
                }
                //获取首行
                String[] firstLine = byteArrayOutputStream.toString().split("	");
                //文件分片
                Set pairSets = DataUtil.partition(index, blockLength, length, randomAccessFile);
                //4.数据分片之后,分别启动处理线程
                logger.info("===========4数据分片之后,分别启动处理线程=======" + pairSets + " 时间:" + System.currentTimeMillis());
                final long startTime = System.currentTimeMillis();
                AtomicLong atomicLong = new AtomicLong(0);
                //6.数据入库,这里做成可配置模式
                logger.info("===========5数据入库=======" + System.currentTimeMillis());
                Map workConfigMap = new HashMap();
                workConfigMap.put("stype", PreDealResourceConstrant.CONFIG_KEY_TYPE);
                workConfigMap.put("pkey", PreDealResourceConstrant.PRE_DEAL_RESOURCE_WORK);
                workConfigMap = preDealResourceDao.qryPreDealResourceConfig(workConfigMap);
                int mapSize = MapUtils.getInteger(workConfigMap, "CODEA", 1000);
                int bufSize = MapUtils.getInteger(workConfigMap, "CODEB", 3) * 1024 * 1024;
                String splitMark = MapUtils.getString(workConfigMap, "CODEC", "	");
                //先清空表数据,备份数据放在文件中
                initTable(tableName, omPredealSyncLogDto);
    
                CyclicBarrier cyclicBarrier = new CyclicBarrier(pairSets.size(), new Runnable() {
                    @Override
                    public void run() {
                        //吧最后的数据提交上去
                        logger.info("===========5数据入库结束=======" + System.currentTimeMillis());
                        omPredealSyncLogDto.setUpdateTime(new Date());
                        omPredealSyncLogDto.setTimes(Integer.valueOf(omPredealSyncLogDto.getTimes()) + 1 + "");
                        omPredealSyncLogDto.setRemark(omPredealSyncLogDto.getRemark() + "
    " + resourceFile.getPath() +  " 入库完毕 use time: "+(System.currentTimeMillis()-startTime));
                        omPredealSyncLogDto.setState("1");
                        omPredealSyncLogDto.setNewTotal(String.valueOf(atomicLong.get()));
                        omPredealSyncLogDto.update();
                    }
                });
    
                for(Object pair : pairSets) {
    //                List tempData = new ArrayList<>();
                    PartitionPair partitionPair = (PartitionPair) pair;
                    CsvBigTask csvBigTask = new CsvBigTask(cyclicBarrier, atomicLong, partitionPair, bufSize, randomAccessFile, new ArrayList(), new IHandle() {
                        @Override
                        public void handle(String line, boolean lastLine, List tempData) {
                            try {
                                //转换数据为map
    //                            logger.info("===========5读取数据=======" + line + System.currentTimeMillis());
                                if(!line.equals("")) {
                                    String[] elements = line.split(splitMark);
                                    tempData.add(elements);
                                }
    
                                //每个3000条一提交
                                if((tempData.size() % mapSize == 0 && tempData.size() >= mapSize) || lastLine) {
                                    preDealResourceDao.addResourceBatch(tableName, firstLine, tempData);
                                    tempData.clear();
    //                                logger.info("===========6批量提交数据=======" + System.currentTimeMillis());
                                }
                            } catch (Exception e) {
                                logger.error(line + e.getMessage(), e);
                            }
                        }
                    });
                    Thread thread = new Thread(csvBigTask);
                    thread.start();
                }
    
    
            } catch (IOException e) {
                omPredealSyncLogDto.setState("0");
                omPredealSyncLogDto.setUpdateTime(new Date());
                omPredealSyncLogDto.setTimes(Integer.valueOf(omPredealSyncLogDto.getTimes()) + 1 + "");
                omPredealSyncLogDto.setRemark(omPredealSyncLogDto.getRemark() + "
    " + preDealResourceVo.getFileFullPath() + e.getMessage());
                logger.error(e.getMessage(), e);
            } finally {
                //7.最后更新日志
                logger.info("===========7最后更新日志=======" + System.currentTimeMillis());
                omPredealSyncLogDto.update();
            }
        }
    
        private String getTableName(String fileName) {
            String tableName = "";
            tableName = PreDealResourceConstrant.PRE_RESOURCE_TABLE + "_" + fileName.substring(0, fileName.lastIndexOf("_"));
            return tableName;
        }
    
        private void initTable(String tableName, OmPredealSyncLogDto omPredealSyncLogDto) {
            preDealResourceDao.truncateResourceTable(omPredealSyncLogDto, tableName);
        }
    
    }

    最后公布一小段入库代码,这个就不全给了

    PreDealResourceDaoImpl
    @Override
        public void addResourceBatch(String tableName, String[] fields, List params) {
            try {
    //            String sql = "insert into PRE_AAA_STAFF (domain_name,internet_account,brasid,band_info,registTime,FirstUseTime,lastLoginTime,status,bandwidth_M,broadband_account,CREATE_DATE,UPDATE_DATE)
    " +
    //                    "values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?,sysdate,sysdate) ";
                //组装sql
                StringBuffer stringBuffer = new StringBuffer(" insert /*+ append */ into " + tableName + " (");
                StringBuffer stringValue = new StringBuffer(" values (");
    //            Set<String> keySet = tableMap.keySet();
                for(int i = 0; i < fields.length; ++i) {
                    stringBuffer.append(fields[i] + ",");
                    stringValue.append("?,");
                }
    
                //添加时间
                stringBuffer.append("create_date, update_date,");
                stringValue.append("sysdate,sysdate,");
    
                stringBuffer = stringBuffer.deleteCharAt(stringBuffer.length() - 1).append(" ) ");
                stringValue = stringValue.deleteCharAt(stringValue.length() - 1).append(")");
                String sql = stringBuffer.append(stringValue).toString();
    
                super.getJdbcOperations().batchUpdate(sql, new BatchPreparedStatementSetter() {
                    @Override
                    public void setValues(PreparedStatement ps, int i) throws SQLException {
                        Object[] values = (Object[]) params.get(i);
                        for(int j = 0; j < fields.length; ++j) {
                            if(j < values.length) {
                                ps.setString(j + 1, String.valueOf(values[j]));
                            } else {
                                ps.setObject(j + 1, null);
                            }
                        }
                    }
    
                    @Override
                    public int getBatchSize() {
                        return params.size();
                    }
                });
    
                logger.info("====" + tableName + "入库 " + params.size() + " 条");
    
            } catch (DataAccessException e) {
                logger.error(e.getMessage(), e);
            }
        }
  • 相关阅读:
    PowerDesigner反向工程操作步骤 以PowerDesigner15为例
    RegularExpressionValidator控件中正则表达式用法
    在C#中进行数据纵向不定行转横向列,多条信息成一行,例如员工薪资信息
    Oracle常见等待事件说明
    ORACLE 绑定变量用法总结(转)
    Configure Oracle 11gR2 RAC 一节点执行root.sh脚本报错
    ORACLE ASH/AWR
    db file sequential read 事件的优化(一)
    Redo Log Buffer的大小设置转载
    Oracle 判断 并 手动收集 统计信息 脚本
  • 原文地址:https://www.cnblogs.com/cutter-point/p/10560324.html
Copyright © 2011-2022 走看看