背景
依赖
创建工具类
SFTP链接池化
SFTP链接池的使用
集成到SpringBoot中
配置
java Bean注入
背景在项目开发中,一般文件存储很少再使用SFTP服务,但是也不排除合作伙伴使用SFTP来存储项目中的文件或者通过SFTP来实现文件数据的交互。
我遇到的项目中,就有银行和保险公司等合作伙伴通过SFTP服务来实现与我们项目的文件数据的交互。
为了能够顺利地完成与友商的SFTP服务的连通,我们需要在自己的项目中实现一套SFTP客户端工具。一般我们会采用Jsch来实现SFTP客户端。
依赖<!--执行远程操作-->
<dependency>
<groupId>com.jcraft</groupId>
<artifactId>jsch</artifactId>
<version>0.1.55</version>
</dependency>
<!--链接池-->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
<version>2.11.1</version>
</dependency>
首先我们一定要引入jsch
依赖,这个是我们实现SFTP客户端的基石;其次我们引入了链接池工具,为了避免每次执行SFTP命令都要重新创建链接,我们使用池化的方式优化了比较消耗资源的创建操作。
为了更好的使用SFTP工具,我们把jsch
中关于SFTP的相关功能提炼出来,做了一次简单的封装,做成了我们可以直接使用的工具类。
里面只有两类方法:
1.创建Session与开启Session;
session创建好后,还不能创建channel,需要开启session后才能创建channel;
2.创建channel与开启channel;
channel也是一样,创建好的channel需要开启后才能真正地执行命令;
public class JschUtil {
/**
* 创建session
*
* @param userName 用户名
* @param password 密码
* @param host 域名
* @param port 端口
* @param privateKeyFile 密钥文件
* @param passphrase 口令
* @return
* @throws AwesomeException
*/
public static Session createSession(String userName, String password, String host, int port, String privateKeyFile, String passphrase) throws AwesomeException {
return createSession(new JSch(), userName, password, host, port, privateKeyFile, passphrase);
}
/**
* 创建session
*
* @param jSch
* @param userName 用户名
* @param password 密码
* @param host 域名
* @param port 端口
* @param privateKeyFile 密钥
* @param passphrase 口令
* @return
* @throws AwesomeException
*/
public static Session createSession(JSch jSch, String userName, String password, String host, int port, String privateKeyFile, String passphrase) throws AwesomeException {
try {
if (!StringUtils.isEmpty(privateKeyFile)) {
// 使用密钥验证方式,密钥可以是有口令的密钥,也可以是没有口令的密钥
if (!StringUtils.isEmpty(passphrase)) {
jSch.addIdentity(privateKeyFile, passphrase);
} else {
jSch.addIdentity(privateKeyFile);
}
}
// 获取session
Session session = jSch.getSession(userName, host, port);
if (!StringUtils.isEmpty(password)) {
session.setPassword(password);
}
// 不校验域名
session.setConfig("StrictHostKeyChecking", "no");
return session;
} catch (Exception e) {
throw new AwesomeException(500, "create session fail");
}
}
/**
* 创建session
*
* @param jSch
* @param userName 用户名
* @param password 密码
* @param host 域名
* @param port 端口
* @return
* @throws AwesomeException
*/
public static Session createSession(JSch jSch, String userName, String password, String host, int port) throws AwesomeException {
return createSession(jSch, userName, password, host, port, StringUtils.EMPTY, StringUtils.EMPTY);
}
/**
* 创建session
*
* @param jSch
* @param userName 用户名
* @param host 域名
* @param port 端口
* @return
* @throws AwesomeException
*/
private Session createSession(JSch jSch, String userName, String host, int port) throws AwesomeException {
return createSession(jSch, userName, StringUtils.EMPTY, host, port, StringUtils.EMPTY, StringUtils.EMPTY);
}
/**
* 开启session链接
*
* @param jSch
* @param userName 用户名
* @param password 密码
* @param host 域名
* @param port 端口
* @param privateKeyFile 密钥
* @param passphrase 口令
* @param timeout 链接超时时间
* @return
* @throws AwesomeException
*/
public static Session openSession(JSch jSch, String userName, String password, String host, int port, String privateKeyFile, String passphrase, int timeout) throws AwesomeException {
Session session = createSession(jSch, userName, password, host, port, privateKeyFile, passphrase);
try {
if (timeout >= 0) {
session.connect(timeout);
} else {
session.connect();
}
return session;
} catch (Exception e) {
throw new AwesomeException(500, "session connect fail");
}
}
/**
* 开启session链接
*
* @param userName 用户名
* @param password 密码
* @param host 域名
* @param port 端口
* @param privateKeyFile 密钥
* @param passphrase 口令
* @param timeout 链接超时时间
* @return
* @throws AwesomeException
*/
public static Session openSession(String userName, String password, String host, int port, String privateKeyFile, String passphrase, int timeout) throws AwesomeException {
Session session = createSession(userName, password, host, port, privateKeyFile, passphrase);
try {
if (timeout >= 0) {
session.connect(timeout);
} else {
session.connect();
}
return session;
} catch (Exception e) {
throw new AwesomeException(500, "session connect fail");
}
}
/**
* 开启session链接
*
* @param jSch
* @param userName 用户名
* @param password 密码
* @param host 域名
* @param port 端口
* @param timeout 链接超时时间
* @return
* @throws AwesomeException
*/
public static Session openSession(JSch jSch, String userName, String password, String host, int port, int timeout) throws AwesomeException {
return openSession(jSch, userName, password, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
}
/**
* 开启session链接
*
* @param userName 用户名
* @param password 密码
* @param host 域名
* @param port 端口
* @param timeout 链接超时时间
* @return
* @throws AwesomeException
*/
public static Session openSession(String userName, String password, String host, int port, int timeout) throws AwesomeException {
return openSession(userName, password, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
}
/**
* 开启session链接
*
* @param jSch
* @param userName 用户名
* @param host 域名
* @param port 端口
* @param timeout 链接超时时间
* @return
* @throws AwesomeException
*/
public static Session openSession(JSch jSch, String userName, String host, int port, int timeout) throws AwesomeException {
return openSession(jSch, userName, StringUtils.EMPTY, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
}
/**
* 开启session链接
*
* @param userName 用户名
* @param host 域名
* @param port 端口
* @param timeout 链接超时时间
* @return
* @throws AwesomeException
*/
public static Session openSession(String userName, String host, int port, int timeout) throws AwesomeException {
return openSession(userName, StringUtils.EMPTY, host, port, StringUtils.EMPTY, StringUtils.EMPTY, timeout);
}
/**
* 创建指定通道
*
* @param session
* @param channelType
* @return
* @throws AwesomeException
*/
public static Channel createChannel(Session session, ChannelType channelType) throws AwesomeException {
try {
if (!session.isConnected()) {
session.connect();
}
return session.openChannel(channelType.getValue());
} catch (Exception e) {
throw new AwesomeException(500, "open channel fail");
}
}
/**
* 创建sftp通道
*
* @param session
* @return
* @throws AwesomeException
*/
public static ChannelSftp createSftp(Session session) throws AwesomeException {
return (ChannelSftp) createChannel(session, ChannelType.SFTP);
}
/**
* 创建shell通道
*
* @param session
* @return
* @throws AwesomeException
*/
public static ChannelShell createShell(Session session) throws AwesomeException {
return (ChannelShell) createChannel(session, ChannelType.SHELL);
}
/**
* 开启通道
*
* @param session
* @param channelType
* @param timeout
* @return
* @throws AwesomeException
*/
public static Channel openChannel(Session session, ChannelType channelType, int timeout) throws AwesomeException {
Channel channel = createChannel(session, channelType);
try {
if (timeout >= 0) {
channel.connect(timeout);
} else {
channel.connect();
}
return channel;
} catch (Exception e) {
throw new AwesomeException(500, "connect channel fail");
}
}
/**
* 开启sftp通道
*
* @param session
* @param timeout
* @return
* @throws AwesomeException
*/
public static ChannelSftp openSftpChannel(Session session, int timeout) throws AwesomeException {
return (ChannelSftp) openChannel(session, ChannelType.SFTP, timeout);
}
/**
* 开启shell通道
*
* @param session
* @param timeout
* @return
* @throws AwesomeException
*/
public static ChannelShell openShellChannel(Session session, int timeout) throws AwesomeException {
return (ChannelShell) openChannel(session, ChannelType.SHELL, timeout);
}
enum ChannelType {
SESSION("session"),
SHELL("shell"),
EXEC("exec"),
X11("x11"),
AGENT_FORWARDING("auth-agent@openssh.com"),
DIRECT_TCPIP("direct-tcpip"),
FORWARDED_TCPIP("forwarded-tcpip"),
SFTP("sftp"),
SUBSYSTEM("subsystem");
private final String value;
ChannelType(String value) {
this.value = value;
}
public String getValue() {
return this.value;
}
}
}
SFTP链接池化
我们通过实现BasePooledObjectFactory
类来池化通道ChannelSftp
。这并不是真正池化的代码,下面的代码只是告知池化管理器如何创建对象和销毁对象。
static class SftpFactory extends BasePooledObjectFactory<ChannelSftp> implements AutoCloseable {
private Session session;
private SftpProperties properties;
// 初始化SftpFactory
// 里面主要是创建目标session,后续可用通过这个session不断地创建ChannelSftp。
SftpFactory(SftpProperties properties) throws AwesomeException {
this.properties = properties;
String username = properties.getUsername();
String password = properties.getPassword();
String host = properties.getHost();
int port = properties.getPort();
String privateKeyFile = properties.getPrivateKeyFile();
String passphrase = properties.getPassphrase();
session = JschUtil.createSession(username, password, host, port, privateKeyFile, passphrase);
}
// 销毁对象,主要是销毁ChannelSftp
@Override
public void destroyObject(PooledObject<ChannelSftp> p) throws Exception {
p.getObject().disconnect();
}
// 创建对象ChannelSftp
@Override
public ChannelSftp create() throws Exception {
int timeout = properties.getTimeout();
return JschUtil.openSftpChannel(this.session, timeout);
}
// 包装创建出来的对象
@Override
public PooledObject<ChannelSftp> wrap(ChannelSftp channelSftp) {
return new DefaultPooledObject<>(channelSftp);
}
// 验证对象是否可用
@Override
public boolean validateObject(PooledObject<ChannelSftp> p) {
return p.getObject().isConnected();
}
// 销毁资源,关闭session
@Override
public void close() throws Exception {
if (Objects.nonNull(session)) {
if (session.isConnected()) {
session.disconnect();
}
session = null;
}
}
}
为了实现真正的池化操作,我们还需要以下代码:
1.我们需要在SftpClient对象中创建一个GenericObjectPool
对象池,这个才是真正的池子,它负责创建和存储所有的对象。
2.我们还需要提供资源销毁的功能,也就是实现AutoCloseable
,在服务停止时,需要把相关的资源销毁。
public class SftpClient implements AutoCloseable {
private SftpFactory sftpFactory;
GenericObjectPool<ChannelSftp> objectPool;
// 构造方法1
public SftpClient(SftpProperties properties, GenericObjectPoolConfig<ChannelSftp> poolConfig) throws AwesomeException {
this.sftpFactory = new SftpFactory(properties);
objectPool = new GenericObjectPool<>(this.sftpFactory, poolConfig);
}
// 构造方法2
public SftpClient(SftpProperties properties) throws AwesomeException {
this.sftpFactory = new SftpFactory(properties);
SftpProperties.PoolConfig config = properties.getPool();
// 默认池化配置
if (Objects.isNull(config)) {
objectPool = new GenericObjectPool<>(this.sftpFactory);
} else {
// 自定义池化配置
GenericObjectPoolConfig<ChannelSftp> poolConfig = new GenericObjectPoolConfig<>();
poolConfig.setMaxIdle(config.getMaxIdle());
poolConfig.setMaxTotal(config.getMaxTotal());
poolConfig.setMinIdle(config.getMinIdle());
poolConfig.setTestOnBorrow(config.isTestOnBorrow());
poolConfig.setTestOnCreate(config.isTestOnCreate());
poolConfig.setTestOnReturn(config.isTestOnReturn());
poolConfig.setTestWhileIdle(config.isTestWhileIdle());
poolConfig.setBlockWhenExhausted(config.isBlockWhenExhausted());
poolConfig.setMaxWait(Duration.ofMillis(config.getMaxWaitMillis()));
poolConfig.setTimeBetweenEvictionRuns(Duration.ofMillis(config.getTimeBetweenEvictionRunsMillis()));
objectPool = new GenericObjectPool<>(this.sftpFactory, poolConfig);
}
}
// 销毁资源
@Override
public void close() throws Exception {
// 销毁链接池
if (Objects.nonNull(this.objectPool)) {
if (!this.objectPool.isClosed()) {
this.objectPool.close();
}
}
this.objectPool = null;
// 销毁sftpFactory
if (Objects.nonNull(this.sftpFactory)) {
this.sftpFactory.close();
}
}
}
SFTP链接池的使用
我们已经对链接池进行了初始化,下面我们就可以从链接池中获取我们需要的ChannelSftp
来实现文件的上传下载了。
下面实现了多种文件上传和下载的方式:
1.直接把本地文件上传到SFTP服务器的指定路径;
2.把InputStream输入流提交到SFTP服务器指定路径中;
3.可以针对以上两种上传方式进行进度的监测;
4.把SFTP服务器中的指定文件下载到本地机器上;
5.把SFTP服务器˙中的文件写入指定的输出流;
6.针对以上两种下载方式,监测下载进度;
/**
* 上传文件
*
* @param srcFilePath
* @param targetDir
* @param targetFileName
* @return
* @throws AwesomeException
*/
public boolean uploadFile(String srcFilePath, String targetDir, String targetFileName) throws AwesomeException {
return uploadFile(srcFilePath, targetDir, targetFileName, null);
}
/**
* 上传文件
*
* @param srcFilePath
* @param targetDir
* @param targetFileName
* @param monitor
* @return
* @throws AwesomeException
*/
public boolean uploadFile(String srcFilePath, String targetDir, String targetFileName, SftpProgressMonitor monitor) throws AwesomeException {
ChannelSftp channelSftp = null;
try {
// 从链接池获取对象
channelSftp = this.objectPool.borrowObject();
// 如果不存在目标文件夹
if (!exist(channelSftp, targetDir)) {
mkdirs(channelSftp, targetDir);
}
channelSftp.cd(targetDir);
// 上传文件
if (Objects.nonNull(monitor)) {
channelSftp.put(srcFilePath, targetFileName, monitor);
} else {
channelSftp.put(srcFilePath, targetFileName);
}
return true;
} catch (Exception e) {
throw new AwesomeException(500, "upload file fail");
} finally {
if (Objects.nonNull(channelSftp)) {
// 返还对象给链接池
this.objectPool.returnObject(channelSftp);
}
}
}
/**
* 上传文件到目标文件夹
*
* @param in
* @param targetDir
* @param targetFileName
* @return
* @throws AwesomeException
*/
public boolean uploadFile(InputStream in, String targetDir, String targetFileName) throws AwesomeException {
return uploadFile(in, targetDir, targetFileName, null);
}
/**
* 上传文件,添加进度监视器
*
* @param in
* @param targetDir
* @param targetFileName
* @param monitor
* @return
* @throws AwesomeException
*/
public boolean uploadFile(InputStream in, String targetDir, String targetFileName, SftpProgressMonitor monitor) throws AwesomeException {
ChannelSftp channelSftp = null;
try {
channelSftp = this.objectPool.borrowObject();
// 如果不存在目标文件夹
if (!exist(channelSftp, targetDir)) {
mkdirs(channelSftp, targetDir);
}
channelSftp.cd(targetDir);
if (Objects.nonNull(monitor)) {
channelSftp.put(in, targetFileName, monitor);
} else {
channelSftp.put(in, targetFileName);
}
return true;
} catch (Exception e) {
throw new AwesomeException(500, "upload file fail");
} finally {
if (Objects.nonNull(channelSftp)) {
this.objectPool.returnObject(channelSftp);
}
}
}
/**
* 下载文件
*
* @param remoteFile
* @param targetFilePath
* @return
* @throws AwesomeException
*/
public boolean downloadFile(String remoteFile, String targetFilePath) throws AwesomeException {
return downloadFile(remoteFile, targetFilePath, null);
}
/**
* 下载目标文件到本地
*
* @param remoteFile
* @param targetFilePath
* @return
* @throws AwesomeException
*/
public boolean downloadFile(String remoteFile, String targetFilePath, SftpProgressMonitor monitor) throws AwesomeException {
ChannelSftp channelSftp = null;
try {
channelSftp = this.objectPool.borrowObject();
// 如果不存在目标文件夹
if (!exist(channelSftp, remoteFile)) {
// 不用下载了
return false;
}
File targetFile = new File(targetFilePath);
try (FileOutputStream outputStream = new FileOutputStream(targetFile)) {
if (Objects.nonNull(monitor)) {
channelSftp.get(remoteFile, outputStream, monitor);
} else {
channelSftp.get(remoteFile, outputStream);
}
}
return true;
} catch (Exception e) {
throw new AwesomeException(500, "upload file fail");
} finally {
if (Objects.nonNull(channelSftp)) {
this.objectPool.returnObject(channelSftp);
}
}
}
/**
* 下载文件
*
* @param remoteFile
* @param outputStream
* @return
* @throws AwesomeException
*/
public boolean downloadFile(String remoteFile, OutputStream outputStream) throws AwesomeException {
return downloadFile(remoteFile, outputStream, null);
}
/**
* 下载文件
*
* @param remoteFile
* @param outputStream
* @param monitor
* @return
* @throws AwesomeException
*/
public boolean downloadFile(String remoteFile, OutputStream outputStream, SftpProgressMonitor monitor) throws AwesomeException {
ChannelSftp channelSftp = null;
try {
channelSftp = this.objectPool.borrowObject();
// 如果不存在目标文件夹
if (!exist(channelSftp, remoteFile)) {
// 不用下载了
return false;
}
if (Objects.nonNull(monitor)) {
channelSftp.get(remoteFile, outputStream, monitor);
} else {
channelSftp.get(remoteFile, outputStream);
}
return true;
} catch (Exception e) {
throw new AwesomeException(500, "upload file fail");
} finally {
if (Objects.nonNull(channelSftp)) {
this.objectPool.returnObject(channelSftp);
}
}
}
/**
* 创建文件夹
*
* @param channelSftp
* @param dir
* @return
*/
protected boolean mkdirs(ChannelSftp channelSftp, String dir) {
try {
String pwd = channelSftp.pwd();
if (StringUtils.contains(pwd, dir)) {
return true;
}
String relativePath = StringUtils.substringAfter(dir, pwd);
String[] dirs = StringUtils.splitByWholeSeparatorPreserveAllTokens(relativePath, "/");
for (String path : dirs) {
if (StringUtils.isBlank(path)) {
continue;
}
try {
channelSftp.cd(path);
} catch (SftpException e) {
channelSftp.mkdir(path);
channelSftp.cd(path);
}
}
return true;
} catch (Exception e) {
return false;
}
}
/**
* 判断文件夹是否存在
*
* @param channelSftp
* @param dir
* @return
*/
protected boolean exist(ChannelSftp channelSftp, String dir) {
try {
channelSftp.lstat(dir);
return true;
} catch (Exception e) {
return false;
}
}
集成到SpringBoot中
我们可以通过java config
的方式,把我们已经实现好的SftpClient
类实例化到Spring IOC
容器中来管理,以便让开发人员在整个项目中通过@Autowired
的方式就可以直接使用。
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;
/**
* @author zouwei
* @className SftpProperties
* @date: 2022/8/19 下午12:12
* @description:
*/
@Data
@Configuration
@ConfigurationProperties(prefix = "sftp.config")
public class SftpProperties {
// 用户名
private String username;
// 密码
private String password;
// 主机名
private String host;
// 端口
private int port;
// 密钥
private String privateKeyFile;
// 口令
private String passphrase;
// 通道链接超时时间
private int timeout;
// 链接池配置
private PoolConfig pool;
@Data
public static class PoolConfig {
//最大空闲实例数,空闲超过此值将会被销毁淘汰
private int maxIdle;
// 最小空闲实例数,对象池将至少保留2个空闲对象
private int minIdle;
//最大对象数量,包含借出去的和空闲的
private int maxTotal;
//对象池满了,是否阻塞获取(false则借不到直接抛异常)
private boolean blockWhenExhausted;
// BlockWhenExhausted为true时生效,对象池满了阻塞获取超时,不设置则阻塞获取不超时,也可在borrowObject方法传递第二个参数指定本次的超时时间
private long maxWaitMillis;
// 创建对象后是否验证对象,调用objectFactory#validateObject
private boolean testOnCreate;
// 借用对象后是否验证对象 validateObject
private boolean testOnBorrow;
// 归还对象后是否验证对象 validateObject
private boolean testOnReturn;
// 定时检查期间是否验证对象 validateObject
private boolean testWhileIdle;
//定时检查淘汰多余的对象, 启用单独的线程处理
private long timeBetweenEvictionRunsMillis;
//jmx监控,和springboot自带的jmx冲突,可以选择关闭此配置或关闭springboot的jmx配置
private boolean jmxEnabled;
}
}
java Bean注入
import com.example.awesomespring.exception.AwesomeException;
import com.example.awesomespring.sftp.SftpClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author zouwei
* @className SftpConfig
* @date: 2022/8/19 下午12:12
* @description:
*/
@Configuration
public class SftpConfig {
@Autowired
private SftpProperties properties;
// 创建SftpClient对象
@Bean(destroyMethod = "close")
@ConditionalOnProperty(prefix = "sftp.config")
public SftpClient sftpClient() throws AwesomeException {
return new SftpClient(properties);
}
}
通过以上代码,我们就可以在项目的任何地方直接使用SFTP客户端来上传和下载文件了。
更多关于SpringBoot SFTP文件上传下载的资料请关注易知道(ezd.cc)其它相关文章!