zoukankan      html  css  js  c++  java
  • 用多线程处理FTP上传

    在进入正文前,先给大家分享一款比较好用的服务器连接工具:
    IIS7服务器管理工具是一款windows全系下用于连接并操控基于windows和linux系统的VPS、VNC、FTP等远程服务器、云服务器的管理工具。
    界面简单明了,操作易上手,功能强大,支持批量导入服务器,并批量打开,多窗口化管理,除此之外,加载本地硬盘、硬盘映射、加载服务器的声音,远程声卡读取等功能也一应俱全,完全实现了各类场景使用,对于FTP连接界面,其中FTP文件的定时上传,定时下载(也可以说定时上传下载、定时备份)功能,对于经常使用FTP的小伙伴来说,也是非常适用的。
    工具支持自动更新,压缩包只有7.62M,方便简洁,一步到位。
    下载地址:http://fwqglgj.iis7.net/?tscc
    简单的使用步骤可以看下面的截图,做了详细标注:

    #########################################################################################################################

    正文

    在开发中遇到总站发送命令请求分站将某资源通过FTP上传过来,也就是总站提取分站的资源问题。并且总站实时可以获取已经提取了文件的大小的比例。

      思路:1.首先分站要将文件大小告知总站

                   2.总站收到文件大小后,根据指定路径去判断指定路径文件夹(分站的文件存储的位置)下的文件大小,然后和当前文件大小/总大小,就获取了已经上传的所占比。

      总站发送请求就不再赘述了,直接说分站接收到请求后的处理,为了防止恶意请求,我们这对每次请求都加了“盐”,那么分站接收时需要判断是否带“盐”了,如果没有,则不予处理,返回错误。

      分站接收请求contorller

        @RequestMapping("/fileSize.htm")
        @ResponseBody
        public String getFileSize(HttpServletRequest request,HttpServletResponse response,String json) {
            log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "获取资源文件大小(字节)------start");
            /*获取参数--start*/
            String checkCode = "see-P2C";//校验码
            JSONObject jsonObject = new JSONObject(json);
            String tranno =jsonObject.getString("tranno"); //交易流水号
            String isCheckcode = jsonObject.getString("checkCode"); //校验码
            checkCode = tranno.substring(tranno.length()-6,tranno.length())+checkCode;
            String mD5CheckCode = MD5Util.encrypt(checkCode);   
            Long ids =jsonObject.getLong("sid"); //资源ID
            Map<String, Object>map = new HashMap<String, Object>();
            map.put("ids", ids);
            map.put("tranno", tranno);
            map.put("isCheckcode", isCheckcode);
            map.put("mD5CheckCode", mD5CheckCode);
            log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "获取资源文件大小(字节)------end");
            return this.transRecordService.fileSizeHandle(map);
        }

      TransRecordServiceImpl.java中我们处理这些请求,首先是总站第一遍请求时,分站会判断有没有这个文件,如果没有返回错误,如果有资源分站会告诉总站我的文件有多大、我会把文件放到你的FTP的哪个文件夹下(路径)。

        /**
         * @desc 获取文件总大小
         * @author zp
         */
        @Override
        public String fileSizeHandle(Map<String, Object> map) {
            // TODO Auto-generated method stub
            log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(获取视频总大小)----start");
            String tranno = map.get("tranno").toString();
            Long ids = (Long)map.get("ids");
            String mD5CheckCode = map.get("mD5CheckCode").toString();
            String isCheckcode = map.get("isCheckcode").toString();
            /*交易记录表插入信息--start*/
            TranCodeModel tranCodeModel = new TranCodeModel();
            tranCodeModel.setTrandesc("总站提取分站视频资源");
            tranCodeModel.setUrl("parentMsg/fileSizeHandle.htm");
            tranCodeModel.setProjectphase("2");
            tranCodeModel.setRspcode("00");
            tranCodeModel.setRspinfo("交易成功");
            tranCodeModel.setTranconent("提取资源(resourceid="+ids+")");
            tranCodeModel.setTranno(tranno);
            tranCodeModel.setCreatetime(DateUtil.getCurrentDateString("YYYY-MM-dd HH:mm:ss"));
            /*交易记录表插入信息--end*/ 
            if(!mD5CheckCode.equals(isCheckcode)){ // 身份验证失败
                tranCodeModel.setRspcode("01");
                tranCodeModel.setRspinfo("身份验证失败");
                this.insertdata(tranCodeModel);
                return "{"rspcode":"01","rspinfo":"identifyError"}";
            }
            if(this.vertifyTranNo(tranno)>0){ // 流水号重复
                tranCodeModel.setRspcode("01");
                tranCodeModel.setRspinfo("交易流水号重复");
                this.insertdata(tranCodeModel);
                return "{"rspcode":"01","rspinfo":"trannoRepeat"}";
            }
            Map<String, Object> resourceMap = this.resourceService.findById(ids);
            if (Validator.isNull(resourceMap)||resourceMap==null) { // 资源被删除
                tranCodeModel.setRspcode("01");
                tranCodeModel.setRspinfo("资源被删除");
                this.insertdata(tranCodeModel);
                return "{"rspcode":"01","rspinfo":"delresource"}";
            }
            if ("-1".equals(resourceMap.get("isextract").toString())) { // 提取异常
                tranCodeModel.setRspcode("01");
                tranCodeModel.setRspinfo("提取异常");
                this.insertdata(tranCodeModel);
                return "{"rspcode":"01","rspinfo":"errvideo"}";
            }
            List<ResourceFileModel>rFiles = this.rFileService.findGroupByResourceId(ids);
            if (rFiles.size()==0) { // 视频资源不存在
                tranCodeModel.setRspcode("01");
                tranCodeModel.setRspinfo("nofiles");
                this.insertdata(tranCodeModel);
                return "{"rspcode":"01","rspinfo":"novideo"}";
            }
            Long filesize = (long)0;
            for (ResourceFileModel resourceFileModel : rFiles) {
                filesize += Long.parseLong(resourceFileModel.getSize());
            }
            String uuid = UUID.randomUUID().toString().replace("-", "");
            log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(获取视频总大小)----end"); 
            return "{"rspcode":"00","rspinfo":"success","filesize":""+filesize+"","uuid":""+uuid+""}"; 
        }

      总站接收到分站第一次成功返回信息之后,然后通知分站,你可以开始上传了,总站也会将文件总大小进行保存,然后开始根据分站提供的文件路径进行实时查询文件大小,然后进行比对。

      分站接收到总站回信后,开始上传文件(总站的回信也是加盐的哦)。

      分站接收总站的第二次请求contorller

    /**
         * @desc 提取资源
         * @author zp
         * @throws IOException 
         * @date 2018-3-16
         */
        @RequestMapping("/pickup.htm")
        @ResponseBody
        public String pickup(HttpServletRequest request,HttpServletResponse response,String json) throws IOException{
            log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "提取资源------start"); 
            
            /*获取参数--start*/
            String checkCode = "see-P2C";//校验码
            JSONObject jsonObject = new JSONObject(json);
            String tranno =jsonObject.getString("tranno"); //交易流水号
            String isCheckcode = jsonObject.getString("checkCode"); //校验码
            checkCode = tranno.substring(tranno.length()-6,tranno.length())+checkCode;
            String mD5CheckCode = MD5Util.encrypt(checkCode);   
            Long ids =jsonObject.getLong("sid"); //资源ID
            String ftpuser = jsonObject.getString("ftpuser");// ftp帐号
            String ftppwd = jsonObject.getString("ftppwd");// ftp密码
            String ftpport = jsonObject.getString("ftpport");// ftp端口
            String ftpIP = jsonObject.getString("ftpIP");// ftpIP
            String uuid = jsonObject.getString("uuid");// uuid
            Map<String, Object>map = new HashMap<String, Object>();
            map.put("ids", ids);
            map.put("tranno", tranno);
            map.put("isCheckcode", isCheckcode);
            map.put("mD5CheckCode", mD5CheckCode);
            map.put("ftpuser", ftpuser);
            map.put("ftppwd", ftppwd);
            map.put("ftpIP", ftpIP);
            map.put("ftpport", ftpport);
            map.put("uuid", uuid);
            return this.transRecordService.pickHandle(map);
        }

      TransRecordServiceImpl.java中,总站会提供自己的FTP的相关信息。然后开始上传。里边也会处理盐是否正确,是否有该文件等操作。 

    /**
         * @desc 处理总站提取请求
         * @author zp
         * @throws IOException 
         * @date 2018-3-19
         */
        @Override
        public String pickHandle(Map<String, Object>map) throws IOException {
            String tranno = map.get("tranno").toString();
            Long ids = (Long)map.get("ids");
            String mD5CheckCode = map.get("mD5CheckCode").toString();
            String isCheckcode = map.get("isCheckcode").toString();
            String ftpuser = map.get("ftpuser").toString();
            String ftppwd = map.get("ftppwd").toString();
            String ftpIP = map.get("ftpIP").toString();
            String ftpport = map.get("ftpport").toString();
            /*交易记录表插入信息--start*/
            TranCodeModel tranCodeModel = new TranCodeModel();
            tranCodeModel.setTrandesc("总站提取分站视频资源");
            tranCodeModel.setUrl("parentMsg/pickup.htm");
            tranCodeModel.setProjectphase("2");
            tranCodeModel.setRspcode("00");
            tranCodeModel.setRspinfo("交易成功");
            tranCodeModel.setTranconent("提取资源(resourceid="+ids+")");
            tranCodeModel.setTranno(tranno);
            tranCodeModel.setCreatetime(DateUtil.getCurrentDateString("YYYY-MM-dd HH:mm:ss"));
            /*交易记录表插入信息--end*/ 
            if(!mD5CheckCode.equals(isCheckcode)){ // 身份验证失败
                tranCodeModel.setRspcode("01");
                tranCodeModel.setRspinfo("身份验证失败");
                this.insertdata(tranCodeModel);
                return "{"rspcode":"01","rspinfo":"identifyError"}";
            }
            if(this.vertifyTranNo(tranno)>0){ // 流水号重复
                tranCodeModel.setRspcode("01");
                tranCodeModel.setRspinfo("交易流水号重复");
                this.insertdata(tranCodeModel);
                return "{"rspcode":"01","rspinfo":"trannoRepeat"}";
            }
            // 开始上传资源
            // 1.判断该任务是否录制完成
            Map<String, Object>  reMap = this.resourceService.findById(ids);
            if (Validator.isNull(reMap)) { // 不存在
                tranCodeModel.setRspcode("01");
                tranCodeModel.setRspinfo("资源已被删除");
                this.insertdata(tranCodeModel);
                return "{"rspcode":"01","rspinfo":"delResouces"}";
            }
            if (!"3".equals(reMap.get("isextract").toString())&&!"2".equals(reMap.get("isextract").toString())) { // 资源未被提取或已被删除
                tranCodeModel.setRspcode("01");
                tranCodeModel.setRspinfo("资源还未被提取或已被删除");
                this.insertdata(tranCodeModel);
                return "{"rspcode":"01","rspinfo":"nopick"}";
            }
            // 2.通过资源ID查找所有的资源信息 
            List<ResourceFileModel>rFiles = this.rFileService.findGroupByResourceId(ids); 
            if (rFiles.size()==0) { // 视频资源不存在
                tranCodeModel.setRspcode("01");
                tranCodeModel.setRspinfo("nofiles");
                this.insertdata(tranCodeModel);
                return "{"rspcode":"01","rspinfo":"novideo"}";
            }
            // 3.开始上传
            String filePath = "";
            String arr = "";
            String picktemp = PropertyUtil.getProperty("picktemp"); // 临时路径
            String uuid = map.get("uuid").toString();
            picktemp = picktemp+uuid+"\";
            picktemp = picktemp.replaceAll("\\", "/");
            for (ResourceFileModel resourceFileModel : rFiles) {
                filePath = resourceFileModel.getPath();
                filePath = filePath.replaceAll("\\", "/"); 
                String path = filePath.replaceAll("\\", "/"); 
                String temp = filePath.substring(0,filePath.lastIndexOf("resources/")+10);
                String temp2 =  filePath.substring(filePath.lastIndexOf("resources/")+10,filePath.length());
                temp2 = temp2.substring(0,temp2.indexOf("/")+1);
                filePath = temp + temp2;   
                path = path .substring(filePath.length(),path.length()); 
                path = picktemp + path;
                path = path.replace("/picktemp/", "/resources/");
                resourceFileModel.setPath(path); 
                String url = path.substring(path.indexOf("ResourcesManager/"),path.length()); 
                resourceFileModel.setUrl(url); 
                arr += JSONObject.fromObject(resourceFileModel)+",";
            }
            if (arr.length()>0) {
                arr = arr.substring(0, arr.length()-1);
            }
            // 将文件拷贝到指定的临时目录 
            CopyDirectory copyfile = new CopyDirectory();
            copyfile.copyDirectiory(filePath, picktemp); // filePath 源文件  picktemp 目标地址 
            log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "提取资源------end");
            try {
                String[] ip  = ftpIP.split(":");
                ftpIP = ip[0];
                this.starFtp(picktemp, ftpIP, ftpuser, ftppwd, ftpport); 
                return "{"rspcode":"00","rspinfo":"success","resource":["+arr+"]}";  
            } catch (Exception e) {
                // TODO: handle exception
                e.printStackTrace();
                return "{"rspcode":"01","rspinfo":"ftperror"}"; 
            } 
        } 
         
        /**
         * @desc 开启线程进行FTP上传
         * @author zp
         * @date 2018-3-23
         */
        public void starFtp(String picktemp, String ftpIP,String ftpuser,String ftppwd,String ftpport) { 
            log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "准备开启FTP上传线程------start");
            //创建一个可重用固定线程数的线程池 
            ExecutorService pool = Executors.newFixedThreadPool(5); 
            //创建实现了Runnable接口对象,Thread对象当然也实现了Runnable接口
            CreateTreadPool t1 = new CreateTreadPool(picktemp, ftpIP, ftpuser, ftppwd, ftpport);
            //将线程放入池中进行执行
            pool.execute(t1); 
            System.out.println(Thread.currentThread().getName()+"进入线程"); 
            //关闭线程池
            pool.shutdown();  
            log.info(this.getClass().getSimpleName() + "."+ Thread.currentThread().getStackTrace()[1].getMethodName()+ "准备开启FTP上传线程------end");
        } 
        

      开启线程对FTP上传进行操作,因为考虑到本次操作总站可能不关心上传过程,所以成功与否就不实时通知总站了~~~~~

    public class CreateTreadPool implements Runnable {
        private static Logger log = Logger.getLogger(CreateTreadPool.class);
        // public static ReentrantLock lock=new ReentrantLock();
        //定义信号量,只能5个线程同时访问  
        final Semaphore semaphore = new Semaphore(5); 
        public static int c=0;
        public String picktemp; //路径
        public String ftpIP; // ftpip
        public String ftpuser ; // ftp帐号
        public String ftppwd ; // ftp密码
        public String ftpport; // ftp端口
        // 通过构造方法 获取FTP相关信息。
        public CreateTreadPool(String picktemp, String ftpIP,String ftpuser,String ftppwd,String ftpport){
            this.picktemp = picktemp;
            this.ftpIP = ftpIP;
            this.ftpuser = ftpuser;
            this.ftppwd = ftppwd;
            this.ftpport = ftpport;
        }
        @Override
        public void run() {
            // TODO Auto-generated method stub
            System.out.println(Thread.currentThread().getName()+"---------上传准备中---------"); 
            try {
                //获取许可  
                semaphore.acquire();
                System.out.println(Thread.currentThread().getName()+"获得锁");
                System.out.println(Thread.currentThread().getName()+"====>"+c); 
                boolean isSuccess = this.FTPUpload(picktemp, ftpIP, ftpuser, ftppwd, ftpport);
                if (isSuccess) {
                    System.out.println(Thread.currentThread().getName()+"---------上传成功---------");  
                }else {
                    System.out.println(Thread.currentThread().getName()+"---------上传失败---------"); 
                } 
                c++;
            } catch (Exception e) { 
                System.out.println(Thread.currentThread().getName()+"---------上传失败---------");
                e.printStackTrace();
            }finally{
                 System.out.println(Thread.currentThread().getName()+"------------释放锁");
                 semaphore.release();
            }
        }
        /**
         * @desc FTP上传
         * @author zp
         * @date 2018-3-23
         */
        public boolean FTPUpload(String filePath, String serverIp,String ftpuser,String ftppwd,String ftpport) {
            log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(给总站FTP上传视频)----start");
            FtpUtil ftpTool = new FtpUtil(ftpuser, ftppwd, serverIp, ftpport); // 创建链接
            boolean linkStatus = ftpTool.connectServer(ftpuser, ftppwd, serverIp, ftpport);
            File file = new File(filePath); // 实例化
            UploadListener listener = new UploadListener(ftpTool.client); // 创建监听
            ftpTool.ftpUploadFolder(file, listener); // 上传
            ftpTool.disconnect(); // 关闭
            RemoveFilesUtil removeFilesUtil = new RemoveFilesUtil();
            removeFilesUtil.DeleteFolder(filePath); // 移除本地临时上传的文件
            log.info(this.getClass().getSimpleName() + "." + Thread.currentThread().getStackTrace()[1].getMethodName() + "(给总站FTP上传视频)----end");
            return linkStatus;
        } 
  • 相关阅读:
    [C++11新特性] weak_ptr和unique_ptr
    [C++11新特性] shared_ptr共享的智能指针
    VS2019 Qt5.15.2 开发环境搭建
    【C++11 新特性】Lambda表达式(三)
    【C++11 新特性】bind(二)
    【C++11 新特性】function(一)
    【IPC 进程间通信】有名管道的简单实现
    【IPC 进程间通信】常用进程间通信方式总结
    Qt 文件常见操作管理类
    【GitHub 开源分享】QML 在线预览工具
  • 原文地址:https://www.cnblogs.com/pengpengzhang/p/8629491.html
Copyright © 2011-2022 走看看