AI大模型教程
一起来学习

stable-diffusion-v1-5模型训练简易教程(含代码)

Stable Diffusion v1.5 是一种基于扩散模型的生成模型,广泛用于图像生成任务。

训练参考代码:

import os
import torch
import json
import numpy as np
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
from diffusers import AutoencoderKL, UNet2DConditionModel, LMSDiscreteScheduler, DDPMScheduler, PNDMScheduler
from transformers import CLIPTextModel, CLIPTokenizer
from accelerate import Accelerator
from tqdm import tqdm
import torch.nn.functional as F
from diffusers.optimization import get_scheduler
import itertools

# ==== 1. 配置参数 ====
pretrained_model_name_or_path = "runwayml/stable-diffusion-v1-5"
jsonl_path = "train/metadata.jsonl"
image_folder = "train"       # 图片目录a
output_dir = "finetuned_sd15"
logs_dir = os.path.join(output_dir, "logs")  # 添加日志目录
samples_dir = os.path.join(output_dir, "samples")  # 添加采样图片目录
os.makedirs(logs_dir, exist_ok=True)  # 创建日志目录
os.makedirs(samples_dir, exist_ok=True)  # 创建采样图片目录
resolution = 512
train_batch_size = 1
max_train_steps = 200   #
save_samples_every = 10  # 每多少步生成一次样本图片
save_model_every = 30   # 每多少步保存一次模型

# 学习率参数
learning_rate = 1e-5
vae_lr = 1e-6  # VAE通常需要更小的学习率
text_encoder_lr = 5e-6  # text encoder学习率
lr_scheduler_type = "cosine"  # 余弦退火学习率
lr_warmup_steps = 50  # 学习率预热步数
lr_num_cycles = 3     # 余弦退火周期数

# 训练参数
train_vae = True        # 是否训练VAE
train_text_encoder = True  # 是否训练文本编码器

# 生成采样图片参数
num_inference_steps = 150  # 推理时的采样步数
guidance_scale = 8     # 分类器引导系数
sample_prompts = [
    "A Chinese figure painting with ink leaves and gourds as the background depicts an old people resting.",
    "This is a Chinese figure painting featuring ink leaves and gourds as the backdrop, depicting an elderly man at rest."
]  # 用于生成样本的提示词

seed = 1234
torch.manual_seed(seed)
np.random.seed(seed)

# ==== 2. 数据集准备 ====
class JsonlPromptImageDataset(Dataset):
    def __init__(self, jsonl_path, image_folder, size=512):
        # 读取jsonl
        self.samples = []
        with open(jsonl_path, "r", encoding="utf8") as f:
            for line in f:
                line = line.strip()
                if line:
                    obj = json.loads(line)
                    img_path = os.path.join(image_folder, obj["file_name"])
                    prompt = obj["text"]
                    self.samples.append({"image": img_path, "prompt": prompt})
        self.transform = transforms.Compose([
            transforms.Resize((size, size), interpolation=transforms.InterpolationMode.BILINEAR),
            transforms.ToTensor(),
            transforms.Normalize([0.5], [0.5]),
        ])
    def __len__(self):
        return len(self.samples)
    def __getitem__(self, idx):
        d = self.samples[idx]
        image = Image.open(d["image"]).convert("RGB")
        image = self.transform(image)
        return {"pixel_values": image, "prompt": d["prompt"]}

dataset = JsonlPromptImageDataset(jsonl_path, image_folder, size=resolution)
dataloader = DataLoader(dataset, batch_size=train_batch_size, shuffle=True)
print(f"数据集样本数: {len(dataset)}")

# ==== 3. 加载模型组件 ====
tokenizer = CLIPTokenizer.from_pretrained(pretrained_model_name_or_path, subfolder="tokenizer")
text_encoder = CLIPTextModel.from_pretrained(pretrained_model_name_or_path, subfolder="text_encoder")
vae = AutoencoderKL.from_pretrained(pretrained_model_name_or_path, subfolder="vae")
unet = UNet2DConditionModel.from_pretrained(pretrained_model_name_or_path, subfolder="unet")
noise_scheduler = LMSDiscreteScheduler.from_pretrained(pretrained_model_name_or_path, subfolder="scheduler")

# 创建用于推理的调度器
inference_scheduler = PNDMScheduler.from_pretrained(pretrained_model_name_or_path, subfolder="scheduler")

