Python异步IO:asyncio深度解析
2026/5/29 0:08:58 网站建设 项目流程

Python异步IO:asyncio深度解析

引言

异步编程是现代后端开发的核心技能,它能够显著提升应用程序的性能和并发处理能力。作为一名从Python转向Rust的后端开发者,我在实践中总结了异步编程的最佳实践。本文将深入探讨Python的asyncio库,帮助你掌握异步编程的核心技术。

一、异步编程基础概念

1.1 同步与异步

特性同步异步
执行方式阻塞等待非阻塞继续
资源利用
编程复杂度简单较高
适用场景CPU密集型IO密集型

1.2 协程概念

协程是一种轻量级的并发编程方式,允许在单个线程中实现多个任务的并发执行。

1.3 asyncio核心组件

  • Event Loop:事件循环,负责调度协程
  • Coroutine:协程,可暂停的函数
  • Task:任务,封装协程的对象
  • Future:未来对象,表示异步操作的结果

二、asyncio入门

2.1 基本协程定义

import asyncio async def hello(): print("Hello") await asyncio.sleep(1) print("World") async def main(): await hello() asyncio.run(main())

2.2 协程调度

async def task1(): for i in range(3): print(f"Task 1: {i}") await asyncio.sleep(0.5) async def task2(): for i in range(3): print(f"Task 2: {i}") await asyncio.sleep(0.7) async def main(): await asyncio.gather(task1(), task2()) asyncio.run(main())

2.3 创建Task

async def coro(): await asyncio.sleep(1) return 42 async def main(): task = asyncio.create_task(coro()) result = await task print(f"Result: {result}") asyncio.run(main())

三、并发模式

3.1 并行执行多个任务

async def fetch_data(url): print(f"Fetching {url}") await asyncio.sleep(1) return f"Data from {url}" async def main(): urls = ["https://example.com", "https://google.com", "https://github.com"] tasks = [asyncio.create_task(fetch_data(url)) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())

3.2 带超时的任务

async def long_running_task(): await asyncio.sleep(5) return "Done" async def main(): try: result = await asyncio.wait_for(long_running_task(), timeout=2) print(result) except asyncio.TimeoutError: print("Task timed out") asyncio.run(main())

3.3 任务取消

async def task(): try: while True: print("Running...") await asyncio.sleep(0.5) except asyncio.CancelledError: print("Task cancelled") raise async def main(): t = asyncio.create_task(task()) await asyncio.sleep(2) t.cancel() try: await t except asyncio.CancelledError: print("Main caught cancelled error") asyncio.run(main())

四、异步IO操作

4.1 文件操作

async def read_file(filepath): async with asyncio.timeout(10): with open(filepath, 'r') as f: return f.read() async def write_file(filepath, content): async with asyncio.timeout(10): with open(filepath, 'w') as f: f.write(content)

4.2 网络请求

import aiohttp async def fetch(session, url): async with session.get(url) as response: return await response.text() async def main(): async with aiohttp.ClientSession() as session: html = await fetch(session, 'https://example.com') print(html[:100]) asyncio.run(main())

4.3 数据库操作

import asyncpg async def query_database(): conn = await asyncpg.connect(user='user', password='pass', database='db', host='localhost') values = await conn.fetch('SELECT * FROM users WHERE id = $1', 1) await conn.close() return values asyncio.run(query_database())

五、同步代码与异步代码互操作

5.1 同步转异步

import time def blocking_task(): time.sleep(2) return "Blocking result" async def async_wrapper(): loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, blocking_task) return result async def main(): result = await async_wrapper() print(result) asyncio.run(main())

5.2 异步转同步

async def async_task(): await asyncio.sleep(1) return "Async result" def sync_wrapper(): return asyncio.run(async_task()) result = sync_wrapper() print(result)

5.3 使用线程池

async def cpu_bound_task(data): loop = asyncio.get_event_loop() def process_data(data): # CPU密集型操作 return sum(data) result = await loop.run_in_executor(None, process_data, data) return result

六、异步上下文管理器

6.1 自定义异步上下文管理器

