RPA 应用函数分享之 ---FTP 文件上传下载
在做 RPA 流程时候,数据源的获取是多种多样的:共享盘获取,FTP 获取,邮件附件获取,这里整理了 FTP 获取数据源需要用到的一些函数:
可以获取单个,也可以拷贝获取整个文件夹的数据。
# coding=utf-8
import os
from ftplib import FTP
#创建文件夹
def mkdirs(path):
path = path.strip().rstrip('\\')
isExists = os.path.exists(path)
if not isExists:
os.makedirs(path)
#ftp建立连接
def ftp_connect():
ftp=FTP()
ftp.set_debuglevel(2)
ftp.connect('IP','PORT')
ftp.login('loginname', 'passwd')
return ftp
#ftp下载文件(入参为文件路径)
#source_path=isrpa/need_download/aaa.txt
#local_path=d:\isrpa\download\bbb.txt
def download_file(ftp,local_path,source_path):
file_writer = open(local_path,'wb')
ftp.retrbinary('RETR %s' % source_path,file_writer.write,8192)
file_writer.close
#ftp上传文件(入参为文件路径)
def upload_file(ftp,local_path,source_path):
if os.path.exists(local_path):
file_writer = open(local_path,'rb')
ftp.storbinary('STOR %s' % source_path,file_writer,8192)
file_writer.close
#递归下载ftp上的文件夹(入参为文件夹路径)
#source_path=isrpa/need_download
#local_path=d:\isrpa\download
def download_ftp_dir(source_path, local_path):
ftp= ftp_connect()
ftp.cwd(source_path)
mkdirs(local_path)
os.chdir(local_path)
download_dir(ftp,'')
ftp.quit
def download_dir(ftp,next_dir):
ftp_curr_dir = ftp.pwd()
local_curr_dir = os.getcwd()
if next_dir != '':
ftp.cwd(next_dir)
try:
os.mkdir(next_dir)
except:
pass
os.chdir(next_dir)
files,dirs = get_ftp_dirs_files(ftp)
for f in files:
download_file(ftp,f,f)
for d in dirs:
ftp.cwd(ftp_curr_dir)
os.chdir(local_curr_dir)
download_dir(ftp,d)
def get_ftp_dirs_files(ftp):
dir_res = []
ftp.dir(dir_res.append)
files = [f.split(None,8)[-1] for f in dir_res if f.find('<DIR>')==-1]
dirs = [f.split(None,8)[-1] for f in dir_res if f.find('<DIR>')>-1]
return (files,dirs)
#递归上传文件夹至ftp(入参为文件夹路径)
def upload_local_dir(local_path, source_path):
ftp= ftp_connect()
ftp.cwd(source_path)
os.chdir(local_path)
upload_dir(ftp,'')
ftp.quit
def upload_dir(ftp,next_dir):
ftp_curr_dir = ftp.pwd()
local_curr_dir = os.getcwd()
if next_dir != '':
os.chdir(next_dir)
try:
ftp.mkd(next_dir)
except:
pass
ftp.cwd(next_dir)
files,dirs = get_local_dirs_files()
for f in files:
upload_file(ftp,f,f)
for d in dirs:
ftp.cwd(ftp_curr_dir)
os.chdir(local_curr_dir)
uoload_dir(ftp,d)
def get_local_dirs_files():
files = []
dirs = []
for file in os.listdir():
file_path = os.path.join(os.getcwd(), file)
if os.path.isdir(file_path):
dirs.append(file)
else:
files.append(file)
return (files,dirs)
#递归删除文件夹及文件
def remove_dir(path):
try:
for file in os.listdir(path):
file_path = os.path.join(path, file)
if not os.path.isdir(file_path):
os.remove(file_path)
else:
remove_dir(file_path)
os.rmdir(path)
print('文件删除成功')
except Exception as e:
print(e,'文件删除失败')
客户这边需要用 key,代码过不去
不需要的,咱们自带
赞一下,ftplib 需要另外下载吗?