mirror of
https://github.com/modelscope/DiffSynth-Studio.git
synced 2026-03-18 22:08:13 +00:00
* add conf docs * add conf docs * add index * add index * update ref * test root * add en * test relative * redirect relative * add document * test_document * test_document
251 lines
11 KiB
Markdown
251 lines
11 KiB
Markdown
# 接入 Pipeline
|
||
|
||
在[将 Pipeline 所需的模型接入](../Developer_Guide/Integrating_Your_Model.md)之后,还需构建 `Pipeline` 用于模型推理,本文档提供 `Pipeline` 构建的标准化流程,开发者也可参考现有的 `Pipeline` 进行构建。
|
||
|
||
`Pipeline` 的实现位于 `diffsynth/pipelines`,每个 `Pipeline` 包含以下必要的关键组件:
|
||
|
||
* `__init__`
|
||
* `from_pretrained`
|
||
* `__call__`
|
||
* `units`
|
||
* `model_fn`
|
||
|
||
## `__init__`
|
||
|
||
在 `__init__` 中,`Pipeline` 进行初始化,以下是一个简易的实现:
|
||
|
||
```python
|
||
import torch
|
||
from PIL import Image
|
||
from typing import Union
|
||
from tqdm import tqdm
|
||
from ..diffusion import FlowMatchScheduler
|
||
from ..core import ModelConfig
|
||
from ..diffusion.base_pipeline import BasePipeline, PipelineUnit
|
||
from ..models.new_models import XXX_Model, YYY_Model, ZZZ_Model
|
||
|
||
class NewDiffSynthPipeline(BasePipeline):
|
||
|
||
def __init__(self, device="cuda", torch_dtype=torch.bfloat16):
|
||
super().__init__(device=device, torch_dtype=torch_dtype)
|
||
self.scheduler = FlowMatchScheduler()
|
||
self.text_encoder: XXX_Model = None
|
||
self.dit: YYY_Model = None
|
||
self.vae: ZZZ_Model = None
|
||
self.in_iteration_models = ("dit",)
|
||
self.units = [
|
||
NewDiffSynthPipelineUnit_xxx(),
|
||
...
|
||
]
|
||
self.model_fn = model_fn_new
|
||
```
|
||
|
||
其中包括以下几部分
|
||
|
||
* `scheduler`: 调度器,用于控制推理的迭代公式中的系数,控制每一步的噪声含量。
|
||
* `text_encoder`、`dit`、`vae`: 模型,自 [Latent Diffusion](https://arxiv.org/abs/2112.10752) 被提出以来,这种三段式模型架构已成为主流的 Diffusion 模型架构,但这并不是一成不变的,`Pipeline` 中可添加任意多个模型。
|
||
* `in_iteration_models`: 迭代中模型,这个元组标注了在迭代中会调用哪些模型。
|
||
* `units`: 模型迭代的前处理单元,详见[`units`](#units)。
|
||
* `model_fn`: 迭代中去噪模型的 `forward` 函数,详见[`model_fn`](#model_fn)。
|
||
|
||
> Q: 模型加载并不发生在 `__init__`,为什么这里仍要将每个模型初始化为 `None`?
|
||
>
|
||
> A: 在这里标注每个模型的类型后,代码编辑器就可以根据每个模型提供代码补全提示,便于后续的开发。
|
||
|
||
## `from_pretrained`
|
||
|
||
`from_pretrained` 负责加载所需的模型,让 `Pipeline` 变成可调用的状态。以下是一个简易的实现:
|
||
|
||
```python
|
||
@staticmethod
|
||
def from_pretrained(
|
||
torch_dtype: torch.dtype = torch.bfloat16,
|
||
device: Union[str, torch.device] = "cuda",
|
||
model_configs: list[ModelConfig] = [],
|
||
vram_limit: float = None,
|
||
):
|
||
# Initialize pipeline
|
||
pipe = NewDiffSynthPipeline(device=device, torch_dtype=torch_dtype)
|
||
model_pool = pipe.download_and_load_models(model_configs, vram_limit)
|
||
|
||
# Fetch models
|
||
pipe.text_encoder = model_pool.fetch_model("xxx_text_encoder")
|
||
pipe.dit = model_pool.fetch_model("yyy_dit")
|
||
pipe.vae = model_pool.fetch_model("zzz_vae")
|
||
# If necessary, load tokenizers here.
|
||
|
||
# VRAM Management
|
||
pipe.vram_management_enabled = pipe.check_vram_management_state()
|
||
return pipe
|
||
```
|
||
|
||
开发者需要实现其中获取模型的逻辑,对应的模型名称即为[模型接入时填写的模型 Config](../Developer_Guide/Integrating_Your_Model.md#step-3-编写模型-config) 中的 `"model_name"`。
|
||
|
||
部分模型还需要加载 `tokenizer`,可根据需要在 `from_pretrained` 上添加额外的 `tokenizer_config` 参数并在获取模型后实现这部分。
|
||
|
||
## `__call__`
|
||
|
||
`__call__` 实现了整个 Pipeline 的生成过程,以下是常见的生成过程模板,开发者可根据需要在此基础上修改。
|
||
|
||
```python
|
||
@torch.no_grad()
|
||
def __call__(
|
||
self,
|
||
prompt: str,
|
||
negative_prompt: str = "",
|
||
cfg_scale: float = 4.0,
|
||
input_image: Image.Image = None,
|
||
denoising_strength: float = 1.0,
|
||
height: int = 1328,
|
||
width: int = 1328,
|
||
seed: int = None,
|
||
rand_device: str = "cpu",
|
||
num_inference_steps: int = 30,
|
||
progress_bar_cmd = tqdm,
|
||
):
|
||
# Scheduler
|
||
self.scheduler.set_timesteps(
|
||
num_inference_steps,
|
||
denoising_strength=denoising_strength
|
||
)
|
||
|
||
# Parameters
|
||
inputs_posi = {
|
||
"prompt": prompt,
|
||
}
|
||
inputs_nega = {
|
||
"negative_prompt": negative_prompt,
|
||
}
|
||
inputs_shared = {
|
||
"cfg_scale": cfg_scale,
|
||
"input_image": input_image,
|
||
"denoising_strength": denoising_strength,
|
||
"height": height,
|
||
"width": width,
|
||
"seed": seed,
|
||
"rand_device": rand_device,
|
||
"num_inference_steps": num_inference_steps,
|
||
}
|
||
for unit in self.units:
|
||
inputs_shared, inputs_posi, inputs_nega = self.unit_runner(unit, self, inputs_shared, inputs_posi, inputs_nega)
|
||
|
||
# Denoise
|
||
self.load_models_to_device(self.in_iteration_models)
|
||
models = {name: getattr(self, name) for name in self.in_iteration_models}
|
||
for progress_id, timestep in enumerate(progress_bar_cmd(self.scheduler.timesteps)):
|
||
timestep = timestep.unsqueeze(0).to(dtype=self.torch_dtype, device=self.device)
|
||
|
||
# Inference
|
||
noise_pred_posi = self.model_fn(**models, **inputs_shared, **inputs_posi, timestep=timestep, progress_id=progress_id)
|
||
if cfg_scale != 1.0:
|
||
noise_pred_nega = self.model_fn(**models, **inputs_shared, **inputs_nega, timestep=timestep, progress_id=progress_id)
|
||
noise_pred = noise_pred_nega + cfg_scale * (noise_pred_posi - noise_pred_nega)
|
||
else:
|
||
noise_pred = noise_pred_posi
|
||
|
||
# Scheduler
|
||
inputs_shared["latents"] = self.step(self.scheduler, progress_id=progress_id, noise_pred=noise_pred, **inputs_shared)
|
||
|
||
# Decode
|
||
self.load_models_to_device(['vae'])
|
||
image = self.vae.decode(inputs_shared["latents"], device=self.device)
|
||
image = self.vae_output_to_image(image)
|
||
self.load_models_to_device([])
|
||
|
||
return image
|
||
```
|
||
|
||
## `units`
|
||
|
||
`units` 包含了所有的前处理过程,例如:宽高检查、提示词编码、初始噪声生成等。在整个模型前处理过程中,数据被抽象为了互斥的三部分,分别存储在对应的字典中:
|
||
|
||
* `inputs_shard`: 共享输入,与 [Classifier-Free Guidance](https://arxiv.org/abs/2207.12598)(简称 CFG)无关的参数。
|
||
* `inputs_posi`: Classifier-Free Guidance 的 Positive 侧输入,包含与正向提示词相关的内容。
|
||
* `inputs_nega`: Classifier-Free Guidance 的 Negative 侧输入,包含与负向提示词相关的内容。
|
||
|
||
Pipeline Unit 的实现包括三种:直接模式、CFG 分离模式、接管模式。
|
||
|
||
如果某些计算与 CFG 无关,可采用直接模式,例如 Qwen-Image 的随机噪声初始化:
|
||
|
||
```python
|
||
class QwenImageUnit_NoiseInitializer(PipelineUnit):
|
||
def __init__(self):
|
||
super().__init__(
|
||
input_params=("height", "width", "seed", "rand_device"),
|
||
output_params=("noise",),
|
||
)
|
||
|
||
def process(self, pipe: QwenImagePipeline, height, width, seed, rand_device):
|
||
noise = pipe.generate_noise((1, 16, height//8, width//8), seed=seed, rand_device=rand_device, rand_torch_dtype=pipe.torch_dtype)
|
||
return {"noise": noise}
|
||
```
|
||
|
||
如果某些计算与 CFG 有关,需分别处理正向和负向提示词,但两侧的输入参数是相同的,可采用 CFG 分离模式,例如 Qwen-image 的提示词编码:
|
||
|
||
```python
|
||
class QwenImageUnit_PromptEmbedder(PipelineUnit):
|
||
def __init__(self):
|
||
super().__init__(
|
||
seperate_cfg=True,
|
||
input_params_posi={"prompt": "prompt"},
|
||
input_params_nega={"prompt": "negative_prompt"},
|
||
input_params=("edit_image",),
|
||
output_params=("prompt_emb", "prompt_emb_mask"),
|
||
onload_model_names=("text_encoder",)
|
||
)
|
||
|
||
def process(self, pipe: QwenImagePipeline, prompt, edit_image=None) -> dict:
|
||
pipe.load_models_to_device(self.onload_model_names)
|
||
# Do something
|
||
return {"prompt_emb": prompt_embeds, "prompt_emb_mask": encoder_attention_mask}
|
||
```
|
||
|
||
如果某些计算需要全局的信息,则需要接管模式,例如 Qwen-Image 的实体分区控制:
|
||
|
||
```python
|
||
class QwenImageUnit_EntityControl(PipelineUnit):
|
||
def __init__(self):
|
||
super().__init__(
|
||
take_over=True,
|
||
input_params=("eligen_entity_prompts", "width", "height", "eligen_enable_on_negative", "cfg_scale"),
|
||
output_params=("entity_prompt_emb", "entity_masks", "entity_prompt_emb_mask"),
|
||
onload_model_names=("text_encoder",)
|
||
)
|
||
|
||
def process(self, pipe: QwenImagePipeline, inputs_shared, inputs_posi, inputs_nega):
|
||
# Do something
|
||
return inputs_shared, inputs_posi, inputs_nega
|
||
```
|
||
|
||
以下是 Pipeline Unit 所需的参数配置:
|
||
|
||
* `seperate_cfg`: 是否启用 CFG 分离模式
|
||
* `take_over`: 是否启用接管模式
|
||
* `input_params`: 共享输入参数
|
||
* `output_params`: 输出参数
|
||
* `input_params_posi`: Positive 侧输入参数
|
||
* `input_params_nega`: Negative 侧输入参数
|
||
* `onload_model_names`: 需调用的模型组件名
|
||
|
||
在设计 `unit` 时请尽量按照以下原则进行:
|
||
|
||
* 缺省兜底:可选功能的 `unit` 输入参数默认为 `None`,而不是 `False` 或其他数值,请对此默认值进行兜底处理。
|
||
* 参数触发:部分 Adapter 模型可能是未被加载的,例如 ControlNet,对应的 `unit` 应当以参数输入是否为 `None` 来控制触发,而不是以模型是否被加载来控制触发。例如当用户输入了 `controlnet_image` 但没有加载 ControlNet 模型时,代码应当给出报错,而不是忽略这些输入参数继续执行。
|
||
* 简洁优先:尽可能使用直接模式,仅当功能无法实现时,使用接管模式。
|
||
* 显存高效:在 `unit` 中调用模型时,请使用 `pipe.load_models_to_device(self.onload_model_names)` 激活对应的模型,请不要调用 `onload_model_names` 之外的其他模型,`unit` 计算完成后,请不要使用 `pipe.load_models_to_device([])` 手动释放显存。
|
||
|
||
> Q: 部分参数并未在推理过程中调用,例如 `output_params`,是否仍有必要配置?
|
||
>
|
||
> A: 这些参数不会影响推理过程,但会影响一些实验性功能,因此我们建议将其配置好。例如“拆分训练”,我们可以将训练中的前处理离线完成,但部分需要梯度回传的模型计算无法拆分,这些参数用于构建计算图从而推断哪些计算是可以拆分的。
|
||
|
||
## `model_fn`
|
||
|
||
`model_fn` 是迭代中的统一 `forward` 接口,对于开源模型生态尚未形成的模型,直接沿用去噪模型的 `forward` 即可,例如:
|
||
|
||
```python
|
||
def model_fn_new(dit=None, latents=None, timestep=None, prompt_emb=None, **kwargs):
|
||
return dit(latents, prompt_emb, timestep)
|
||
```
|
||
|
||
对于开源生态丰富的模型,`model_fn` 通常包含复杂且混乱的跨模型推理,以 `diffsynth/pipelines/qwen_image.py` 为例,这个函数中实现的额外计算包括:实体分区控制、三种 ControlNet、Gradient Checkpointing 等,开发者在实现这一部分时要格外小心,避免模块功能之间的冲突。
|