Python数据流优化
2026/6/1 10:32:56 网站建设 项目流程

===== Python 数据流优化 =====
===== 使用生成器、迭代器和惰性求值处理大规模数据 =====

"""
数据流优化是一种内存高效的数据处理方式。
通过生成器管道、迭代器链和惰性求值,
可以处理远超内存容量的数据集。
"""

import itertools
import time
import sys


# ===== 1. 生成器基础:内存效率对比 =====

def load_all_data(n: int) -> list:
"""
传统方式:一次性将所有数据加载到内存。
当 n 很大时会导致 MemoryError。
"""
return [i ** 2 for i in range(n)]


def generate_data(n: int):
"""
生成器方式:逐个产生数据,几乎不占内存。
使用 yield 关键字创建生成器函数。
"""
for i in range(n):
yield i ** 2


def compare_memory():
"""
对比列表和生成器的内存占用。
生成器只保存当前状态,内存开销恒定。
"""
n = 100000

# 列表方式
start_mem = sys.getsizeof([])
list_data = load_all_data(n)
list_mem = sys.getsizeof(list_data)
print(f"列表占用内存: {list_mem / 1024:.2f} KB")

# 生成器方式
gen = generate_data(n)
gen_mem = sys.getsizeof(gen)
print(f"生成器占用内存: {gen_mem} bytes")

print(f"内存差异: {list_mem / max(gen_mem, 1):.0f}x 倍")


# ===== 2. 生成器管道:链式数据处理 =====

def read_lines(filename: str):
"""
管道第一步:逐行读取文件。
即使文件有几十 GB,也只会占用一行内存。
"""
with open(filename, 'r', encoding='utf-8') as f:
for line in f:
yield line.strip()


def filter_lines(lines, keyword: str):
"""
管道第二步:过滤包含关键字的行。
接收生成器,返回生成器,构成惰性管道。
"""
for line in lines:
if keyword in line:
yield line


def transform_lines(lines):
"""
管道第三步:对每行进行变换。
每个管道步骤都是独立的生成器函数。
"""
for line in lines:
# 转换为大写并添加前缀
yield f"[PROCESSED] {line.upper()}"


def count_words_in_lines(lines):
"""
管道第四步:统计每行的单词数。
管道的末端通常是一个消费数据的循环。
"""
for line in lines:
word_count = len(line.split())
yield (line, word_count)


def demo_pipeline():
"""
演示完整的生成器管道。
数据在管道中流动,每个步骤只处理一个元素。
"""
print("===== 生成器管道示例 =====")

# 模拟数据(实际应用中可能是大文件)
test_data = [
"apple banana cherry",
"dog elephant fox",
"python is great",
"apple pie is delicious",
"zip zap zop",
]

# 保存测试文件
with open('_test_pipeline.txt', 'w') as f:
for line in test_data:
f.write(line + '\n')

# 构建处理管道
lines = read_lines('_test_pipeline.txt')
filtered = filter_lines(lines, 'apple')
transformed = transform_lines(filtered)
counted = count_words_in_lines(transformed)

# 消费管道中的数据
for line, count in counted:
print(f" [{count} 词] {line}")


# ===== 3. itertools 工具集 =====

def demo_itertools():
"""
itertools 提供了丰富的流式数据处理工具。
所有函数都返回迭代器,惰性求值。
"""
print("===== itertools 流式处理 =====")

# chain: 将多个迭代器串联
data1 = [1, 2, 3]
data2 = [4, 5, 6]
chained = itertools.chain(data1, data2)
print(f" chain: {list(chained)}")

# islice: 对无限生成器切片
def counter():
i = 0
while True:
yield i
i += 1

sliced = itertools.islice(counter(), 5, 10)
print(f" islice(5..10): {list(sliced)}")

# takewhile / dropwhile: 条件截取
data = [1, 3, 5, 7, 2, 4, 6, 8]
taken = itertools.takewhile(lambda x: x < 5, data)
dropped = itertools.dropwhile(lambda x: x < 5, data)
print(f" takewhile(<5): {list(taken)}")
print(f" dropwhile(<5): {list(dropped)}")

# groupby: 流式分组(要求数据已排序)
data = [(1, 'a'), (1, 'b'), (2, 'c'), (2, 'd')]
for key, group in itertools.groupby(data, key=lambda x: x[0]):
print(f" groupby({key}): {list(group)}")

# zip_longest: 不等长压缩
a = [1, 2, 3]
b = ['x', 'y']
zipped = itertools.zip_longest(a, b, fillvalue=None)
print(f" zip_longest: {list(zipped)}")

# accumulate: 累积计算
values = [1, 2, 3, 4, 5]
accumulated = itertools.accumulate(values)
print(f" accumulate: {list(accumulated)}")

# product: 笛卡尔积(可替代嵌套循环)
colors = ['red', 'blue']
sizes = ['S', 'M', 'L']
products = itertools.product(colors, sizes)
print(f" product: {list(products)}")


