Python高阶编程:异步编程
学习目标
- 理解异步编程的概念和优势
- 掌握async/await语法
- 学会创建和使用协程
- 理解事件循环的工作原理
- 掌握异步I/O操作
- 学会使用异步上下文管理器
- 掌握异步编程的最佳实践
1. 异步编程基础
1.1 什么是异步编程
异步编程是一种编程范式,允许程序在等待某些操作(通常是I/O操作)完成时继续执行其他任务,而不是阻塞等待。
1.2 同步 vs 异步
import time
import asyncio
import aiohttp
# 同步版本
def sync_fetch_data(url, delay):
"""同步获取数据"""
print(f"Sync: Fetching from {url}")
time.sleep(delay) # 模拟网络延迟
print(f"Sync: Finished fetching from {url}")
return f"Data from {url}"
def sync_main():
"""同步主函数"""
start_time = time.time()
urls = ["url1", "url2", "url3"]
delays = [1, 2, 1]
results = []
for url, delay in zip(urls, delays):
result = sync_fetch_data(url, delay)
results.append(result)
end_time = time.time()
print(f"Sync total time: {end_time - start_time:.2f}s")
return results
# 异步版本
async def async_fetch_data(url, delay):
"""异步获取数据"""
print(f"Async: Fetching from {url}")
await asyncio.sleep(delay) # 异步等待
print(f"Async: Finished fetching from {url}")
return f"Data from {url}"
async def async_main():
"""异步主函数"""
start_time = time.time()
urls = ["url1", "url2", "url3"]
delays = [1, 2, 1]
# 创建任务
tasks = []
for url, delay in zip(urls, delays):
task = async_fetch_data(url, delay)
tasks.append(task)
# 并发执行
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"Async total time: {end_time - start_time:.2f}s")
return results
# 运行对比
print("=== Synchronous vs Asynchronous ===")
print("Running synchronous version:")
sync_results = sync_main()
print("nRunning asynchronous version:")
async_results = asyncio.run(async_main())
print(f"nResults equal: {sync_results == async_results}")
2. 协程基础
2.1 创建和运行协程
import asyncio
# 基本协程
async def hello_world():
"""最简单的协程"""
print("Hello")
await asyncio.sleep(1)
print("World")
# 运行协程
asyncio.run(hello_world())
# 协程也是对象
async def simple_coroutine():
return 42
# 创建协程对象
coro = simple_coroutine()
print(f"Coroutine object: {coro}")
print(f"Type: {type(coro)}")
# 运行协程
result = asyncio.run(coro)
print(f"Result: {result}")
# 多个协程
async def greet(name, delay):
"""带参数的协程"""
print(f"Hello, {name}!")
await asyncio.sleep(delay)
print(f"Goodbye, {name}!")
return f"Greeted {name}"
async def main():
"""主协程"""
# 顺序执行
print("=== Sequential Execution ===")
result1 = await greet("Alice", 1)
result2 = await greet("Bob", 1)
print(f"Results: {result1}, {result2}")
# 并发执行
print("n=== Concurrent Execution ===")
task1 = greet("Charlie", 1)
task2 = greet("David", 1)
results = await asyncio.gather(task1, task2)
print(f"Results: {results}")
asyncio.run(main())
2.2 任务(Task)和Future
import asyncio
import time
async def slow_operation(name, delay):
"""慢操作协程"""
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} completed after {delay}s")
return f"Result of {name}"
async def task_demo():
"""任务演示"""
print("=== Task Demo ===")
# 创建任务
task1 = asyncio.create_task(slow_operation("Task 1", 2))
task2 = asyncio.create_task(slow_operation("Task 2", 1))
print(f"Task 1 done: {task1.done()}")
print(f"Task 2 done: {task2.done()}")
# 等待任务完成
result1 = await task1
result2 = await task2
print(f"Task 1 done: {task1.done()}")
print(f"Task 2 done: {task2.done()}")
print(f"Results: {result1}, {result2}")
async def task_cancellation():
"""任务取消演示"""
print("n=== Task Cancellation ===")
async def long_task():
try:
for i in range(10):
await asyncio.sleep(1)
print(f"Long task step {i}")
return "Completed"
except asyncio.CancelledError:
print("Long task was cancelled")
raise
# 创建长时间运行的任务
task = asyncio.create_task(long_task())
# 等待一段时间后取消
await asyncio.sleep(3)
task.cancel()
try:
result = await task
print(f"Task result: {result}")
except asyncio.CancelledError:
print("Task was cancelled")
async def task_timeout():
"""任务超时演示"""
print("n=== Task Timeout ===")
async def very_slow_task():
await asyncio.sleep(10)
return "Very slow result"
try:
# 设置超时
result = await asyncio.wait_for(very_slow_task(), timeout=2)
print(f"Task completed: {result}")
except asyncio.TimeoutError:
print("Task timed out")
# 自定义Future
async def custom_future_demo():
"""自定义Future演示"""
print("n=== Custom Future Demo ===")
# 创建Future
future = asyncio.Future()
# 设置结果
future.set_result("Custom future result")
# 获取结果
result = await future
print(f"Future result: {result}")
# 创建已完成的Future
completed_future = asyncio.create_task(asyncio.sleep(0))
await completed_future
print(f"Completed future: {completed_future.done()}")
# 运行所有演示
async def run_all_demos():
await task_demo()
await task_cancellation()
await task_timeout()
await custom_future_demo()
asyncio.run(run_all_demos())
2.3 并发执行模式
import asyncio
import time
async def fetch_data(source, delay):
"""模拟获取数据"""
print(f"Fetching from {source}...")
await asyncio.sleep(delay)
print(f"Finished fetching from {source}")
return f"Data from {source}"
async def gather_demo():
"""gather演示"""
print("=== asyncio.gather Demo ===")
# 并发执行多个协程
results = await asyncio.gather(
fetch_data("API 1", 1),
fetch_data("API 2", 2),
fetch_data("API 3", 1),
return_exceptions=True # 返回异常而不是抛出
)
print(f"All results: {results}")
async def wait_demo():
"""wait演示"""
print("n=== asyncio.wait Demo ===")
tasks = [
asyncio.create_task(fetch_data("Source A", 1)),
asyncio.create_task(fetch_data("Source B", 2)),
asyncio.create_task(fetch_data("Source C", 3))
]
# 等待所有任务完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
print(f"Completed: {len(done)}")
print(f"Pending: {len(pending)}")
for task in done:
try:
result = task.result()
print(f"Result: {result}")
except Exception as e:
print(f"Error: {e}")
async def wait_for_any():
"""等待任意任务完成"""
print("n=== wait with FIRST_COMPLETED ===")
tasks = [
asyncio.create_task(fetch_data("Fast", 1)),
asyncio.create_task(fetch_data("Medium", 2)),
asyncio.create_task(fetch_data("Slow", 3))
]
# 等待任意一个任务完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"First completed: {done.pop().result()}")
print(f"Still running: {len(pending)}")
# 取消剩余任务
for task in pending:
task.cancel()
async def semaphore_demo():
"""信号量演示"""
print("n=== Semaphore Demo ===")
semaphore = asyncio.Semaphore(2) # 最多2个并发
async def limited_fetch(source, delay):
async with semaphore:
print(f"Fetching {source} (semaphore acquired)")
await asyncio.sleep(delay)
print(f"Finished {source} (semaphore released)")
return f"Data from {source}"
tasks = [
limited_fetch(f"Resource {i}", 1) for i in range(5)
]
results = await asyncio.gather(*tasks)
print(f"All results: {results}")
# 运行演示
asyncio.run(gather_demo())
asyncio.run(wait_demo())
asyncio.run(wait_for_any())
asyncio.run(semaphore_demo())
3. 异步I/O操作
3.1 异步文件操作
import asyncio
import aiofiles
import os
async def read_file_async(filename):
"""异步读取文件"""
try:
async with aiofiles.open(filename, 'r') as file:
content = await file.read()
print(f"Read {len(content)} characters from {filename}")
return content
except FileNotFoundError:
print(f"File {filename} not found")
return None
async def write_file_async(filename, content):
"""异步写入文件"""
async with aiofiles.open(filename, 'w') as file:
await file.write(content)
print(f"Written {len(content)} characters to {filename}")
async def process_large_file(filename, chunk_size=1024):
"""异步处理大文件"""
async with aiofiles.open(filename, 'rb') as file:
chunk_num = 0
while True:
chunk = await file.read(chunk_size)
if not chunk:
break
chunk_num += 1
# 模拟处理
await asyncio.sleep(0.01)
print(f"Processed chunk {chunk_num} ({len(chunk)} bytes)")
print(f"Finished processing {chunk_num} chunks")
async def file_operations_demo():
"""文件操作演示"""
print("=== Async File Operations ===")
# 创建测试文件
test_files = ['test1.txt', 'test2.txt', 'test3.txt']
contents = ['Hello World', 'Python Async', 'File Operations']
# 并行写入文件
write_tasks = [
write_file_async(filename, content)
for filename, content in zip(test_files, contents)
]
await asyncio.gather(*write_tasks)
# 并行读取文件
read_tasks = [
read_file_async(filename)
for filename in test_files
]
results = await asyncio.gather(*read_tasks)
for filename, content in zip(test_files, results):
if content:
print(f"{filename}: {content[:20]}...")
# 创建大文件并处理
large_file = 'large_file.txt'
large_content = "x" * 10000
await write_file_async(large_file, large_content)
await process_large_file(large_file, chunk_size=1000)
# 清理
for filename in test_files + [large_file]:
if os.path.exists(filename):
os.remove(filename)
print(f"Removed {filename}")
# 运行文件操作演示
asyncio.run(file_operations_demo())
3.2 异步网络请求
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""异步获取URL内容"""
try:
async with session.get(url) as response:
content = await response.text()
print(f"Fetched {url}: {len(content)} characters")
return url, len(content)
except Exception as e:
print(f"Error fetching {url}: {e}")
return url, 0
async def fetch_multiple_urls(urls):
"""并发获取多个URL"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def fetch_with_timeout(session, url, timeout=5):
"""带超时的异步获取"""
try:
async with asyncio.timeout(timeout):
async with session.get(url) as response:
content = await response.text()
return url, response.status, len(content)
except asyncio.TimeoutError:
print(f"Timeout for {url}")
return url, "timeout", 0
except Exception as e:
print(f"Error for {url}: {e}")
return url, "error", 0
async def fetch_with_semaphore(session, url, semaphore):
"""使用信号量限制的异步获取"""
async with semaphore:
async with session.get(url) as response:
content = await response.text()
await asyncio.sleep(0.1) # 模拟处理时间
return url, len(content)
async def network_demo():
"""网络请求演示"""
print("=== Async Network Requests ===")
# 测试URLs
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/3",
"https://httpbin.org/delay/1"
]
# 并发获取
start_time = time.time()
results = await fetch_multiple_urls(urls)
end_time = time.time()
print(f"nFetched {len(results)} URLs in {end_time - start_time:.2f}s")
for url, size in results:
print(f" {url}: {size} bytes")
# 带超时的获取
print("n--- With Timeout ---")
async with aiohttp.ClientSession() as session:
timeout_urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/10", # 这个会超时
"https://httpbin.org/delay/2"
]
start_time = time.time()
results = await asyncio.gather(
*[fetch_with_timeout(session, url, timeout=3) for url in timeout_urls]
)
end_time = time.time()
print(f"Timeout fetch completed in {end_time - start_time:.2f}s")
for url, status, size in results:
print(f" {url}: {status}, {size} bytes")
# 使用信号量限制并发
print("n--- With Semaphore (max 2 concurrent) ---")
async with aiohttp.ClientSession() as session:
semaphore = asyncio.Semaphore(2)
start_time = time.time()
results = await asyncio.gather(
*[fetch_with_semaphore(session, url, semaphore) for url in urls]
)
end_time = time.time()
print(f"Semaphore fetch completed in {end_time - start_time:.2f}s")
for url, size in results:
print(f" {url}: {size} bytes")
# 运行网络演示
asyncio.run(network_demo())
3.3 异步数据库操作
import asyncio
import asyncpg
import aiomysql
async def postgres_demo():
"""PostgreSQL异步操作演示"""
print("=== Async PostgreSQL ===")
try:
# 连接到PostgreSQL
conn = await asyncpg.connect(
host="localhost",
port=5432,
user="postgres",
password="password",
database="testdb"
)
# 创建表
await conn.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100)
)
""")
# 插入数据
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"Alice", "alice@example.com"
)
# 查询数据
rows = await conn.fetch("SELECT * FROM users")
print(f"Users: {rows}")
# 批量插入
users = [
("Bob", "bob@example.com"),
("Charlie", "charlie@example.com"),
("David", "david@example.com")
]
await conn.executemany(
"INSERT INTO users (name, email) VALUES ($1, $2)",
users
)
# 查询所有用户
all_users = await conn.fetch("SELECT * FROM users ORDER BY id")
print(f"All users ({len(all_users)}):")
for user in all_users:
print(f" {user['id']}: {user['name']} - {user['email']}")
# 关闭连接
await conn.close()
except Exception as e:
print(f"PostgreSQL error: {e}")
print("Note: This demo requires a running PostgreSQL server")
async def mysql_demo():
"""MySQL异步操作演示"""
print("n=== Async MySQL ===")
try:
# 连接到MySQL
conn = await aiomysql.connect(
host="localhost",
port=3306,
user="root",
password="password",
db="testdb"
)
async with conn.cursor() as cursor:
# 创建表
await cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100),
price DECIMAL(10, 2)
)
""")
# 插入数据
await cursor.execute(
"INSERT INTO products (name, price) VALUES (%s, %s)",
("Laptop", 999.99)
)
# 查询数据
await cursor.execute("SELECT * FROM products")
result = await cursor.fetchall()
print(f"Products: {result}")
# 批量插入
products = [
("Mouse", 29.99),
("Keyboard", 79.99),
("Monitor", 299.99)
]
await cursor.executemany(
"INSERT INTO products (name, price) VALUES (%s, %s)",
products
)
# 查询所有产品
await cursor.execute("SELECT * FROM products ORDER BY id")
all_products = await cursor.fetchall()
print(f"All products ({len(all_products)}):")
for product in all_products:
print(f" {product[0]}: {product[1]} - ${product[2]}")
# 关闭连接
conn.close()
except Exception as e:
print(f"MySQL error: {e}")
print("Note: This demo requires a running MySQL server")
# 模拟数据库操作(不需要真实数据库)
async def simulated_db_demo():
"""模拟数据库操作演示"""
print("n=== Simulated Database Operations ===")
class SimulatedDB:
def __init__(self):
self.data = {}
self.next_id = 1
async def execute(self, query, *params):
"""模拟执行查询"""
await asyncio.sleep(0.1) # 模拟网络延迟
if "CREATE TABLE" in query:
table_name = query.split("TABLE")[1].split()[0].strip("IF NOT EXISTS ")
self.data[table_name] = []
return f"Table {table_name} created"
elif "INSERT INTO" in query:
table_name = query.split("INTO")[1].split()[0].strip()
if table_name not in self.data:
self.data[table_name] = []
# 提取值
if "users" in table_name:
name, email = params
record = {
'id': self.next_id,
'name': name,
'email': email
}
else: # products
name, price = params
record = {
'id': self.next_id,
'name': name,
'price': float(price)
}
self.data[table_name].append(record)
self.next_id += 1
return f"Inserted 1 record"
elif "SELECT" in query:
table_name = query.split("FROM")[1].split()[0].strip()
if "ORDER BY" in query:
# 简单排序
key = query.split("ORDER BY")[1].split()[0].strip()
return sorted(self.data.get(table_name, []), key=lambda x: x[key])
return self.data.get(table_name, [])
# 使用模拟数据库
db = SimulatedDB()
# 创建表
await db.execute("""
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name VARCHAR(100),
email VARCHAR(100)
)
""")
# 插入数据
await db.execute(
"INSERT INTO users (name, email) VALUES (%s, %s)",
"Alice", "alice@example.com"
)
# 查询数据
users = await db.execute("SELECT * FROM users")
print(f"Users: {users}")
# 批量操作
tasks = []
for i in range(5):
tasks.append(db.execute(
"INSERT INTO users (name, email) VALUES (%s, %s)",
f"User{i}", f"user{i}@example.com"
))
await asyncio.gather(*tasks)
# 查询所有用户
all_users = await db.execute("SELECT * FROM users ORDER BY id")
print(f"All users ({len(all_users)}):")
for user in all_users:
print(f" {user['id']}: {user['name']} - {user['email']}")
# 运行数据库演示
async def db_demo_main():
await postgres_demo()
await mysql_demo()
await simulated_db_demo()
asyncio.run(db_demo_main())
4. 异步上下文管理器
4.1 创建异步上下文管理器
import asyncio
from contextlib import asynccontextmanager
class AsyncDatabaseConnection:
"""异步数据库连接"""
def __init__(self, db_name):
self.db_name = db_name
self.connected = False
async def connect(self):
"""连接数据库"""
print(f"Connecting to {self.db_name}...")
await asyncio.sleep(0.5) # 模拟连接时间
self.connected = True
print(f"Connected to {self.db_name}")
async def disconnect(self):
"""断开连接"""
if self.connected:
print(f"Disconnecting from {self.db_name}...")
await asyncio.sleep(0.2) # 模拟断开时间
self.connected = False
print(f"Disconnected from {self.db_name}")
async def execute(self, query):
"""执行查询"""
if not self.connected:
raise RuntimeError("Not connected to database")
print(f"Executing: {query}")
await asyncio.sleep(0.3) # 模拟查询时间
return f"Result of: {query}"
async def __aenter__(self):
"""异步上下文管理器入口"""
await self.connect()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""异步上下文管理器出口"""
await self.disconnect()
# 使用异步上下文管理器
async def use_async_context_manager():
"""使用异步上下文管理器"""
print("=== Async Context Manager ===")
async with AsyncDatabaseConnection("mydb") as db:
result1 = await db.execute("SELECT * FROM users")
result2 = await db.execute("INSERT INTO users VALUES (1, 'Alice')")
print(f"Results: {result1}, {result2}")
# 使用装饰器创建异步上下文管理器
@asynccontextmanager
async def async_file_manager(filename, mode):
"""异步文件管理器"""
print(f"Opening {filename}...")
file = await aiofiles.open(filename, mode)
try:
yield file
finally:
print(f"Closing {filename}...")
await file.close()
async def use_async_context_manager_decorator():
"""使用装饰器版本的异步上下文管理器"""
print("n=== Async Context Manager Decorator ===")
async with async_file_manager("test.txt", "w") as f:
await f.write("Hello, Async World!")
async with async_file_manager("test.txt", "r") as f:
content = await f.read()
print(f"Read: {content}")
# 清理
import os
if os.path.exists("test.txt"):
os.remove("test.txt")
# 嵌套异步上下文管理器
@asynccontextmanager
async def transaction_manager(db_name):
"""事务管理器"""
print(f"Starting transaction on {db_name}")
await asyncio.sleep(0.1)
try:
yield db_name
print(f"Committing transaction on {db_name}")
await asyncio.sleep(0.1)
except Exception as e:
print(f"Rolling back transaction on {db_name}: {e}")
await asyncio.sleep(0.1)
raise
async def nested_async_context_managers():
"""嵌套异步上下文管理器"""
print("n=== Nested Async Context Managers ===")
async with AsyncDatabaseConnection("main_db") as db:
async with transaction_manager("main_db") as tx:
result = await db.execute("UPDATE users SET active = true")
print(f"Transaction result: {result}")
# 模拟错误
# raise ValueError("Simulated error")
# 异步上下文管理器组合
class AsyncContextManagerGroup:
"""异步上下文管理器组合"""
def __init__(self, *managers):
self.managers = managers
self.entered = []
async def __aenter__(self):
"""进入所有上下文"""
for manager in self.managers:
entered = await manager.__aenter__()
self.entered.append((manager, entered))
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
"""退出所有上下文(逆序)"""
for manager, _ in reversed(self.entered):
await manager.__aexit__(exc_type, exc_val, exc_tb)
async def combined_async_context_managers():
"""组合异步上下文管理器"""
print("n=== Combined Async Context Managers ===")
db1 = AsyncDatabaseConnection("db1")
db2 = AsyncDatabaseConnection("db2")
async with AsyncContextManagerGroup(db1, db2) as group:
result1 = await db1.execute("SELECT 1")
result2 = await db2.execute("SELECT 2")
print(f"Results: {result1}, {result2}")
# 运行所有演示
async def run_all_context_manager_demos():
await use_async_context_manager()
await use_async_context_manager_decorator()
await nested_async_context_managers()
await combined_async_context_managers()
# 注意:需要安装aiofiles
# pip install aiofiles
# asyncio.run(run_all_context_manager_demos())
5. 高级异步模式
5.1 异步迭代器
import asyncio
class AsyncRange:
"""异步范围迭代器"""
def __init__(self, start, stop=None, step=1):
if stop is None:
start, stop = 0, start
self.start = start
self.stop = stop
self.step = step
self.current = start
def __aiter__(self):
return self
async def __anext__(self):
if (self.step > 0 and self.current >= self.stop) or
(self.step 0 and self.current self.stop):
raise StopAsyncIteration
value = self.current
self.current += self.step
# 模拟异步操作
await asyncio.sleep(0.1)
return value
class AsyncFileReader:
"""异步文件读取器"""
def __init__(self, filename, chunk_size=1024):
self.filename = filename
self.chunk_size = chunk_size
async def __aenter__(self):
self.file = await aiofiles.open(self.filename, 'r')
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.file.close()
def __aiter__(self):
return self
async def __anext__(self):
chunk = await self.file.read(self.chunk_size)
if not chunk:
raise StopAsyncIteration
return chunk
# 使用异步迭代器
async def async_iterator_demo():
"""异步迭代器演示"""
print("=== Async Iterator Demo ===")
# 使用异步范围
print("AsyncRange:")
async for i in AsyncRange(5):
print(f" {i}")
# 创建测试文件
test_content = "Line 1nLine 2nLine 3nLine 4nLine 5"
async with aiofiles.open("test_lines.txt", "w") as f:
await f.write(test_content)
# 异步逐行读取
print("nAsync file reading:")
async with aiofiles.open("test_lines.txt", "r") as f:
async for line in f:
print(f" {line.strip()}")
# 清理
import os
if os.path.exists("test_lines.txt"):
os.remove("test_lines.txt")
# 自定义异步可迭代对象
class AsyncDataPipeline:
"""异步数据管道"""
def __init__(self, data_sources):
self.data_sources = data_sources
def __aiter__(self):
return self.process_data()
async def process_data(self):
"""处理数据的异步生成器"""
for source in self.data_sources:
print(f"Processing source: {source}")
# 模拟异步数据处理
await asyncio.sleep(0.5)
yield f"Processed data from {source}"
async def use_async_pipeline():
"""使用异步数据管道"""
print("n=== Async Data Pipeline ===")
sources = ["API 1", "Database", "File System", "Cache"]
pipeline = AsyncDataPipeline(sources)
async for data in pipeline:
print(f"Received: {data}")
# 运行异步迭代器演示
asyncio.run(async_iterator_demo())
asyncio.run(use_async_pipeline())
5.2 异步生成器
import asyncio
import random
async def async_counter(start, stop, delay):
"""异步计数器生成器"""
for i in range(start, stop):
await asyncio.sleep(delay)
yield i
async def async_fibonacci(n):
"""异步斐波那契生成器"""
a, b = 0, 1
for _ in range(n):
await asyncio.sleep(0.1)
yield a
a, b = b, a + b
async def async_random_numbers(count, min_val, max_val):
"""异步随机数生成器"""
for _ in range(count):
await asyncio.sleep(random.uniform(0.1, 0.5))
yield random.randint(min_val, max_val)
# 异步过滤和映射
async def async_filter(predicate, async_iterable):
"""异步过滤"""
async for item in async_iterable:
if await predicate(item) if asyncio.iscoroutine(predicate(item)) else predicate(item):
yield item
async def async_map(function, async_iterable):
"""异步映射"""
async for item in async_iterable:
result = await function(item) if asyncio.iscoroutine(function(item)) else function(item)
yield result
# 使用异步生成器
async def async_generator_demo():
"""异步生成器演示"""
print("=== Async Generator Demo ===")
# 基本异步生成器
print("Async counter:")
async for num in async_counter(0, 5, 0.2):
print(f" {num}")
# 异步斐波那契
print("nAsync Fibonacci:")
async for fib in async_fibonacci(10):
print(f" {fib}", end=" ")
print()
# 异步随机数
print("nAsync random numbers:")
async for num in async_random_numbers(5, 1, 100):
print(f" {num}")
# 异步数据处理管道
async def async_data_pipeline():
"""异步数据处理管道"""
print("n=== Async Data Pipeline ===")
# 数据源
async def data_source():
for i in range(10):
await asyncio.sleep(0.1)
yield i
# 异步过滤(偶数)
async def is_even(x):
await asyncio.sleep(0.05)
return x % 2 == 0
# 异步映射(平方)
async def square(x):
await asyncio.sleep(0.05)
return x ** 2
# 构建管道
source = data_source()
filtered = async_filter(is_even, source)
transformed = async_map(square, filtered)
# 处理结果
results = []
async for result in transformed:
results.append(result)
print(f" {result}")
print(f"Pipeline results: {results}")
# 异步批处理
async def async_batch_processor(batch_size=3):
"""异步批处理器"""
print("n=== Async Batch Processor ===")
async def data_generator():
for i in range(10):
yield f"item-{i}"
batch = []
async for item in data_generator():
batch.append(item)
if len(batch) >= batch_size:
# 处理批次
print(f"Processing batch: {batch}")
await asyncio.sleep(0.5) # 模拟批处理
yield batch
batch = []
# 处理剩余项
if batch:
print(f"Processing final batch: {batch}")
yield batch
async def use_batch_processor():
"""使用批处理器"""
async for batch in async_batch_processor(batch_size=3):
print(f"Processed batch: {batch}")
# 运行所有演示
async def run_all_async_generator_demos():
await async_generator_demo()
await async_data_pipeline()
await use_batch_processor()
asyncio.run(run_all_async_generator_demos())
5.3 异步任务队列
import asyncio
import uuid
from datetime import datetime
class AsyncTaskQueue:
"""异步任务队列"""
def __init__(self, max_workers=3):
self.max_workers = max_workers
self.tasks = asyncio.Queue()
self.results = {}
self.workers = []
self.running = False
async def start(self):
"""启动任务队列"""
self.running = True
self.workers = [
asyncio.create_task(self.worker(i))
for i in range(self.max_workers)
]
print(f"Started {self.max_workers} workers")
async def stop(self):
"""停止任务队列"""
self.running = False
# 等待所有任务完成
await self.tasks.join()
# 取消所有worker
for worker in self.workers:
worker.cancel()
# 等待worker完成
await asyncio.gather(*self.workers, return_exceptions=True)
print("All workers stopped")
async def add_task(self, task_func, *args, **kwargs):
"""添加任务"""
task_id = str(uuid.uuid4())
task_data = {
'id': task_id,
'func': task_func,
'args': args,
'kwargs': kwargs,
'created_at': datetime.now()
}
await self.tasks.put(task_data)
return task_id
async def worker(self, worker_id):
"""工作进程"""
while self.running:
try:
# 获取任务
task_data = await asyncio.wait_for(self.tasks.get(), timeout=1.0)
print(f"Worker {worker_id} processing task {task_data['id']}")
# 执行任务
result = await task_data['func'](
*task_data['args'],
**task_data['kwargs']
)
# 存储结果
self.results[task_data['id']] = {
'result': result,
'completed_at': datetime.now()
}
# 标记任务完成
self.tasks.task_done()
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"Worker {worker_id} error: {e}")
self.tasks.task_done()
async def get_result(self, task_id, timeout=None):
"""获取任务结果"""
start_time = asyncio.get_event_loop().time()
while task_id not in self.results:
if timeout and (asyncio.get_event_loop().time() - start_time) > timeout:
raise TimeoutError(f"Task {task_id} timeout")
await asyncio.sleep(0.1)
return self.results.pop(task_id)
# 示例任务函数
async def simulate_work(task_name, duration):
"""模拟工作任务"""
print(f"Starting {task_name} (duration: {duration}s)")
await asyncio.sleep(duration)
print(f"Completed {task_name}")
return f"Result of {task_name}"
async def cpu_intensive_task(task_name, iterations):
"""CPU密集型任务"""
print(f"Starting CPU task {task_name}")
result = 0
for i in range(iterations):
result += i ** 2
# 让出控制权
if i % 10000 == 0:
await asyncio.sleep(0)
print(f"Completed CPU task {task_name}")
return f"CPU result: {result}"
# 使用异步任务队列
async def task_queue_demo():
"""任务队列演示"""
print("=== Async Task Queue ===")
# 创建任务队列
queue = AsyncTaskQueue(max_workers=3)
await queue.start()
# 添加任务
task_ids = []
# I/O密集型任务
for i in range(5):
task_id = await queue.add_task(simulate_work, f"Task-{i}", random.uniform(0.5, 2.0))
task_ids.append(task_id)
# CPU密集型任务
for i in range(3):
task_id = await queue.add_task(cpu_intensive_task, f"CPUTask-{i}", 50000)
task_ids.append(task_id)
# 等待所有任务完成
await queue.tasks.join()
# 获取结果
print("nTask results:")
for task_id in task_ids:
try:
result = await queue.get_result(task_id)
print(f" {task_id}: {result['result']}")
except Exception as e:
print(f" {task_id}: Error - {e}")
# 停止队列
await queue.stop()
# 优先级任务队列
class PriorityAsyncTaskQueue:
"""优先级异步任务队列"""
def __init__(self, max_workers=3):
self.max_workers = max_workers
self.tasks = asyncio.PriorityQueue()
self.results = {}
self.workers = []
self.running = False
self.task_counter = 0
async def add_task(self, priority, task_func, *args, **kwargs):
"""添加优先级任务"""
self.task_counter += 1
task_data = {
'id': str(uuid.uuid4()),
'priority': priority,
'func': task_func,
'args': args,
'kwargs': kwargs,
'sequence': self.task_counter
}
# 优先级队列使用 (priority, sequence) 作为键
await self.tasks.put((priority, self.task_counter, task_data))
return task_data['id']
async def start(self):
"""启动队列"""
self.running = True
self.workers = [
asyncio.create_task(self.worker(i))
for i in range(self.max_workers)
]
async def stop(self):
"""停止队列"""
self.running = False
# 添加停止信号
for _ in range(self.max_workers):
await self.tasks.put((999, 999, None))
await asyncio.gather(*self.workers, return_exceptions=True)
async def worker(self, worker_id):
"""工作进程"""
while self.running:
try:
priority, sequence, task_data = await self.tasks.get()
if task_data is None: # 停止信号
break
print(f"Worker {worker_id} processing priority {priority} task {task_data['id']}")
# 执行任务
result = await task_data['func'](
*task_data['args'],
**task_data['kwargs']
)
# 存储结果
self.results[task_data['id']] = result
except Exception as e:
print(f"Worker {worker_id} error: {e}")
async def priority_queue_demo():
"""优先级队列演示"""
print("n=== Priority Task Queue ===")
queue = PriorityAsyncTaskQueue(max_workers=2)
await queue.start()
# 添加不同优先级的任务
tasks = [
(3, simulate_work, "Low Priority", 1.0),
(1, simulate_work, "High Priority", 1.0),
(2, simulate_work, "Medium Priority", 1.0),
(1, simulate_work, "Another High", 1.0),
(3, simulate_work, "Another Low", 1.0),
]
task_ids = []
for priority, func, *args in tasks:
task_id = await queue.add_task(priority, func, *args)
task_ids.append(task_id)
# 等待完成
await asyncio.sleep(3)
# 停止队列
await queue.stop()
print("nPriority task results:")
for task_id in task_ids:
if task_id in queue.results:
print(f" {task_id}: {queue.results[task_id]}")
# 运行所有任务队列演示
async def run_all_task_queue_demos():
await task_queue_demo()
await priority_queue_demo()
asyncio.run(run_all_task_queue_demos())
6. 异步最佳实践
6.1 错误处理
import asyncio
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def risky_operation(fail=False):
"""可能失败的操作"""
await asyncio.sleep(0.1)
if fail:
raise ValueError("Operation failed")
return "Success"
async def error_handling_demo():
"""错误处理演示"""
print("=== Error Handling Demo ===")
# 基本错误处理
try:
result = await risky_operation(fail=True)
print(f"Result: {result}")
except ValueError as e:
print(f"Caught error: {e}")
# 在gather中处理错误
print("n--- Error Handling in gather ---")
results = await asyncio.gather(
risky_operation(),
risky_operation(fail=True),
risky_operation(),
return_exceptions=True
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"Task {i} failed: {result}")
else:
print(f"Task {i} succeeded: {result}")
# 超时处理
print("n--- Timeout Handling ---")
try:
result = await asyncio.wait_for(risky_operation(), timeout=0.05)
print(f"Completed: {result}")
except asyncio.TimeoutError:
print("Operation timed out")
# 取消处理
print("n--- Cancellation Handling ---")
async def long_task():
try:
for i in range(5):
await asyncio.sleep(1)
print(f"Working... {i+1}/5")
return "Completed"
except asyncio.CancelledError:
print("Task was cancelled, cleaning up...")
# 执行清理操作
await asyncio.sleep(0.1)
print("Cleanup done")
raise
task = asyncio.create_task(long_task())
# 等待一段时间后取消
await asyncio.sleep(2.5)
task.cancel()
try:
result = await task
print(f"Task result: {result}")
except asyncio.CancelledError:
print("Task was successfully cancelled")
# 运行错误处理演示
asyncio.run(error_handling_demo())
6.2 性能优化
import asyncio
import time
import aiohttp
# 连接池
async def connection_pool_demo():
"""连接池演示"""
print("=== Connection Pool Demo ===")
# 创建会话(自动管理连接池)
connector = aiohttp.TCPConnector(
limit=100, # 总连接数限制
limit_per_host=30, # 每个主机的连接数限制
ttl_dns_cache=300, # DNS缓存时间
use_dns_cache=True,
)
timeout = aiohttp.ClientTimeout(
total=30, # 总超时时间
connect=10, # 连接超时时间
sock_read=10 # 读取超时时间
)
async with aiohttp.ClientSession(
connector=connector,
timeout=timeout
) as session:
urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
start_time = time.time()
tasks = [fetch_with_session(session, url) for url in urls]
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"Fetched {len(results)} URLs in {end_time - start_time:.2f}s")
print(f"Average time per request: {(end_time - start_time) / len(results):.2f}s")
async def fetch_with_session(session, url):
"""使用会话获取URL"""
async with session.get(url) as response:
content = await response.text()
return len(content)
# 批量处理优化
async def batch_processing_demo():
"""批量处理演示"""
print("n=== Batch Processing Demo ===")
async def process_item(item):
"""处理单个项目"""
await asyncio.sleep(0.1) # 模拟处理时间
return item * 2
async def process_batch(items):
"""批量处理"""
return await asyncio.gather(*[process_item(item) for item in items])
# 大数据集
large_dataset = list(range(1000))
batch_size = 100
start_time = time.time()
# 分批处理
results = []
for i in range(0, len(large_dataset), batch_size):
batch = large_dataset[i:i + batch_size]
batch_results = await process_batch(batch)
results.extend(batch_results)
end_time = time.time()
print(f"Processed {len(results)} items in {end_time - start_time:.2f}s")
print(f"Sample results: {results[:10]}")
# 缓存优化
async def cache_optimization_demo():
"""缓存优化演示"""
print("n=== Cache Optimization Demo ===")
class AsyncLRUCache:
"""异步LRU缓存"""
def __init__(self, max_size=100, ttl=60):
self.max_size = max_size
self.ttl = ttl
self.cache = {}
self.access_order = []
self.lock = asyncio.Lock()
async def get(self, key):
async with self.lock:
if key in self.cache:
# 移动到访问顺序末尾
self.access_order.remove(key)
self.access_order.append(key)
return self.cache[key]['value']
return None
async def set(self, key, value):
async with self.lock:
current_time = time.time()
# 清理过期项
self._clean_expired(current_time)
# 如果已满,移除最久未使用的
if len(self.cache) >= self.max_size and key not in self.cache:
oldest_key = self.access_order.pop(0)
del self.cache[oldest_key]
# 设置新值
self.cache[key] = {
'value': value,
'timestamp': current_time
}
if key in self.access_order:
self.access_order.remove(key)
self.access_order.append(key)
def _clean_expired(self, current_time):
"""清理过期项"""
expired_keys = []
for key, item in self.cache.items():
if current_time - item['timestamp'] > self.ttl:
expired_keys.append(key)
for key in expired_keys:
del self.cache[key]
if key in self.access_order:
self.access_order.remove(key)
# 使用异步缓存
cache = AsyncLRUCache(max_size=5, ttl=2)
async def expensive_operation(key):
"""昂贵的操作"""
await asyncio.sleep(0.5) # 模拟耗时操作
return f"Result for {key}"
async def cached_operation(key):
"""带缓存的操作"""
# 检查缓存
cached_result = await cache.get(key)
if cached_result is not None:
print(f"Cache hit for {key}")
return cached_result
# 执行昂贵操作
print(f"Cache miss for {key}")
result = await expensive_operation(key)
# 缓存结果
await cache.set(key, result)
return result
# 测试缓存
keys = ["key1", "key2", "key3", "key1", "key2"] # 重复键测试缓存
start_time = time.time()
results = await asyncio.gather(*[cached_operation(key) for key in keys])
end_time = time.time()
print(f"Results: {results}")
print(f"Total time: {end_time - start_time:.2f}s")
# 等待缓存过期
await asyncio.sleep(3)
# 再次测试(应该全部缓存未命中)
start_time = time.time()
results = await asyncio.gather(*[cached_operation(key) for key in keys])
end_time = time.time()
print(f"After expiration - Results: {results}")
print(f"Total time: {end_time - start_time:.2f}s")
# 运行性能优化演示
async def run_performance_demos():
await connection_pool_demo()
await batch_processing_demo()
await cache_optimization_demo()
# 注意:需要安装aiohttp
# pip install aiohttp
# asyncio.run(run_performance_demos())
6.3 调试和监控
import asyncio
import time
import logging
from contextlib import asynccontextmanager
# 配置日志
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
# 异步任务调试装饰器
def debug_async_task(func):
"""调试异步任务的装饰器"""
async def wrapper(*args, **kwargs):
task_name = func.__name__
task_id = id(asyncio.current_task())
logging.info(f"[{task_id}] Starting {task_name}")
start_time = time.time()
try:
result = await func(*args, **kwargs)
end_time = time.time()
logging.info(f"[{task_id}] Completed {task_name} in {end_time - start_time:.3f}s")
return result
except Exception as e:
end_time = time.time()
logging.error(f"[{task_id}] Failed {task_name} in {end_time - start_time:.3f}s: {e}")
raise
return wrapper
# 性能监控上下文管理器
@asynccontextmanager
async def performance_monitor(name):
"""性能监控上下文管理器"""
start_time = time.time()
task_id = id(asyncio.current_task())
logging.info(f"[{task_id}] Entering {name}")
try:
yield
finally:
end_time = time.time()
logging.info(f"[{task_id}] Exiting {name} (duration: {end_time - start_time:.3f}s)")
# 任务追踪器
class AsyncTaskTracker:
"""异步任务追踪器"""
def __init__(self):
self.tasks = {}
self.lock = asyncio.Lock()
async def track_task(self, coro, name=None):
"""追踪任务"""
if name is None:
name = coro.__name__ if hasattr(coro, '__name__') else 'Unknown'
task = asyncio.create_task(coro)
task_id = id(task)
async with self.lock:
self.tasks[task_id] = {
'name': name,
'start_time': time.time(),
'status': 'running'
}
# 添加完成回调
task.add_done_callback(
lambda t: asyncio.create_task(self._update_task_status(t))
)
return task
async def _update_task_status(self, task):
"""更新任务状态"""
task_id = id(task)
async with self.lock:
if task_id in self.tasks:
task_info = self.tasks[task_id]
task_info['end_time'] = time.time()
task_info['duration'] = task_info['end_time'] - task_info['start_time']
if task.cancelled():
task_info['status'] = 'cancelled'
elif task.exception():
task_info['status'] = 'failed'
task_info['error'] = str(task.exception())
else:
task_info['status'] = 'completed'
def get_stats(self):
"""获取统计信息"""
stats = {
'total': len(self.tasks),
'running': 0,
'completed': 0,
'failed': 0,
'cancelled': 0
}
for task_info in self.tasks.values():
stats[task_info['status']] += 1
return stats
def print_summary(self):
"""打印摘要"""
stats = self.get_stats()
print(f"nTask Summary:")
print(f" Total: {stats['total']}")
print(f" Running: {stats['running']}")
print(f" Completed: {stats['completed']}")
print(f" Failed: {stats['failed']}")
print(f" Cancelled: {stats['cancelled']}")
# 打印详细信息
print("nTask Details:")
for task_id, info in self.tasks.items():
status = info['status']
name = info['name']
if status == 'completed' and 'duration' in info:
print(f" {name}: {status} ({info['duration']:.3f}s)")
elif status == 'failed' and 'error' in info:
print(f" {name}: {status} - {info['error']}")
else:
print(f" {name}: {status}")
# 调试演示
async def debugging_demo():
"""调试演示"""
print("=== Async Debugging and Monitoring ===")
# 使用调试装饰器
@debug_async_task
async def sample_task(name, duration):
await asyncio.sleep(duration)
return f"Task {name} completed"
# 使用性能监控
async def monitored_task():
async with performance_monitor("Database Query"):
await asyncio.sleep(0.5)
async with performance_monitor("API Call"):
await asyncio.sleep(0.3)
# 使用任务追踪器
tracker = AsyncTaskTracker()
# 创建多个任务
tasks = []
for i in range(5):
task = await tracker.track_task(
sample_task(f"Task-{i}", random.uniform(0.1, 0.5)),
name=f"SampleTask-{i}"
)
tasks.append(task)
# 添加监控任务
monitor_task = await tracker.track_task(
monitored_task(),
name="MonitoredTask"
)
tasks.append(monitor_task)
# 添加一个会失败的任务
async def failing_task():
await asyncio.sleep(0.2)
raise ValueError("Simulated failure")
fail_task = await tracker.track_task(
failing_task(),
name="FailingTask"
)
tasks.append(fail_task)
# 等待所有任务完成
await asyncio.gather(*tasks, return_exceptions=True)
# 打印摘要
tracker.print_summary()
# 运行调试演示
asyncio.run(debugging_demo())
总结
本章深入介绍了Python异步编程的核心概念:
- 异步编程的基础概念和优势
- 协程的创建和使用
- 任务(Task)和Future的使用
- 并发执行模式(gather、wait等)
- 异步I/O操作(文件、网络、数据库)
- 异步上下文管理器
- 高级异步模式(迭代器、生成器、任务队列)
- 异步编程的最佳实践(错误处理、性能优化、调试监控)
掌握异步编程能够让你编写高性能的I/O密集型应用,特别适合网络服务、Web应用和数据处理等场景。
文章来源于互联网:Python-12高阶编程之异步编程
相关推荐: 百度文心一言应用:L1B3RT45中文提示词构造技巧
百度文心一言应用:L1B3RT45中文提示词构造技巧 【免费下载链接】L1B3RT45 J41LBR34K PR0MPT5 项目地址: https://gitcode.com/GitHub_Trending/l1/L1B3RT45 引言:中文AI交互的痛点与突…
5bei.cn大模型教程网










