人工智能训练师认证教程(3)Pandas数据世界的军刀
2026/6/6 20:26:19 网站建设 项目流程

目录

一、高级数据加载与预处理

1.1 高效读取大文件

1.2 处理缺失值的进阶技巧

二、高效数据转换与计算

2.1 向量化操作与性能优化

2.2 高级分组与聚合

三、时间序列数据处理

3.1 高级时间序列操作


一、高级数据加载与预处理

1.1 高效读取大文件

python

import pandas as pd import numpy as np from pathlib import Path # 分块读取大文件 def read_large_file_in_chunks(filepath, chunk_size=10000): """分块读取大文件,节省内存""" chunks = [] for chunk in pd.read_csv(filepath, chunksize=chunk_size, low_memory=False): # 在读取时进行初步处理 chunk = chunk.dropna(subset=['important_column']) chunks.append(chunk) return pd.concat(chunks, ignore_index=True) # 指定数据类型以减少内存使用 dtype_mapping = { 'user_id': 'int32', 'price': 'float32', 'category': 'category', 'date': 'str' } df = pd.read_csv('large_data.csv', dtype=dtype_mapping, parse_dates=['date'])

1.2 处理缺失值的进阶技巧

python

# 创建示例数据 df = pd.DataFrame({ 'A': [1, 2, np.nan, 4, 5], 'B': [np.nan, 2, 3, np.nan, 5], 'C': [1, np.nan, np.nan, np.nan, 5], 'D': ['a', 'b', None, 'd', 'e'] }) # 1. 基于统计的填充 df['A_filled'] = df['A'].fillna(df['A'].mean()) df['B_filled'] = df['B'].fillna(df['B'].median()) # 2. 前向/后向填充(时间序列) df['C_ffill'] = df['C'].ffill() # 前向填充 df['C_bfill'] = df['C'].bfill() # 后向填充 # 3. 插值法填充 df['A_interpolated'] = df['A'].interpolate(method='linear') # 4. 使用模型预测填充(简单示例) from sklearn.ensemble import RandomForestRegressor def model_based_imputation(df, target_col): """使用随机森林预测缺失值""" # 分离有值和无值的数据 df_train = df[df[target_col].notna()] df_missing = df[df[target_col].isna()] if len(df_missing) == 0: return df # 准备特征(排除目标列和其他高缺失率列) features = [col for col in df.columns if col != target_col and df[col].notna().sum() > len(df)*0.7] X_train = df_train[features] y_train = df_train[target_col] X_missing = df_missing[features] # 训练模型并预测 model = RandomForestRegressor(n_estimators=100, random_state=42) model.fit(X_train, y_train) predictions = model.predict(X_missing) # 填充预测值 df.loc[df[target_col].isna(), target_col] = predictions return df # 5. 创建缺失值指示器 df['A_is_missing'] = df['A'].isna().astype(int)

二、高效数据转换与计算

2.1 向量化操作与性能优化

python

import pandas as pd import numpy as np from numba import jit import swifter # pip install swifter # 创建测试数据 np.random.seed(42) df = pd.DataFrame({ 'x': np.random.randn(1000000), 'y': np.random.randn(1000000), 'category': np.random.choice(['A', 'B', 'C'], 1000000) }) # 1. 避免循环,使用向量化操作 # ❌ 慢:使用循环 def slow_calculation(df): result = [] for i in range(len(df)): result.append(df['x'].iloc[i] ** 2 + df['y'].iloc[i] ** 2) return result # ✅ 快:使用向量化 df['vectorized'] = df['x'] ** 2 + df['y'] ** 2 # 2. 使用Numba加速(针对复杂计算) @jit(nopython=True) def numba_calc(x, y): n = len(x) result = np.zeros(n) for i in range(n): result[i] = np.sqrt(x[i]**2 + y[i]**2) return result df['numba_calc'] = numba_calc(df['x'].values, df['y'].values) # 3. 使用swifter自动选择最佳计算方式 df['swifter_calc'] = df.swifter.apply(lambda row: row['x'] * row['y'], axis=1) # 4. 内存优化技巧 def optimize_memory(df): """优化DataFrame内存使用""" start_mem = df.memory_usage().sum() / 1024**2 for col in df.columns: col_type = df[col].dtype if col_type != object: c_min = df[col].min() c_max = df[col].max() if str(col_type)[:3] == 'int': if c_min > np.iinfo(np.int8).min and c_max < np.iinfo(np.int8).max: df[col] = df[col].astype(np.int8) elif c_min > np.iinfo(np.int16).min and c_max < np.iinfo(np.int16).max: df[col] = df[col].astype(np.int16) elif c_min > np.iinfo(np.int32).min and c_max < np.iinfo(np.int32).max: df[col] = df[col].astype(np.int32) else: df[col] = df[col].astype(np.int64) else: if c_min > np.finfo(np.float16).min and c_max < np.finfo(np.float16).max: df[col] = df[col].astype(np.float16) elif c_min > np.finfo(np.float32).min and c_max < np.finfo(np.float32).max: df[col] = df[col].astype(np.float32) else: df[col] = df[col].astype(np.float64) else: # 对象类型转换为分类数据 unique_count = df[col].nunique() total_count = len(df[col]) if unique_count / total_count < 0.5: df[col] = df[col].astype('category') end_mem = df.memory_usage().sum() / 1024**2 print(f'内存使用减少: {100 * (start_mem - end_mem) / start_mem:.1f}%') return df df = optimize_memory(df)

