别再手动合并TS文件了!Python+Flask实现m3u8视频流自动下载、合并并直传Cloudflare R2
2026/5/31 4:44:04 网站建设 项目流程

Python+Flask自动化处理m3u8视频流:从下载到合并再到Cloudflare R2存储的一站式解决方案

每次下载m3u8视频后面对一堆零散的.ts文件,你是否也感到头疼?手动合并不仅耗时耗力,还容易出错。本文将带你用Python+Flask构建一个自动化系统,实现从m3u8视频流下载、内存合并到直接上传Cloudflare R2存储的全流程解决方案。

1. 系统架构设计

我们的目标是一个完整的自动化处理流水线,主要包含三个核心模块:

  1. m3u8解析与下载模块:负责解析m3u8索引文件并下载所有.ts片段
  2. 视频合并模块:在内存或临时目录中自动合并.ts文件为单个MP4
  3. Cloudflare R2上传模块:将最终视频文件直接上传到R2存储

技术栈选择

  • Python 3.8+
  • Flask 2.0+ (轻量级Web框架)
  • requests (HTTP请求)
  • ffmpeg-python (视频处理)
  • boto3 (AWS SDK,兼容R2)

2. 环境准备与依赖安装

首先确保系统已安装FFmpeg,这是视频处理的核心工具:

# Ubuntu/Debian sudo apt-get install ffmpeg # macOS brew install ffmpeg # Windows choco install ffmpeg

然后安装Python依赖:

pip install flask requests ffmpeg-python boto3 python-dotenv

提示:建议使用虚拟环境隔离项目依赖,避免版本冲突

3. 核心功能实现

3.1 m3u8解析与下载

我们首先实现m3u8索引文件的解析和.ts片段的并行下载:

import concurrent.futures import requests from urllib.parse import urljoin def download_ts_segment(ts_url, output_dir, headers=None): try: response = requests.get(ts_url, headers=headers, stream=True) filename = ts_url.split('/')[-1] filepath = f"{output_dir}/{filename}" with open(filepath, 'wb') as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) return filepath except Exception as e: print(f"下载失败 {ts_url}: {str(e)}") return None def download_m3u8_playlist(m3u8_url, output_dir, max_workers=5): response = requests.get(m3u8_url) response.raise_for_status() ts_urls = [line for line in response.text.split('\n') if line and not line.startswith('#') and line.endswith('.ts')] base_url = '/'.join(m3u8_url.split('/')[:-1]) + '/' absolute_ts_urls = [urljoin(base_url, ts) for ts in ts_urls] os.makedirs(output_dir, exist_ok=True) with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: futures = [executor.submit(download_ts_segment, url, output_dir) for url in absolute_ts_urls] for future in concurrent.futures.as_completed(futures): result = future.result() if result: print(f"成功下载: {result}")

3.2 内存中的TS文件合并

传统方法需要先将所有.ts文件下载到磁盘再合并,我们改进为在内存中处理:

import ffmpeg import tempfile import os def merge_ts_in_memory(ts_file_paths, output_filename): """ 在内存中合并TS文件 :param ts_file_paths: TS文件路径列表 :param output_filename: 输出文件名 :return: 合并后的文件路径 """ with tempfile.NamedTemporaryFile(suffix='.ts', delete=False) as temp_merged: temp_merged_path = temp_merged.name try: # 使用ffmpeg合并TS文件 ( ffmpeg .input('concat:' + '|'.join(ts_file_paths), format='concat', safe=0) .output(temp_merged_path, c='copy') .run(quiet=True) ) # 转换为MP4格式 output_path = output_filename if output_filename.endswith('.mp4') else f"{output_filename}.mp4" ( ffmpeg .input(temp_merged_path) .output(output_path, format='mp4', movflags='faststart') .run(quiet=True) ) return output_path finally: if os.path.exists(temp_merged_path): os.unlink(temp_merged_path)

3.3 Cloudflare R2上传集成

配置Cloudflare R2存储并实现上传功能:

import boto3 from botocore.exceptions import ClientError import os def init_r2_client(): return boto3.client( 's3', endpoint_url=os.getenv('R2_ENDPOINT'), aws_access_key_id=os.getenv('R2_ACCESS_KEY'), aws_secret_access_key=os.getenv('R2_SECRET_KEY'), region_name='auto' ) def upload_to_r2(file_path, bucket_name, object_name=None): """上传文件到Cloudflare R2存储""" if object_name is None: object_name = os.path.basename(file_path) s3_client = init_r2_client() try: response = s3_client.upload_file(file_path, bucket_name, object_name) return True except ClientError as e: print(f"上传失败: {e}") return False

4. Flask应用集成

将上述功能集成到Flask应用中,提供RESTful API:

from flask import Flask, request, jsonify import os from werkzeug.utils import secure_filename app = Flask(__name__) @app.route('/api/process_m3u8', methods=['POST']) def process_m3u8(): data = request.json m3u8_url = data.get('m3u8_url') output_dir = data.get('output_dir', 'temp_videos') bucket_name = data.get('bucket_name', 'my-video-bucket') if not m3u8_url: return jsonify({'error': 'm3u8_url is required'}), 400 try: # 1. 下载所有TS片段 ts_files = download_m3u8_playlist(m3u8_url, output_dir) # 2. 合并TS文件 output_filename = secure_filename(m3u8_url.split('/')[-1].replace('.m3u8', '.mp4')) merged_path = merge_ts_in_memory(ts_files, output_filename) # 3. 上传到R2 if upload_to_r2(merged_path, bucket_name): # 清理临时文件 for ts_file in ts_files: os.unlink(ts_file) os.unlink(merged_path) return jsonify({ 'status': 'success', 'message': 'Video processed and uploaded successfully', 'object_name': output_filename }) else: return jsonify({'error': 'Failed to upload to R2'}), 500 except Exception as e: return jsonify({'error': str(e)}), 500 if __name__ == '__main__': app.run(debug=True)

