Toccata in Nowhere.

Python FTP多进程下载

2022.08.02

使用 multiproceesing 进行 ftp 数据的并行下载,可以极大地提升文件下载效率。

主要思路

使用一个 FTP 对象连接获取文件列表,之后加载多进程对数据进行并行下载并保存。期间需要对异常进行处理,需要针对多级文件目录设计不同的遍历思路。

实现

引入 ftplibmultiprocessing 和其他需要用到的库:

import os
from ftplib import FTP
import ftplib
import multiprocessing
from multiprocessing import Pool

构建服务器连接函数:

def ftpConnect(ftpserver, port, username=[], password=[], timeout=400):
	ftp = FTP(timeout=timeout)
	try:
		ftp.connect(ftpserver, port)
		ftp.login(username, password)
	except:
		raise IOError('\n FTP connection failed, please check the code!')
	else:
		print(ftp.getwelcome()) # Print information from ftp server
		print('\n[INFO]ftp connection successful!!!')
		return ftp

以上函数连接服务器并打印 FTP 服务器的欢迎信息。默认不需要用户密码,如果需要可以作为参数添加。另超时时间 timeout 单位为秒,可在登陆时进行设定。

同时,需要一个退出函数,断开连接:

def ftpDisConnect(ftp):
	ftp.quit()

构建下载函数:

	
def ftpDownloadFileFromServer(ftpserver, port, ftpfile, localfile, username=[], password=[], timeout=400):
	
	while True:
		try:
			fptTemp = ftpConnect(ftpserver, port, username, password, timeout)
			bufsize =  1024*1024
			with open(localfile, 'wb') as fid:
				fptTemp.retrbinary('RETR '+ftpfile, fid.write, bufsize)
				
			ftpDisConnect(fptTemp)
			
		except:
			print('[ERROR] Retry !!!!!!!!!!!')
			continue
		
		break

在下载时首先对 FTP 服务器新建连接,之后对下载进行多次尝试。如果超时进行重新连接,并重新尝试下载。下载结束后断开 FTP 服务,该函数可以使用 multiprocessingpool.starmap 进行调用。

下载主程序:

if __name__ == '__main__':
	ftpserver = 'Server_address'
	port = 21
	userName = ''
	pwd = ''
	
	nProcess = 10
	
	ftpFilePath = '/xxx/xxx/'
	localSavePath = './Download'
	
	ftp = ftpConnect(ftpserver, port, userName, pwd)
	ftp.cwd(ftpFilePath)   # 切换到文件目录
	ftpFileList = ftp.nlst()
	
	if not os.path.exists(localSavePath):
		os.makedirs(localSavePath)
	
	downloadList = []
	
	for ftpFileName in ftpFileList:
		print(ftpFileName)
		localFullPath = os.path.join(localSavePath, ftpFileName)
		ftpFileFullPath = os.path.join(ftpFilePath, ftpFileName)
		if not os.path.exists(localFullPath):		# 判断文件是否已经存在,有即跳过
			downloadList.append((ftpserver,port, fileFullPathCata, localFullPath))
		else:
			print('[info] already download: ', localFullPath, 'skipped..')
		
		
	with Pool(nProcess) as pool:
		pool.starmap(func=ftpDownloadFileFromServer, iterable=downloadList)
    

其中,nProcess 为并行使用的进程数,可根据网速进行修改;另如果存在多层文件夹结构,可以进行多个 ftpFileList = ftp.nlst()for ftpFileName in ftpFileList: 的嵌套以实现多层级结构的逐个下载,例如:

for flight in flightList:
			
	fileFullPath = os.path.join(ftpFilePath, flight)
	
	print(fileFullPath)
	
	localPath = os.path.join(localSavePath,flight)
	print('Flight Number Path: {0}'.format(fileFullPath))
	if not os.path.exists(localPath):
		os.makedirs(localPath)
		
	while True:
		try:
			ftp.cwd(fileFullPath)
		except:
			ftp = ftpConnect(ftpserver, port)
			continue
		
		break

	for cataName in ftp.nlst():
		print(cataName)
		local = os.path.join(localPath, cataName)

		if not os.path.exists(local):
			os.makedirs(local)
			
		print('Cata Path: {0}'.format(cataName))
		
		cataFullPath = os.path.join(fileFullPath, cataName)
		
		while True:
			try:
				ftp.cwd(cataFullPath)
			except:
				ftp = ftpConnect(ftpserver, port)
				continue
			
			break

		downloadList = []
		
		for fileName in ftp.nlst():
			print(fileName)
			localFullPath = os.path.join(local, fileName)
			fileFullPathCata = os.path.join(fileFullPath, cataName, fileName)
			if not os.path.exists(localFullPath):
				
				downloadList.append((ftpserver,port, fileFullPathCata, localFullPath))
			else:
				print('already download: ', localFullPath, 'skipped..')
				
				
		with Pool(10) as pool:
			pool.starmap(func=ftpDownloadFileFromServer, iterable=downloadList)

可以实现Flight/Cata/File多层级的遍历并行下载。

进一步优化任务并行

以上代码对于每个文件夹中存在大量文件的情况可对效率有较大的提升,但如果文件夹众多而单个文件夹内文件较少的情况,则只能在单个子目录中进行多进程下载。如有这种情况,可以在所有目录遍历前构建downloadList,在目录遍历的 for 循环后,即所有下载服务器地址-本地地址列表构建完成后,使用 multiprocess pool 进行并行下载。