深入淺出了解生成模型-5:diffuser/accelerate庫學(xué)習(xí)及其數(shù)據(jù)合成
本文不定期更新,最新內(nèi)容:https://www.big-yellow-j.top/posts/2025/06/25/accelerate-diffusers.html
工欲善其事,必先利其器。即便介紹了再多生成模型,沒有趁手的工具也難以施展才華。因此,本文將重點(diǎn)介紹幾個(gè)在生成模型開發(fā)中常用的 Python 庫,著重講解 Diffusers 和 Accelerate 的基本使用。感謝 Hugging Face 為無數(shù)算法工程師提供了強(qiáng)大的開源支持!需要注意的是,官方文檔對(duì)這兩個(gè)庫已有詳盡的說明,本文僅作為一篇簡明的使用筆記,拋磚引玉,供參考和交流。
accelerate
推薦直接閱讀官方文檔:https://huggingface.co/docs/accelerate/index
pip install accelerate
介紹之前了解一下這個(gè)庫是干什么的:這個(gè)庫主要提供一個(gè)快速的分布式訓(xùn)練(避免了直接用torch進(jìn)行手搓)并且支持各類加速方法:混合精度訓(xùn)練、Deepspeed、梯度累計(jì)等
一個(gè)基本使用場(chǎng)景
一般任務(wù)中一個(gè)常見的應(yīng)用場(chǎng)景是:需要實(shí)現(xiàn)一個(gè)多顯卡(這里假設(shè)為雙顯卡)分布式訓(xùn)練,并且使用梯度累計(jì)、混合精度訓(xùn)練,并且訓(xùn)練得到的結(jié)果通過tensorboard/wandb進(jìn)行記錄,除此之外還需要使用warm-up學(xué)習(xí)率調(diào)整策略,并且我的模型不同模塊使用的學(xué)習(xí)率不同,訓(xùn)練完成之后所有的模型權(quán)重要進(jìn)行保存/讀取權(quán)重進(jìn)行測(cè)試。那么可以直接通過下面代碼進(jìn)行實(shí)現(xiàn)(部分庫的導(dǎo)入以及一些參數(shù)比如說config直接忽略)
from accelerate import Accelerator
kwargs_handlers=[DistributedDataParallelKwargs(find_unused_parameters=True)] # 不是必須的
# Step-1 首先初始化 accelerate
accelerator = Accelerator(mixed_precision= 'fp16',
gradient_accumulation_steps= 2,
log_with= ['tensorboard', 'wandb'], # 一般來說用一個(gè)即可
project_dir=os.path.join(config.output_dir, "logs"),
kwargs_handlers= kwargs_handlers
)
# 僅在主線程上創(chuàng)建文件夾
if accelerator.is_main_process:
os.makedirs(config.output_dir, exist_ok=True)
# 初始化一個(gè)實(shí)驗(yàn)記錄器(此處內(nèi)容需要注意?)
# accelerator.init_trackers(f"Train-{config.training}")
log_name = 'Model-Test'
accelerator.init_trackers(
project_name= f"Page-Layout-Analysis-{config.pred_heads}",
init_kwargs={
"wandb": {
"name": log_name,
"dir": os.path.join(config.output_dir, "logs"),
"config": vars(config)
}
}
)
# Step-2 初始化完成之后可以直接將我們需要的內(nèi)容通過 accelerator.prepare 進(jìn)行處理
optimizer = torch.optim.AdamW([
{'params': model.image_model.parameters(), 'lr': 2e-5, 'weight_decay': 1e-4},
{'params': model.text_model.parameters(), 'lr': 4e-5},
{'params': [p for n, p in model.named_parameters()
if 'image_model' not in n and 'text_model' not in n],
'lr': config.learning_rate, 'weight_decay': 1e-4},
])
total_steps = config.epochs * len(train_dataloader)
warmup_steps = int(0.15 * total_steps)
# Warmup 調(diào)度器:從 0.1*lr 線性增加到 lr
warmup_scheduler = torch.optim.lr_scheduler.LinearLR(optimizer,
start_factor=0.1,
total_iters=warmup_steps
)
# 余弦退火調(diào)度器:添加 eta_min 防止學(xué)習(xí)率過低
cosine_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer,
T_max=total_steps - warmup_steps,
eta_min=1e-6
)
cosine_scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer,
T_max=total_steps - warmup_steps)
lr_scheduler = torch.optim.lr_scheduler.SequentialLR(
optimizer,
schedulers=[warmup_scheduler, cosine_scheduler],
milestones=[warmup_steps]
)
dataloader, model, optimizer, scheduler = accelerator.prepare(dataloader, model, optimizer, scheduler)
# Step-3 模型訓(xùn)練以及模型優(yōu)化
total_data = len(dataloader)
for i, batch in enumerate(dataloader):
with accelerator.accumulate(model): # 梯度累計(jì)
inputs, targets = batch
# 下面兩句可以不用,但是習(xí)慣還是直接使用
inputs = inputs.to(accelerator.device)
targets = targets.to(accelerator.device)
outputs = model(inputs)
loss = loss_function(outputs, targets)
accelerator.backward(loss)
if accelerator.sync_gradients: # 進(jìn)行梯度裁剪
accelerator.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
scheduler.step()
optimizer.zero_grad()
# 記錄一下實(shí)驗(yàn)結(jié)果
logs = {
"Train/loss": loss.item(),
"Train/lr": optimizer.param_groups[0]['lr'], # 這里是假設(shè)模型使用的優(yōu)化學(xué)習(xí)率不同 或者直接使用 scheduler.get_last_lr()[0]
"Train/ACC": acc,
}
progress_bar.set_postfix(
loss=loss.item(),
acc=acc, f1=f1)
accelerator.log(logs, step= epoch* total_data+ i)
# Step-3 同步不同進(jìn)程
accelerator.wait_for_everyone()
if accelerator.is_main_process:
model = accelerator.unwrap_model(model)
model.save_pretrained(os.path.join(args.output_dir, "model"))
accelerator.end_training()
不過對(duì)于上面的代碼需要注意如下幾個(gè)內(nèi)容
1、追蹤器使用:一般多顯卡使用過程中通過使用 accelerator.end_training() 去結(jié)束 tracker
2、tqdm使用:一般只需要主進(jìn)程進(jìn)行顯示進(jìn)度條,因此一般直接:tqdm(..., disable=not accelerator.is_local_main_process)
diffuser
推薦直接閱讀官方文檔:https://huggingface.co/docs/diffusers/main/en/index
pip install git+https://github.com/huggingface/diffusers
基本使用
對(duì)于Diffusion Model原理理解可以參考,以及直接通過下面訓(xùn)練一個(gè)Diffusion Model代碼(代碼不一定很規(guī)范)進(jìn)行解釋。
from diffusers import DDPMScheduler
noise_scheduler = DDPMScheduler(num_train_timesteps= config.num_train_timesteps,
beta_start= config.beta_start, # 兩個(gè)beta代表加噪權(quán)重
beta_end= config.beta_end,
beta_schedule= 'scaled_linear')
...
# training
for epoch in range(config.epochs):
for i, batch in enumerate(train_dataloader):
image = batch["images"]
...
timesteps = torch.randint(0, noise_scheduler.config.num_train_timesteps,
(image.shape[0],),
device=image.device,
dtype=torch.int64)
noise = torch.randn(image.shape, device= accelerator.device)
noise_image = noise_scheduler.add_noise(image, noise, timesteps)
...
noise_pred = model(noise_image, timesteps)
loss = F.mse_loss(noise_pred, noise)
...
# eva
def evaluate(..., noise_scheduler, ):
...
noise = torch.randn((config.eval_batch_size, config.channel, config.image_size, config.image_size)) # 可以選擇固定隨機(jī)數(shù)種子
for t in noise_scheduler.timesteps:
t_tensor = torch.full((noise.shape[0],),
t,
dtype=torch.long,
device= device)
predicted_noise = model(noise, t_tensor, text_label)
noise = noise_scheduler.step(predicted_noise, t, noise).prev_sample
images = (noise.clamp(-1, 1) + 1) / 2
...
訓(xùn)練過程
1、加噪處理:通過選擇使用DDPM/DDIM而后將生成的"確定的噪聲"添加到圖片上 noise_scheduler.add_noise(image, noise, timesteps)