# ===== 4. 惰性求值在大数据中的应用 =====

def lazy_evaluation_demo():
"""
惰性求值允许处理理论上"无限"的数据流。
只计算需要的那部分数据。
"""
print("===== 惰性求值示例 =====")

# 无限斐波那契数列生成器
def fibonacci():
a, b = 0, 1
while True:
yield a
a, b = b, a + b

# 获取前 20 个斐波那契数
fib = fibonacci()
first_20 = list(itertools.islice(fib, 20))
print(f" 前 20 个斐波那契数: {first_20}")

# 找到第一个大于 1000 的斐波那契数
fib = fibonacci()
for num in fib:
if num > 1000:
print(f" 第一个大于 1000 的斐波那契数: {num}")
break

# 惰性筛选大文件中的匹配行
# 以下操作不会一次性读取整个文件
def find_pattern(filename, pattern):
with open(filename) as f:
for line in f:
if pattern in line:
yield line
# 只找到前 5 个匹配就停止
yield from itertools.islice(find_pattern('_test_pipeline.txt', 'apple'), 5)

matches = list(itertools.islice(
filter_lines(read_lines('_test_pipeline.txt'), 'apple'),
5
))
print(f" 惰性匹配到 {len(matches)} 行包含 'apple'")


# ===== 5. 迭代器链式操作 =====

def demo_iterator_chain():
"""
使用函数式编程风格链式处理数据流。
避免创建中间列表,减少内存使用。
"""
print("===== 迭代器链式操作 =====")

# 传统方式:多个中间列表
numbers = range(100000)
squares = [x ** 2 for x in numbers]
even_squares = [x for x in squares if x % 2 == 0]
result_trad = sum(even_squares[:1000])

# 迭代器链:无中间列表
result_chain = sum(
itertools.islice(
(x ** 2 for x in range(100000)
if (x ** 2) % 2 == 0),
1000
)
)

print(f" 传统方式结果: {result_trad}")
print(f" 迭代器链结果: {result_chain}")

# 更复杂的链式处理
data = range(10000)
pipeline = (
(x * 2 for x in data) # 变换
if x > 5 for x in data # 筛选
)
# 上述是错误的语法,修正如下:
pipeline = (x * 2 for x in data if x > 5)
result = sum(itertools.islice(pipeline, 100))
print(f" 链式流水线结果: {result}")


# ===== 6. 实战:流式日志分析 =====

def stream_log_analysis():
"""
模拟流式分析大规模日志文件。
演示如何使用生成器管道处理实时数据流。
"""
print("===== 流式日志分析 =====")

import random
import datetime

# 模拟日志生成器
def log_generator(n: int):
levels = ['INFO', 'WARN', 'ERROR', 'DEBUG']
services = ['api', 'web', 'db', 'cache']
for _ in range(n):
timestamp = datetime.datetime.now().isoformat()
level = random.choice(levels)
service = random.choice(services)
message = f"{timestamp} [{level}] {service}: 请求处理中..."
yield message

# 实时过滤分析
def filter_errors(logs):
for log in logs:
if 'ERROR' in log:
yield log

def extract_service(logs):
for log in logs:
parts = log.split()
service = parts[3].rstrip(':')
yield service

# 流式消费
logs = log_generator(10000)
error_logs = filter_errors(logs)
services = extract_service(error_logs)

# 统计各服务错误次数
error_counts = {}
for svc in services:
error_counts[svc] = error_counts.get(svc, 0) + 1

print(f" 各服务错误统计: {error_counts}")

# 使用 collections.Counter 替代
from collections import Counter
logs = log_generator(10000)
error_services = extract_service(filter_errors(logs))
counts = Counter(error_services)
print(f" Counter 统计: {dict(counts)}")


# ===== 性能测试:比较不同方式 =====

def benchmark_streaming():
"""
对比传统列表方式和流式处理的内存和时间开销。
"""
print("===== 流式处理性能对比 =====")

n = 1000000

# 列表方式
start = time.perf_counter()
result_list = sum([x ** 2 for x in range(n)])
t_list = time.perf_counter() - start

# 生成器方式
start = time.perf_counter()
result_gen = sum(x ** 2 for x in range(n))
t_gen = time.perf_counter() - start

print(f" 列表 sum: {t_list:.4f}s (结果: {result_list})")
print(f" 生成器 sum: {t_gen:.4f}s (结果: {result_gen})")
print(f" 时间差异: {t_list / t_gen:.2f}x")


if __name__ == '__main__':
print("=" * 50)
print("Python 数据流优化")
print("=" * 50)

compare_memory()
print()
demo_pipeline()
print()
demo_itertools()
print()
lazy_evaluation_demo()
print()
demo_iterator_chain()
print()
stream_log_analysis()
print()
benchmark_streaming()

# 清理测试文件
import os
for f in ['_test_pipeline.txt']:
if os.path.exists(f):
os.remove(f)

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询