# ==== 4. Accelerator ====
accelerator = Accelerator(mixed_precision="fp16")  # 如无A卡或不支持fp16,则为"no"
vae, unet, text_encoder, dataloader = accelerator.prepare(vae, unet, text_encoder, dataloader)

# 设置模型训练状态
vae.train() if train_vae else vae.eval()
text_encoder.train() if train_text_encoder else text_encoder.eval()
unet.train()

# 设置参数梯度要求
for p in vae.parameters():
    p.requires_grad = train_vae
for p in text_encoder.parameters():
    p.requires_grad = train_text_encoder
for p in unet.parameters():
    p.requires_grad = True

# 创建优化器,每个组件可以有不同的学习率
optimizer_grouped_params = [
    {"params": unet.parameters(), "lr": learning_rate},
]
if train_vae:
    optimizer_grouped_params.append({"params": vae.parameters(), "lr": vae_lr})
if train_text_encoder:
    optimizer_grouped_params.append({"params": text_encoder.parameters(), "lr": text_encoder_lr})

optimizer = torch.optim.AdamW(optimizer_grouped_params)

# 创建学习率调度器
lr_scheduler = get_scheduler(
    lr_scheduler_type,
    optimizer=optimizer,
    num_warmup_steps=lr_warmup_steps,
    num_training_steps=max_train_steps,
    num_cycles=lr_num_cycles,
)

# 准备优化器
optimizer, lr_scheduler = accelerator.prepare(optimizer, lr_scheduler)

