zoukankan      html  css  js  c++  java
  • Spring Integration sftp 技术之 SFTP Outbound Gateway

    本篇博文介绍spring integration sftp技术中的sftp outbound gateway相关内容。Sftp outbound gateway 其实质就是提供一组命令(如图1)来实现对服务器上文件的交互操作,包括文件的获取(文件对象和文件名等)、上传(单文件和多文件)、下载(单文件和多文件),删除,移动。具体在开发的过程中可以使用多种配置方式如xml,springboot等。本文在介绍SFTP Outbound Gateway 的基础上,使用SpringBoot开发框架进行相应的开发实践。

    1.命令组
    1.1 ls

    该命令的功能是获取远程文件,包括文件对象和文件路径名称等,具体返回值根据配置的选项:

    • -1 :获取一组远程文件的文件名;默认是获取一组FileInfo对象;
    • -a:获取所有的文件(包括开始的文件,递归时使用);
    • - f:检索结果不用排序;
    • -dirs: 包括文件夹,默认是包括的;
    • -links:包括链接符号,默认是包括的;
    • -R:递归方式获取远程文件夹下所有文件,默认不递归的。

    除此之外,还可以配置文件名过滤器等;
    命令返回值: 通过ls命令获取的message payload,是一组文件名或者FileInfo对象,对象中提供了有关文件的修改时间,权限以及其他的信息;

    ls命令作用的远程文件夹,由header头的file_remoteDirectory属性提供;

    建议提醒:如果使用-R递归选择项,文件名将含有子文件夹,表明递归文件的相对路径;如果使用-dirs选项,每一个递归的子文件夹,返回的元素中将含有子文件夹名;在这种情况下,建议不用使用-1罗列文件名,因为返回的元素中不能够区分是文件还是文件夹?建议返回FileInfo对象。

    下面是开发示例:

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel2")
    public MessageHandler handler2() {
    	//指定session配置和命令
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"ls","payload");
        sftpOutboundGateway.setOptions("-dirs"); //配置项
        return sftpOutboundGateway;
    }
    //使用Gateway触发
    @MessagingGateway
    public interface MessageGateway {
        @Gateway(requestChannel = "sftpChannel2")
        List<FileInfo> listFileName(String dir); //指定远程文件夹
     }
    
    1.2 nlst

    该命令提供检索远程文件名的功能,相当于ls -1的命令;支持如下配置:

    • -f:文件名不排序;
      nlst命令作用的远程文件夹,由header头的file_remoteDirectory提供。

    返回值:通过nlst获取的文件message payload,就是一组文件名列表;

    1.3 get

    该命令由于获取一个远程的文件,支持如下的选项:

    • -P:文件下载之后,保持文件在本地的时间戳同远程服务器一致;
    • -stream:以流的方式获取远程文件;
    • -D:文件成功转移之后,删除远程文件;如果FileExistsMode设置为IGNORE,远程文件不会删除。

    file_remoteDirectory 头包含了文件的远程路径,file_remoteFile属性为文件名;

    返回值:使用get方法获取的message的payload是一个File对象,如果使用-straem,则payload就是一个InputStream文件流。

    对于文本文件,有个通用的案例,使用file splitter 或 stream transformer。当以文件流的形式获取远程文件,Session在结束之后要及时关闭. Session由closeableResource属性header头文件,IntegrationMessageHeaderAccessor提供了流资源的关闭操作。

    1.4 mget

    该命令用来基于特定的文件模式过滤器获取多个文件,支持如下的设置:

    • -P:保留远程文件的时间戳;
    • -R:递归下载所有符合的文件;
    • -x:没有文件匹配文件筛选模式,抛出异常,并返回空集合;
    • -D:文件成功转移之后。如何FileExistsMode=IGNORE,本地文件存在,文件不会删除;

    message payload返回的是List< >对象,集合元素是File。

    注意:
    在5.0版本之后,若FileExistsMode=IGNORE,payload不再包含已经存在的文件对象。

    remote path的表达式应该是以结尾,类似myfiles/,表示获取完整的文件夹树myfiles;

    注意,在版本5.0之后,MGET命令可以设置FileExistsMode.REPLACE_IF_MODIFIED模式,去同步整个文件夹,被修改的文件的时间戳也会相应修改。不用关心-P模式;

    -R模式,默认情况下是整个文件夹,同时也支持设置文件或文件夹过滤器FileListFilter; 该过滤器提供两种方式filename-pattern或者filename-regex属性;例如filename-regex="(subDir|.*1.txt)" 获取subDir下所有以1.txt结尾的文件;
    通常,将在local-directory-expression中使用#remoteDirectory变量,以便远程目录结构在本地保留。

    下面是开发示例:

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel3")
    public MessageHandler handler3() {
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"mget","payload");
        sftpOutboundGateway.setOptions("-R");
        sftpOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE_IF_MODIFIED);
        sftpOutboundGateway.setLocalDirectory(new File("E:\sftp_tmp_dir"));
        sftpOutboundGateway.setAutoCreateLocalDirectory(true);  
        return sftpOutboundGateway;
    }
    @MessagingGateway
    public interface MessageGateway {
    	@Gateway(requestChannel = "sftpChannel3")
    	List<File> listFile(String dir);
    }
    
    1.5 put

    该命令是发送单个文件到远程服务器;
    message的payload可以是File对象,byte[]数组,或者字符串;
    remote-filename-generator用来命名远程文件。其他的属性如remote-directory,temporary-remote-directory等等;
    返回值:put命令的message的payload的返回值是string,包含文件传输后在服务器上的整个路径;

    1.6 mput

    该命令是发送多个文件到服务器,支持如下配置:

    • -R: 递归发送文件和子文件夹下的所有文件;

    message payload必须是文件或者文件路径字符串,代表了本地文件夹;自版本5.1之后,也支持文件或者路径字符串集合;
    put的配置,同样适合mput,同时除此之外,还提供过滤文件的mput-pattern,mput-regex,mput-filter等;
    版本4.3之后,支持设置文件的权限;

    返回值:mput执行之后的返回值,是一个List,包含文件转移之后的路径集合。

    下面是开发示例:

        //!important,put命令需要借助与sftpRemoteFileTemplate。
        //看源码,可以发现outbound gateway 有多种构造函数;
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel4")
        public MessageHandler handler4(){
            SftpRemoteFileTemplate  sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sftpSessionFactory());
            sftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression("/send"));
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpRemoteFileTemplate,"put","payload");
            sftpOutboundGateway.setBeanFactory(beanFactory);
            return sftpOutboundGateway;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel5")
        public MessageHandler handler5(){
            SftpRemoteFileTemplate  sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sftpSessionFactory());
            sftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression("/send"));        
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpRemoteFileTemplate,"mput","payload");
           //配置过滤器
            sftpOutboundGateway.setMputFilter(new FileListFilter<File>() {
                @Override
                public List<File> filterFiles(File[] files) {
                	if(...){
                		...
                	}
                    return null;
                }
            });
            sftpOutboundGateway.setBeanFactory(beanFactory);
            return sftpOutboundGateway;
        }
    
    1.7 rm

    该命令是删除远程文件。
    如果删除成功,message payload的返回值是Boolean.TRUE;否则是Boolean.FALSE。
    file_remoteDirectory头包含远程文件属性;

    下面是开发示例:

    @Bean
    @ServiceActivator(inputChannel = "sftpChannel6")
    public MessageHandler handler6(){
        SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"rm","payload");
        sftpOutboundGateway.setBeanFactory(beanFactory);
        return sftpOutboundGateway;
    }
    
    1.8 mv

    该命令是移动文件在远程服务器上的位置。
    返回值:转移成功,返回true,否则是false;

    下面是开发示例:

        @Bean
        @ServiceActivator(inputChannel = "sftpChannel7")
        public MessageHandler handler7(){
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"mv","'send/22.TXT'");
            sftpOutboundGateway.setRenameExpression(new LiteralExpression("send1/22.TXT"));
            sftpOutboundGateway.setBeanFactory(beanFactory);
            return sftpOutboundGateway;
        }
    

     以下是干货(测试用例):

    首先是POM文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.flower.springintegration</groupId>
        <artifactId>spring-integration-samples</artifactId>
    
        <version>v0.0.1</version>
    
        <name>SpringIntegrationExamples</name>
    
        <description>Spring Integration Samples</description>
    
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
        </properties>
    
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.1.RELEASE</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
    
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
           <!-- <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-batch</artifactId>
            </dependency>-->
    
            <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-sftp</artifactId>
            </dependency>
    
            <!-- https://mvnrepository.com/artifact/org.quartz-scheduler/quartz -->
            <dependency>
                <groupId>org.quartz-scheduler</groupId>
                <artifactId>quartz</artifactId>
                <version>2.2.1</version>
            </dependency>
    
    
            <dependency>
                <groupId>com.zaxxer</groupId>
                <artifactId>HikariCP</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-test</artifactId>
                <version>1.4.0.RELEASE</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <scope>test</scope>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>4.3.2.RELEASE</version>
                <scope>test</scope>
            </dependency>
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

     接下来是yml文件配置:

    spring:
      datasource:
        type: com.zaxxer.hikari.HikariDataSource
        url: jdbc:mysql://localhost:3306/springbatchexample?serverTimezone=GMT%2B8&useUnicode=true&characterEncoding=utf-8
        username: root
        password: root
    
    sftp:
        host: 127.0.0.1
        port: 23
        user: 47gamer
        password: wdnmd
        filePath:
            send: /send
            achieve: /achieve
            localPath: /sftp_tmp_dir
    

     然后是Sftp网关配置类SftpConfig.java

    package com.flower.integration.sftp;
    
    import com.jcraft.jsch.ChannelSftp.LsEntry;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.BeanFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.DependsOn;
    import org.springframework.expression.common.LiteralExpression;
    import org.springframework.integration.annotation.*;
    import org.springframework.integration.channel.DirectChannel;
    import org.springframework.integration.config.EnableIntegration;
    import org.springframework.integration.core.MessageSource;
    import org.springframework.integration.file.filters.AcceptOnceFileListFilter;
    import org.springframework.integration.file.filters.FileListFilter;
    import org.springframework.integration.file.remote.FileInfo;
    import org.springframework.integration.file.remote.session.CachingSessionFactory;
    import org.springframework.integration.file.remote.session.SessionFactory;
    import org.springframework.integration.file.support.FileExistsMode;
    import org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter;
    import org.springframework.integration.sftp.gateway.SftpOutboundGateway;
    import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer;
    import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizingMessageSource;
    import org.springframework.integration.sftp.outbound.SftpMessageHandler;
    import org.springframework.integration.sftp.session.DefaultSftpSessionFactory;
    import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.messaging.MessageHandler;
    
    import javax.annotation.Resource;
    import java.io.File;
    import java.util.List;
    import java.util.Properties;
    
    /**
     * Sftp configuration.
     *
     * @Autor 47Gamer
     * @Date 2019-01-18
     */
    @Configuration
    @DependsOn("sftpProperty")
    public class SftpConfig {
    
        @Resource(name = "sftpProperty")
        private SftpProperty sftpProperty;
    
        private static Logger log = LoggerFactory.getLogger(SftpConfig.class);
    
    
        @Value("${sftp.host}")
        private String sftpHost;
    
        @Value("${sftp.port:23}")
        private int sftpPort;
    
        @Value("${sftp.user}")
        private String sftpUser;
    
        @Value("${sftp.privateKey:#{null}}")
        private org.springframework.core.io.Resource sftpPrivateKey;
    
        @Value("${sftp.privateKeyPassphrase:}")
        private String sftpPrivateKeyPassphrase;
    
        @Value("${sftp.password}")
        private String sftpPassword;
    
       /* @Bean
        public SessionFactory<LsEntry> sftpSessionFactory() {
            System.out.println("@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
            DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(false);
            factory.setHost(sftpProperty.getHost());
            factory.setPort(sftpProperty.getPort());
            factory.setUser(sftpProperty.getUser());
            Properties jschProps = new Properties();
            //!important 必须配置PreferredAuthentications,否则程序控制台会询问user name 和 password。
            jschProps.put("StrictHostKeyChecking", "no");
            jschProps.put("PreferredAuthentications",
                    "password,gssapi-with-mic,publickey,keyboard-interactive");
    
            factory.setSessionConfig(jschProps);
    
          //  if (sftpPassword != null) {
                factory.setPassword(sftpProperty.getPassword());
    //        } else {
    //            factory.setPrivateKey(sftpPrivateKey);
    //            factory.setPrivateKeyPassphrase(sftpPrivateKeyPassphrase);
    //        }
    
            factory.setAllowUnknownKeys(true);
            //        //设置缓存的属性,缓存的size(), waitTimeout().
            CachingSessionFactory<LsEntry> cachingSessionFactory =
                    new CachingSessionFactory<LsEntry>(factory);
            cachingSessionFactory.setPoolSize(10);
    //        cachingSessionFactory.setSessionWaitTimeout(1000);
    
            return cachingSessionFactory;
    //        return new CachingSessionFactory<LsEntry>(factory);
        }*/
    
        /**
         * 创建 spring-integration-sftp session
         * 避免使用jsch原生的创建session的方式
         *
         * @return SessionFactory<LsEntry>
         */
        @Bean
        public SessionFactory<LsEntry> sftpSessionFactory(){
            System.out.println("######################################################");
            DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
            factory.setUser(sftpProperty.getUser());
            factory.setHost(sftpProperty.getHost());
            factory.setPort(sftpProperty.getPort());
            factory.setPassword(sftpProperty.getPassword());
    
            Properties jschProps = new Properties();
            //!important 必须配置PreferredAuthentications,否则程序控制台会询问user name 和 password。
            jschProps.put("StrictHostKeyChecking", "no");
            jschProps.put("PreferredAuthentications",
                    "password,gssapi-with-mic,publickey,keyboard-interactive");
    
            factory.setSessionConfig(jschProps);
            factory.setAllowUnknownKeys(true);
    
            //设置缓存的属性,缓存的size(), waitTimeout().
            CachingSessionFactory<LsEntry> cachingSessionFactory =
                    new CachingSessionFactory<LsEntry>(factory);
    //        cachingSessionFactory.setPoolSize(2000);
    
    
            return  cachingSessionFactory;
        }
    
        /**
         * 配置Outbound Channel Adapter.
         *
         * 实质上就是一个MessageHandler,接收Message Channel发送的信息流.
         * @return MessageHandler
         */
        @ServiceActivator(inputChannel = "fileInChannel")
        @Bean
        public SftpMessageHandler sftpMessageHandler(){
            SftpMessageHandler sftpMsgHandler = new SftpMessageHandler(sftpSessionFactory());
    
            sftpMsgHandler.setRemoteDirectoryExpression(
                    new LiteralExpression(sftpProperty.getSftpAchievePath()));
            sftpMsgHandler.setAutoCreateDirectory(true);
            sftpMsgHandler.setCharset("UFT-8");
            return sftpMsgHandler;
        }
    
    
        /**
         * 配置 Inbound Channel Adapter
         *
         * 监控sftp服务器文件的状态。一旦由符合条件的文件生成,就将其同步到本地服务器。
         * 需要条件:inboundFileChannel的bean;轮询的机制;文件同步bean,SftpInboundFileSynchronizer;
         */
        @Bean
        @InboundChannelAdapter(value = "inboundFileChannel",
                poller = @Poller(cron = "0 1/10 * * * *", maxMessagesPerPoll = "1"))
        public MessageSource<File> fileMessageSource() {
            System.out.println("=========================================================");
    
            //创建sftpInboundFileSynchronizer,并绑定到message source.
            SftpInboundFileSynchronizingMessageSource source =
                    new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
    
            //自动创建本地文件夹
            source.setAutoCreateLocalDirectory(true);
            source.setLocalDirectory(new File(sftpProperty.getLocalTempDir()));
    
            //设置文件过滤器
            source.setLocalFilter(new AcceptOnceFileListFilter<File>());
    
            return source;
    
        }
    
        /**
         * 为Inbound-channel-adapter提供bean
         */
        @Bean
        public DirectChannel inboundFileChannel() {
            return new DirectChannel();
        }
    
        /**
         * SftpInboundFileSynchronizer,
         *
         *  同步sftp文件至本地服务器.
         *      <1> 可以放在service中获取bean使用.toLocal方法;
         *      <2> 也可以使用inbound-channel-adapter中,做监控文件服务器的动态。
         *
         * @return SftpInboundFileSynchronizer
         */
        @Bean(name = "synFileChannel")
        public SftpInboundFileSynchronizer sftpInboundFileSynchronizer (){
    
            SftpInboundFileSynchronizer fileSynchronize =
                    new SftpInboundFileSynchronizer(sftpSessionFactory());
            fileSynchronize.setDeleteRemoteFiles(true);
            fileSynchronize.setPreserveTimestamp(true);
    
            //!important
            fileSynchronize.setRemoteDirectory(sftpProperty.getSftpSendPath());
            fileSynchronize.setFilter(new SftpSimplePatternFileListFilter("*.*"));
            //fileSynchronize.setLocalFilenameGeneratorExpression( );
            fileSynchronize.setPreserveTimestamp(true);
            return fileSynchronize;
        }
    
        ///////////////////////////////////////////////////////////////////////
    
        /**
         * 配置 SFTP Outbound Gateway
         *
         * @return MessageHandler
         */
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel")
        public MessageHandler handler() {
    
    
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"ls","payload");
    //        MessageChannel message = sftpOutboundGateway.getOutputChannel();
    
            sftpOutboundGateway.setLocalDirectory(new File("E:\sftp_tmp_dir"));
            sftpOutboundGateway.setAutoCreateLocalDirectory(true);  // TODO dynanic path
            return sftpOutboundGateway;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel2")
        public MessageHandler handler2() {
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"ls","payload");
            sftpOutboundGateway.setOptions("-dirs");
            sftpOutboundGateway.setLocalDirectory(new File("E:\sftp_tmp_dir"));
            sftpOutboundGateway.setAutoCreateLocalDirectory(true);  // TODO dynanic path
    
            return sftpOutboundGateway;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel3")
        public MessageHandler handler3() {
            System.out.println("=========================         3         ================================");
    
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"mget","payload");
            sftpOutboundGateway.setOptions("-R");
            sftpOutboundGateway.setFileExistsMode(FileExistsMode.REPLACE_IF_MODIFIED);
            sftpOutboundGateway.setLocalDirectory(new File("E:\sftp_tmp_dir"));
            sftpOutboundGateway.setAutoCreateLocalDirectory(true);  // TODO dynanic path
    
            return sftpOutboundGateway;
        }
    
        @Autowired
        private BeanFactory beanFactory;
    
    //outbound gateway,put命令需要借助与sftpRemoteFileTemplate。
        //看源码,可以发现outbound gateway 有多种构造函数;
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel4")
        public MessageHandler handler4(){
            SftpRemoteFileTemplate  sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sftpSessionFactory());
            sftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression("/send"));
    
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpRemoteFileTemplate,"put","payload");
    //        sftpOutboundGateway.setLocalDirectoryExpressionString("/get/");
            sftpOutboundGateway.setBeanFactory(beanFactory);
            return sftpOutboundGateway;
        }
    
    
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel5")
        public MessageHandler handler5(){
            SftpRemoteFileTemplate  sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sftpSessionFactory());
            sftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression("/send"));
    
    
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpRemoteFileTemplate,"mput","payload");
    //        sftpOutboundGateway.setLocalDirectoryExpressionString("/get/");
    //        sftpOutboundGateway.setOptions("-R");
            sftpOutboundGateway.setMputFilter(new FileListFilter<File>() {
                @Override
                public List<File> filterFiles(File[] files) {
                    return null;
                }
            });
            sftpOutboundGateway.setBeanFactory(beanFactory);
            return sftpOutboundGateway;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel6")
        public MessageHandler handler6(){
    
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"rm","payload");
            sftpOutboundGateway.setBeanFactory(beanFactory);
            return sftpOutboundGateway;
        }
    
        @Bean
        @ServiceActivator(inputChannel = "sftpChannel7")
        public MessageHandler handler7(){
    
    //        SftpRemoteFileTemplate  sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sftpSessionFactory());
    //        sftpRemoteFileTemplate.setRemoteDirectoryExpression(new LiteralExpression("/send"));
    
    
            SftpOutboundGateway sftpOutboundGateway = new SftpOutboundGateway(sftpSessionFactory(),"mv","'send/22.TXT'");
    //        sftpOutboundGateway.setRenameExpression(new LiteralExpression("/send1"));
    //        sftpOutboundGateway.setChmod(777);
    //        sftpOutboundGateway.setRenameExpressionString("send1");
    
            sftpOutboundGateway.setRenameExpression(new LiteralExpression("send1/22.TXT"));
    //        sftpOutboundGateway.setAutoCreateLocalDirectory(true);
            sftpOutboundGateway.setBeanFactory(beanFactory);
            return sftpOutboundGateway;
        }
    
    
        @MessagingGateway
        public interface UploadGateway {
    
            @Gateway(requestChannel = "sftpChannel")
            List<FileInfo> listFileInfo(String dir);
    
            @Gateway(requestChannel = "sftpChannel2")
            List<FileInfo> listFileName(String dir);
    
            @Gateway(requestChannel = "sftpChannel3")
            List<File> listFile(String dir);
    
            @Gateway(requestChannel = "sftpChannel4")
            String putFile(File source);
    
            @Gateway(requestChannel = "sftpChannel5")
            List<String> mputFile(File directory);
    
            @Gateway(requestChannel = "sftpChannel6")
            boolean removeFile(String file);
    
            @Gateway(requestChannel = "sftpChannel7")
            boolean moveFile(String file);
    
        }
    
    }
    

    映射yml文件里的stfp配置实体SftpProperty .java

    package com.flower.integration.sftp;
    
    
    import org.springframework.boot.context.properties.ConfigurationProperties;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    @Component("sftpProperty")
    @ConfigurationProperties(prefix = "sftp")
    public class SftpProperty {
    
    
        private String host;
    
        private Integer port;
    
        private String user;
    
        private String password;
    
        private Map<String,String> filePath;
    
        ////////////////////////////////////////////////////
        public String getSftpSendPath(){
            return filePath.get("send");
        }
    
        public String getSftpAchievePath(){
            return filePath.get("achieve");
        }
    
        public String getLocalTempDir(){
            return filePath.get("localPath");
        }
    
        ///////////////////////////////////////////////////
        public String getHost() {
            return host;
        }
    
        public void setHost(String host) {
            this.host = host;
        }
    
        public Integer getPort() {
            return port;
        }
    
        public void setPort(Integer port) {
            this.port = port;
        }
    
        public String getUser() {
            return user;
        }
    
        public void setUser(String user) {
            this.user = user;
        }
    
        public String getPassword() {
            return password;
        }
    
        public void setPassword(String password) {
            this.password = password;
        }
    
        public Map<String, String> getFilePath() {
            return filePath;
        }
    
        public void setFilePath(Map<String, String> filePath) {
            this.filePath = filePath;
        }
    }
    

     Service层:SftpService.java

    package com.flower.integration.sftp;
    
    import com.jcraft.jsch.ChannelSftp;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.BeanFactory;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.expression.common.LiteralExpression;
    import org.springframework.integration.file.remote.session.SessionFactory;
    import org.springframework.integration.file.support.FileExistsMode;
    import org.springframework.integration.sftp.inbound.SftpInboundFileSynchronizer;
    import org.springframework.integration.sftp.session.SftpRemoteFileTemplate;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.messaging.Message;
    import org.springframework.messaging.MessageChannel;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import java.io.File;
    import java.nio.file.Path;
    import java.nio.file.Paths;
    import java.util.ArrayList;
    import java.util.List;
    
    @Service("sftpService")
    public class SftpService {
    
    
        private Logger log = LoggerFactory.getLogger(this.getClass());
    
        @Resource(name = "fileInChannel")
        protected MessageChannel messageChannel;
    
        @Autowired
        private SftpProperty sftpProperty;
    
        @Autowired
        private SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory;
    
        /**
         * 发送文件到SFTP, 借用MessageChannel
         *
         * @param localFilePath file local path.
         */
        public void sendFileToSftp(String localFilePath) {
    
            Path filePath = Paths.get(localFilePath);
            if (filePath.toFile().exists()) {
                Message<File> fileMessage = MessageBuilder.withPayload(filePath.toFile()).build();
                boolean result = messageChannel.send(fileMessage);
                String resultMsg = result ? "Success" : "Failure";
                log.info("File send to sftp {}, File: {}.", resultMsg, filePath.getFileName());
            } else {
                log.warn("No found file. {}", filePath.getFileName());
            }
        }
    
        /**
         * 删除sftp文件
         *
         * @param sessionFactory  sftp server.
         * @param remoteDirectory file directory.
         * @param fileName        file
         * @return return true is remove success,or false.
         */
        public boolean removeSftpRemoteFile(SessionFactory<ChannelSftp.LsEntry> sessionFactory, String remoteDirectory, String fileName) {
            SftpRemoteFileTemplate sftpRemoteFileTemplate = new SftpRemoteFileTemplate(sessionFactory);
    
            boolean direCheck = remoteDirectory.endsWith(sftpRemoteFileTemplate.getRemoteFileSeparator());
            if (!direCheck) {
                remoteDirectory += sftpRemoteFileTemplate.getRemoteFileSeparator();
            }
            boolean fileExist = sftpRemoteFileTemplate.exists(remoteDirectory + fileName);
            if (fileExist) {
                return sftpRemoteFileTemplate.remove(remoteDirectory + fileName);
            } else {
                log.warn("No found file in the directory, {}.", remoteDirectory);
                return false;
            }
        }
    
        /**
         * sftp文件重命名
         *
         * @param sessionFactory  sftp server
         * @param remoteDirectory file directory path.
         * @param sourceFileName  source file name
         * @param targetFileName  rename target name
         */
        public void renameSftpRemoteFile(SessionFactory<ChannelSftp.LsEntry> sessionFactory, String remoteDirectory,
                                         String sourceFileName, String targetFileName) {
            SftpRemoteFileTemplate fileTemplate = new SftpRemoteFileTemplate(sessionFactory);
    
            boolean direCheck = remoteDirectory.endsWith(fileTemplate.getRemoteFileSeparator());
            if (!direCheck) {
                remoteDirectory += fileTemplate.getRemoteFileSeparator();
            }
            boolean fileExist = fileTemplate.exists(remoteDirectory + sourceFileName);
            if (fileExist) {
                fileTemplate.rename(remoteDirectory + sourceFileName, remoteDirectory + targetFileName);
            } else {
                log.warn("No found file in the directory, {}.", remoteDirectory);
            }
        }
    
        /**
         * sftp文件是否存在
         *
         * @param sessionFactory sftp server
         * @param directory      file directory
         * @param fileName       file name
         * @return true if file exist, or false.
         */
        public boolean fileExist(SessionFactory<ChannelSftp.LsEntry> sessionFactory, String directory, String fileName) {
            SftpRemoteFileTemplate fileTemplate = new SftpRemoteFileTemplate(sessionFactory);
            boolean fileNameCheck = directory.endsWith(fileTemplate.getRemoteFileSeparator());
            if (!fileNameCheck) {
                directory += fileTemplate.getRemoteFileSeparator();
            }
    
            return fileTemplate.exists(directory + fileName);
        }
    
    
        /**
         * sftp检索文件
         *
         * @param sessionFactory sftp server
         * @param directory      file directory
         * @param fileNameFilter file name filter
         * @return file name list match filter
         */
        public List<String> lsFileOfDirectory(SessionFactory<ChannelSftp.LsEntry> sessionFactory,
                                              String directory, String fileNameFilter) {
            SftpRemoteFileTemplate fileTemplate = new SftpRemoteFileTemplate(sessionFactory);
    
            if (!directory.endsWith(fileTemplate.getRemoteFileSeparator())) {
                directory += fileTemplate.getRemoteFileSeparator();
            }
            ChannelSftp.LsEntry[] files = fileTemplate.list(directory + fileNameFilter);
            List<String> fileNames = new ArrayList<>();
            for (ChannelSftp.LsEntry lsEntry : files) {
                boolean isDir = lsEntry.getAttrs().isDir();
                if (!isDir) {
                    fileNames.add(lsEntry.getFilename());
                }
            }
            return fileNames;
        }
    
        @Autowired
        private BeanFactory beanFactory;
    
        /**
         * 本地发送文件至sftp服务器
         *
         * @param sessionFactory sftp server
         * @param filePath file local path
         * @param targetPath target directory
         * @param mode FileExistsModel
         *             NULL:默认,替换文件;
         *             APPEND:若文件存在,追加内容;
         *             REPLACE:替换文件;
         *             APPEND_NO_FLUSH:
         *             FAIL:
         *             IGNORE:
         */
        public void sendSftpFile(SessionFactory<ChannelSftp.LsEntry> sessionFactory,
                                 String filePath, String targetPath, FileExistsMode mode){
            SftpRemoteFileTemplate fileTemplate = new SftpRemoteFileTemplate(sessionFactory);
            try {
                //设置远程sftp服务器配置
                fileTemplate.setRemoteDirectoryExpression(new LiteralExpression(targetPath));
                fileTemplate.setAutoCreateDirectory(true);
                fileTemplate.setCharset("UTF-8");
                fileTemplate.setBeanFactory(beanFactory);
                fileTemplate.afterPropertiesSet();
            } catch (Exception e){
                log.warn(e.getMessage());
            }
    
            Path file = Paths.get(filePath);
            if (file.toFile().exists()){
                Message<File> message = MessageBuilder.withPayload(file.toFile()).build();
                if (null == mode){
                    fileTemplate.send(message);
                } else {
                    //fileTemplate.setFileNameGenerator(new DefaultFileNameGenerator());
                    if (fileTemplate.isUseTemporaryFileName()){
                        fileTemplate.setUseTemporaryFileName(false);
                    }
                    fileTemplate.send(message, mode);
                }
            }
        }
    
    
        @Resource(name = "synFileChannel")
        private SftpInboundFileSynchronizer sftpInboundFileSynchronizer;
    
        public void synchronizedFileToLocal(String localDir){
            File dir = Paths.get(localDir).toFile();
            sftpInboundFileSynchronizer.synchronizeToLocalDirectory(dir);
        }
    
    }
    

    Controller层:用于测试service层方法

    package com.flower.integration.sftp;
    
    
    import com.jcraft.jsch.ChannelSftp;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.integration.file.remote.FileInfo;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    import java.io.File;
    import java.util.List;
    
    
    @RestController
    public class TestController {
    
    
        @Autowired
        private SftpService sftpService;
    
        @Autowired
        private SftpConfig.UploadGateway uploadGateway;
    
        @GetMapping("/sftp")
        public void testSftpSpringBatch() {
    
            List<FileInfo> fileList = uploadGateway.listFileInfo("/send");
    
            for (FileInfo file : fileList) {
                String fileName = file.getFilename();
                String filePath = file.getRemoteDirectory();
                ChannelSftp.LsEntry fileInfo = (ChannelSftp.LsEntry) file.getFileInfo();
                boolean isDir = file.isDirectory();
                boolean isLink = file.isLink();
                long modifyTime = file.getModified();
                System.out.println("=============================  " + fileName);
                System.out.println("==================  " + filePath);
                System.out.println("==================  " + fileInfo.getFilename());
                System.out.println("==================  " + isDir);
                System.out.println("==================  " + isLink);
                System.out.println("==================  " + modifyTime);
            }
        }
    
        @GetMapping("/sftp2")
        public void testSftpSpringBatch2() {
    
            List<FileInfo> fileNameList = uploadGateway.listFileName("/send");
    
            for (FileInfo fileName : fileNameList) {
    
                System.out.println("=============================  " + fileName);
            }
        }
    
    
        @GetMapping("/sftp3")
        public void testSftpSpringBatch3() throws InterruptedException {
    
            List<File> fileNameList = uploadGateway.listFile("/send");
    
            for (File fileName : fileNameList) {
                System.out.println("=============================  " + fileName);
            }
        }
    
        @GetMapping("/sftp4")
        public void testSftpSpringBatch4() throws InterruptedException {
    
            String result = uploadGateway.putFile(new File("G:\Redis.pdf"));
    
            System.out.println("=============================  " + result);
        }
    
        @GetMapping("/sftp5")
        public void testSftpSpringBatch5() throws InterruptedException {
    
            List<String> result = uploadGateway.mputFile(new File("G:\js"));
    
    
            for (String fileName : result) {
                System.out.println("=============================  " + fileName);
            }
        }
    
        @GetMapping("/sftp6")
        public void testSftpSpringBatch6() throws InterruptedException {
    
            boolean result = uploadGateway.removeFile("/send/2.txt");
    
    
    
                System.out.println("=============================  " + result);
    
        }
    
        @GetMapping("/sftp7")
        public void testSftpSpringBatch7() throws InterruptedException {
    
            boolean result = uploadGateway.moveFile("/22.TXT");
    
    
    
            System.out.println("=============================  " + result);
    
        }
    }
    

    SpringIntegrationApp.java启动类

    package com.flower.integration;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.boot.autoconfigure.security.servlet.SecurityAutoConfiguration;
    import org.springframework.scheduling.annotation.EnableScheduling;
    
    @SpringBootApplication
    //@EnableScheduling
    public class SpringIntegrationApp {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringIntegrationApp.class, args);
            System.out.println("Spring-Integration application start success.");
        }
    
    }
    

     junit单元测试类:自行在test文件夹下建立并测试SpringIntegrationExamplesApplicationTests.java

    package com.flower.integration.sftp;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.test.context.junit4.SpringRunner;
    
    @RunWith(SpringRunner.class)
    @SpringBootTest
    public class SpringIntegrationExamplesApplicationTests {
    
        @Test
        public void contextLoads() {
        }
    
    }

    进行单元测试SftpServiceTest.java

    package com.flower.integration.sftp;
    
    import com.flower.integration.SpringIntegrationApp;
    import com.jcraft.jsch.ChannelSftp;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.test.context.SpringBootTest;
    import org.springframework.integration.config.EnableIntegration;
    import org.springframework.integration.file.remote.session.SessionFactory;
    import org.springframework.integration.file.support.FileExistsMode;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    import java.util.List;
    
    @RunWith(SpringJUnit4ClassRunner.class)
    @SpringBootTest(classes = SpringIntegrationApp.class)
    @EnableIntegration
    public class SftpServiceTest {
    
        @Autowired
        private SftpService sftpService;
    
        @Autowired
        private SftpProperty sftpProperty;
    
        @Autowired
        private SessionFactory<ChannelSftp.LsEntry> sftpSessionFactory;
    
        @Before
        public void before (){
            System.out.println("00000000000000000000000000000000000000000000000000000");
        }
    
        @After
        public void after(){
    
        }
    
        @Test
        public void sendFileToSftp() {
    
        //sftpService.sendFileToSftp();
        }
    
        @Test
        public void testRemoveSftpRemoteFile(){
            boolean result = sftpService.removeSftpRemoteFile(
                    sftpSessionFactory, sftpProperty.getSftpSendPath(),"user333.csv");
    
            System.out.println("=======" + result);
        }
    
        @Test
        public void testRenameSftpRemoteFile(){
            sftpService.renameSftpRemoteFile(sftpSessionFactory, sftpProperty.getSftpSendPath(),"user.csv",
                    "user111.csv");
        }
    
        @Test
        public void testfileExist(){
           boolean result = sftpService.fileExist(sftpSessionFactory, sftpProperty.getSftpSendPath(),"user111.csv");
            System.out.println("++++++++++++" + result);
        }
    
        @Test
        public void testlsFileOfDirectory(){
            List<String> result = sftpService.lsFileOfDirectory(sftpSessionFactory,
                    sftpProperty.getSftpSendPath(),"*TXT");
            System.out.println("-------------------" + result.toString());
        }
    
        @Test
        public void testSendSftpFile() throws Exception {
            sftpService.sendSftpFile(sftpSessionFactory,
                    "G:\jquery.txt", sftpProperty.getSftpAchievePath(), FileExistsMode.REPLACE);
        }
    
        @Test
        public void testSynchronizedFileToLocal(){
            sftpService.synchronizedFileToLocal(sftpProperty.getLocalTempDir());
        }
    }
    
  • 相关阅读:
    Sysbench对Mysql进行基准测试
    yum安装软件时报错libmysqlclient.so.18()(64bit)
    redis启动报错Could not connect to Redis at 127.0.0.1:6379: 由于目标计算机积极拒绝,无法连接。
    cmd 中连接mysql时报'mysql'不是内部或外部命令,也不是可运行的程序或批处理文件
    Mysql远程连接权限
    Git基础教程
    电脑adb命令给智能电视安装APK
    python基础之类
    python基础之函数
    前端之CSS第二部分属性相关
  • 原文地址:https://www.cnblogs.com/47Gamer/p/13746064.html
Copyright © 2011-2022 走看看