一 sftp搭建
略
这里简单说一下为什么使用sftp。ftp和sftp各有优点,差别并不是太大。sftp安全性好,性能比ftp低。ftp对于java来说并不复杂,效率也高。之所以使用sftp主要是可以使用spring-boot+apache-camel。camel框架将文件传输分为filter,prcessor,和路由,定时器等组件,模块化开发,将可随意将这些组件进行组合,耦合性低,开发较为灵活。可以将更多的精力放到业务层面。
二使用apache-camel来定时从sftp服务器下载文件
2.1 pom依赖
<dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-spring-boot-starter</artifactId> <version>2.18.0</version> </dependency> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ftp</artifactId> <version>2.19.4</version> </dependency>
2.2 applicatin.properties配置
ftp.server.uri=sftp://${ftp.url} ?username=${ftp.username} &password=${ftp.password} &useUserKnownHostsFile=false &localWorkDirectory=${ftp.local.work.directory} &delay=5m &filter=#ftpDownloadFileFilter &stepwise=false &recursive=true ftp.url=192.168.20.162:22/ ftp.username=test ftp.password=123456 #文件服务器目录 ftp.local.work.directory=/ # 文件拉取到本地存储的文件 ftp.local.data.dir=E://test/
其中
readLock=rename 是否重命名,防止读取文件服务器正在写入的文件
recursive=true 是否递归读取
#有些地方说这里需要显式指定后台运行
camel.springboot.main-run-controller=true
2.3 过滤器
自定义规则判断哪些文件需要下载,哪些文件不需要下载
package com.test.comm; import org.apache.camel.component.file.GenericFile; import org.apache.camel.component.file.GenericFileFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.test.util.RedisTemplateUtil; @Component public class FtpDownloadFileFilter implements GenericFileFilter<Object> { private static Logger logger = LoggerFactory.getLogger(FtpDownloadFileFilter.class); @Value("${ftp.local.data.dir}") private String localDir; @Autowired private RedisTemplateUtil redisTemplateUtil; /** * 过滤下载文件 * * @author sunk */ @Override public boolean accept(GenericFile<Object> file) { try { return isDownloaded(file); } catch (Exception e) { logger.error("ftp download file filter error !", e); return false; } } /** * 根据时间戳来判断是否下载过 * * @param fileName * * @return */ public boolean isDownloaded(GenericFile<Object> file) { String fileName = file.getFileName(); if (file.isDirectory()) { return true; } boolean bool = false; if (fileName.contains("_")) { long time = Long.parseLong(fileName.split("_")[3]); // 从redis中获取上次的时间,当前文件时间大于当前时间则获取,否则不获取 Object preTime = redisTemplateUtil.get(0, Constants.reids.YP_PICTRUE_TIME); if (preTime == null) { bool = true; } else { if (Long.parseLong(preTime.toString()) < time) { bool = true; } } } return bool; } }
2.4 路由
自定义路由规则,一般是告诉程序,从哪里读文件,并搬运到哪里去
package com.test.comm; import java.net.InetAddress; import org.apache.camel.LoggingLevel; import org.apache.camel.builder.RouteBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component public class FtpDownloadRoute extends RouteBuilder { private static Logger logger = LoggerFactory.getLogger(FtpDownloadRoute.class); @Value("${ftp.server.uri}") private String ftpUri; @Value("${ftp.local.data.dir}") private String localDir; @Autowired LocationFileProcessor locationFileProcessor; @Override public void configure() throws Exception { logger.debug("开始连接 " + ftpUri); from(ftpUri).to("file:" + localDir).process(locationFileProcessor).log(LoggingLevel.INFO, logger, "download file ${file:name} complete."); logger.debug("连接成功"); } }
2.5 其它自定义进程
除了文件搬运之外,允许自定义对文件的其它操作,比如入库等等
,自定义的类,可添加在路由中
package com.test.comm; import java.io.RandomAccessFile; import java.util.HashMap; import org.apache.camel.Exchange; import org.apache.camel.Processor; import org.apache.camel.component.file.GenericFileMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import com.google.gson.Gson; import com.test.config.ApplicationStartup; import com.test.model.Device; import com.test.model.Pictrue; import com.test.util.DateUtil; import com.test.util.ESRepository; import com.test.util.FileUtil; import com.test.util.RedisTemplateUtil; /** * camel 业务类 * * <p> * Title:LocationFileProcessor * </p> * <p> * Description:TODO * </p> * <p> * Copyright:Copyright(c)2005 * </p> * <p> * Company:stest * </p> * * @author * @date 2018年11月15日 上午9:02:29 */ @Component public class LocationFileProcessor implements Processor { private static Logger logger = LoggerFactory.getLogger(LocationFileProcessor.class); @Autowired private RedisTemplateUtil redisTemplateUtil; @Autowired private FastDFSClient fastDFSClient; @Value("${ftp.local.data.dir}") private String localDir; @Autowired private ESRepository eSRepository; @Value("${elasticsearch.index}") private String esIndex; @Value("${elasticsearch.type}") private String esType; @Autowired private ApplicationStartup applicationStartup; @Override public void process(Exchange exchange) throws Exception { @SuppressWarnings("unchecked") GenericFileMessage<RandomAccessFile> inFileMessage = (GenericFileMessage<RandomAccessFile>) exchange.getIn(); String fileName = inFileMessage.getGenericFile().getFileName();// 文件名 logger.info(fileName);// 文件的绝对路径 String subfileName = fileName.substring(fileName.lastIndexOf("/") + 1); long time = Long.parseLong(fileName.split("_")[3]); // 上传到fastdfs String path = upload(fileName); // 将图片地址等信息保存到es saveEs(subfileName, path); // 获取当前redis里面保存的时间,如果为空直接存入,如果不为空且当前文件时间大于redis时间,那覆盖 saveRedis(time); } /** * 将最后获取图片的时间标记保存至redis * * @param time */ private void saveRedis(long time) { Object redisKey = redisTemplateUtil.get(0, Constants.reids.YP_PICTRUE_TIME); if (redisKey == null || (redisKey != null && Long.parseLong(redisKey.toString()) < time)) { redisTemplateUtil.set(Constants.reids.YP_PICTRUE_TIME, time, 0); } } /** * 保存es * * @param subfileName * @param path */ private void saveEs(String subfileName, String path) { String[] fileNames = subfileName.split("_"); String deviceId = fileNames[0]; String plate = fileNames[2].substring(1); String captrue = fileNames[3]; String type = fileNames[4].split("\.")[0]; String times = DateUtil.transForDate1(Integer.parseInt(captrue)); captrue = captrue + "000"; // 根据deviceId获取经纬度 HashMap<Integer, Device> devices = applicationStartup.getDevices(); Device device = devices.get(Integer.parseInt(deviceId)); double latitude = 0; double longitude = 0; if (device != null) { latitude = device.getLat(); longitude = device.getLon(); } String deviceName = device.getDeviceName(); String address = device.getDeviceAddress(); Pictrue pictrue = new Pictrue(deviceId, plate, captrue, type, path, times, latitude, longitude, deviceName, address, "视频数据"); Gson gson = new Gson(); eSRepository.addTargetDataALL(gson.toJson(pictrue), esIndex, esType, null); } /** * 上传fastdfs * * @param fileName * @return * @throws Exception */ private String upload(String fileName) throws Exception { String path = fastDFSClient.uploadFile(FileUtil.getBytes(localDir + fileName), fileName); return path; } }