掌握如何流式读取大文件

几乎所有人都知道,在 Python 里读取文件有一种“标准做法”:首先使用 withopen(fine_name) 上下文管理器的方式获得一个文件对象,然后使用 for 循环迭代它,逐行获取文件里的内容。

下面是一个使用这种“标准做法”的简单示例函数:

def count_nine(fname):
"""计算文件里包含多少个数字 '9'"""
    count =  0
    with open(fname)  as file:
	    for line in file:`
            count += line.count('9')
        return count
		

假如我们有一个文件 small_file.txt,那么使用这个函数可以轻松计算出 9 的数量。

# small_file.txt
feiowe9322nasd9233rl
aoeijfiowejf8322kaf9a

# OUTPUT: 3
print(count_nine('small_file.txt'))

为什么这种文件读取方式会成为标准?这是因为它有两个好处:

  1. with 上下文管理器会自动关闭打开的文件描述符
  2. 在迭代文件对象时,内容是一行一行返回的,不会占用太多内存

标准做法的缺点:

但这套标准做法并非没有缺点。如果被读取的文件里,根本就没有任何换行符,那么上面的第二个好处就不成立了。** 当代码执行到 forlineinfile 时,line 将会变成一个非常巨大的字符串对象,消耗掉非常可观的内存。

让我们来做个试验:有一个 5GB 大的文件 big_file.txt,它里面装满了和 small_file.txt一样的随机字符串。只不过它存储内容的方式稍有不同,所有的文本都被放在了同一行里:

# FILE: big_file.txt
df2if283rkwefh...  <剩余  5GB  大小>  ...`

如果我们继续使用前面的 count_nine 函数去统计这个大文件里 9 的个数。那么在我的笔记本上,这个过程会足足花掉 65 秒,并在执行过程中吃掉机器 2GB 内存

使用 read 方法分块读取

为了解决这个问题,我们需要暂时把这个“标准做法”放到一边,使用更底层的 file.read() 方法。与直接循环迭代文件对象不同,每次调用 file.read(chunk_size) 会直接返回从当前位置往后读取 chunk_size 大小的文件内容,不必等待任何换行符出现。

所以,如果使用 file.read() 方法,我们的函数可以改写成这样:

def count_nine_v2(fname):
"""计算文件里包含多少个数字 '9',每次读取 8kb"""
	count =  0
	block_size =  1024  *  8
	with open(fname)  as fp:
		while  True:
		    chunk = fp.read(block_size)
# 当文件没有更多内容时,read 调用将会返回空字符串
			if  not chunk:
				break`
			count += chunk.count('9')
	return count

在新函数中,我们使用了一个 while 循环来读取文件内容,每次最多读取 8kb 大小,这样可以避免之前需要拼接一个巨大字符串的过程,把内存占用降低非常多。

利用生成器解耦代码

假如我们在讨论的不是 Python,而是其他编程语言。那么可以说上面的代码已经很好了。但是如果你认真分析一下 count_nine_v2 函数,你会发现在循环体内部,存在着两个独立的逻辑:数据生成(read 调用与 chunk 判断)数据消费。而这两个独立逻辑被耦合在了一起。

正如在《编写地道循环》里所提到的,为了提升复用能力,我们可以定义一个新的 chunked_file_reader 生成器函数,由它来负责所有与“数据生成”相关的逻辑。这样 count_nine_v3 里面的主循环就只需要负责计数即可。

def chunked_file_reader(fp, block_size=1024  *  8):
"""生成器函数:分块读取文件内容"""
	while  True:
		chunk = fp.read(block_size)
# 当文件没有更多内容时,read 调用将会返回空字符串
		if  not chunk:
			break
		yield chunk


def count_nine_v3(fname):
	count =  0
	with open(fname)  as fp:
		for chunk in chunked_file_reader(fp):
			count += chunk.count('9')
	return count

进行到这一步,代码似乎已经没有优化的空间了,但其实不然。iter(iterable) 是一个用来构造迭代器的内建函数,但它还有一个更少人知道的用法。当我们使用 iter(callable,sentinel) 的方式调用它时,会返回一个特殊的对象,迭代它将不断产生可调用对象 callable 的调用结果,直到结果为 setinel 时,迭代终止。

def chunked_file_reader(file, block_size=1024  *  8):
	"""生成器函数:分块读取文件内容,使用 iter 函数"""
	# 首先使用 partial(fp.read, block_size) 构造一个新的无需参数的函数
	# 循环将不断返回 fp.read(block_size) 调用结果,直到其为 '' 时终止
	for chunk in iter(partial(file.read, block_size),  ''):
		yield chunk

最终,只需要两行代码,我们就完成了一个可复用的分块文件读取函数。那么,这个函数在性能方面的表现如何呢?

和一开始的 2GB 内存 / 耗时 65 秒 相比,使用生成器的版本只需要 7MB 内存 / 12 秒 就能完成计算。效率提升了接近 4 倍,内存占用更是不到原来的 1%。