zoukankan      html  css  js  c++  java
  • Spring Integration sftp 技术之 SFTP Outbound Gateway

    本篇博文介绍spring integration sftp技术中的sftp outbound gateway相关内容。Sftp outbound gateway 其实质就是提供一组命令(如图1)来实现对服务器上文件的交互操作,包括文件的获取(文件对象和文件名等)、上传(单文件和多文件)、下载(单文件和多文件),删除,移动。具体在开发的过程中可以使用多种配置方式如xml,springboot等。本文在介绍SFTP Outbound Gateway 的基础上,使用SpringBoot开发框架进行相应的开发实践。

    1.命令组
    1.1 ls

    该命令的功能是获取远程文件,包括文件对象和文件路径名称等,具体返回值根据配置的选项:

    • -1 :获取一组远程文件的文件名;默认是获取一组FileInfo对象;
    • -a:获取所有的文件(包括开始的文件,递归时使用);
    • - f:检索结果不用排序;
    • -dirs: 包括文件夹,默认是包括的;
    • -links:包括链接符号,默认是包括的;
    • -R:递归方式获取远程文件夹下所有文件,默认不递归的。

    除此之外,还可以配置文件名过滤器等;
    命令返回值: 通过ls命令获取的message payload,是一组文件名或者FileInfo对象,对象中提供了有关文件的修改时间,权限以及其他的信息;

    ls命令作用的远程文件夹,由header头的file_remoteDirectory属性提供;

    建议提醒:如果使用-R递归选择项,文件名将含有子文件夹,表明递归文件的相对路径;如果使用-dirs选项,每一个递归的子文件夹,返回的元素中将含有子文件夹名;在这种情况下,建议不用使用-1罗列文件名,因为返回的元素中不能够区分是文件还是文件夹?建议返回FileInfo对象。

    下面是开发示例:

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel2")
    public MessageHandler handler2() {
    	//指定session配置和命令
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"ls","payload");
        sftpOutboundGateway.setOptions("-dirs"); //配置项
        return sftpOutboundGateway;
    }
    //使用Gateway触发
    @MessagingGateway
    public interface MessageGateway {
        @Gateway(requestChannel = "sftpChannel2")
        List<FileInfo> listFileName(String dir); //指定远程文件夹
     }
    
    1.2 nlst

    该命令提供检索远程文件名的功能,相当于ls -1的命令;支持如下配置:

    • -f:文件名不排序;
      nlst命令作用的远程文件夹,由header头的file_remoteDirectory提供。

    返回值:通过nlst获取的文件message payload,就是一组文件名列表;

    1.3 get

    该命令由于获取一个远程的文件,支持如下的选项:

    • -P:文件下载之后,保持文件在本地的时间戳同远程服务器一致;
    • -stream:以流的方式获取远程文件;
    • -D:文件成功转移之后,删除远程文件;如果FileExistsMode设置为IGNORE,远程文件不会删除。

    file_remoteDirectory 头包含了文件的远程路径,file_remoteFile属性为文件名;

    返回值:使用get方法获取的message的payload是一个File对象,如果使用-straem,则payload就是一个InputStream文件流。

    对于文本文件,有个通用的案例,使用file splitter 或 stream transformer。当以文件流的形式获取远程文件,Session在结束之后要及时关闭. Session由closeableResource属性header头文件,IntegrationMessageHeaderAccessor提供了流资源的关闭操作。

    1.4 mget

    该命令用来基于特定的文件模式过滤器获取多个文件,支持如下的设置:

    • -P:保留远程文件的时间戳;
    • -R:递归下载所有符合的文件;
    • -x:没有文件匹配文件筛选模式,抛出异常,并返回空集合;
    • -D:文件成功转移之后。如何FileExistsMode=IGNORE,本地文件存在,文件不会删除;

    message payload返回的是List< >对象,集合元素是File。

    注意:
    在5.0版本之后,若FileExistsMode=IGNORE,payload不再包含已经存在的文件对象。

    remote path的表达式应该是以结尾,类似myfiles/,表示获取完整的文件夹树myfiles;

    注意,在版本5.0之后,MGET命令可以设置FileExistsMode.REPLACE_IF_MODIFIED模式,去同步整个文件夹,被修改的文件的时间戳也会相应修改。不用关心-P模式;

    -R模式,默认情况下是整个文件夹,同时也支持设置文件或文件夹过滤器FileListFilter; 该过滤器提供两种方式filename-pattern或者filename-regex属性;例如filename-regex="(subDir|.*1.txt)" 获取subDir下所有以1.txt结尾的文件;
    通常,将在local-directory-expression中使用#remoteDirectory变量,以便远程目录结构在本地保留。

    下面是开发示例:

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel3")
    public MessageHandler handler3() {
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"mget","payload");
        sftpOutboundGateway.setOptions("-R");
        sftpOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE_IF_MODIFIED);
        sftpOutboundGateway.setLocalDirectory(new File("E:\sftp_tmp_dir"));
        sftpOutboundGateway.setAutoCreateLocalDirectory(true);  
        return sftpOutboundGateway;
    }
    @MessagingGateway
    public interface MessageGateway {
    	@Gateway(requestChannel = "sftpChannel3")
    	List<File> listFile(String dir);
    }
    
    1.5 put

    该命令是发送单个文件到远程服务器;
    message的payload可以是File对象,byte[]数组,或者字符串;
    remote-filename-generator用来命名远程文件。其他的属性如remote-directory,temporary-remote-directory等等;
    返回值:put命令的message的payload的返回值是string,包含文件传输后在服务器上的整个路径;

    1.6 mput

    该命令是发送多个文件到服务器,支持如下配置:

    • -R: 递归发送文件和子文件夹下的所有文件;

    message payload必须是文件或者文件路径字符串,代表了本地文件夹;自版本5.1之后,也支持文件或者路径字符串集合;
    put的配置,同样适合mput,同时除此之外,还提供过滤文件的mput-pattern,mput-regex,mput-filter等;
    版本4.3之后,支持设置文件的权限;

    返回值:mput执行之后的返回值,是一个List,包含文件转移之后的路径集合。

    下面是开发示例:

        //!important,put命令需要借助与sftpRemoteFileTemplate。
        //看源码,可以发现outbound gateway 有多种构造函数;
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel4")
        public MessageHandler handler4(){
            SftpRemoteFileTemplate  sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sftpSessionFactory());
            sftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression("/send"));
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpRemoteFileTemplate,"put","payload");
            sftpOutboundGateway.setBeanFactory(beanFactory);
            return sftpOutboundGateway;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel5")
        public MessageHandler handler5(){
            SftpRemoteFileTemplate  sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sftpSessionFactory());
            sftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression("/send"));        
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpRemoteFileTemplate,"mput","payload");
           //配置过滤器
            sftpOutboundGateway.setMputFilter(new FileListFilter<File>() {
                @Override
                public List<File> filterFiles(File[] files) {
                	if(...){
                		...
                	}
                    return null;
                }
            });
            sftpOutboundGateway.setBeanFactory(beanFactory);
            return sftpOutboundGateway;
        }
    
    1.7 rm

    该命令是删除远程文件。
    如果删除成功,message payload的返回值是Boolean.TRUE;否则是Boolean.FALSE。
    file_remoteDirectory头包含远程文件属性;

    下面是开发示例:

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel6")
    public MessageHandler handler6(){
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"rm","payload");
        sftpOutboundGateway.setBeanFactory(beanFactory);
        return sftpOutboundGateway;
    }
    
    1.8 mv

    该命令是移动文件在远程服务器上的位置。
    返回值:转移成功,返回true,否则是false;

    下面是开发示例:

        @Bean
        @ServiceActivator(inputChannel = "sftpChannel7")
        public MessageHandler handler7(){
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"mv","'send/22.TXT'");
            sftpOutboundGateway.setRenameExpression(new LiteralExpression("send1/22.TXT"));
            sftpOutboundGateway.setBeanFactory(beanFactory);
            return sftpOutboundGateway;
        }
    

     以下是干货(测试用例):

    首先是POM文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.flower.springintegration</groupId>
        <artifactId>spring-integration-samples</artifactId>
    
        <version>v0.0.1</version>
    
        <name>SpringIntegrationExamples</name>
    
        <description>Spring Integration Samples</description>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.1.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
           <!-- <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-batch</artifactId>
            </dependency>-->
    
            <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-sftp</artifactId>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz -->
            <dependency>
                <groupId>org.quartz-scheduler</groupId>
                <artifactId>quartz</artifactId>
                <version>2.2.1</version>
            </dependency>
    
    
            <dependency>
                <groupId>com.zaxxer</groupId>
                <artifactId>HikariCP</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-test</artifactId>
                <version>1.4.0.RELEASE</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>4.3.2.RELEASE</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

     接下来是yml文件配置:

    spring:
      datasource:
        type: com.zaxxer.hikari.HikariDataSource
        url: jdbc:mysql://localhost:3306/springbatchexample?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8
        username: root
        password: root
    
    sftp:
        host: 127.0.0.1
        port: 23
        user: 47gamer
        password: wdnmd
        filePath:
            send: /send
            achieve: /achieve
            localPath: /sftp_tmp_dir
    

     然后是Sftp网关配置类SftpConfig.java

    package com.flower.integration.sftp;
    
    import com.jcraft.jsch.ChannelSftp.LsEntry;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.BeanFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.DependsOn;
    import org.springframework.expression.common.LiteralExpression;
    import org.springframework.integration.annotation.*;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.config.EnableIntegration;
    import org.springframework.integration.core.MessageSource;
    import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
    import org.springframework.integration.file.filters.FileListFilter;
    import org.springframework.integration.file.remote.FileInfo;
    import org.springframework.integration.file.remote.session.CachingSessionFactory;
    import org.springframework.integration.file.remote.session.SessionFactory;
    import org.springframework.integration.file.support.FileExistsMode;
    import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter;
    import org.springframework.integration.sftp.gateway.SftpOutboundGateway;
    import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer;
    import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource;
    import org.springframework.integration.sftp.outbound.SftpMessageHandler;
    import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
    import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    
    import javax.annotation.Resource;
    import java.io.File;
    import java.util.List;
    import java.util.Properties;
    
    /**
     * Sftp configuration.
     *
     * @Autor 47Gamer
     * @Date 2019-01-18
     */
    @Configuration
    @DependsOn("sftpProperty")
    public class SftpConfig {
    
        @Resource(name = "sftpProperty")
        private SftpProperty sftpProperty;
    
        private static Logger log = LoggerFactory.getLogger(SftpConfig.class);
    
    
        @Value("${sftp.host}")
        private String sftpHost;
    
        @Value("${sftp.port:23}")
        private int sftpPort;
    
        @Value("${sftp.user}")
        private String sftpUser;
    
        @Value("${sftp.privateKey:#{null}}")
        private org.springframework.core.io.Resource sftpPrivateKey;
    
        @Value("${sftp.privateKeyPassphrase:}")
        private String sftpPrivateKeyPassphrase;
    
        @Value("${sftp.password}")
        private String sftpPassword;
    
       /* @Bean
        public SessionFactory<LsEntry> sftpSessionFactory() {
            System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
            DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(false);
            factory.setHost(sftpProperty.getHost());
            factory.setPort(sftpProperty.getPort());
            factory.setUser(sftpProperty.getUser());
            Properties jschProps = new Properties();
            //!important 必须配置PreferredAuthentications,否则程序控制台会询问user name 和 password。
            jschProps.put("StrictHostKeyChecking", "no");
            jschProps.put("PreferredAuthentications",
                    "password,gssapi-with-mic,publickey,keyboard-interactive");
    
            factory.setSessionConfig(jschProps);
    
          //  if (sftpPassword != null) {
                factory.setPassword(sftpProperty.getPassword());
    //        } else {
    //            factory.setPrivateKey(sftpPrivateKey);
    //            factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
    //        }
    
            factory.setAllowUnknownKeys(true);
            //        //设置缓存的属性,缓存的size(), waitTimeout().
            CachingSessionFactory<LsEntry> cachingSessionFactory =
                    new CachingSessionFactory<LsEntry>(factory);
            cachingSessionFactory.setPoolSize(10);
    //        cachingSessionFactory.setSessionWaitTimeout(1000);
    
            return cachingSessionFactory;
    //        return new CachingSessionFactory<LsEntry>(factory);
        }*/
    
        /**
         * 创建 spring-integration-sftp session
         * 避免使用jsch原生的创建session的方式
         *
         * @return SessionFactory<LsEntry>
         */
        @Bean
        public SessionFactory<LsEntry> sftpSessionFactory(){
            System.out.println("######################################################");
            DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
            factory.setUser(sftpProperty.getUser());
            factory.setHost(sftpProperty.getHost());
            factory.setPort(sftpProperty.getPort());
            factory.setPassword(sftpProperty.getPassword());
    
            Properties jschProps = new Properties();
            //!important 必须配置PreferredAuthentications,否则程序控制台会询问user name 和 password。
            jschProps.put("StrictHostKeyChecking", "no");
            jschProps.put("PreferredAuthentications",
                    "password,gssapi-with-mic,publickey,keyboard-interactive");
    
            factory.setSessionConfig(jschProps);
            factory.setAllowUnknownKeys(true);
    
            //设置缓存的属性,缓存的size(), waitTimeout().
            CachingSessionFactory<LsEntry> cachingSessionFactory =
                    new CachingSessionFactory<LsEntry>(factory);
    //        cachingSessionFactory.setPoolSize(2000);
    
    
            return  cachingSessionFactory;
        }
    
        /**
         * 配置Outbound Channel Adapter.
         *
         * 实质上就是一个MessageHandler,接收Message Channel发送的信息流.
         * @return MessageHandler
         */
        @ServiceActivator(inputChannel = "fileInChannel")
        @Bean
        public SftpMessageHandler sftpMessageHandler(){
            SftpMessageHandler sftpMsgHandler = new SftpMessageHandler(sftpSessionFactory());
    
            sftpMsgHandler.setRemoteDirectoryExpression(
                    new LiteralExpression(sftpProperty.getSftpAchievePath()));
            sftpMsgHandler.setAutoCreateDirectory(true);
            sftpMsgHandler.setCharset("UFT-8");
            return sftpMsgHandler;
        }
    
    
        /**
         * 配置 Inbound Channel Adapter
         *
         * 监控sftp服务器文件的状态。一旦由符合条件的文件生成,就将其同步到本地服务器。
         * 需要条件:inboundFileChannel的bean;轮询的机制;文件同步bean,SftpInboundFileSynchronizer;
         */
        @Bean
        @InboundChannelAdapter(value = "inboundFileChannel",
                poller = @Poller(cron = "0 1/10 * * * *", maxMessagesPerPoll = "1"))
        public MessageSource<File> fileMessageSource() {
            System.out.println("=========================================================");
    
            //创建sftpInboundFileSynchronizer,并绑定到message source.
            SftpInboundFileSynchronizingMessageSource source =
                    new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
    
            //自动创建本地文件夹
            source.setAutoCreateLocalDirectory(true);
            source.setLocalDirectory(new File(sftpProperty.getLocalTempDir()));
    
            //设置文件过滤器
            source.setLocalFilter(new AcceptOnceFileListFilter<File>());
    
            return source;
    
        }
    
        /**
         * 为Inbound-channel-adapter提供bean
         */
        @Bean
        public DirectChannel inboundFileChannel() {
            return new DirectChannel();
        }
    
        /**
         * SftpInboundFileSynchronizer,
         *
         *  同步sftp文件至本地服务器.
         *      <1> 可以放在service中获取bean使用.toLocal方法;
         *      <2> 也可以使用inbound-channel-adapter中,做监控文件服务器的动态。
         *
         * @return SftpInboundFileSynchronizer
         */
        @Bean(name = "synFileChannel")
        public SftpInboundFileSynchronizer sftpInboundFileSynchronizer (){
    
            SftpInboundFileSynchronizer fileSynchronize =
                    new SftpInboundFileSynchronizer(sftpSessionFactory());
            fileSynchronize.setDeleteRemoteFiles(true);
            fileSynchronize.setPreserveTimestamp(true);
    
            //!important
            fileSynchronize.setRemoteDirectory(sftpProperty.getSftpSendPath());
            fileSynchronize.setFilter(new SftpSimplePatternFileListFilter("*.*"));
            //fileSynchronize.setLocalFilenameGeneratorExpression( );
            fileSynchronize.setPreserveTimestamp(true);
            return fileSynchronize;
        }
    
        ///////////////////////////////////////////////////////////////////////
    
        /**
         * 配置 SFTP Outbound Gateway
         *
         * @return MessageHandler
         */
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel")
        public MessageHandler handler() {
    
    
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"ls","payload");
    //        MessageChannel message = sftpOutboundGateway.getOutputChannel();
    
            sftpOutboundGateway.setLocalDirectory(new File("E:\sftp_tmp_dir"));
            sftpOutboundGateway.setAutoCreateLocalDirectory(true);  // TODO dynanic path
            return sftpOutboundGateway;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel2")
        public MessageHandler handler2() {
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"ls","payload");
            sftpOutboundGateway.setOptions("-dirs");
            sftpOutboundGateway.setLocalDirectory(new File("E:\sftp_tmp_dir"));
            sftpOutboundGateway.setAutoCreateLocalDirectory(true);  // TODO dynanic path
    
            return sftpOutboundGateway;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel3")
        public MessageHandler handler3() {
            System.out.println("=========================         3         ================================");
    
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"mget","payload");
            sftpOutboundGateway.setOptions("-R");
            sftpOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE_IF_MODIFIED);
            sftpOutboundGateway.setLocalDirectory(new File("E:\sftp_tmp_dir"));
            sftpOutboundGateway.setAutoCreateLocalDirectory(true);  // TODO dynanic path
    
            return sftpOutboundGateway;
        }
    
        @Autowired
        private BeanFactory beanFactory;
    
    //outbound gateway,put命令需要借助与sftpRemoteFileTemplate。
        //看源码,可以发现outbound gateway 有多种构造函数;
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel4")
        public MessageHandler handler4(){
            SftpRemoteFileTemplate  sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sftpSessionFactory());
            sftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression("/send"));
    
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpRemoteFileTemplate,"put","payload");
    //        sftpOutboundGateway.setLocalDirectoryExpressionString("/get/");
            sftpOutboundGateway.setBeanFactory(beanFactory);
            return sftpOutboundGateway;
        }
    
    
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel5")
        public MessageHandler handler5(){
            SftpRemoteFileTemplate  sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sftpSessionFactory());
            sftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression("/send"));
    
    
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpRemoteFileTemplate,"mput","payload");
    //        sftpOutboundGateway.setLocalDirectoryExpressionString("/get/");
    //        sftpOutboundGateway.setOptions("-R");
            sftpOutboundGateway.setMputFilter(new FileListFilter<File>() {
                @Override
                public List<File> filterFiles(File[] files) {
                    return null;
                }
            });
            sftpOutboundGateway.setBeanFactory(beanFactory);
            return sftpOutboundGateway;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel6")
        public MessageHandler handler6(){
    
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"rm","payload");
            sftpOutboundGateway.setBeanFactory(beanFactory);
            return sftpOutboundGateway;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel7")
        public MessageHandler handler7(){
    
    //        SftpRemoteFileTemplate  sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sftpSessionFactory());
    //        sftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression("/send"));
    
    
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"mv","'send/22.TXT'");
    //        sftpOutboundGateway.setRenameExpression(new LiteralExpression("/send1"));
    //        sftpOutboundGateway.setChmod(777);
    //        sftpOutboundGateway.setRenameExpressionString("send1");
    
            sftpOutboundGateway.setRenameExpression(new LiteralExpression("send1/22.TXT"));
    //        sftpOutboundGateway.setAutoCreateLocalDirectory(true);
            sftpOutboundGateway.setBeanFactory(beanFactory);
            return sftpOutboundGateway;
        }
    
    
        @MessagingGateway
        public interface UploadGateway {
    
            @Gateway(requestChannel = "sftpChannel")
            List<FileInfo> listFileInfo(String dir);
    
            @Gateway(requestChannel = "sftpChannel2")
            List<FileInfo> listFileName(String dir);
    
            @Gateway(requestChannel = "sftpChannel3")
            List<File> listFile(String dir);
    
            @Gateway(requestChannel = "sftpChannel4")
            String putFile(File source);
    
            @Gateway(requestChannel = "sftpChannel5")
            List<String> mputFile(File directory);
    
            @Gateway(requestChannel = "sftpChannel6")
            boolean removeFile(String file);
    
            @Gateway(requestChannel = "sftpChannel7")
            boolean moveFile(String file);
    
        }
    
    }
    

    映射yml文件里的stfp配置实体SftpProperty .java

    package com.flower.integration.sftp;
    
    
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    @Component("sftpProperty")
    @ConfigurationProperties(prefix = "sftp")
    public class SftpProperty {
    
    
        private String host;
    
        private Integer port;
    
        private String user;
    
        private String password;
    
        private Map<String,String> filePath;
    
        ////////////////////////////////////////////////////
        public String getSftpSendPath(){
            return filePath.get("send");
        }
    
        public String getSftpAchievePath(){
            return filePath.get("achieve");
        }
    
        public String getLocalTempDir(){
            return filePath.get("localPath");
        }
    
        ///////////////////////////////////////////////////
        public String getHost() {
            return host;
        }
    
        public void setHost(String host) {
            this.host = host;
        }
    
        public Integer getPort() {
            return port;
        }
    
        public void setPort(Integer port) {
            this.port = port;
        }
    
        public String getUser() {
            return user;
        }
    
        public void setUser(String user) {
            this.user = user;
        }
    
        public String getPassword() {
            return password;
        }
    
        public void setPassword(String password) {
            this.password = password;
        }
    
        public Map<String, String> getFilePath() {
            return filePath;
        }
    
        public void setFilePath(Map<String, String> filePath) {
            this.filePath = filePath;
        }
    }
    

     Service层:SftpService.java

    package com.flower.integration.sftp;
    
    import com.jcraft.jsch.ChannelSftp;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.BeanFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.expression.common.LiteralExpression;
    import org.springframework.integration.file.remote.session.SessionFactory;
    import org.springframework.integration.file.support.FileExistsMode;
    import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer;
    import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import java.io.File;
    import java.nio.file.Path;
    import java.nio.file.Paths;
    import java.util.ArrayList;
    import java.util.List;
    
    @Service("sftpService")
    public class SftpService {
    
    
        private Logger log = LoggerFactory.getLogger(this.getClass());
    
        @Resource(name = "fileInChannel")
        protected MessageChannel messageChannel;
    
        @Autowired
        private SftpProperty sftpProperty;
    
        @Autowired
        private SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory;
    
        /**
         * 发送文件到SFTP, 借用MessageChannel
         *
         * @param localFilePath file local path.
         */
        public void sendFileToSftp(String localFilePath) {
    
            Path filePath = Paths.get(localFilePath);
            if (filePath.toFile().exists()) {
                Message<File> fileMessage = MessageBuilder.withPayload(filePath.toFile()).build();
                boolean result = messageChannel.send(fileMessage);
                String resultMsg = result ? "Success" : "Failure";
                log.info("File send to sftp {}, File: {}.", resultMsg, filePath.getFileName());
            } else {
                log.warn("No found file. {}", filePath.getFileName());
            }
        }
    
        /**
         * 删除sftp文件
         *
         * @param sessionFactory  sftp server.
         * @param remoteDirectory file directory.
         * @param fileName        file
         * @return return true is remove success,or false.
         */
        public boolean removeSftpRemoteFile(SessionFactory<ChannelSftp.LsEntry> sessionFactory, String remoteDirectory, String fileName) {
            SftpRemoteFileTemplate sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sessionFactory);
    
            boolean direCheck = remoteDirectory.endsWith(sftpRemoteFileTemplate.getRemoteFileSeparator());
            if (!direCheck) {
                remoteDirectory += sftpRemoteFileTemplate.getRemoteFileSeparator();
            }
            boolean fileExist = sftpRemoteFileTemplate.exists(remoteDirectory + fileName);
            if (fileExist) {
                return sftpRemoteFileTemplate.remove(remoteDirectory + fileName);
            } else {
                log.warn("No found file in the directory, {}.", remoteDirectory);
                return false;
            }
        }
    
        /**
         * sftp文件重命名
         *
         * @param sessionFactory  sftp server
         * @param remoteDirectory file directory path.
         * @param sourceFileName  source file name
         * @param targetFileName  rename target name
         */
        public void renameSftpRemoteFile(SessionFactory<ChannelSftp.LsEntry> sessionFactory, String remoteDirectory,
                                         String sourceFileName, String targetFileName) {
            SftpRemoteFileTemplate fileTemplate = new SftpRemoteFileTemplate(sessionFactory);
    
            boolean direCheck = remoteDirectory.endsWith(fileTemplate.getRemoteFileSeparator());
            if (!direCheck) {
                remoteDirectory += fileTemplate.getRemoteFileSeparator();
            }
            boolean fileExist = fileTemplate.exists(remoteDirectory + sourceFileName);
            if (fileExist) {
                fileTemplate.rename(remoteDirectory + sourceFileName, remoteDirectory + targetFileName);
            } else {
                log.warn("No found file in the directory, {}.", remoteDirectory);
            }
        }
    
        /**
         * sftp文件是否存在
         *
         * @param sessionFactory sftp server
         * @param directory      file directory
         * @param fileName       file name
         * @return true if file exist, or false.
         */
        public boolean fileExist(SessionFactory<ChannelSftp.LsEntry> sessionFactory, String directory, String fileName) {
            SftpRemoteFileTemplate fileTemplate = new SftpRemoteFileTemplate(sessionFactory);
            boolean fileNameCheck = directory.endsWith(fileTemplate.getRemoteFileSeparator());
            if (!fileNameCheck) {
                directory += fileTemplate.getRemoteFileSeparator();
            }
    
            return fileTemplate.exists(directory + fileName);
        }
    
    
        /**
         * sftp检索文件
         *
         * @param sessionFactory sftp server
         * @param directory      file directory
         * @param fileNameFilter file name filter
         * @return file name list match filter
         */
        public List<String> lsFileOfDirectory(SessionFactory<ChannelSftp.LsEntry> sessionFactory,
                                              String directory, String fileNameFilter) {
            SftpRemoteFileTemplate fileTemplate = new SftpRemoteFileTemplate(sessionFactory);
    
            if (!directory.endsWith(fileTemplate.getRemoteFileSeparator())) {
                directory += fileTemplate.getRemoteFileSeparator();
            }
            ChannelSftp.LsEntry[] files = fileTemplate.list(directory + fileNameFilter);
            List<String> fileNames = new ArrayList<>();
            for (ChannelSftp.LsEntry lsEntry : files) {
                boolean isDir = lsEntry.getAttrs().isDir();
                if (!isDir) {
                    fileNames.add(lsEntry.getFilename());
                }
            }
            return fileNames;
        }
    
        @Autowired
        private BeanFactory beanFactory;
    
        /**
         * 本地发送文件至sftp服务器
         *
         * @param sessionFactory sftp server
         * @param filePath file local path
         * @param targetPath target directory
         * @param mode FileExistsModel
         *             NULL:默认,替换文件;
         *             APPEND:若文件存在,追加内容;
         *             REPLACE:替换文件;
         *             APPEND_NO_FLUSH:
         *             FAIL:
         *             IGNORE:
         */
        public void sendSftpFile(SessionFactory<ChannelSftp.LsEntry> sessionFactory,
                                 String filePath, String targetPath, FileExistsMode mode){
            SftpRemoteFileTemplate fileTemplate = new SftpRemoteFileTemplate(sessionFactory);
            try {
                //设置远程sftp服务器配置
                fileTemplate.setRemoteDirectoryExpression(new LiteralExpression(targetPath));
                fileTemplate.setAutoCreateDirectory(true);
                fileTemplate.setCharset("UTF-8");
                fileTemplate.setBeanFactory(beanFactory);
                fileTemplate.afterPropertiesSet();
            } catch (Exception e){
                log.warn(e.getMessage());
            }
    
            Path file = Paths.get(filePath);
            if (file.toFile().exists()){
                Message<File> message = MessageBuilder.withPayload(file.toFile()).build();
                if (null == mode){
                    fileTemplate.send(message);
                } else {
                    //fileTemplate.setFileNameGenerator(new DefaultFileNameGenerator());
                    if (fileTemplate.isUseTemporaryFileName()){
                        fileTemplate.setUseTemporaryFileName(false);
                    }
                    fileTemplate.send(message, mode);
                }
            }
        }
    
    
        @Resource(name = "synFileChannel")
        private SftpInboundFileSynchronizer sftpInboundFileSynchronizer;
    
        public void synchronizedFileToLocal(String localDir){
            File dir = Paths.get(localDir).toFile();
            sftpInboundFileSynchronizer.synchronizeToLocalDirectory(dir);
        }
    
    }
    

    Controller层:用于测试service层方法

    package com.flower.integration.sftp;
    
    
    import com.jcraft.jsch.ChannelSftp;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.integration.file.remote.FileInfo;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.io.File;
    import java.util.List;
    
    
    @RestController
    public class TestController {
    
    
        @Autowired
        private SftpService sftpService;
    
        @Autowired
        private SftpConfig.UploadGateway uploadGateway;
    
        @GetMapping("/sftp")
        public void testSftpSpringBatch() {
    
            List<FileInfo> fileList = uploadGateway.listFileInfo("/send");
    
            for (FileInfo file : fileList) {
                String fileName = file.getFilename();
                String filePath = file.getRemoteDirectory();
                ChannelSftp.LsEntry fileInfo = (ChannelSftp.LsEntry) file.getFileInfo();
                boolean isDir = file.isDirectory();
                boolean isLink = file.isLink();
                long modifyTime = file.getModified();
                System.out.println("=============================  " + fileName);
                System.out.println("==================  " + filePath);
                System.out.println("==================  " + fileInfo.getFilename());
                System.out.println("==================  " + isDir);
                System.out.println("==================  " + isLink);
                System.out.println("==================  " + modifyTime);
            }
        }
    
        @GetMapping("/sftp2")
        public void testSftpSpringBatch2() {
    
            List<FileInfo> fileNameList = uploadGateway.listFileName("/send");
    
            for (FileInfo fileName : fileNameList) {
    
                System.out.println("=============================  " + fileName);
            }
        }
    
    
        @GetMapping("/sftp3")
        public void testSftpSpringBatch3() throws InterruptedException {
    
            List<File> fileNameList = uploadGateway.listFile("/send");
    
            for (File fileName : fileNameList) {
                System.out.println("=============================  " + fileName);
            }
        }
    
        @GetMapping("/sftp4")
        public void testSftpSpringBatch4() throws InterruptedException {
    
            String result = uploadGateway.putFile(new File("G:\Redis.pdf"));
    
            System.out.println("=============================  " + result);
        }
    
        @GetMapping("/sftp5")
        public void testSftpSpringBatch5() throws InterruptedException {
    
            List<String> result = uploadGateway.mputFile(new File("G:\js"));
    
    
            for (String fileName : result) {
                System.out.println("=============================  " + fileName);
            }
        }
    
        @GetMapping("/sftp6")
        public void testSftpSpringBatch6() throws InterruptedException {
    
            boolean result = uploadGateway.removeFile("/send/2.txt");
    
    
    
                System.out.println("=============================  " + result);
    
        }
    
        @GetMapping("/sftp7")
        public void testSftpSpringBatch7() throws InterruptedException {
    
            boolean result = uploadGateway.moveFile("/22.TXT");
    
    
    
            System.out.println("=============================  " + result);
    
        }
    }
    

    SpringIntegrationApp.java启动类

    package com.flower.integration;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    @SpringBootApplication
    //@EnableScheduling
    public class SpringIntegrationApp {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringIntegrationApp.class, args);
            System.out.println("Spring-Integration application start success.");
        }
    
    }
    

     junit单元测试类:自行在test文件夹下建立并测试SpringIntegrationExamplesApplicationTests.java

    package com.flower.integration.sftp;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringIntegrationExamplesApplicationTests {
    
        @Test
        public void contextLoads() {
        }
    
    }

    进行单元测试SftpServiceTest.java

    package com.flower.integration.sftp;
    
    import com.flower.integration.SpringIntegrationApp;
    import com.jcraft.jsch.ChannelSftp;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.integration.config.EnableIntegration;
    import org.springframework.integration.file.remote.session.SessionFactory;
    import org.springframework.integration.file.support.FileExistsMode;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    import java.util.List;
    
    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringBootTest(classes = SpringIntegrationApp.class)
    @EnableIntegration
    public class SftpServiceTest {
    
        @Autowired
        private SftpService sftpService;
    
        @Autowired
        private SftpProperty sftpProperty;
    
        @Autowired
        private SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory;
    
        @Before
        public void before (){
            System.out.println("00000000000000000000000000000000000000000000000000000");
        }
    
        @After
        public void after(){
    
        }
    
        @Test
        public void sendFileToSftp() {
    
        //sftpService.sendFileToSftp();
        }
    
        @Test
        public void testRemoveSftpRemoteFile(){
            boolean result = sftpService.removeSftpRemoteFile(
                    sftpSessionFactory, sftpProperty.getSftpSendPath(),"user333.csv");
    
            System.out.println("=======" + result);
        }
    
        @Test
        public void testRenameSftpRemoteFile(){
            sftpService.renameSftpRemoteFile(sftpSessionFactory, sftpProperty.getSftpSendPath(),"user.csv",
                    "user111.csv");
        }
    
        @Test
        public void testfileExist(){
           boolean result = sftpService.fileExist(sftpSessionFactory, sftpProperty.getSftpSendPath(),"user111.csv");
            System.out.println("++++++++++++" + result);
        }
    
        @Test
        public void testlsFileOfDirectory(){
            List<String> result = sftpService.lsFileOfDirectory(sftpSessionFactory,
                    sftpProperty.getSftpSendPath(),"*TXT");
            System.out.println("-------------------" + result.toString());
        }
    
        @Test
        public void testSendSftpFile() throws Exception {
            sftpService.sendSftpFile(sftpSessionFactory,
                    "G:\jquery.txt", sftpProperty.getSftpAchievePath(), FileExistsMode.REPLACE);
        }
    
        @Test
        public void testSynchronizedFileToLocal(){
            sftpService.synchronizedFileToLocal(sftpProperty.getLocalTempDir());
        }
    }
    
  • 相关阅读:
    【PAT甲级】1043 Is It a Binary Search Tree (25 分)(判断是否为BST的先序遍历并输出后序遍历)
    Educational Codeforces Round 73 (Rated for Div. 2)F(线段树,扫描线)
    【PAT甲级】1042 Shuffling Machine (20 分)
    【PAT甲级】1041 Be Unique (20 分)(多重集)
    【PAT甲级】1040 Longest Symmetric String (25 分)(cin.getline(s,1007))
    【PAT甲级】1039 Course List for Student (25 分)(vector嵌套于map,段错误原因未知)
    Codeforces Round #588 (Div. 2)E(DFS,思维,__gcd,树)
    2017-3-9 SQL server 数据库
    2017-3-8 学生信息展示习题
    2017-3-5 C#基础 函数--递归
  • 原文地址:https://www.cnblogs.com/47Gamer/p/13746064.html
Copyright © 2011-2022 走看看