先说下需求吧,我们的系统概括的讲就是一个接口系统,对外的方式无外乎三种,MQ、WEBSERVICE以及FTP了。因为FTP的业务是前人留下来东西,而它恰好一直不出问题,逻辑也比较复杂,所以一直都懒得看里面的内容,只是初步的知道是用的Apache Camel的ftp的路由。
一、Apache Camel是什么东西?
Apache Camel 的官网是http://camel.apache.org 。它是一个通用的基于已知的开源集成框架企业集成模式。简单的讲,Apache Camel就是集成现有的一系列中间件架构,如HTTP, ActiveMQ, JMS, JBI, SCA, MINA or CXF等,实现传输协议和消息格式的转换等等。看起来功能还挺强大,不过大部分可能在垂直领域有更好的解决方案。本文只说一下FTP的这个路由组件。(这段话可能说的不是太清楚,我自己也是迷迷糊糊,有能力的同学自己看下官网的解释吧)
二、官网FTP API 翻译整理
2.1 关于jar包
<!-- https://mvnrepository.com/artifact/org.apache.camel/camel-ftp --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ftp</artifactId> <version>2.13.1</version> </dependency>
<!-- https://mvnrepository.com/artifact/org.apache.camel/camel-ftp --> <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-ftp</artifactId> <version>2.19.2</version> </dependency>
2.2 URI的格式
其中directoryname表示文件夹目录。目录名称是一个相对路径。不支持绝对路径。相对路径可以包含嵌套文件夹,例如 /inbox/us。
如果没有提供端口号,Camel将根据协议提供缺省值(ftp sftp = ftp = 21日22日= 2222)。
你可以添加以下格式的URI查询配置, ?option=value&option=value&... 如上文我的测试链接。&是我在pom文件中转译使用的。
这个配置使用两个不同的库操作FTP。FTP和FTPS使用Apache Commons Net,而SFTP使用JCraft JSCH。
2.3 URI的各种配置选项
Name |
Default Value |
Description |
指定要使用的用户名登录远程服务器。 |
指定的密码用来登录到远程服务器。 |
null |
Camel 2.15.2: 指定的帐户用于登录到远程FTP服务器(只对FTP和FTPS) |
指定文件传输模式,二进制或ASCII。默认是ASCII(false)。 |
Camel 2.2: 使用后是否要断开远程FTP服务器。可用于消费者和生产者。只会断开断开当前连接到FTP服务器。如果你有一个你想要停止消费,那么你需要停止消费者路由。 |
在使用时,可以使用本地工作目录将远程文件内容直接存储在本地文件中,以避免将内容加载到内存中。这个是有好处的,如果你下载一个非常大的远程文件,可以节省内存。详情见下文。 |
FTP and FTPS only: 指定是否使用被动模式连接。默认是false。 |
Camel 2.11: FTP消费者是否应该下载该文件。如果将此选项设置为false,那么消息体将null,但消费者仍将触发一个Camel Exchange获得文件的详细信息,如文件名,文件大小,等等。只是不能下载的文件。 |
streamDownload |
false |
Camel 2.11: 消费者是否应该下载整个文件前,默认的行为,或者是否应该通过InputStream从路由远程资源读取而不是从内存中的Camel Exchange数组获取。如果下载失败的或是本地目录提供,这个选项可以忽略。此选项对于处理大型远程文件非常有用。 |
Camel 2.4: FTPS only: 默认情况下,如果没有禁用安全数据通道默认值,则使用选项p。可能的值是: |
Camel 2.4: FTPS only: 此选项指定安全数据通道的缓冲区大小。如果选择useSecureDataChannel,但是没有被显式设置,就没有使用价值了。 |
Camel 2.8.2, 2.9: 如果将此选项设置为true,camel-ftp将直接使用列表文件检查文件是否存在。由于一些FTP服务器可能不支持直接列出文件,如果选项是错误的,camel-ftp将使用旧的方法列出目录并检查文件是否存在。注意从Camel 2.10.1起这个选项也影响readLock=changed控制是否执行一个快速检查更新文件信息。如果FTP服务器有大量文件,则可以使用它来加快进程。 |
Delay in millis Camel will wait before performing a reconnect attempt. |
Camel 2.4: Is the connect timeout in millis. This corresponds to using |
FTP and FTPS Only: Camel 2.4: Is the Also SFTP from Camel 2.14.3/2.15.3/2.16 onwards. From Camel 2.16 onwards the default is 300000 (300 sec). |
FTP and FTPS Only: Camel 2.4: Is the data timeout in millis. This corresponds to using |
SFTP Producer Only: Camel 2.9: 允许你设置chmod存储文件。例如 |
32768 |
FTP/FTPS Only: Camel 2.15.1: 下载文件的缓冲区大小。默认大小是32 kb。 |
FTP and FTPS Only: Camel 2.1: Allows you to use a custom |
disconnectOnBatchComplete |
false | Camel 2.18: 是否在批处理完成后从远程FTP服务器断开连接。可用于消费者和生产者。断开连接只会断开与FTP服务器的当前连接。如果你有一个想要停止的消费者,那么你需要停止消费者路由 |
activePortRange |
Camel 2.18: 在主动模式设置客户端端口范围。语法是:minPort-maxPort。端口号都是包括的,如10000 - 19999包括所有xxxx端口。 |
2.4 Camel默认的一些配置
/** * Apache Camel FTP Demo * @author 小卖铺的老爷爷 */ public class HelloWorld extends RouteBuilder { //启动FTP路由,实际项目中初始化应该是单独的一个类 public static void main(String[] args) throws Exception { // 这是camel上下文对象,整个路由的驱动全靠它了。 ModelCamelContext camelContext = new DefaultCamelContext(); // 启动route camelContext.start(); // 将我们的路由处理加入到上下文中 camelContext.addRoutes(new HelloWorld()); } @Override public void configure() throws Exception { //从FTP上下载文件到本地目录,相关参数的意义,参考我上文贴出的API,实际项目中这些地址一般写在配置文件中 from("") //自定义的处理器,可以做各种逻辑处理,如文件名匹配下载等 .process(new HttpProcessor()) .to("file:d:/wms-fe/inFile"); } }
上文的demo是每隔一分钟扫描ftp服务器目录上是否有新文件,如果有匹配文件名复核条件的下载到本地,并将服务器上的文件删除。当然我们也可以不删除,将文件移到一个固定目录备份,这里可以用move=XXX参数。.process(new HttpProcessor()) 其中这句是可以删除的,删除后就是只有有文件,Camel就会下载到本地了。
/** * * 此类描述的是:FTP下载、加压、解析处理
* */ public class FTPMheRoute extends RouteBuilder { private String ftpDownURI; private String downDir; private String unpackDir; private String ftpFileCharset; private void initialize() { ftpDownURI = WmsFEUtil.getSysConfigValue("camel.ftp.download.uri").trim(); downDir = WmsFEUtil.getSysConfigValue("camel.ftp.download.zip.dir").trim(); unpackDir = WmsFEUtil.getSysConfigValue("camel.ftp.download.zip.unpack.dir").trim(); ftpFileCharset = WmsFEUtil.getSysConfigValue("camel.ftp.file.charset"); } // @Override public void configure() throws Exception { initialize(); downRoute(); unpackRoute(); parseRoute(); } /** * 此方法描述的是:下载路由 */ private void downRoute() { from(ftpDownURI).to(downDir).process(new MHEDownloadedProcessor()); // from(ftpDownURI) // .choice() // .when(new FTPFilter(ftpFileCharset, FTPFilter.TYPE_FTP_FILE_NORMAL)) // .process(new FTPProcessor(ftpFileCharset)) // .to(downDir); //.process(new MHEDownloadedProcessor()); OutUtil.log( FTPMheRoute.class,"Register Route :"+ " when find the zip file it will download to "+downDir ); } /** * 此方法描述的是:解压路由 */ private void unpackRoute() { from(downDir).process(new MHEDownUnpackProcessor()); OutUtil.log( FTPMheRoute.class,"Register Route :"+ "camel listner @ "+ downDir , " when find the zip file it will unpack to "+unpackDir ); } /** * 此方法描述的是:解析路由 */ private void parseRoute() { from(unpackDir).process(new MHEDownExecuteProcessor()); OutUtil.log( FTPMheRoute.class,"Register Route :"+ "camel listner @ "+ unpackDir , " when find the txt file it will parse to DB." ); } }
最后说一下Process,它用于接收从控制端点、路由选择条件又或者另一个处理器的Exchange中传来的消息信息,并进行处理。这里process(Exchange exchange)方法是必须进行实现的。
/** * * 此类描述的是:FTP解压处理 */ public class MHEDownUnpackProcessor implements Processor { private static final String TXT_PATTERN = ".*?\.txt"; private static final String ZIP_PATTERN = ".*?\.zip"; private static final String BUSINESSTYPE_FTPMHE = "FTPMHE"; @Override public void process(Exchange exchange) throws Exception { Message message = exchange.getIn(); GenericFile<?> gf = (GenericFile<?>)message.getBody(); File zipFile = (File) gf.getFile(); unzip(zipFile); } /** * 此方法描述的是:解压缩文件 * @throws IOException */ private List<File> unzip(File zipFile) throws IOException { String zipName = zipFile.getName(); if (Pattern.matches(ZIP_PATTERN, zipName)){ List<File> list = ZipUtil.unzip(zipFile, getDownUnpackDir(), TXT_PATTERN); if (list == null){ BaseLogUtil.addMsgLog(BUSINESSTYPE_FTPMHE, zipName, "", BaseLogUtil.SOURCE_OTHER, BaseLogUtil.TO_SYS_OTHER, BaseLogUtil.CHANNEL_FTP, BaseLogUtil.STATUS_FAIL, "", "解压失败,文件内容为空"); ParseFileTools.move2Dir(zipFile, "fail"); }else{ //FileUtil.deleteFile(zipFile); ParseFileTools.move2Dir(zipFile, "success"); } return list; }else{ BaseLogUtil.addMsgLog(BUSINESSTYPE_FTPMHE, zipName, "", BaseLogUtil.SOURCE_OTHER, BaseLogUtil.TO_SYS_OTHER, BaseLogUtil.CHANNEL_FTP, BaseLogUtil.STATUS_FAIL, "", "解压失败,压缩包为非法格式"); ParseFileTools.move2Dir(zipFile, "fail"); } return null; } /** * 此方法描述的是:获得解压临时目录 */ public static String getDownUnpackDir() { return getURI(WmsFEUtil.getSysConfigValue("camel.ftp.download.zip.unpack.dir")); } /** * 此方法描述的是:获得文件路径 */ public static String getURI(String fileURI) { String uri = fileURI.replaceFirst("file://", ""); int lastParam = uri.lastIndexOf("?"); if (lastParam != -1){ uri = uri.substring(0, lastParam); } return uri; } }
public class ZipUtil { /** * 此方法描述的是:ZIP压缩 * @param source 源文件 * @param dest 压缩文件 * @version: 2015年3月2日 上午9:17:26 */ public static String zip(File source, File dest) { ZipOutputStream out = null; BufferedOutputStream bo = null; try{ File zipParent = dest.getParentFile(); if (!zipParent.exists()){ zipParent.mkdirs(); } out = new ZipOutputStream(new FileOutputStream(dest)); bo = new BufferedOutputStream(out); zip(out, source, source.getName(), bo); return dest.getAbsolutePath(); }catch(Exception e){ OutUtil.error(e,e.getMessage()); }finally{ if (bo!=null){ try { bo.close(); } catch (IOException e) { } } if (out!=null){ try { out.close(); // 输出流关闭 } catch (IOException e) { } } } return null; } public static void zip(ZipOutputStream out, File f, String base,BufferedOutputStream bo) throws Exception { // 方法重载 out.putNextEntry(new ZipEntry(base)); // 创建zip压缩进入点base FileInputStream in = new FileInputStream(f); BufferedInputStream bi = new BufferedInputStream(in); int b; try{ while ((b = bi.read()) != -1) { bo.write(b); // 将字节流写入当前zip目录 } }finally{ bi.close(); in.close(); // 输入流关闭 bo.close(); } } /** 解压缩(压缩文件中包含多个文件)可代替上面的方法使用。 * ZipInputStream类 * 当我们需要解压缩多个文件的时候,ZipEntry就无法使用了, * 如果想操作更加复杂的压缩文件,我们就必须使用ZipInputStream类 * */ public static List<File> unzip(File sourceFile ,String outPath,String fileNameRegexp){ List<File> listDestFile = new ArrayList<File>(); File outFile = null; ZipFile zipFile = null; FileInputStream sourceInput = null; ZipInputStream zipInput = null; ZipEntry entry = null; InputStream input = null; OutputStream output = null; String sourceName = sourceFile.getName(); try { zipFile = new ZipFile(sourceFile); sourceInput = new FileInputStream(sourceFile); zipInput = new ZipInputStream(sourceInput); while((entry = zipInput.getNextEntry()) != null){ String entryName = entry.getName(); if (!Pattern.matches(fileNameRegexp, entryName)){ continue ; } outFile = new File(outPath + File.separator + sourceName+"-"+entryName); if(!outFile.getParentFile().exists()){ outFile.getParentFile().mkdir(); } if(!outFile.exists()){ outFile.createNewFile(); } input = zipFile.getInputStream(entry); output = new FileOutputStream(outFile); int temp = 0; while((temp = input.read()) != -1){ output.write(temp); } input.close(); output.close(); listDestFile.add(outFile); } return listDestFile; } catch (Exception e) { e.printStackTrace(); } finally{ try { if (input!=null) input.close(); if (output!=null) output.close(); if (zipInput!=null) zipInput.close(); if (sourceInput!=null) sourceInput.close(); if (zipFile!=null) zipFile.close(); } catch (IOException e) {} } return null; } /** * 此方法描述的是:获得ZIP文件同名解压目录 * @version: 2015年3月5日 下午2:10:41 */ public static String getUnpackForder(File zipFile) { String filePath = zipFile.getAbsolutePath(); return filePath.substring(0, filePath.lastIndexOf(".")); } /** * 此方法描述的是:获得ZIP文件指定解压目录 * @version: 2015年3月6日 下午10:28:36 */ public static String getUnpackForder(File zipFile, String subDir) { return zipFile.getParent()+File.separator+subDir; } }