AI大模型教程
一起来学习

Python-12高阶编程之异步编程

Python高阶编程:异步编程

学习目标

  • 理解异步编程的概念和优势
  • 掌握async/await语法
  • 学会创建和使用协程
  • 理解事件循环的工作原理
  • 掌握异步I/O操作
  • 学会使用异步上下文管理器
  • 掌握异步编程的最佳实践

1. 异步编程基础

1.1 什么是异步编程

异步编程是一种编程范式,允许程序在等待某些操作(通常是I/O操作)完成时继续执行其他任务,而不是阻塞等待。

1.2 同步 vs 异步

import time
import asyncio
import aiohttp

# 同步版本
def sync_fetch_data(url, delay):
    """同步获取数据"""
    print(f"Sync: Fetching from {url}")
    time.sleep(delay)  # 模拟网络延迟
    print(f"Sync: Finished fetching from {url}")
    return f"Data from {url}"

def sync_main():
    """同步主函数"""
    start_time = time.time()
    
    urls = ["url1", "url2", "url3"]
    delays = [1, 2, 1]
    
    results = []
    for url, delay in zip(urls, delays):
        result = sync_fetch_data(url, delay)
        results.append(result)
    
    end_time = time.time()
    print(f"Sync total time: {end_time - start_time:.2f}s")
    return results

# 异步版本
async def async_fetch_data(url, delay):
    """异步获取数据"""
    print(f"Async: Fetching from {url}")
    await asyncio.sleep(delay)  # 异步等待
    print(f"Async: Finished fetching from {url}")
    return f"Data from {url}"

async def async_main():
    """异步主函数"""
    start_time = time.time()
    
    urls = ["url1", "url2", "url3"]
    delays = [1, 2, 1]
    
    # 创建任务
    tasks = []
    for url, delay in zip(urls, delays):
        task = async_fetch_data(url, delay)
        tasks.append(task)
    
    # 并发执行
    results = await asyncio.gather(*tasks)
    
    end_time = time.time()
    print(f"Async total time: {end_time - start_time:.2f}s")
    return results

# 运行对比
print("=== Synchronous vs Asynchronous ===")
print("Running synchronous version:")
sync_results = sync_main()

print("nRunning asynchronous version:")
async_results = asyncio.run(async_main())

print(f"nResults equal: {sync_results == async_results}")

2. 协程基础

2.1 创建和运行协程

import asyncio

# 基本协程
async def hello_world():
    """最简单的协程"""
    print("Hello")
    await asyncio.sleep(1)
    print("World")

# 运行协程
asyncio.run(hello_world())

# 协程也是对象
async def simple_coroutine():
    return 42

# 创建协程对象
coro = simple_coroutine()
print(f"Coroutine object: {coro}")
print(f"Type: {type(coro)}")

# 运行协程
result = asyncio.run(coro)
print(f"Result: {result}")

# 多个协程
async def greet(name, delay):
    """带参数的协程"""
    print(f"Hello, {name}!")
    await asyncio.sleep(delay)
    print(f"Goodbye, {name}!")
    return f"Greeted {name}"

async def main():
    """主协程"""
    # 顺序执行
    print("=== Sequential Execution ===")
    result1 = await greet("Alice", 1)
    result2 = await greet("Bob", 1)
    print(f"Results: {result1}, {result2}")
    
    # 并发执行
    print("n=== Concurrent Execution ===")
    task1 = greet("Charlie", 1)
    task2 = greet("David", 1)
    results = await asyncio.gather(task1, task2)
    print(f"Results: {results}")

asyncio.run(main())

2.2 任务(Task)和Future

import asyncio
import time

async def slow_operation(name, delay):
    """慢操作协程"""
    print(f"{name} started")
    await asyncio.sleep(delay)
    print(f"{name} completed after {delay}s")
    return f"Result of {name}"

async def task_demo():
    """任务演示"""
    print("=== Task Demo ===")
    
    # 创建任务
    task1 = asyncio.create_task(slow_operation("Task 1", 2))
    task2 = asyncio.create_task(slow_operation("Task 2", 1))
    
    print(f"Task 1 done: {task1.done()}")
    print(f"Task 2 done: {task2.done()}")
    
    # 等待任务完成
    result1 = await task1
    result2 = await task2
    
    print(f"Task 1 done: {task1.done()}")
    print(f"Task 2 done: {task2.done()}")
    print(f"Results: {result1}, {result2}")

async def task_cancellation():
    """任务取消演示"""
    print("n=== Task Cancellation ===")
    
    async def long_task():
        try:
            for i in range(10):
                await asyncio.sleep(1)
                print(f"Long task step {i}")
            return "Completed"
        except asyncio.CancelledError:
            print("Long task was cancelled")
            raise
    
    # 创建长时间运行的任务
    task = asyncio.create_task(long_task())
    
    # 等待一段时间后取消
    await asyncio.sleep(3)
    task.cancel()
    
    try:
        result = await task
        print(f"Task result: {result}")
    except asyncio.CancelledError:
        print("Task was cancelled")

async def task_timeout():
    """任务超时演示"""
    print("n=== Task Timeout ===")
    
    async def very_slow_task():
        await asyncio.sleep(10)
        return "Very slow result"
    
    try:
        # 设置超时
        result = await asyncio.wait_for(very_slow_task(), timeout=2)
        print(f"Task completed: {result}")
    except asyncio.TimeoutError:
        print("Task timed out")

# 自定义Future
async def custom_future_demo():
    """自定义Future演示"""
    print("n=== Custom Future Demo ===")
    
    # 创建Future
    future = asyncio.Future()
    
    # 设置结果
    future.set_result("Custom future result")
    
    # 获取结果
    result = await future
    print(f"Future result: {result}")
    
    # 创建已完成的Future
    completed_future = asyncio.create_task(asyncio.sleep(0))
    await completed_future
    print(f"Completed future: {completed_future.done()}")

# 运行所有演示
async def run_all_demos():
    await task_demo()
    await task_cancellation()
    await task_timeout()
    await custom_future_demo()

asyncio.run(run_all_demos())

2.3 并发执行模式

import asyncio
import time

async def fetch_data(source, delay):
    """模拟获取数据"""
    print(f"Fetching from {source}...")
    await asyncio.sleep(delay)
    print(f"Finished fetching from {source}")
    return f"Data from {source}"