5. 高级优化与错误处理

5.1 断点续传实现

对于大视频文件,实现断点续传功能:

def download_with_resume(ts_url, output_path, headers=None): temp_path = output_path + '.temp' if os.path.exists(output_path): return output_path if os.path.exists(temp_path): downloaded_size = os.path.getsize(temp_path) headers = headers or {} headers['Range'] = f'bytes={downloaded_size}-' else: downloaded_size = 0 try: response = requests.get(ts_url, headers=headers, stream=True) if response.status_code == 206: # Partial Content mode = 'ab' elif response.status_code == 200: # Full Content mode = 'wb' else: raise Exception(f"Unexpected status code: {response.status_code}") with open(temp_path, mode) as f: for chunk in response.iter_content(chunk_size=8192): f.write(chunk) if downloaded_size == 0 or response.status_code == 200: os.rename(temp_path, output_path) return output_path except Exception as e: print(f"下载失败 {ts_url}: {str(e)}") return None

5.2 视频质量检查

合并后自动检查视频完整性:

def check_video_integrity(video_path): try: probe = ffmpeg.probe(video_path) duration = float(probe['format']['duration']) if duration > 0: return True return False except Exception as e: print(f"视频检查失败: {str(e)}") return False

5.3 性能优化技巧

并行处理优化

  1. 使用线程池控制并发下载数量
  2. 对TS文件按大小排序,先下载大文件
  3. 实现下载速度限制避免被封禁
from concurrent.futures import ThreadPoolExecutor, as_completed def optimized_download(ts_urls, output_dir, max_workers=3): # 先获取文件大小并排序 ts_sizes = [] for url in ts_urls: try: head = requests.head(url) size = int(head.headers.get('content-length', 0)) ts_sizes.append((url, size)) except: ts_sizes.append((url, 0)) # 按文件大小降序排序 ts_sizes.sort(key=lambda x: x[1], reverse=True) with ThreadPoolExecutor(max_workers=max_workers) as executor: futures = {executor.submit(download_ts_segment, url, output_dir): url for url, _ in ts_sizes} for future in as_completed(futures): url = futures[future] try: result = future.result() if result: print(f"下载完成: {url}") except Exception as e: print(f"下载失败 {url}: {str(e)}")

6. 安全与最佳实践

6.1 安全注意事项

  1. 输入验证:对所有输入的URL进行严格验证
  2. 临时文件清理:确保处理完成后删除所有临时文件
  3. 错误隔离:单个TS文件下载失败不应中断整个流程
import re from urllib.parse import urlparse def validate_m3u8_url(url): """验证m3u8 URL是否合法""" parsed = urlparse(url) if not parsed.scheme in ('http', 'https'): raise ValueError("只支持HTTP/HTTPS协议") if not re.match(r'^https?://[^/]+/.+\.m3u8(?:\?.*)?$', url): raise ValueError("无效的m3u8 URL格式") return True

6.2 配置管理

使用环境变量管理敏感信息:

# .env 文件示例 R2_ENDPOINT=https://your-account-id.r2.cloudflarestorage.com R2_ACCESS_KEY=your-access-key R2_SECRET_KEY=your-secret-key MAX_WORKERS=5 TEMP_DIR=./temp_videos

在Python中加载配置:

from dotenv import load_dotenv import os load_dotenv() MAX_WORKERS = int(os.getenv('MAX_WORKERS', 3)) TEMP_DIR = os.getenv('TEMP_DIR', './temp_videos')

7. 部署与扩展

7.1 Docker容器化部署

创建Dockerfile实现一键部署:

FROM python:3.9-slim WORKDIR /app RUN apt-get update && apt-get install -y \ ffmpeg \ && rm -rf /var/lib/apt/lists/* COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . ENV FLASK_APP=app.py ENV FLASK_ENV=production CMD ["gunicorn", "--bind", "0.0.0.0:5000", "app:app"]

7.2 性能监控

集成Prometheus监控指标:

from prometheus_client import start_http_server, Counter, Gauge # 定义指标 VIDEOS_PROCESSED = Counter('videos_processed_total', 'Total videos processed') PROCESSING_TIME = Gauge('video_processing_seconds', 'Time spent processing video') FAILED_DOWNLOADS = Counter('ts_download_failures_total', 'Total TS download failures') @app.route('/api/process_m3u8', methods=['POST']) def process_m3u8(): start_time = time.time() VIDEOS_PROCESSED.inc() try: # ...原有处理逻辑... processing_time = time.time() - start_time PROCESSING_TIME.set(processing_time) return jsonify({'status': 'success'}) except Exception as e: FAILED_DOWNLOADS.inc() return jsonify({'error': str(e)}), 500 if __name__ == '__main__': start_http_server(8000) app.run(host='0.0.0.0', port=5000)

7.3 扩展思路

  1. 分布式处理:使用Celery将任务分发到多个worker节点
  2. 进度跟踪:实现WebSocket实时报告处理进度
  3. 格式转换:扩展支持更多输出格式如WebM、AV1编码
  4. CDN集成:上传后自动刷新CDN缓存
from celery import Celery celery = Celery('tasks', broker='redis://localhost:6379/0') @celery.task(bind=True) def process_m3u8_task(self, m3u8_url, output_dir, bucket_name): # 实现异步处理逻辑 pass

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询