zoukankan      html  css  js  c++  java
  • spring boot + apache camel 传输文件

    一 sftp搭建

    这里简单说一下为什么使用sftp。ftp和sftp各有优点,差别并不是太大。sftp安全性好,性能比ftp低。ftp对于java来说并不复杂,效率也高。之所以使用sftp主要是可以使用spring-boot+apache-camel。camel框架将文件传输分为filter,prcessor,和路由,定时器等组件,模块化开发,将可随意将这些组件进行组合,耦合性低,开发较为灵活。可以将更多的精力放到业务层面。

    二使用apache-camel来定时从sftp服务器下载文件

    2.1 pom依赖

             <dependency>
                <groupId>org.apache.camel</groupId>
                <artifactId>camel-spring-boot-starter</artifactId>
                <version>2.18.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.camel</groupId>
                <artifactId>camel-ftp</artifactId>
                <version>2.19.4</version>
            </dependency>                

    2.2 applicatin.properties配置

    ftp.server.uri=sftp://${ftp.url}
                   ?username=${ftp.username}
                   &password=${ftp.password}
                   &useUserKnownHostsFile=false
                   &localWorkDirectory=${ftp.local.work.directory}
                   &delay=5m
                   &filter=#ftpDownloadFileFilter
                   &stepwise=false
                   &recursive=true
    ftp.url=192.168.20.162:22/
    ftp.username=test
    ftp.password=123456
    
    #文件服务器目录
    ftp.local.work.directory=/
    # 文件拉取到本地存储的文件
    ftp.local.data.dir=E://test/

    其中

    readLock=rename 是否重命名,防止读取文件服务器正在写入的文件
    recursive=true 是否递归读取

    #有些地方说这里需要显式指定后台运行

    camel.springboot.main-run-controller=true

    2.3 过滤器
    自定义规则判断哪些文件需要下载,哪些文件不需要下载

    package com.test.comm;
    
    import org.apache.camel.component.file.GenericFile;
    import org.apache.camel.component.file.GenericFileFilter;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import com.test.util.RedisTemplateUtil;
    
    @Component
    public class FtpDownloadFileFilter implements GenericFileFilter<Object> {
    
        private static Logger logger = LoggerFactory.getLogger(FtpDownloadFileFilter.class);
    
        @Value("${ftp.local.data.dir}")
        private String localDir;
    
        @Autowired
        private RedisTemplateUtil redisTemplateUtil;
    
        /**
         * 过滤下载文件
         * 
         * @author sunk
         */
        @Override
        public boolean accept(GenericFile<Object> file) {
            try {
                return isDownloaded(file);
            } catch (Exception e) {
                logger.error("ftp download file filter error !", e);
                return false;
            }
        }
    
        /**
         * 根据时间戳来判断是否下载过
         * 
         * @param fileName
         *            
         * @return
         */
        public boolean isDownloaded(GenericFile<Object> file) {
            String fileName = file.getFileName();
            if (file.isDirectory()) {
                return true;
            }
            boolean bool = false;
            if (fileName.contains("_")) {
                long time = Long.parseLong(fileName.split("_")[3]);
                // 从redis中获取上次的时间,当前文件时间大于当前时间则获取,否则不获取
                Object preTime = redisTemplateUtil.get(0, Constants.reids.YP_PICTRUE_TIME);
                if (preTime == null) {
                    bool = true;
                } else {
                    if (Long.parseLong(preTime.toString()) < time) {
                        bool = true;
                    }
                }
            }
            return bool;
        }
         
    }
    View Code

    2.4 路由
    自定义路由规则,一般是告诉程序,从哪里读文件,并搬运到哪里去

    package com.test.comm;
    
    import java.net.InetAddress;
    
    import org.apache.camel.LoggingLevel;
    import org.apache.camel.builder.RouteBuilder;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    @Component
    public class FtpDownloadRoute extends RouteBuilder {
    
        private static Logger logger = LoggerFactory.getLogger(FtpDownloadRoute.class);
    
        @Value("${ftp.server.uri}")
        private String ftpUri;
    
        @Value("${ftp.local.data.dir}")
        private String localDir;
    
        @Autowired
        LocationFileProcessor locationFileProcessor;
    
        @Override
        public void configure() throws Exception {
            logger.debug("开始连接 " + ftpUri);
            from(ftpUri).to("file:" + localDir).process(locationFileProcessor).log(LoggingLevel.INFO, logger,
                    "download file ${file:name} complete.");
            logger.debug("连接成功");
        }
    
    }
    View Code

    2.5 其它自定义进程
    除了文件搬运之外,允许自定义对文件的其它操作,比如入库等等
    ,自定义的类,可添加在路由中

    package com.test.comm;
    
    import java.io.RandomAccessFile;
    import java.util.HashMap;
    
    import org.apache.camel.Exchange;
    import org.apache.camel.Processor;
    import org.apache.camel.component.file.GenericFileMessage;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.stereotype.Component;
    
    import com.google.gson.Gson;
    import com.test.config.ApplicationStartup;
    import com.test.model.Device;
    import com.test.model.Pictrue;
    import com.test.util.DateUtil;
    import com.test.util.ESRepository;
    import com.test.util.FileUtil;
    import com.test.util.RedisTemplateUtil;
    
    /**
     * camel 业务类
     *
     * <p>
     * Title:LocationFileProcessor
     * </p>
     * <p>
     * Description:TODO
     * </p>
     * <p>
     * Copyright:Copyright(c)2005
     * </p>
     * <p>
     * Company:stest
     * </p>
     *
     * @author
     * @date 2018年11月15日 上午9:02:29
     */
    @Component
    public class LocationFileProcessor implements Processor {
    
        private static Logger logger = LoggerFactory.getLogger(LocationFileProcessor.class);
    
        @Autowired
        private RedisTemplateUtil redisTemplateUtil;
    
        @Autowired
        private FastDFSClient fastDFSClient;
    
        @Value("${ftp.local.data.dir}")
        private String localDir;
    
        @Autowired
        private ESRepository eSRepository;
    
        @Value("${elasticsearch.index}")
        private String esIndex;
    
        @Value("${elasticsearch.type}")
        private String esType;
    
        @Autowired
        private ApplicationStartup applicationStartup;
    
        @Override
        public void process(Exchange exchange) throws Exception {
            @SuppressWarnings("unchecked")
            GenericFileMessage<RandomAccessFile> inFileMessage = (GenericFileMessage<RandomAccessFile>) exchange.getIn();
            String fileName = inFileMessage.getGenericFile().getFileName();// 文件名
            logger.info(fileName);// 文件的绝对路径
            String subfileName = fileName.substring(fileName.lastIndexOf("/") + 1);
            long time = Long.parseLong(fileName.split("_")[3]);
            // 上传到fastdfs
            String path = upload(fileName);
            // 将图片地址等信息保存到es
            saveEs(subfileName, path);
            // 获取当前redis里面保存的时间,如果为空直接存入,如果不为空且当前文件时间大于redis时间,那覆盖
            saveRedis(time);
        }
    
        /**
         * 将最后获取图片的时间标记保存至redis
         * 
         * @param time
         */
        private void saveRedis(long time) {
            Object redisKey = redisTemplateUtil.get(0, Constants.reids.YP_PICTRUE_TIME);
            if (redisKey == null || (redisKey != null && Long.parseLong(redisKey.toString()) < time)) {
                redisTemplateUtil.set(Constants.reids.YP_PICTRUE_TIME, time, 0);
            }
        }
    
        /**
         * 保存es
         * 
         * @param subfileName
         * @param path
         */
        private void saveEs(String subfileName, String path) {
            String[] fileNames = subfileName.split("_");
            String deviceId = fileNames[0];
            String plate = fileNames[2].substring(1);
            String captrue = fileNames[3];
            String type = fileNames[4].split("\.")[0];
            String times = DateUtil.transForDate1(Integer.parseInt(captrue));
            captrue = captrue + "000";
            // 根据deviceId获取经纬度
            HashMap<Integer, Device> devices = applicationStartup.getDevices();
            Device device = devices.get(Integer.parseInt(deviceId));
            double latitude = 0;
            double longitude = 0;
            if (device != null) {
                latitude = device.getLat();
                longitude = device.getLon();
            }
            String deviceName = device.getDeviceName();
            String address = device.getDeviceAddress();
            Pictrue pictrue = new Pictrue(deviceId, plate, captrue, type, path, times, latitude, longitude, deviceName,
                    address, "视频数据");
            Gson gson = new Gson();
            eSRepository.addTargetDataALL(gson.toJson(pictrue), esIndex, esType, null);
        }
    
        /**
         * 上传fastdfs
         * 
         * @param fileName
         * @return
         * @throws Exception
         */
        private String upload(String fileName) throws Exception {
            String path = fastDFSClient.uploadFile(FileUtil.getBytes(localDir + fileName), fileName);
            return path;
        }
    }
    View Code
  • 相关阅读:
    发送邮件时,报错:AttributeError: 'list' object has no attribute 'encode'
    快速统计字符出现次数
    vscode快捷键
    win7系统部署django项目
    记录一个小问题,django+Apache+win7,启动Apache后,打开网页,一直转圈圈,停不下来
    django ORM 按月分组统计
    哈希表的应用
    查找算法
    递归的应用
    递归
  • 原文地址:https://www.cnblogs.com/eryuan/p/9988621.html
Copyright © 2011-2022 走看看