async def gather_demo():
    """gather演示"""
    print("=== asyncio.gather Demo ===")
    
    # 并发执行多个协程
    results = await asyncio.gather(
        fetch_data("API 1", 1),
        fetch_data("API 2", 2),
        fetch_data("API 3", 1),
        return_exceptions=True  # 返回异常而不是抛出
    )
    
    print(f"All results: {results}")

async def wait_demo():
    """wait演示"""
    print("n=== asyncio.wait Demo ===")
    
    tasks = [
        asyncio.create_task(fetch_data("Source A", 1)),
        asyncio.create_task(fetch_data("Source B", 2)),
        asyncio.create_task(fetch_data("Source C", 3))
    ]
    
    # 等待所有任务完成
    done, pending = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
    
    print(f"Completed: {len(done)}")
    print(f"Pending: {len(pending)}")
    
    for task in done:
        try:
            result = task.result()
            print(f"Result: {result}")
        except Exception as e:
            print(f"Error: {e}")

async def wait_for_any():
    """等待任意任务完成"""
    print("n=== wait with FIRST_COMPLETED ===")
    
    tasks = [
        asyncio.create_task(fetch_data("Fast", 1)),
        asyncio.create_task(fetch_data("Medium", 2)),
        asyncio.create_task(fetch_data("Slow", 3))
    ]
    
    # 等待任意一个任务完成
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    
    print(f"First completed: {done.pop().result()}")
    print(f"Still running: {len(pending)}")
    
    # 取消剩余任务
    for task in pending:
        task.cancel()

async def semaphore_demo():
    """信号量演示"""
    print("n=== Semaphore Demo ===")
    
    semaphore = asyncio.Semaphore(2)  # 最多2个并发
    
    async def limited_fetch(source, delay):
        async with semaphore:
            print(f"Fetching {source} (semaphore acquired)")
            await asyncio.sleep(delay)
            print(f"Finished {source} (semaphore released)")
            return f"Data from {source}"
    
    tasks = [
        limited_fetch(f"Resource {i}", 1) for i in range(5)
    ]
    
    results = await asyncio.gather(*tasks)
    print(f"All results: {results}")

# 运行演示
asyncio.run(gather_demo())
asyncio.run(wait_demo())
asyncio.run(wait_for_any())
asyncio.run(semaphore_demo())

3. 异步I/O操作

3.1 异步文件操作

import asyncio
import aiofiles
import os

async def read_file_async(filename):
    """异步读取文件"""
    try:
        async with aiofiles.open(filename, 'r') as file:
            content = await file.read()
            print(f"Read {len(content)} characters from {filename}")
            return content
    except FileNotFoundError:
        print(f"File {filename} not found")
        return None

async def write_file_async(filename, content):
    """异步写入文件"""
    async with aiofiles.open(filename, 'w') as file:
        await file.write(content)
        print(f"Written {len(content)} characters to {filename}")

async def process_large_file(filename, chunk_size=1024):
    """异步处理大文件"""
    async with aiofiles.open(filename, 'rb') as file:
        chunk_num = 0
        while True:
            chunk = await file.read(chunk_size)
            if not chunk:
                break
            chunk_num += 1
            # 模拟处理
            await asyncio.sleep(0.01)
            print(f"Processed chunk {chunk_num} ({len(chunk)} bytes)")
        
        print(f"Finished processing {chunk_num} chunks")

async def file_operations_demo():
    """文件操作演示"""
    print("=== Async File Operations ===")
    
    # 创建测试文件
    test_files = ['test1.txt', 'test2.txt', 'test3.txt']
    contents = ['Hello World', 'Python Async', 'File Operations']
    
    # 并行写入文件
    write_tasks = [
        write_file_async(filename, content)
        for filename, content in zip(test_files, contents)
    ]
    await asyncio.gather(*write_tasks)
    
    # 并行读取文件
    read_tasks = [
        read_file_async(filename)
        for filename in test_files
    ]
    results = await asyncio.gather(*read_tasks)
    
    for filename, content in zip(test_files, results):
        if content:
            print(f"{filename}: {content[:20]}...")
    
    # 创建大文件并处理
    large_file = 'large_file.txt'
    large_content = "x" * 10000
    await write_file_async(large_file, large_content)
    
    await process_large_file(large_file, chunk_size=1000)
    
    # 清理
    for filename in test_files + [large_file]:
        if os.path.exists(filename):
            os.remove(filename)
            print(f"Removed {filename}")

# 运行文件操作演示
asyncio.run(file_operations_demo())

3.2 异步网络请求

import asyncio
import aiohttp
import time

async def fetch_url(session, url):
    """异步获取URL内容"""
    try:
        async with session.get(url) as response:
            content = await response.text()
            print(f"Fetched {url}: {len(content)} characters")
            return url, len(content)
    except Exception as e:
        print(f"Error fetching {url}: {e}")
        return url, 0