2、模型預(yù)測(cè):通過模型去預(yù)測(cè)所添加的噪聲并且計(jì)算loss
生成過程
3、逐步解噪:訓(xùn)練好的模型逐步預(yù)測(cè)噪聲之后將其從噪聲圖片中將噪聲剝離出來
1、Scheduler
https://huggingface.co/docs/diffusers/api/schedulers/overview
更加詳細(xì)的描述:https://www.big-yellow-j.top/posts/2025/07/06/DFscheduler.html
以DDPMScheduler為例主要使用兩個(gè)功能:
1、add_noise(輸入:sample、noise、timesteps):這個(gè)比較簡單就是直接:\(x=\sqrt{\alpha}x+ \sqrt{1-\alpha}\epsilon\)
2、step(輸入:model_output、timestep、sample):step做的就是將上面的add_noise進(jìn)行逆操作。具體代碼處理
- Step-1 首先計(jì)算幾個(gè)參數(shù):\(\alpha、\beta\)
alpha_prod_t = self.alphas_cumprod[t]
alpha_prod_t_prev = self.alphas_cumprod[prev_t] if prev_t >= 0 else self.one
beta_prod_t = 1 - alpha_prod_t
beta_prod_t_prev = 1 - alpha_prod_t_prev
current_alpha_t = alpha_prod_t / alpha_prod_t_prev
current_beta_t = 1 - current_alpha_t
- Step-2 根據(jù)計(jì)算得到參數(shù)反推\(t-1\)的計(jì)算結(jié)果(提供3種類,介紹“epsilon”)\(x_0=\frac{x_T- \sqrt{1- \alpha_t}\epsilon}{\alpha_t}\)
pred_original_sample = (sample - beta_prod_t ** (0.5) * model_output) / alpha_prod_t ** (0.5)
pred_original_sample_coeff = (alpha_prod_t_prev ** (0.5) * current_beta_t) / beta_prod_t
current_sample_coeff = current_alpha_t ** (0.5) * beta_prod_t_prev / beta_prod_t
pred_prev_sample = pred_original_sample_coeff * pred_original_sample + current_sample_coeff * sample
區(qū)別DDIM的處理過程將DDPM的馬爾科夫鏈替換為非馬爾科夫鏈過程而后進(jìn)行采樣,這樣我們就可以每次迭代中跨多個(gè)step,從而減少推理迭代次數(shù)和時(shí)間:
std_dev_t = eta * variance ** (0.5)
pred_sample_direction = (1 - alpha_prod_t_prev - std_dev_t**2) ** (0.5) * pred_epsilon
prev_sample = alpha_prod_t_prev ** (0.5) * pred_original_sample + pred_sample_direction
prev_sample = prev_sample + variance
2、pipeline
所有支持的pipeline:Diffusers Pipelines
一般來說很多論文里面提出的模型,基本都是基于SD(StableDiffusion)等模型進(jìn)行“微調(diào)”的,因此很多改進(jìn)也都是去爭對(duì)輸入模型的參數(shù)進(jìn)行調(diào)整(換言之就是搭積木講故事),比如說改變輸入圖片內(nèi)容、改變SD中條件等。除此之外分析一個(gè)pipeline直接通過分析里面的__call__即可,基本使用:
from diffusers import StableDiffusionPipeline
import torch
model_id = "runwayml/stable-diffusion-v1-5"
pipe = StableDiffusionPipeline.from_pretrained(model_id, torch_dtype=torch.float16)
if torch.cuda.is_available():
pipe = pipe.to("cuda")
prompt = "A futuristic city at sunset, cyberpunk style, highly detailed, cinematic lighting"
image = pipe(prompt, num_inference_steps=50, guidance_scale=7.5).images[0]
image.save("output.png")
2.1 StableDiffusionPipeline
https://huggingface.co/docs/diffusers/v0.34.0/en/api/pipelines/overview#diffusers.DiffusionPipeline
很多論文里面基本都是直接去微調(diào)訓(xùn)練好的模型比如說StableDiffusion等,使用別人訓(xùn)練后的就少不了看到 pipeline的影子,直接介紹StableDiffusionPipeline的構(gòu)建(文生圖pipeline)。在代碼中主要使用到的基礎(chǔ)模型如下幾個(gè):1、VAE(AutoencoderKL);2、CLIP(用于文本編碼,CLIPTextModel、CLIPTokenizer);3、Unet(模型骨架,UNet2DConditionModel)
Step-1:對(duì)輸入文本進(jìn)行編碼(文生圖直接輸入文本)通過正、負(fù)編碼對(duì)生成圖像進(jìn)行指導(dǎo):
def encode_prompt(..., prompt, do_classifier_free_guidance,...,):
# 1、判斷文本編碼器是否lora微調(diào)
if lora_scale is not None and isinstance(self, StableDiffusionLoraLoaderMixin):
self._lora_scale = lora_scale
if not USE_PEFT_BACKEND:
adjust_lora_scale_text_encoder(self.text_encoder, lora_scale)
else:
scale_lora_layers(self.text_encoder, lora_scale)
# 2、通過prompt來確定需要生成多少圖片
...
# 3、對(duì)文本進(jìn)行編碼
if prompt_embeds is None:
...
text_inputs = self.tokenizer(...)
text_input_ids = text_inputs.input_ids
untruncated_ids = self.tokenizer(prompt, padding="longest", return_tensors="pt").input_ids
...
# 會(huì)顯示一個(gè)過長截?cái)嗑? ...
# 選擇clip中倒數(shù)第幾層作為文本編碼輸出
if clip_skip is None:
# 默認(rèn)直接最后一層
prompt_embeds = self.text_encoder(text_input_ids.to(device), attention_mask=attention_mask)
prompt_embeds = prompt_embeds[0]
else:
# 倒數(shù)層
prompt_embeds = self.text_encoder(
text_input_ids.to(device), attention_mask=attention_mask, output_hidden_states=True
)
prompt_embeds = prompt_embeds[-1][-(clip_skip + 1)]
prompt_embeds = self.text_encoder.text_model.final_layer_norm(prompt_embeds)
# 改變形狀得到 batch_size(對(duì)應(yīng)prompt數(shù)量), 77, 748 CLIP: CLIP-ViT-L
...
prompt_embeds = prompt_embeds.view(bs_embed * num_images_per_prompt, seq_len, -1)
if do_classifier_free_guidance and negative_prompt_embeds is None:
# 此部分和上面正常的編碼處理方式相似直接對(duì)negative_prompt進(jìn)行編碼
...
...
return prompt_embeds, negative_prompt_embeds
Step-2:獲取推理時(shí)間步以及生成latent變量
Step-3:模型處理
# 首先通過unet逐步進(jìn)行解碼圖像
with self.progress_bar(total=num_inference_steps) as progress_bar:
for i, t in enumerate(timesteps):
...
noise_pred = self.unet(...)[0]
...
# 通過step來從t反推t-1
latents = self.scheduler.step(noise_pred, t, latents, **extra_step_kwargs, return_dict=False)[0]
...
# classifier_free_guidance
if not output_type == "latent":
# 圖片返回
image = self.vae.decode(latents / self.vae.config.scaling_factor, return_dict=False,generator=generator)[0]
image, has_nsfw_concept = self.run_safety_checker(image, device, prompt_embeds.dtype)
else:
# 直接返回沒被vae處理的結(jié)果
image = latents
has_nsfw_concept = None
if has_nsfw_concept is None:
do_denormalize = [True] * image.shape[0]
else:
do_denormalize = [not has_nsfw for has_nsfw in has_nsfw_concept]
image = self.image_processor.postprocess(image, output_type=output_type, do_denormalize=do_denormalize)
...
if not return_dict:
return (image, has_nsfw_concept)
return StableDiffusionPipelineOutput(images=image, nsfw_content_detected=has_nsfw_concept)
補(bǔ)充-1:
classifier_free_guidance(CFG) 以及classifier_guidance(CG)
classifier_guidance[1]:通過一個(gè)分類器來引導(dǎo)模型生成的方向,也就是使得模型按類進(jìn)行生成。數(shù)學(xué)上描述為[2]:\(\nabla p(x_t\vert y)=\nabla \log p(x_t)+ \nabla \log p(y \vert x_t)\) 也就是說前面部分代表unconditional score后面部分代表分類器的梯度,也就是添加一個(gè)分類器梯度來“指導(dǎo)”模型生成方向。
classifier_free_guidance[3]:對(duì)上面的改進(jìn)版本,上面過程中會(huì)額外訓(xùn)練一個(gè)分類器進(jìn)而增加訓(xùn)練成本。因此對(duì)于上面計(jì)算公式中:\(\nabla \log p(y \vert x_t)= \nabla p(x_t\vert y)- \nabla \log p(x_t)= -\frac{1}{\sqrt{1- \alpha_t}}(\epsilon_\theta(x_t, t, y)- \epsilon_\theta(x_t, t))\) 最后得到梯度過程為: \((w+1)\epsilon_\theta(x_t, t, y)- w\epsilon_\theta(x_t, t)\)
回到代碼中,代碼中具體操作過程為:1、文本編碼過程中,這部分比較簡單直接根據(jù)對(duì)negative_prompt進(jìn)行CLIP text encoder處理即可(如果沒有輸入negative_prompt默認(rèn)就是直接用空字符進(jìn)行替代)如果進(jìn)行CFG那么直接將兩部分進(jìn)行拼接(torch.cat([negative_prompt_embeds, prompt_embeds])) prompt_embeds;2、模型解碼過程中,這部分處理過程比較粗暴,如果要進(jìn)行CFG那么直接將latent擴(kuò)展為兩份(Uncond+Cond各一份)對(duì)應(yīng)的text輸出也是兩份,通過一個(gè)模型處理之后再通過chunk分出無條件輸出、有條件輸出,最后計(jì)算兩部分組合:\(\epsilon(x,t)+ w(\epsilon(x,t,y)- \epsilon(x,t))\)
if self.do_classifier_free_guidance:
prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds])
...
latent_model_input = torch.cat([latents] * 2) if self.do_classifier_free_guidance else latents
...
noise_pred = self.unet(latent_model_input, t, prompt_embeds, ...)[0]
if self.do_classifier_free_guidance:
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
noise_pred = noise_pred_uncond + self.guidance_scale * (noise_pred_text - noise_pred_uncond)
if self.do_classifier_free_guidance and self.guidance_rescale > 0.0:
noise_pred = rescale_noise_cfg(noise_pred, noise_pred_text, guidance_rescale=self.guidance_rescale)
2.2 StableDiffusionXLInpaintPipeline
對(duì)于圖像消除任務(wù)而言使用較多的也是此類pipeline(SDXL開源可以商用)具體使用代碼如下:
from diffusers import StableDiffusionXLInpaintPipeline
from diffusers.utils import load_image, make_image_grid
import torch
from PIL import Image
device = "cuda" if torch.cuda.is_available() else "cpu"
# 加載基礎(chǔ)模型
base = StableDiffusionXLInpaintPipeline.from_pretrained(
"stabilityai/stable-diffusion-xl-base-1.0",
torch_dtype=torch.float16, # 使用半精度浮點(diǎn)數(shù)以減少顯存占用
variant="fp16", # 使用 fp16 變體以優(yōu)化性能
use_safetensors=True # 使用 safetensors 格式以提高加載速度
).to(device)
# 加載優(yōu)化模型(refiner model)
refiner = StableDiffusionXLInpaintPipeline.from_pretrained(
"stabilityai/stable-diffusion-xl-refiner-1.0",
text_encoder_2=base.text_encoder_2, # 共享基礎(chǔ)模型的第二個(gè)文本編碼器
vae=base.vae, # 共享基礎(chǔ)模型的變分自編碼器
torch_dtype=torch.float16,
use_safetensors=True,
variant="fp16",
).to(device)
img_url = "https://raw.githubusercontent.com/CompVis/latent-diffusion/main/data/inpainting_examples/overture-creations-5sI6fQgYIuo.png"
mask_url = "https://raw.githubusercontent.com/CompVis/latent-diffusion/main/data/inpainting_examples/overture-creations-5sI6fQgYIuo_mask.png"
init_image = load_image(img_url)
mask_image = load_image(mask_url)
prompt = "A majestic tiger sitting on a bench"
negative_prompt = "distorted, blurry, low quality"
num_inference_steps = 75
high_noise_frac = 0.7
# 使用基礎(chǔ)模型進(jìn)行初步去噪(輸出潛在表示)
base_output = base(
prompt=prompt,
negative_prompt=negative_prompt,
image=init_image,
mask_image=mask_image,
num_inference_steps=num_inference_steps,
denoising_end=high_noise_frac, # 基礎(chǔ)模型處理高噪聲階段
output_type="latent" # 輸出潛在表示以供優(yōu)化模型使用
).images
# 使用優(yōu)化模型進(jìn)行細(xì)節(jié)增強(qiáng)
refined_image = refiner(
prompt=prompt,
negative_prompt=negative_prompt,
image=base_output,
mask_image=mask_image,
num_inference_steps=num_inference_steps,
denoising_start=high_noise_frac, # 優(yōu)化模型處理低噪聲階段
).images[0]
# 可視化結(jié)果
grid = make_image_grid([init_image, mask_image, refined_image.resize((512, 512))], rows=1, cols=3)
grid.save("inpainting_result.png")
refined_image.save("refined_image.png")
首先模型輸入主要為如下幾個(gè)部分:1、文本輸入;2、圖片輸入(正常圖片以及mask圖片)。首先對(duì)于文本編碼。對(duì)于SDXL模型而言文本會(huì)通過兩個(gè)clip的文本編碼器進(jìn)行編碼(OpenCLIP-ViT/G:1280、CLIP-ViT/L:768)對(duì)于兩個(gè)編碼器代碼處理思路為:
...
tokenizers = [self.tokenizer, self.tokenizer_2] if self.tokenizer is not None else [self.tokenizer_2]
text_encoders = (
[self.text_encoder, self.text_encoder_2] if self.text_encoder is not None else [self.text_encoder_2]
)
if prompt_embeds is None:
prompt_2 = prompt_2 or prompt
prompt_2 = [prompt_2] if isinstance(prompt_2, str) else prompt_2
prompt_embeds_list = []
prompts = [prompt, prompt_2]
for prompt, tokenizer, text_encoder in zip(prompts, tokenizers, text_encoders):
...
text_inputs = tokenizer(prompt,...)
text_input_ids = text_inputs.input_ids
untruncated_ids = tokenizer(prompt, ...).input_ids
...
prompt_embeds = text_encoder(text_input_ids.to(device), output_hidden_states=True)
...
prompt_embeds_list.append(prompt_embeds)
prompt_embeds = torch.concat(prompt_embeds_list, dim=-1)
...
if self.do_classifier_free_guidance:
prompt_embeds = torch.cat([negative_prompt_embeds, prompt_embeds], dim=0)
...
prompt_embeds = prompt_embeds.to(device)
最后得到的prompt_embeds為:[1, 77, 2048](由[1, 77, 768] 和 [1, 77, 1280])拼接得到,如果要使用CFG的話就需要輸入negative_prompt以及參數(shù)guidance_scale,對(duì)于negative_prompt的處理方式和上面相同。除此之外再代碼中會(huì)有added_cond_kwargs = {"text_embeds": add_text_embeds, "time_ids": add_time_ids}這個(gè)參數(shù),一般作用是:作為一個(gè)“額外”的條件添加到時(shí)間編碼中(emb = emb + aug_emb if aug_emb is not None else emb)。不過值得注意的是,很多論文里面都使用:將圖像和文本編碼組合作為“文本編碼”輸入(objectclear)如果要實(shí)現(xiàn)這個(gè)(objectclear)功能偽代碼如下:
...
masked_image = init_image
# masked_image = init_image * (mask < 0.5)
obj_only = init_image * (mask > 0.5)
obj_only = obj_only.to(device=device)
object_embeds = self.image_prompt_encoder(obj_only)
prompt_embeds = self.postfuse_module(prompt_embeds, object_embeds, 5)
其中prompt_embeds就是正常的文本編碼,self.image_prompt_encoder一般就是使用clip image的文本編碼器這樣一來就會(huì)將文本和圖片編碼成向量,self.postfuse_module一般就是將兩個(gè)向量進(jìn)行融合(這個(gè)一般就是通過mlp對(duì)齊維度之后直接拼接即可)
而后再圖像編碼。這部分比較容易直接通過vae去編碼即可
...
masked_image = init_image * (mask < 0.5)
...
mask = torch.cat([mask] * 2) if do_classifier_free_guidance else mask
masked_image_latents = self._encode_vae_image(masked_image, generator=generator)
對(duì)于圖片一般做法是直接masked_image = init_image * (mask < 0.5)但是論文里面有些直接使用masked_image = init_image。在文本以及圖像都編碼之后就是模型處理,只不過如果使用CFG:
if self.do_classifier_free_guidance:
noise_pred_uncond, noise_pred_text = noise_pred.chunk(2)
noise_pred = noise_pred_uncond + self.guidance_scale * (noise_pred_text - noise_pred_uncond)
if self.do_classifier_free_guidance and self.guidance_rescale > 0.0:
# Based on 3.4. in https://arxiv.org/pdf/2305.08891.pdf
noise_pred = rescale_noise_cfg(noise_pred, noise_pred_text, guidance_rescale=self.guidance_rescale)
補(bǔ)充一點(diǎn):如果要做CFG一般會(huì)將文本的prompt:negative_prompt_embeds(如果沒有輸入negative_prompt會(huì)直接用0代替), prompt_embeds直接拼接起來,而后其他的就直接“拼接本體”
3、Lora微調(diào)
和大語言模型的處理方式相似,通過peft去微調(diào)模型,簡單了解一下peft里面微調(diào)的處理思路(值得注意的是,使用peft來微調(diào)只適用于基于transformer庫來搭建的模型對(duì)于自己的模型可能沒那么好的適應(yīng)性):
unet = UNet2DConditionModel.from_pretrained(
"stable-diffusion-v1-5/stable-diffusion-inpainting",
subfolder="unet",
cache_dir= '/data/huangjie',
)
unet.requires_grad_(False)
print(unet.down_blocks[0])
unet_lora_config = LoraConfig(
r=2,
lora_alpha=2,
lora_dropout=0.2,
init_lora_weights="gaussian",
target_modules=["to_k", "to_q", "to_v", "to_out.0", "add_k_proj", "add_v_proj"],
)
unet.add_adapter(unet_lora_config)
print("after Lora Model:", unet.down_blocks[0])
上面兩個(gè)過程模型變化為:

