问题:续接上一篇。说干咱就干呀,勤勤恳恳写程序呀!
目标:此篇开始进入正题了。为实现我们整个项目功能而开始实现各个子模块功能。首先实现第一篇列出的分步功能模块的第一步:
1、python访问ftp,下载所有文件到本地文件夹DownloadData
解决方案:查找python访问ftp并下载文件的相关资料,编写涉及的功能函数,并将其封装在一个通用的FTPHelper类中。这样方便使用和维护。
具体相关项分析:
1、ftp对象信息:ftp主机名称或者ip地址,端口(默认是21),登录名称,登录密码。
2、成功连接上ftp后的操作:某指定的路径是否是文件路径,某指定的路径是否是文件夹路径,获取某指定的路径下的所有文件名,下载某指定的路径下的所有文件。
3、python实现操纵ftp需要引用什么模块?ftplib。
python是依赖功能包来实现一些常见的功能的,因此安装和引用功能包是使用python编程的常见步骤。
具体实践:
1、新建了一个类: FTPHelper.py。
首行添加代码:import ftplib ,引用ftplib包。定义conn 连接对象,类初始化时连接ftp,定义login登录函数。
conn = ftplib.FTP()
def __init__(self, host, port=21):
self.conn.connect(host, port)
def login(self, username, password):
self.conn.login(username, password)
self.conn.set_pasv(True)
print("FTPHelper init")
# print(self.conn.welcome)
2、定义相关功能函数
def _is_ftp_dir(self, ftp_path): # 私有函数:是否是ftp路径
ftp_path = ftp_path.rstrip('/')
ftp_path_parent = os.path.dirname(ftp_path)
self.ftp_dir_name = os.path.basename(ftp_path)
self._is_dir = False
if ftp_path == '.' or ftp_path == './' or ftp_path == '':
self._is_dir = True
else:
# this use callback function ,that will change _is_dir value
try:
self.conn.retrlines('LIST %s' % ftp_path_parent, self._ftp_list)
except ftplib.error_perm as e:
return self._is_dir
return self._is_dir
def _ftp_list(self, line):
list = line.split(' ')
if self.ftp_dir_name == list[-1] and list[0].startswith('d'):
self._is_dir = True
def _is_ftp_file(self, ftp_path): # 私有函数:是否是ftp文件
try:
if ftp_path in self.conn.nlst(os.path.dirname(ftp_path)):
return True
else:
return False
except ftplib.error_perm as e:
return False
def download_files_from_dir(self, ftp_path, local_path='.', begin=False):
try:
ftp_path = ftp_path.rstrip('/')
# 当ftp目录存在时下载
if self._is_ftp_dir(ftp_path):
# 将指定路径ftp_path文件夹中的文件下载到本地。
# begin=True,则将ftp文件夹名作为本地保存文件夹名。
# 如果本地目录不存在,则创建目录。
if begin:
if not os.path.isdir(local_path):
os.makedirs(local_path)
local_path = os.path.join(local_path, os.path.basename(ftp_path))
# 如果本地目录不存在,则创建目录
if not os.path.isdir(local_path):
os.makedirs(local_path)
# 进入ftp目录,开始递归查询
self.conn.cwd(ftp_path)
ftp_files = self.conn.nlst()
for file in ftp_files:
local_file = os.path.join(local_path, file)
# 如果file ftp路径是文件则直接上传文件
if not self._is_ftp_dir(file):
self.get_file(file, local_file)
# else:
# print(file + " is dir.")
# 如果当前ftp目录文件已经遍历完毕返回上一层目录
self.conn.cwd("..")
return
else:
print('ERROR:The dir:%s is not exist' % ftp_path)
return
except Exception as e:
print(e)
def get_files_name_from_dir(self, ftp_path):
try:
ftp_path = ftp_path.rstrip('/')
# 当ftp目录存在时下载
if self._is_ftp_dir(ftp_path):
# 将指定路径ftp_path文件夹中的文件名返回。
# 返回:list。
# 进入ftp目录,开始递归查询
self.conn.cwd(ftp_path)
ftp_files = self.conn.nlst()
array_data = []
for file in ftp_files:
# 如果file ftp路径是文件则收录
if not self._is_ftp_dir(file):
array_data.append(file)
# else:
# print(file + " is dir.")
# 如果当前ftp目录文件已经遍历完毕返回上一层目录
self.conn.cwd("..")
return array_data
else:
print('ERROR:The dir:%s is not exist' % ftp_path)
LogHelper.LogHelper.write_msg2log_file('ERROR:The dir:%s is not exist' % ftp_path, self.PROJECT_NAME)
return []
except Exception as e:
print(e)
LogHelper.LogHelper.write_msg2log_file("[" + sys._getframe().f_code.co_name + "]" + e, self.PROJECT_NAME)
return []
3、定义测试函数test并添加到主函数执行
def test():
ftp = FTPHelper('xxx')
ftp.login('xxx', 'xxx')
# 下载文件夹到本地的文件夹
local_path = os.path.join(os.path.dirname(sys.argv[0]), "DownloadFiles")
ftp.download_files_from_dir('A/B', local_path)
print(ftp.get_files_name_from_dir('A'))
if __name__ == '__main__':
test()
参考地址:http://www.jb51.net/article/67196.htm