async def fetch_multiple_urls(urls):
    """并发获取多个URL"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

async def fetch_with_timeout(session, url, timeout=5):
    """带超时的异步获取"""
    try:
        async with asyncio.timeout(timeout):
            async with session.get(url) as response:
                content = await response.text()
                return url, response.status, len(content)
    except asyncio.TimeoutError:
        print(f"Timeout for {url}")
        return url, "timeout", 0
    except Exception as e:
        print(f"Error for {url}: {e}")
        return url, "error", 0

async def fetch_with_semaphore(session, url, semaphore):
    """使用信号量限制的异步获取"""
    async with semaphore:
        async with session.get(url) as response:
            content = await response.text()
            await asyncio.sleep(0.1)  # 模拟处理时间
            return url, len(content)

async def network_demo():
    """网络请求演示"""
    print("=== Async Network Requests ===")
    
    # 测试URLs
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/3",
        "https://httpbin.org/delay/1"
    ]
    
    # 并发获取
    start_time = time.time()
    results = await fetch_multiple_urls(urls)
    end_time = time.time()
    
    print(f"nFetched {len(results)} URLs in {end_time - start_time:.2f}s")
    for url, size in results:
        print(f"  {url}: {size} bytes")
    
    # 带超时的获取
    print("n--- With Timeout ---")
    async with aiohttp.ClientSession() as session:
        timeout_urls = [
            "https://httpbin.org/delay/1",
            "https://httpbin.org/delay/10",  # 这个会超时
            "https://httpbin.org/delay/2"
        ]
        
        start_time = time.time()
        results = await asyncio.gather(
            *[fetch_with_timeout(session, url, timeout=3) for url in timeout_urls]
        )
        end_time = time.time()
        
        print(f"Timeout fetch completed in {end_time - start_time:.2f}s")
        for url, status, size in results:
            print(f"  {url}: {status}, {size} bytes")
    
    # 使用信号量限制并发
    print("n--- With Semaphore (max 2 concurrent) ---")
    async with aiohttp.ClientSession() as session:
        semaphore = asyncio.Semaphore(2)
        
        start_time = time.time()
        results = await asyncio.gather(
            *[fetch_with_semaphore(session, url, semaphore) for url in urls]
        )
        end_time = time.time()
        
        print(f"Semaphore fetch completed in {end_time - start_time:.2f}s")
        for url, size in results:
            print(f"  {url}: {size} bytes")

# 运行网络演示
asyncio.run(network_demo())

3.3 异步数据库操作

import asyncio
import asyncpg
import aiomysql

async def postgres_demo():
    """PostgreSQL异步操作演示"""
    print("=== Async PostgreSQL ===")
    
    try:
        # 连接到PostgreSQL
        conn = await asyncpg.connect(
            host="localhost",
            port=5432,
            user="postgres",
            password="password",
            database="testdb"
        )
        
        # 创建表
        await conn.execute("""
            CREATE TABLE IF NOT EXISTS users (
                id SERIAL PRIMARY KEY,
                name VARCHAR(100),
                email VARCHAR(100)
            )
        """)
        
        # 插入数据
        await conn.execute(
            "INSERT INTO users (name, email) VALUES ($1, $2)",
            "Alice", "alice@example.com"
        )
        
        # 查询数据
        rows = await conn.fetch("SELECT * FROM users")
        print(f"Users: {rows}")
        
        # 批量插入
        users = [
            ("Bob", "bob@example.com"),
            ("Charlie", "charlie@example.com"),
            ("David", "david@example.com")
        ]
        
        await conn.executemany(
            "INSERT INTO users (name, email) VALUES ($1, $2)",
            users
        )
        
        # 查询所有用户
        all_users = await conn.fetch("SELECT * FROM users ORDER BY id")
        print(f"All users ({len(all_users)}):")
        for user in all_users:
            print(f"  {user['id']}: {user['name']} - {user['email']}")
        
        # 关闭连接
        await conn.close()
        
    except Exception as e:
        print(f"PostgreSQL error: {e}")
        print("Note: This demo requires a running PostgreSQL server")

async def mysql_demo():
    """MySQL异步操作演示"""
    print("n=== Async MySQL ===")
    
    try:
        # 连接到MySQL
        conn = await aiomysql.connect(
            host="localhost",
            port=3306,
            user="root",
            password="password",
            db="testdb"
        )
        
        async with conn.cursor() as cursor:
            # 创建表
            await cursor.execute("""
                CREATE TABLE IF NOT EXISTS products (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    name VARCHAR(100),
                    price DECIMAL(10, 2)
                )
            """)
            
            # 插入数据
            await cursor.execute(
                "INSERT INTO products (name, price) VALUES (%s, %s)",
                ("Laptop", 999.99)
            )
            
            # 查询数据
            await cursor.execute("SELECT * FROM products")
            result = await cursor.fetchall()
            print(f"Products: {result}")
            
            # 批量插入
            products = [
                ("Mouse", 29.99),
                ("Keyboard", 79.99),
                ("Monitor", 299.99)
            ]
            
            await cursor.executemany(
                "INSERT INTO products (name, price) VALUES (%s, %s)",
                products
            )
            
            # 查询所有产品
            await cursor.execute("SELECT * FROM products ORDER BY id")
            all_products = await cursor.fetchall()
            print(f"All products ({len(all_products)}):")
            for product in all_products:
                print(f"  {product[0]}: {product[1]} - ${product[2]}")
        
        # 关闭连接
        conn.close()
        
    except Exception as e:
        print(f"MySQL error: {e}")
        print("Note: This demo requires a running MySQL server")

# 模拟数据库操作(不需要真实数据库)
async def simulated_db_demo():
    """模拟数据库操作演示"""
    print("n=== Simulated Database Operations ===")
    
    class SimulatedDB:
        def __init__(self):
            self.data = {}
            self.next_id = 1
        
        async def execute(self, query, *params):
            """模拟执行查询"""
            await asyncio.sleep(0.1)  # 模拟网络延迟
            
            if "CREATE TABLE" in query:
                table_name = query.split("TABLE")[1].split()[0].strip("IF NOT EXISTS ")
                self.data[table_name] = []
                return f"Table {table_name} created"
            
            elif "INSERT INTO" in query:
                table_name = query.split("INTO")[1].split()[0].strip()
                if table_name not in self.data:
                    self.data[table_name] = []
                
                # 提取值
                if "users" in table_name:
                    name, email = params
                    record = {
                        'id': self.next_id,
                        'name': name,
                        'email': email
                    }
                else:  # products
                    name, price = params
                    record = {
                        'id': self.next_id,
                        'name': name,
                        'price': float(price)
                    }
                
                self.data[table_name].append(record)
                self.next_id += 1
                return f"Inserted 1 record"
            
            elif "SELECT" in query:
                table_name = query.split("FROM")[1].split()[0].strip()
                if "ORDER BY" in query:
                    # 简单排序
                    key = query.split("ORDER BY")[1].split()[0].strip()
                    return sorted(self.data.get(table_name, []), key=lambda x: x[key])
                return self.data.get(table_name, [])
    
    # 使用模拟数据库
    db = SimulatedDB()
    
    # 创建表
    await db.execute("""
        CREATE TABLE IF NOT EXISTS users (
            id SERIAL PRIMARY KEY,
            name VARCHAR(100),
            email VARCHAR(100)
        )
    """)
    
    # 插入数据
    await db.execute(
        "INSERT INTO users (name, email) VALUES (%s, %s)",
        "Alice", "alice@example.com"
    )
    
    # 查询数据
    users = await db.execute("SELECT * FROM users")
    print(f"Users: {users}")
    
    # 批量操作
    tasks = []
    for i in range(5):
        tasks.append(db.execute(
            "INSERT INTO users (name, email) VALUES (%s, %s)",
            f"User{i}", f"user{i}@example.com"
        ))
    
    await asyncio.gather(*tasks)
    
    # 查询所有用户
    all_users = await db.execute("SELECT * FROM users ORDER BY id")
    print(f"All users ({len(all_users)}):")
    for user in all_users:
        print(f"  {user['id']}: {user['name']} - {user['email']}")

# 运行数据库演示
async def db_demo_main():
    await postgres_demo()
    await mysql_demo()
    await simulated_db_demo()

asyncio.run(db_demo_main())

4. 异步上下文管理器

4.1 创建异步上下文管理器

import asyncio
from contextlib import asynccontextmanager

class AsyncDatabaseConnection:
    """异步数据库连接"""
    
    def __init__(self, db_name):
        self.db_name = db_name
        self.connected = False
    
    async def connect(self):
        """连接数据库"""
        print(f"Connecting to {self.db_name}...")
        await asyncio.sleep(0.5)  # 模拟连接时间
        self.connected = True
        print(f"Connected to {self.db_name}")
    
    async def disconnect(self):
        """断开连接"""
        if self.connected:
            print(f"Disconnecting from {self.db_name}...")
            await asyncio.sleep(0.2)  # 模拟断开时间
            self.connected = False
            print(f"Disconnected from {self.db_name}")
    
    async def execute(self, query):
        """执行查询"""
        if not self.connected:
            raise RuntimeError("Not connected to database")
        
        print(f"Executing: {query}")
        await asyncio.sleep(0.3)  # 模拟查询时间
        return f"Result of: {query}"
    
    async def __aenter__(self):
        """异步上下文管理器入口"""
        await self.connect()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """异步上下文管理器出口"""
        await self.disconnect()

# 使用异步上下文管理器
async def use_async_context_manager():
    """使用异步上下文管理器"""
    print("=== Async Context Manager ===")
    
    async with AsyncDatabaseConnection("mydb") as db:
        result1 = await db.execute("SELECT * FROM users")
        result2 = await db.execute("INSERT INTO users VALUES (1, 'Alice')")
        
        print(f"Results: {result1}, {result2}")

# 使用装饰器创建异步上下文管理器
@asynccontextmanager
async def async_file_manager(filename, mode):
    """异步文件管理器"""
    print(f"Opening {filename}...")
    file = await aiofiles.open(filename, mode)
    
    try:
        yield file
    finally:
        print(f"Closing {filename}...")
        await file.close()

async def use_async_context_manager_decorator():
    """使用装饰器版本的异步上下文管理器"""
    print("n=== Async Context Manager Decorator ===")
    
    async with async_file_manager("test.txt", "w") as f:
        await f.write("Hello, Async World!")
    
    async with async_file_manager("test.txt", "r") as f:
        content = await f.read()
        print(f"Read: {content}")
    
    # 清理
    import os
    if os.path.exists("test.txt"):
        os.remove("test.txt")

# 嵌套异步上下文管理器
@asynccontextmanager
async def transaction_manager(db_name):
    """事务管理器"""
    print(f"Starting transaction on {db_name}")
    await asyncio.sleep(0.1)
    
    try:
        yield db_name
        print(f"Committing transaction on {db_name}")
        await asyncio.sleep(0.1)
    except Exception as e:
        print(f"Rolling back transaction on {db_name}: {e}")
        await asyncio.sleep(0.1)
        raise

async def nested_async_context_managers():
    """嵌套异步上下文管理器"""
    print("n=== Nested Async Context Managers ===")
    
    async with AsyncDatabaseConnection("main_db") as db:
        async with transaction_manager("main_db") as tx:
            result = await db.execute("UPDATE users SET active = true")
            print(f"Transaction result: {result}")
            
            # 模拟错误
            # raise ValueError("Simulated error")

# 异步上下文管理器组合
class AsyncContextManagerGroup:
    """异步上下文管理器组合"""
    
    def __init__(self, *managers):
        self.managers = managers
        self.entered = []
    
    async def __aenter__(self):
        """进入所有上下文"""
        for manager in self.managers:
            entered = await manager.__aenter__()
            self.entered.append((manager, entered))
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """退出所有上下文(逆序)"""
        for manager, _ in reversed(self.entered):
            await manager.__aexit__(exc_type, exc_val, exc_tb)

async def combined_async_context_managers():
    """组合异步上下文管理器"""
    print("n=== Combined Async Context Managers ===")
    
    db1 = AsyncDatabaseConnection("db1")
    db2 = AsyncDatabaseConnection("db2")
    
    async with AsyncContextManagerGroup(db1, db2) as group:
        result1 = await db1.execute("SELECT 1")
        result2 = await db2.execute("SELECT 2")
        print(f"Results: {result1}, {result2}")

# 运行所有演示
async def run_all_context_manager_demos():
    await use_async_context_manager()
    await use_async_context_manager_decorator()
    await nested_async_context_managers()
    await combined_async_context_managers()

# 注意:需要安装aiofiles
# pip install aiofiles
# asyncio.run(run_all_context_manager_demos())

5. 高级异步模式

5.1 异步迭代器

import asyncio

class AsyncRange:
    """异步范围迭代器"""
    
    def __init__(self, start, stop=None, step=1):
        if stop is None:
            start, stop = 0, start
        self.start = start
        self.stop = stop
        self.step = step
        self.current = start
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if (self.step > 0 and self.current >= self.stop) or 
           (self.step  0 and self.current  self.stop):
            raise StopAsyncIteration
        
        value = self.current
        self.current += self.step
        
        # 模拟异步操作
        await asyncio.sleep(0.1)
        return value

class AsyncFileReader:
    """异步文件读取器"""
    
    def __init__(self, filename, chunk_size=1024):
        self.filename = filename
        self.chunk_size = chunk_size
    
    async def __aenter__(self):
        self.file = await aiofiles.open(self.filename, 'r')
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.file.close()
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        chunk = await self.file.read(self.chunk_size)
        if not chunk:
            raise StopAsyncIteration
        return chunk

# 使用异步迭代器
async def async_iterator_demo():
    """异步迭代器演示"""
    print("=== Async Iterator Demo ===")
    
    # 使用异步范围
    print("AsyncRange:")
    async for i in AsyncRange(5):
        print(f"  {i}")
    
    # 创建测试文件
    test_content = "Line 1nLine 2nLine 3nLine 4nLine 5"
    async with aiofiles.open("test_lines.txt", "w") as f:
        await f.write(test_content)
    
    # 异步逐行读取
    print("nAsync file reading:")
    async with aiofiles.open("test_lines.txt", "r") as f:
        async for line in f:
            print(f"  {line.strip()}")
    
    # 清理
    import os
    if os.path.exists("test_lines.txt"):
        os.remove("test_lines.txt")

# 自定义异步可迭代对象
class AsyncDataPipeline:
    """异步数据管道"""
    
    def __init__(self, data_sources):
        self.data_sources = data_sources
    
    def __aiter__(self):
        return self.process_data()
    
    async def process_data(self):
        """处理数据的异步生成器"""
        for source in self.data_sources:
            print(f"Processing source: {source}")
            # 模拟异步数据处理
            await asyncio.sleep(0.5)
            yield f"Processed data from {source}"

async def use_async_pipeline():
    """使用异步数据管道"""
    print("n=== Async Data Pipeline ===")
    
    sources = ["API 1", "Database", "File System", "Cache"]
    pipeline = AsyncDataPipeline(sources)
    
    async for data in pipeline:
        print(f"Received: {data}")

# 运行异步迭代器演示
asyncio.run(async_iterator_demo())
asyncio.run(use_async_pipeline())

5.2 异步生成器

import asyncio
import random

async def async_counter(start, stop, delay):
    """异步计数器生成器"""
    for i in range(start, stop):
        await asyncio.sleep(delay)
        yield i

async def async_fibonacci(n):
    """异步斐波那契生成器"""
    a, b = 0, 1
    for _ in range(n):
        await asyncio.sleep(0.1)
        yield a
        a, b = b, a + b

async def async_random_numbers(count, min_val, max_val):
    """异步随机数生成器"""
    for _ in range(count):
        await asyncio.sleep(random.uniform(0.1, 0.5))
        yield random.randint(min_val, max_val)

# 异步过滤和映射
async def async_filter(predicate, async_iterable):
    """异步过滤"""
    async for item in async_iterable:
        if await predicate(item) if asyncio.iscoroutine(predicate(item)) else predicate(item):
            yield item

async def async_map(function, async_iterable):
    """异步映射"""
    async for item in async_iterable:
        result = await function(item) if asyncio.iscoroutine(function(item)) else function(item)
        yield result

# 使用异步生成器
async def async_generator_demo():
    """异步生成器演示"""
    print("=== Async Generator Demo ===")
    
    # 基本异步生成器
    print("Async counter:")
    async for num in async_counter(0, 5, 0.2):
        print(f"  {num}")
    
    # 异步斐波那契
    print("nAsync Fibonacci:")
    async for fib in async_fibonacci(10):
        print(f"  {fib}", end=" ")
    print()
    
    # 异步随机数
    print("nAsync random numbers:")
    async for num in async_random_numbers(5, 1, 100):
        print(f"  {num}")

# 异步数据处理管道
async def async_data_pipeline():
    """异步数据处理管道"""
    print("n=== Async Data Pipeline ===")
    
    # 数据源
    async def data_source():
        for i in range(10):
            await asyncio.sleep(0.1)
            yield i
    
    # 异步过滤(偶数)
    async def is_even(x):
        await asyncio.sleep(0.05)
        return x % 2 == 0
    
    # 异步映射(平方)
    async def square(x):
        await asyncio.sleep(0.05)
        return x ** 2
    
    # 构建管道
    source = data_source()
    filtered = async_filter(is_even, source)
    transformed = async_map(square, filtered)
    
    # 处理结果
    results = []
    async for result in transformed:
        results.append(result)
        print(f"  {result}")
    
    print(f"Pipeline results: {results}")

# 异步批处理
async def async_batch_processor(batch_size=3):
    """异步批处理器"""
    print("n=== Async Batch Processor ===")
    
    async def data_generator():
        for i in range(10):
            yield f"item-{i}"
    
    batch = []
    async for item in data_generator():
        batch.append(item)
        
        if len(batch) >= batch_size:
            # 处理批次
            print(f"Processing batch: {batch}")
            await asyncio.sleep(0.5)  # 模拟批处理
            yield batch
            batch = []
    
    # 处理剩余项
    if batch:
        print(f"Processing final batch: {batch}")
        yield batch

async def use_batch_processor():
    """使用批处理器"""
    async for batch in async_batch_processor(batch_size=3):
        print(f"Processed batch: {batch}")

# 运行所有演示
async def run_all_async_generator_demos():
    await async_generator_demo()
    await async_data_pipeline()
    await use_batch_processor()

asyncio.run(run_all_async_generator_demos())

5.3 异步任务队列

import asyncio
import uuid
from datetime import datetime

class AsyncTaskQueue:
    """异步任务队列"""
    
    def __init__(self, max_workers=3):
        self.max_workers = max_workers
        self.tasks = asyncio.Queue()
        self.results = {}
        self.workers = []
        self.running = False
    
    async def start(self):
        """启动任务队列"""
        self.running = True
        self.workers = [
            asyncio.create_task(self.worker(i))
            for i in range(self.max_workers)
        ]
        print(f"Started {self.max_workers} workers")
    
    async def stop(self):
        """停止任务队列"""
        self.running = False
        
        # 等待所有任务完成
        await self.tasks.join()
        
        # 取消所有worker
        for worker in self.workers:
            worker.cancel()
        
        # 等待worker完成
        await asyncio.gather(*self.workers, return_exceptions=True)
        print("All workers stopped")
    
    async def add_task(self, task_func, *args, **kwargs):
        """添加任务"""
        task_id = str(uuid.uuid4())
        task_data = {
            'id': task_id,
            'func': task_func,
            'args': args,
            'kwargs': kwargs,
            'created_at': datetime.now()
        }
        
        await self.tasks.put(task_data)
        return task_id
    
    async def worker(self, worker_id):
        """工作进程"""
        while self.running:
            try:
                # 获取任务
                task_data = await asyncio.wait_for(self.tasks.get(), timeout=1.0)
                
                print(f"Worker {worker_id} processing task {task_data['id']}")
                
                # 执行任务
                result = await task_data['func'](
                    *task_data['args'],
                    **task_data['kwargs']
                )
                
                # 存储结果
                self.results[task_data['id']] = {
                    'result': result,
                    'completed_at': datetime.now()
                }
                
                # 标记任务完成
                self.tasks.task_done()
                
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                print(f"Worker {worker_id} error: {e}")
                self.tasks.task_done()
    
    async def get_result(self, task_id, timeout=None):
        """获取任务结果"""
        start_time = asyncio.get_event_loop().time()
        
        while task_id not in self.results:
            if timeout and (asyncio.get_event_loop().time() - start_time) > timeout:
                raise TimeoutError(f"Task {task_id} timeout")
            
            await asyncio.sleep(0.1)
        
        return self.results.pop(task_id)

# 示例任务函数
async def simulate_work(task_name, duration):
    """模拟工作任务"""
    print(f"Starting {task_name} (duration: {duration}s)")
    await asyncio.sleep(duration)
    print(f"Completed {task_name}")
    return f"Result of {task_name}"

async def cpu_intensive_task(task_name, iterations):
    """CPU密集型任务"""
    print(f"Starting CPU task {task_name}")
    result = 0
    for i in range(iterations):
        result += i ** 2
        # 让出控制权
        if i % 10000 == 0:
            await asyncio.sleep(0)
    print(f"Completed CPU task {task_name}")
    return f"CPU result: {result}"

# 使用异步任务队列
async def task_queue_demo():
    """任务队列演示"""
    print("=== Async Task Queue ===")
    
    # 创建任务队列
    queue = AsyncTaskQueue(max_workers=3)
    await queue.start()
    
    # 添加任务
    task_ids = []
    
    # I/O密集型任务
    for i in range(5):
        task_id = await queue.add_task(simulate_work, f"Task-{i}", random.uniform(0.5, 2.0))
        task_ids.append(task_id)
    
    # CPU密集型任务
    for i in range(3):
        task_id = await queue.add_task(cpu_intensive_task, f"CPUTask-{i}", 50000)
        task_ids.append(task_id)
    
    # 等待所有任务完成
    await queue.tasks.join()
    
    # 获取结果
    print("nTask results:")
    for task_id in task_ids:
        try:
            result = await queue.get_result(task_id)
            print(f"  {task_id}: {result['result']}")
        except Exception as e:
            print(f"  {task_id}: Error - {e}")
    
    # 停止队列
    await queue.stop()

# 优先级任务队列
class PriorityAsyncTaskQueue:
    """优先级异步任务队列"""
    
    def __init__(self, max_workers=3):
        self.max_workers = max_workers
        self.tasks = asyncio.PriorityQueue()
        self.results = {}
        self.workers = []
        self.running = False
        self.task_counter = 0
    
    async def add_task(self, priority, task_func, *args, **kwargs):
        """添加优先级任务"""
        self.task_counter += 1
        task_data = {
            'id': str(uuid.uuid4()),
            'priority': priority,
            'func': task_func,
            'args': args,
            'kwargs': kwargs,
            'sequence': self.task_counter
        }
        
        # 优先级队列使用 (priority, sequence) 作为键
        await self.tasks.put((priority, self.task_counter, task_data))
        return task_data['id']
    
    async def start(self):
        """启动队列"""
        self.running = True
        self.workers = [
            asyncio.create_task(self.worker(i))
            for i in range(self.max_workers)
        ]
    
    async def stop(self):
        """停止队列"""
        self.running = False
        
        # 添加停止信号
        for _ in range(self.max_workers):
            await self.tasks.put((999, 999, None))
        
        await asyncio.gather(*self.workers, return_exceptions=True)
    
    async def worker(self, worker_id):
        """工作进程"""
        while self.running:
            try:
                priority, sequence, task_data = await self.tasks.get()
                
                if task_data is None:  # 停止信号
                    break
                
                print(f"Worker {worker_id} processing priority {priority} task {task_data['id']}")
                
                # 执行任务
                result = await task_data['func'](
                    *task_data['args'],
                    **task_data['kwargs']
                )
                
                # 存储结果
                self.results[task_data['id']] = result
                
            except Exception as e:
                print(f"Worker {worker_id} error: {e}")

async def priority_queue_demo():
    """优先级队列演示"""
    print("n=== Priority Task Queue ===")
    
    queue = PriorityAsyncTaskQueue(max_workers=2)
    await queue.start()
    
    # 添加不同优先级的任务
    tasks = [
        (3, simulate_work, "Low Priority", 1.0),
        (1, simulate_work, "High Priority", 1.0),
        (2, simulate_work, "Medium Priority", 1.0),
        (1, simulate_work, "Another High", 1.0),
        (3, simulate_work, "Another Low", 1.0),
    ]
    
    task_ids = []
    for priority, func, *args in tasks:
        task_id = await queue.add_task(priority, func, *args)
        task_ids.append(task_id)
    
    # 等待完成
    await asyncio.sleep(3)
    
    # 停止队列
    await queue.stop()
    
    print("nPriority task results:")
    for task_id in task_ids:
        if task_id in queue.results:
            print(f"  {task_id}: {queue.results[task_id]}")

# 运行所有任务队列演示
async def run_all_task_queue_demos():
    await task_queue_demo()
    await priority_queue_demo()

asyncio.run(run_all_task_queue_demos())

6. 异步最佳实践

6.1 错误处理

import asyncio
import logging

# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

async def risky_operation(fail=False):
    """可能失败的操作"""
    await asyncio.sleep(0.1)
    if fail:
        raise ValueError("Operation failed")
    return "Success"

async def error_handling_demo():
    """错误处理演示"""
    print("=== Error Handling Demo ===")
    
    # 基本错误处理
    try:
        result = await risky_operation(fail=True)
        print(f"Result: {result}")
    except ValueError as e:
        print(f"Caught error: {e}")
    
    # 在gather中处理错误
    print("n--- Error Handling in gather ---")
    results = await asyncio.gather(
        risky_operation(),
        risky_operation(fail=True),
        risky_operation(),
        return_exceptions=True
    )
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"Task {i} failed: {result}")
        else:
            print(f"Task {i} succeeded: {result}")
    
    # 超时处理
    print("n--- Timeout Handling ---")
    try:
        result = await asyncio.wait_for(risky_operation(), timeout=0.05)
        print(f"Completed: {result}")
    except asyncio.TimeoutError:
        print("Operation timed out")
    
    # 取消处理
    print("n--- Cancellation Handling ---")
    async def long_task():
        try:
            for i in range(5):
                await asyncio.sleep(1)
                print(f"Working... {i+1}/5")
            return "Completed"
        except asyncio.CancelledError:
            print("Task was cancelled, cleaning up...")
            # 执行清理操作
            await asyncio.sleep(0.1)
            print("Cleanup done")
            raise
    
    task = asyncio.create_task(long_task())
    
    # 等待一段时间后取消
    await asyncio.sleep(2.5)
    task.cancel()
    
    try:
        result = await task
        print(f"Task result: {result}")
    except asyncio.CancelledError:
        print("Task was successfully cancelled")

# 运行错误处理演示
asyncio.run(error_handling_demo())

6.2 性能优化

import asyncio
import time
import aiohttp

# 连接池
async def connection_pool_demo():
    """连接池演示"""
    print("=== Connection Pool Demo ===")
    
    # 创建会话(自动管理连接池)
    connector = aiohttp.TCPConnector(
        limit=100,          # 总连接数限制
        limit_per_host=30,  # 每个主机的连接数限制
        ttl_dns_cache=300,  # DNS缓存时间
        use_dns_cache=True,
    )
    
    timeout = aiohttp.ClientTimeout(
        total=30,           # 总超时时间
        connect=10,         # 连接超时时间
        sock_read=10        # 读取超时时间
    )
    
    async with aiohttp.ClientSession(
        connector=connector,
        timeout=timeout
    ) as session:
        urls = [f"https://httpbin.org/delay/1" for _ in range(10)]
        
        start_time = time.time()
        
        tasks = [fetch_with_session(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        
        end_time = time.time()
        
        print(f"Fetched {len(results)} URLs in {end_time - start_time:.2f}s")
        print(f"Average time per request: {(end_time - start_time) / len(results):.2f}s")

async def fetch_with_session(session, url):
    """使用会话获取URL"""
    async with session.get(url) as response:
        content = await response.text()
        return len(content)

# 批量处理优化
async def batch_processing_demo():
    """批量处理演示"""
    print("n=== Batch Processing Demo ===")
    
    async def process_item(item):
        """处理单个项目"""
        await asyncio.sleep(0.1)  # 模拟处理时间
        return item * 2
    
    async def process_batch(items):
        """批量处理"""
        return await asyncio.gather(*[process_item(item) for item in items])
    
    # 大数据集
    large_dataset = list(range(1000))
    batch_size = 100
    
    start_time = time.time()
    
    # 分批处理
    results = []
    for i in range(0, len(large_dataset), batch_size):
        batch = large_dataset[i:i + batch_size]
        batch_results = await process_batch(batch)
        results.extend(batch_results)
    
    end_time = time.time()
    
    print(f"Processed {len(results)} items in {end_time - start_time:.2f}s")
    print(f"Sample results: {results[:10]}")

# 缓存优化
async def cache_optimization_demo():
    """缓存优化演示"""
    print("n=== Cache Optimization Demo ===")
    
    class AsyncLRUCache:
        """异步LRU缓存"""
        
        def __init__(self, max_size=100, ttl=60):
            self.max_size = max_size
            self.ttl = ttl
            self.cache = {}
            self.access_order = []
            self.lock = asyncio.Lock()
        
        async def get(self, key):
            async with self.lock:
                if key in self.cache:
                    # 移动到访问顺序末尾
                    self.access_order.remove(key)
                    self.access_order.append(key)
                    return self.cache[key]['value']
                return None
        
        async def set(self, key, value):
            async with self.lock:
                current_time = time.time()
                
                # 清理过期项
                self._clean_expired(current_time)
                
                # 如果已满,移除最久未使用的
                if len(self.cache) >= self.max_size and key not in self.cache:
                    oldest_key = self.access_order.pop(0)
                    del self.cache[oldest_key]
                
                # 设置新值
                self.cache[key] = {
                    'value': value,
                    'timestamp': current_time
                }
                
                if key in self.access_order:
                    self.access_order.remove(key)
                self.access_order.append(key)
        
        def _clean_expired(self, current_time):
            """清理过期项"""
            expired_keys = []
            for key, item in self.cache.items():
                if current_time - item['timestamp'] > self.ttl:
                    expired_keys.append(key)
            
            for key in expired_keys:
                del self.cache[key]
                if key in self.access_order:
                    self.access_order.remove(key)
    
    # 使用异步缓存
    cache = AsyncLRUCache(max_size=5, ttl=2)
    
    async def expensive_operation(key):
        """昂贵的操作"""
        await asyncio.sleep(0.5)  # 模拟耗时操作
        return f"Result for {key}"
    
    async def cached_operation(key):
        """带缓存的操作"""
        # 检查缓存
        cached_result = await cache.get(key)
        if cached_result is not None:
            print(f"Cache hit for {key}")
            return cached_result
        
        # 执行昂贵操作
        print(f"Cache miss for {key}")
        result = await expensive_operation(key)
        
        # 缓存结果
        await cache.set(key, result)
        return result
    
    # 测试缓存
    keys = ["key1", "key2", "key3", "key1", "key2"]  # 重复键测试缓存
    
    start_time = time.time()
    results = await asyncio.gather(*[cached_operation(key) for key in keys])
    end_time = time.time()
    
    print(f"Results: {results}")
    print(f"Total time: {end_time - start_time:.2f}s")
    
    # 等待缓存过期
    await asyncio.sleep(3)
    
    # 再次测试(应该全部缓存未命中)
    start_time = time.time()
    results = await asyncio.gather(*[cached_operation(key) for key in keys])
    end_time = time.time()
    
    print(f"After expiration - Results: {results}")
    print(f"Total time: {end_time - start_time:.2f}s")

# 运行性能优化演示
async def run_performance_demos():
    await connection_pool_demo()
    await batch_processing_demo()
    await cache_optimization_demo()

# 注意:需要安装aiohttp
# pip install aiohttp
# asyncio.run(run_performance_demos())

6.3 调试和监控

import asyncio
import time
import logging
from contextlib import asynccontextmanager

# 配置日志
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

# 异步任务调试装饰器
def debug_async_task(func):
    """调试异步任务的装饰器"""
    async def wrapper(*args, **kwargs):
        task_name = func.__name__
        task_id = id(asyncio.current_task())
        
        logging.info(f"[{task_id}] Starting {task_name}")
        start_time = time.time()
        
        try:
            result = await func(*args, **kwargs)
            end_time = time.time()
            logging.info(f"[{task_id}] Completed {task_name} in {end_time - start_time:.3f}s")
            return result
        except Exception as e:
            end_time = time.time()
            logging.error(f"[{task_id}] Failed {task_name} in {end_time - start_time:.3f}s: {e}")
            raise
    
    return wrapper

# 性能监控上下文管理器
@asynccontextmanager
async def performance_monitor(name):
    """性能监控上下文管理器"""
    start_time = time.time()
    task_id = id(asyncio.current_task())
    
    logging.info(f"[{task_id}] Entering {name}")
    
    try:
        yield
    finally:
        end_time = time.time()
        logging.info(f"[{task_id}] Exiting {name} (duration: {end_time - start_time:.3f}s)")

# 任务追踪器
class AsyncTaskTracker:
    """异步任务追踪器"""
    
    def __init__(self):
        self.tasks = {}
        self.lock = asyncio.Lock()
    
    async def track_task(self, coro, name=None):
        """追踪任务"""
        if name is None:
            name = coro.__name__ if hasattr(coro, '__name__') else 'Unknown'
        
        task = asyncio.create_task(coro)
        task_id = id(task)
        
        async with self.lock:
            self.tasks[task_id] = {
                'name': name,
                'start_time': time.time(),
                'status': 'running'
            }
        
        # 添加完成回调
        task.add_done_callback(
            lambda t: asyncio.create_task(self._update_task_status(t))
        )
        
        return task
    
    async def _update_task_status(self, task):
        """更新任务状态"""
        task_id = id(task)
        
        async with self.lock:
            if task_id in self.tasks:
                task_info = self.tasks[task_id]
                task_info['end_time'] = time.time()
                task_info['duration'] = task_info['end_time'] - task_info['start_time']
                
                if task.cancelled():
                    task_info['status'] = 'cancelled'
                elif task.exception():
                    task_info['status'] = 'failed'
                    task_info['error'] = str(task.exception())
                else:
                    task_info['status'] = 'completed'
    
    def get_stats(self):
        """获取统计信息"""
        stats = {
            'total': len(self.tasks),
            'running': 0,
            'completed': 0,
            'failed': 0,
            'cancelled': 0
        }
        
        for task_info in self.tasks.values():
            stats[task_info['status']] += 1
        
        return stats
    
    def print_summary(self):
        """打印摘要"""
        stats = self.get_stats()
        print(f"nTask Summary:")
        print(f"  Total: {stats['total']}")
        print(f"  Running: {stats['running']}")
        print(f"  Completed: {stats['completed']}")
        print(f"  Failed: {stats['failed']}")
        print(f"  Cancelled: {stats['cancelled']}")
        
        # 打印详细信息
        print("nTask Details:")
        for task_id, info in self.tasks.items():
            status = info['status']
            name = info['name']
            
            if status == 'completed' and 'duration' in info:
                print(f"  {name}: {status} ({info['duration']:.3f}s)")
            elif status == 'failed' and 'error' in info:
                print(f"  {name}: {status} - {info['error']}")
            else:
                print(f"  {name}: {status}")

# 调试演示
async def debugging_demo():
    """调试演示"""
    print("=== Async Debugging and Monitoring ===")
    
    # 使用调试装饰器
    @debug_async_task
    async def sample_task(name, duration):
        await asyncio.sleep(duration)
        return f"Task {name} completed"
    
    # 使用性能监控
    async def monitored_task():
        async with performance_monitor("Database Query"):
            await asyncio.sleep(0.5)
        
        async with performance_monitor("API Call"):
            await asyncio.sleep(0.3)
    
    # 使用任务追踪器
    tracker = AsyncTaskTracker()
    
    # 创建多个任务
    tasks = []
    for i in range(5):
        task = await tracker.track_task(
            sample_task(f"Task-{i}", random.uniform(0.1, 0.5)),
            name=f"SampleTask-{i}"
        )
        tasks.append(task)
    
    # 添加监控任务
    monitor_task = await tracker.track_task(
        monitored_task(),
        name="MonitoredTask"
    )
    tasks.append(monitor_task)
    
    # 添加一个会失败的任务
    async def failing_task():
        await asyncio.sleep(0.2)
        raise ValueError("Simulated failure")
    
    fail_task = await tracker.track_task(
        failing_task(),
        name="FailingTask"
    )
    tasks.append(fail_task)
    
    # 等待所有任务完成
    await asyncio.gather(*tasks, return_exceptions=True)
    
    # 打印摘要
    tracker.print_summary()

# 运行调试演示
asyncio.run(debugging_demo())

总结

本章深入介绍了Python异步编程的核心概念:

  • 异步编程的基础概念和优势
  • 协程的创建和使用
  • 任务(Task)和Future的使用
  • 并发执行模式(gather、wait等)
  • 异步I/O操作(文件、网络、数据库)
  • 异步上下文管理器
  • 高级异步模式(迭代器、生成器、任务队列)
  • 异步编程的最佳实践(错误处理、性能优化、调试监控)

掌握异步编程能够让你编写高性能的I/O密集型应用,特别适合网络服务、Web应用和数据处理等场景。

文章来源于互联网:Python-12高阶编程之异步编程

相关推荐: 百度文心一言应用:L1B3RT45中文提示词构造技巧

百度文心一言应用:L1B3RT45中文提示词构造技巧 【免费下载链接】L1B3RT45 J41LBR34K PR0MPT5 项目地址: https://gitcode.com/GitHub_Trending/l1/L1B3RT45 引言:中文AI交互的痛点与突…

赞(0)
未经允许不得转载:5bei.cn大模型教程网 » Python-12高阶编程之异步编程
分享到: 更多 (0)

AI大模型,我们的未来

小欢软考联系我们