仔細(xì)分析一下LoraConfig里面的具體原理,因?yàn)楹芏嗄P停ɑ赼ttention)基本就是q、k、v三個(gè),因此通過target_modules指定哪些模塊的參數(shù)是需要通過lora進(jìn)行調(diào)整的模塊。init_lora_weights代表lora初始化參數(shù)分布策略,參數(shù)r以及 lora_alpha代表的含義是:
經(jīng)典問題:1、lora里面參數(shù)里面之所以初始化為0是因?yàn)閷?duì)于我們的llm/DF模型一般都是“優(yōu)秀”的,而對(duì)于“陌生”的數(shù)據(jù)通過零初始化確保一切干凈,從 0 開始穩(wěn)步適配(在訓(xùn)練初期引入噪聲,可能導(dǎo)致不穩(wěn)定,尤其在微調(diào)少步數(shù)、低學(xué)習(xí)率時(shí),收斂更慢)2、多個(gè)lora模型同時(shí)作用于一個(gè)SD模型,并配置他們的各自權(quán)重,并且不同lora參數(shù)對(duì)模型生成的影響[4]:

4、Adapters使用
lora也是Adapters(可以簡單理解為對(duì)訓(xùn)練好的模型再去添加一個(gè)插件,通過這個(gè)插件讓SD去生成其他的樣式的圖片)一種,具體見:深入淺出了解生成模型-6:常用基礎(chǔ)模型與 Adapters等解析
5、自注意力技術(shù)(AttnProcessor、AttnProcessor2_0)
https://huggingface.co/docs/diffusers/v0.30.1/en/api/attnprocessor
- 1、AttnProcessor
此部分就是非常常規(guī)的注意力計(jì)算方式
- 2、AttnProcessor2_0
它調(diào)用了 PyTorch 2.0 起啟用的算子 F.scaled_dot_product_attention 代替手動(dòng)實(shí)現(xiàn)的注意力計(jì)算。這個(gè)算子更加高效,如果你確定 PyTorch 版本至少為 2.0,就可以用 AttnProcessor2_0 代替
參考知乎[5]中的描述,如何將自注意力進(jìn)行修改,比如說如下代碼:
from diffusers.models.attention_processor import (Attention,AttnProcessor,AttnProcessor2_0)
unet = UNet2DConditionModel()
for name, module in unet.named_modules():
if isinstance(module, Attention) and "attn2" in name:
print(f'name: {name}')
print("*"*20)
break
那么就會(huì)得到一個(gè)比如說:down_blocks.0.attentions.0.transformer_blocks.0.attn2比如說如果我需要將這個(gè)替換那么處理方式為:
for name, module in unet.named_modules():
if isinstance(module, Attention) and "attn2" in name:
print(f'raw name: {name} \n raw module: {module.processor}')
print("*"*20)
if isinstance(module.processor, AttnProcessor2_0):
module.set_processor(AttnProcessor())
print(f"change name: {name} \n change module: {module.processor}")
print("*"*20)
break
這樣一來有最開始的:<diffusers.models.attention_processor.AttnProcessor2_0 object at 0x7ff392734eb0> 替換為<diffusers.models.attention_processor.AttnProcessor object at 0x7ff5b776bc40>。或者直接改成自定義的處理方式:
class CustonAttnProcessor(AttnProcessor):
def __call__(self, attn, hidden_states, encoder_hidden_states=None, attention_mask=None):
query = attn.to_q(hidden_states)
encoder_states = hidden_states if encoder_hidden_states is None else encoder_hidden_states
key = attn.to_k(encoder_states)
value = attn.to_v(encoder_states)
attn_scores = torch.baddbmm(
torch.empty(query.shape[0], query.shape[1], key.shape[1], device=query.device),
query,
key.transpose(-1, -2),
beta=0,
alpha=attn.scale,
)
# 比如說對(duì) attn_scores 取log
attn_probs = torch.log(attn_scores)
attn_probs = attn_scores.softmax(dim=-1)
hidden_states = torch.bmm(attn_probs, value)
hidden_states = attn.to_out[0](hidden_states)
hidden_states = attn.to_out[1](hidden_states)
return hidden_states
attn_processor_dict = {}
for k in unet.attn_processors.keys():
if "attn2" in k:
attn_processor_dict[k] = CustonAttnProcessor()
else:
attn_processor_dict[k] = unet.attn_processors[k]
unet.set_attn_processor(attn_processor_dict)
for name, processor in unet.attn_processors.items():
print(name, "=>", type(processor))
總的來說如果要去修改注意力處理方式,直接去便利unet.attn_processors.keys()然后去找到需要修改的層將其替換即可,只不過關(guān)鍵在于CustonAttnProcessor的定義方式。

浙公網(wǎng)安備 33010602011771號(hào)