# ==== 5. 辅助函数 ====
def generate_samples(step):
    """生成并保存样本图像"""
    # 准备模型推理
    unet_eval = accelerator.unwrap_model(unet)
    text_encoder_eval = accelerator.unwrap_model(text_encoder)
    vae_eval = accelerator.unwrap_model(vae)
    
    # 临时保存到CPU,设置为eval模式
    unet_eval = unet_eval.to("cpu").eval()
    text_encoder_eval = text_encoder_eval.to("cpu").eval()
    vae_eval = vae_eval.to("cpu").eval()
    
    # 生成每个提示词的图像
    plt.figure(figsize=(12, 4 * len(sample_prompts)))
    
    for i, prompt in enumerate(sample_prompts):
        # 设置随机种子以便结果可比
        torch.manual_seed(seed + i)
        
        # 准备提示词
        text_input = tokenizer(
            [prompt], padding="max_length", max_length=tokenizer.model_max_length, 
            truncation=True, return_tensors="pt"
        )
        
        with torch.no_grad():
            text_embeddings = text_encoder_eval(text_input.input_ids)[0]
        
        # 无条件嵌入用于分类器引导
        uncond_input = tokenizer(
            [""], padding="max_length", max_length=tokenizer.model_max_length, 
            truncation=True, return_tensors="pt"
        )
        
        with torch.no_grad():
            uncond_embeddings = text_encoder_eval(uncond_input.input_ids)[0]
        
        # 合并嵌入以允许分类器引导
        text_embeddings = torch.cat([uncond_embeddings, text_embeddings])
        
        # 初始化随机潜在向量
        latents = torch.randn((1, 4, resolution // 8, resolution // 8))
        
        # 设置推理参数
        inference_scheduler.set_timesteps(num_inference_steps)
        
        # 去噪迭代
        latents = latents * inference_scheduler.init_noise_sigma
        for t in inference_scheduler.timesteps:
            # 扩展latent以进行分类器引导
            latent_model_input = torch.cat([latents] * 2)
            
            # 预测噪声
            with torch.no_grad():
                noise_pred = unet_eval(
                    latent_model_input, t, encoder_hidden_states=text_embeddings
                ).sample
            
            # 执行分类器引导
            noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
            noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond)
            
            # 去噪步骤
            latents = inference_scheduler.step(noise_pred, t, latents).prev_sample
        
        # 解码潜在表示
        with torch.no_grad():
            images = vae_eval.decode(latents / 0.18215).sample
        
        # 转换到PIL图像
        images = (images / 2 + 0.5).clamp(0, 1)
        images = images.permute(0, 2, 3, 1).numpy()
        
        # 保存图像
        plt.subplot(len(sample_prompts), 1, i + 1)
        plt.imshow(images[0])
        plt.title(f"Prompt: {prompt}")
        plt.axis("off")
    
    # 保存完整样本图
    plt.tight_layout()
    plt.savefig(os.path.join(samples_dir, f"sample_step_{step}.png"))
    plt.close()
    
    # 将模型送回设备,恢复训练模式
    unet_eval = unet_eval.to(accelerator.device)
    text_encoder_eval = text_encoder_eval.to(accelerator.device)
    vae_eval = vae_eval.to(accelerator.device)
    
    if train_vae:
        vae.train()
    if train_text_encoder:
        text_encoder.train()
    unet.train()

def save_models(step):
    """保存模型检查点"""
    checkpoint_dir = os.path.join(output_dir, f"checkpoint-{step}")
    os.makedirs(checkpoint_dir, exist_ok=True)
    
    # 保存UNet
    accelerator.unwrap_model(unet).save_pretrained(os.path.join(checkpoint_dir, "unet"))
    
    # 保存VAE
    if train_vae:
        accelerator.unwrap_model(vae).save_pretrained(os.path.join(checkpoint_dir, "vae"))
    
    # 保存文本编码器
    if train_text_encoder:
        accelerator.unwrap_model(text_encoder).save_pretrained(os.path.join(checkpoint_dir, "text_encoder"))
    
    print(f"模型检查点已保存至 {checkpoint_dir}")

# ==== 6. 训练循环 ====
progress_bar = tqdm(range(max_train_steps), desc="Fine-tuning (SD)")

# 添加训练日志记录
loss_log = []
lr_log = {
    "unet_lr": [],
    "vae_lr": [] if train_vae else None,
    "text_encoder_lr": [] if train_text_encoder else None
}
steps_log = []

global_step = 0
while global_step  max_train_steps:
    for batch in dataloader:
        with accelerator.accumulate(unet):
            pixel_values = batch["pixel_values"]
            prompts = batch["prompt"]

            # 1. tokenize不同prompt
            input_ids = tokenizer(list(prompts), padding="max_length", truncation=True, max_length=tokenizer.model_max_length, return_tensors="pt").input_ids
            input_ids = input_ids.to(accelerator.device)
            
            # 2. 获取文本嵌入
            if train_text_encoder:
                encoder_hidden_states = text_encoder(input_ids)[0]
            else:
                with torch.no_grad():
                    encoder_hidden_states = text_encoder(input_ids)[0]

            # 3. encode图像为latent
            if train_vae:
                latents = vae.encode(pixel_values.to(dtype=vae.dtype)).latent_dist.sample()
                latents = latents * 0.18215
            else:
                with torch.no_grad():
                    latents = vae.encode(pixel_values.to(dtype=vae.dtype)).latent_dist.sample()
                    latents = latents * 0.18215

            # 4. 添加噪声
            noise = torch.randn_like(latents)
            bsz = latents.shape[0]
            timesteps = torch.randint(0, noise_scheduler.config.num_train_timesteps, (bsz,), device=latents.device).long()
            noisy_latents = noise_scheduler.add_noise(latents, noise, timesteps)

            # 5. UNet预测噪声
            noise_pred = unet(noisy_latents, timesteps, encoder_hidden_states).sample

            # 6. 计算损失
            loss = F.mse_loss(noise_pred.float(), noise.float(), reduction="mean")

            # 7. 反向传播
            accelerator.backward(loss)
            
            # 8. 梯度裁剪(可选但推荐用于稳定训练)
            if accelerator.sync_gradients:
                accelerator.clip_grad_norm_(
                    itertools.chain(
                        unet.parameters(),
                        *([vae.parameters()] if train_vae else []),
                        *([text_encoder.parameters()] if train_text_encoder else [])
                    ),
                    max_norm=1.0
                )
            
            # 9. 优化器和学习率调度器步骤
            optimizer.step()
            lr_scheduler.step()
            optimizer.zero_grad()

            # 记录训练日志
            if accelerator.is_main_process:
                current_loss = loss.detach().item()
                
                # 记录各模块的学习率
                lrs = {
                    "unet_lr": optimizer.param_groups[0]["lr"]
                }
                if train_vae:
                    lrs["vae_lr"] = optimizer.param_groups[1]["lr"]
                if train_text_encoder:
                    idx = 2 if train_vae else 1
                    lrs["text_encoder_lr"] = optimizer.param_groups[idx]["lr"]
                
                loss_log.append(current_loss)
                steps_log.append(global_step)
                
                for key, value in lrs.items():
                    if lr_log.get(key) is not None:
                        lr_log[key].append(value)
                
                # 更新进度条
                progress_bar.update(1)
                progress_bar.set_postfix(loss=current_loss, lr=lrs["unet_lr"])
            
            # 10. 生成样本图像
            if accelerator.is_main_process and global_step > 0 and global_step % save_samples_every == 0:
                generate_samples(global_step)
            
            # 11. 保存模型检查点
            if accelerator.is_main_process and global_step > 0 and global_step % save_model_every == 0:
                save_models(global_step)
            
            global_step += 1
            if global_step >= max_train_steps:
                break

# ==== 7. 保存最终模型 ====
if accelerator.is_main_process:
    # 保存最终模型
    accelerator.unwrap_model(unet).save_pretrained(os.path.join(output_dir, "unet"))
    if train_vae:
        accelerator.unwrap_model(vae).save_pretrained(os.path.join(output_dir, "vae"))
    if train_text_encoder:
        accelerator.unwrap_model(text_encoder).save_pretrained(os.path.join(output_dir, "text_encoder"))
    
    # 生成最终样本
    generate_samples(global_step)
    
    # ==== 8. 可视化训练结果 ====
    plt.figure(figsize=(12, 10))
    
    # 绘制损失曲线
    plt.subplot(2, 1, 1)
    plt.plot(steps_log, loss_log)
    plt.title('Training Loss')
    plt.xlabel('Steps')
    plt.ylabel('Loss')
    plt.grid(True)
    
    # 绘制学习率曲线
    plt.subplot(2, 1, 2)
    
    # 绘制UNet学习率
    plt.plot(steps_log, lr_log["unet_lr"], label="UNet LR")
    
    # 绘制VAE学习率(如果适用)
    if train_vae and lr_log["vae_lr"]:
        plt.plot(steps_log, lr_log["vae_lr"], label="VAE LR")
    
    # 绘制文本编码器学习率(如果适用)
    if train_text_encoder and lr_log["text_encoder_lr"]:
        plt.plot(steps_log, lr_log["text_encoder_lr"], label="Text Encoder LR")
    
    plt.title('Learning Rates')
    plt.xlabel('Steps')
    plt.ylabel('Learning Rate')
    plt.legend()
    plt.grid(True)
    
    # 保存图表
    plt.tight_layout()
    plt.savefig(os.path.join(logs_dir, 'training_metrics.png'))
    
    # 保存训练日志数据
    np.savez(
        os.path.join(logs_dir, 'training_logs.npz'),
        steps=np.array(steps_log),
        loss=np.array(loss_log),
        unet_lr=np.array(lr_log["unet_lr"]),
        vae_lr=np.array(lr_log["vae_lr"]) if train_vae and lr_log["vae_lr"] else None,
        text_encoder_lr=np.array(lr_log["text_encoder_lr"]) if train_text_encoder and lr_log["text_encoder_lr"] else None
    )
    
    print(f"训练可视化结果已保存至: {logs_dir}")
    print(f"采样图像已保存至: {samples_dir}")

print("模型已保存至:", output_dir)

打个小广告,超好用的ai集成网站,谁用谁知道。

发现一个能无限白嫖Gemini 2.5 Pro的隐藏入口,推理逆天,万字长文直出,再也回不去DeepSeek-R1了。入口地址:https://askmany.cn/login?i=e4fd1c61

文章来源于互联网:stable-diffusion-v1-5模型训练简易教程(含代码)

相关推荐: 查准查全与文本生成:AI写作技术提高查准查全率

1.背景介绍 在当今的大数据时代,信息爆炸,人类生活中的数据量已经超过了人类能够理解和处理的范围。因此,如何有效地查找和获取相关信息成为了人类的一个重要需求。传统的搜索引擎虽然提供了一定的帮助,但是由于数据量的增加和信息的多样性,传统的搜索引擎在查准和查全方面…

赞(0)
未经允许不得转载:5bei.cn大模型教程网 » stable-diffusion-v1-5模型训练简易教程(含代码)
分享到: 更多 (0)

AI大模型,我们的未来

小欢软考联系我们