2.2 高级分组与聚合

python

# 创建示例数据 sales_data = pd.DataFrame({ 'date': pd.date_range('2023-01-01', '2023-12-31', freq='D'), 'region': np.random.choice(['North', 'South', 'East', 'West'], 365), 'product': np.random.choice(['A', 'B', 'C', 'D'], 365), 'sales': np.random.randint(100, 1000, 365), 'customers': np.random.randint(10, 100, 365) }) # 1. 多级分组聚合 grouped = sales_data.groupby(['region', 'product']).agg({ 'sales': ['sum', 'mean', 'std', 'count'], 'customers': ['sum', lambda x: x.quantile(0.8)] # 自定义聚合 }) # 重命名列 grouped.columns = ['_'.join(col).strip() for col in grouped.columns.values] # 2. 使用transform进行组内标准化 sales_data['sales_normalized'] = sales_data.groupby('region')['sales'].transform( lambda x: (x - x.mean()) / x.std() ) # 3. 使用filter筛选分组 high_sales_regions = sales_data.groupby('region').filter( lambda x: x['sales'].sum() > 100000 ) # 4. 分组应用复杂函数 def calculate_metrics(group): """计算分组的多维度指标""" result = pd.Series({ 'total_sales': group['sales'].sum(), 'avg_sale_per_customer': group['sales'].sum() / group['customers'].sum(), 'peak_day': group.loc[group['sales'].idxmax(), 'date'], 'sales_growth': (group['sales'].iloc[-1] - group['sales'].iloc[0]) / group['sales'].iloc[0] if len(group) > 1 else 0, 'unique_products': group['product'].nunique() }) return result region_metrics = sales_data.groupby('region').apply(calculate_metrics) # 5. 滚动窗口分组计算 sales_data['rolling_7d_sales'] = sales_data.groupby('region')['sales'].transform( lambda x: x.rolling(window=7, min_periods=1).mean() )

三、时间序列数据处理

3.1 高级时间序列操作

python

# 创建时间序列数据 date_rng = pd.date_range('2023-01-01', '2023-12-31', freq='H') ts_data = pd.DataFrame(date_rng, columns=['timestamp']) ts_data['value'] = np.random.randn(len(date_rng)) * 10 + 50 ts_data['category'] = np.random.choice(['A', 'B', 'C'], len(date_rng)) # 设置为索引 ts_data = ts_data.set_index('timestamp') # 1. 重采样与降采样 # 按天重采样,计算每天的平均值 daily_data = ts_data['value'].resample('D').agg(['mean', 'min', 'max', 'std']) # 按周重采样,计算每周总和 weekly_data = ts_data['value'].resample('W-MON').sum() # 2. 滚动窗口统计 ts_data['7d_rolling_mean'] = ts_data['value'].rolling(window=7*24).mean() # 7天滚动平均 ts_data['24h_rolling_std'] = ts_data['value'].rolling(window=24).std() # 24小时滚动标准差 # 3. 扩展窗口(累计)统计 ts_data['expanding_mean'] = ts_data['value'].expanding().mean() ts_data['expanding_max'] = ts_data['value'].expanding().max() # 4. 时间序列分解(趋势、季节性、残差) from statsmodels.tsa.seasonal import seasonal_decompose # 需要按天或按月的数据 daily_series = ts_data['value'].resample('D').mean() decomposition = seasonal_decompose(daily_series.dropna(), model='additive', period=30) # 5. 时间序列特征工程 def create_time_features(df, datetime_index): """创建时间相关特征""" df = df.copy() df['hour'] = datetime_index.hour df['dayofweek'] = datetime_index.dayofweek df['quarter'] = datetime_index.quarter df['month'] = datetime_index.month df['year'] = datetime_index.year df['dayofyear'] = datetime_index.dayofyear df['weekofyear'] = datetime_index.isocalendar().week # 是否是周末 df['is_weekend'] = datetime_index.dayofweek >= 5 # 时间周期特征 df['sin_hour'] = np.sin(2 * np.pi * df['hour']/24) df['cos_hour'] = np.cos(2 * np.pi * df['hour']/24) return df ts_data = create_time_features(ts_data, ts_data.index) # 6. 滞后特征(lag features) for lag in [1, 2, 3, 7, 30]: ts_data[f'lag_{lag}_hour'] = ts_data['value'].shift(lag) # 7. 滑动窗口统计特征 ts_data['rolling_mean_6h'] = ts_data['value'].rolling('6h').mean() ts_data['rolling_std_12h'] = ts_data['value'].rolling('12h').std() ts_data['rolling_max_24h'] = ts_data['value'].rolling('24h').max()

最佳实践:

1、需要根据数据规模和使用对象选择合适的写出格式,小规模、通用性交换可选 CSV,大规模分析型数据应使用 Parquet,而涉及人工核对时才使用 Excel。

2、在写出前应明确数据的使用场景,并对关键字段、异常值和重复记录进行必要检查,避免将问题数据固化到生产系统中。同时,要特别注意索引的持久化、时间索引的时区信息保留以及缺失值的统一表示方式,确保数据在不同环境和系统中读取结果一致。

3、通过合理的命名、版本控制和元信息记录,保证数据结果可追溯、可复现,从而满足生产环境对数据“可上线、可维护、可回滚”的基本要求。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询