class AsyncResource: def __init__(self, name): self.name = name async def __aenter__(self): print(f"Entering {self.name}") await asyncio.sleep(0.5) return self async def __aexit__(self, exc_type, exc_val, exc_tb): print(f"Exiting {self.name}") await asyncio.sleep(0.5) async def main(): async with AsyncResource("database") as resource: print(f"Using {resource.name}") asyncio.run(main())

6.2 异步迭代器

class AsyncDataStream: def __init__(self, max_items): self.max_items = max_items self.current = 0 def __aiter__(self): return self async def __anext__(self): if self.current >= self.max_items: raise StopAsyncIteration await asyncio.sleep(0.3) self.current += 1 return self.current async def main(): async for item in AsyncDataStream(5): print(f"Item: {item}") asyncio.run(main())

七、异步编程模式

7.1 Producer-Consumer模式

async def producer(queue): for i in range(5): await asyncio.sleep(0.5) await queue.put(i) print(f"Produced: {i}") async def consumer(queue): while True: item = await queue.get() print(f"Consumed: {item}") queue.task_done() async def main(): queue = asyncio.Queue(maxsize=2) producer_task = asyncio.create_task(producer(queue)) consumer_task = asyncio.create_task(consumer(queue)) await producer_task await queue.join() consumer_task.cancel() await consumer_task asyncio.run(main())

7.2 异步锁

async def update_counter(lock, counter): async with lock: current = counter['value'] await asyncio.sleep(0.1) counter['value'] = current + 1 async def main(): lock = asyncio.Lock() counter = {'value': 0} tasks = [update_counter(lock, counter) for _ in range(10)] await asyncio.gather(*tasks) print(f"Final counter: {counter['value']}") asyncio.run(main())

7.3 信号量控制并发

async def fetch_url(semaphore, url): async with semaphore: print(f"Fetching {url}") await asyncio.sleep(1) return f"Done: {url}" async def main(): semaphore = asyncio.Semaphore(3) urls = [f"https://example.com/{i}" for i in range(10)] tasks = [fetch_url(semaphore, url) for url in urls] results = await asyncio.gather(*tasks) for result in results: print(result) asyncio.run(main())

八、实战案例:异步Web爬虫

import asyncio import aiohttp from bs4 import BeautifulSoup async def fetch_page(session, url): try: async with session.get(url, timeout=10) as response: if response.status == 200: return await response.text() return None except Exception as e: print(f"Error fetching {url}: {e}") return None async def parse_page(html): if not html: return [] soup = BeautifulSoup(html, 'html.parser') links = [] for a_tag in soup.find_all('a', href=True): href = a_tag['href'] if href.startswith('http'): links.append(href) return links async def crawl(url, max_depth=3, visited=None, semaphore=None): if visited is None: visited = set() if semaphore is None: semaphore = asyncio.Semaphore(5) if url in visited or max_depth <= 0: return [] visited.add(url) print(f"Crawling: {url} (depth: {max_depth})") async with semaphore: html = await fetch_page(session, url) links = await parse_page(html) tasks = [] for link in links[:10]: if link not in visited: tasks.append(crawl(link, max_depth - 1, visited, semaphore)) results = await asyncio.gather(*tasks) all_links = [url] for result in results: all_links.extend(result) return all_links async def main(): global session async with aiohttp.ClientSession() as session: result = await crawl("https://example.com", max_depth=2) print(f"Total links crawled: {len(result)}") if __name__ == "__main__": asyncio.run(main())

总结

异步编程是构建高性能后端系统的关键技术。通过本文的学习,你应该掌握了以下核心要点:

  1. 异步基础:同步与异步的区别、协程概念
  2. asyncio核心:Event Loop、Coroutine、Task、Future
  3. 并发模式:并行执行、超时控制、任务取消
  4. 异步IO:文件操作、网络请求、数据库操作
  5. 同步异步互操作:同步转异步、异步转同步、线程池
  6. 异步上下文管理器:异步with、异步迭代器
  7. 异步模式:Producer-Consumer、异步锁、信号量
  8. 实战案例:异步Web爬虫

作为从Python转向Rust的后端开发者,掌握异步编程对于构建高并发系统至关重要。后续文章将深入探讨Rust中的异步运行时Tokio。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询