# 接入 Pipeline 在[将 Pipeline 所需的模型接入](./Integrating_Your_Model.md)之后,还需构建 `Pipeline` 用于模型推理,本文档提供 `Pipeline` 构建的标准化流程,开发者也可参考现有的 `Pipeline` 进行构建。 `Pipeline` 的实现位于 `diffsynth/pipelines`,每个 `Pipeline` 包含以下必要的关键组件: * `__init__` * `from_pretrained` * `__call__` * `units` * `model_fn` ## `__init__` 在 `__init__` 中,`Pipeline` 进行初始化,以下是一个简易的实现: ```python import torch from PIL import Image from typing import Union from tqdm import tqdm from ..diffusion import FlowMatchScheduler from ..core import ModelConfig from ..diffusion.base_pipeline import BasePipeline, PipelineUnit from ..models.new_models import XXX_Model, YYY_Model, ZZZ_Model class NewDiffSynthPipeline(BasePipeline): def __init__(self, device="cuda", torch_dtype=torch.bfloat16): super().__init__(device=device, torch_dtype=torch_dtype) self.scheduler = FlowMatchScheduler() self.text_encoder: XXX_Model = None self.dit: YYY_Model = None self.vae: ZZZ_Model = None self.in_iteration_models = ("dit",) self.units = [ NewDiffSynthPipelineUnit_xxx(), ... ] self.model_fn = model_fn_new ``` 其中包括以下几部分 * `scheduler`: 调度器,用于控制推理的迭代公式中的系数,控制每一步的噪声含量。 * `text_encoder`、`dit`、`vae`: 模型,自 [Latent Diffusion](https://arxiv.org/abs/2112.10752) 被提出以来,这种三段式模型架构已成为主流的 Diffusion 模型架构,但这并不是一成不变的,`Pipeline` 中可添加任意多个模型。 * `in_iteration_models`: 迭代中模型,这个元组标注了在迭代中会调用哪些模型。 * `units`: 模型迭代的前处理单元,详见[`units`](#units)。 * `model_fn`: 迭代中去噪模型的 `forward` 函数,详见[`model_fn`](#model_fn)。 > Q: 模型加载并不发生在 `__init__`,为什么这里仍要将每个模型初始化为 `None`? > > A: 在这里标注每个模型的类型后,代码编辑器就可以根据每个模型提供代码补全提示,便于后续的开发。 ## `from_pretrained` `from_pretrained` 负责加载所需的模型,让 `Pipeline` 变成可调用的状态。以下是一个简易的实现: ```python @staticmethod def from_pretrained( torch_dtype: torch.dtype = torch.bfloat16, device: Union[str, torch.device] = "cuda", model_configs: list[ModelConfig] = [], vram_limit: float = None, ): # Initialize pipeline pipe = NewDiffSynthPipeline(device=device, torch_dtype=torch_dtype) model_pool = pipe.download_and_load_models(model_configs, vram_limit) # Fetch models pipe.text_encoder = model_pool.fetch_model("xxx_text_encoder") pipe.dit = model_pool.fetch_model("yyy_dit") pipe.vae = model_pool.fetch_model("zzz_vae") # If necessary, load tokenizers here. # VRAM Management pipe.vram_management_enabled = pipe.check_vram_management_state() return pipe ``` 开发者需要实现其中获取模型的逻辑,对应的模型名称即为[模型接入时填写的模型 Config](Integrating_Your_Model.md#step-3-编写模型-config) 中的 `"model_name"`。 部分模型还需要加载 `tokenizer`,可根据需要在 `from_pretrained` 上添加额外的 `tokenizer_config` 参数并在获取模型后实现这部分。 ## `__call__` `__call__` 实现了整个 Pipeline 的生成过程,以下是常见的生成过程模板,开发者可根据需要在此基础上修改。 ```python @torch.no_grad() def __call__( self, prompt: str, negative_prompt: str = "", cfg_scale: float = 4.0, input_image: Image.Image = None, denoising_strength: float = 1.0, height: int = 1328, width: int = 1328, seed: int = None, rand_device: str = "cpu", num_inference_steps: int = 30, progress_bar_cmd = tqdm, ): # Scheduler self.scheduler.set_timesteps( num_inference_steps, denoising_strength=denoising_strength ) # Parameters inputs_posi = { "prompt": prompt, } inputs_nega = { "negative_prompt": negative_prompt, } inputs_shared = { "cfg_scale": cfg_scale, "input_image": input_image, "denoising_strength": denoising_strength, "height": height, "width": width, "seed": seed, "rand_device": rand_device, "num_inference_steps": num_inference_steps, } for unit in self.units: inputs_shared, inputs_posi, inputs_nega = self.unit_runner(unit, self, inputs_shared, inputs_posi, inputs_nega) # Denoise self.load_models_to_device(self.in_iteration_models) models = {name: getattr(self, name) for name in self.in_iteration_models} for progress_id, timestep in enumerate(progress_bar_cmd(self.scheduler.timesteps)): timestep = timestep.unsqueeze(0).to(dtype=self.torch_dtype, device=self.device) # Inference noise_pred_posi = self.model_fn(**models, **inputs_shared, **inputs_posi, timestep=timestep, progress_id=progress_id) if cfg_scale != 1.0: noise_pred_nega = self.model_fn(**models, **inputs_shared, **inputs_nega, timestep=timestep, progress_id=progress_id) noise_pred = noise_pred_nega + cfg_scale * (noise_pred_posi - noise_pred_nega) else: noise_pred = noise_pred_posi # Scheduler inputs_shared["latents"] = self.step(self.scheduler, progress_id=progress_id, noise_pred=noise_pred, **inputs_shared) # Decode self.load_models_to_device(['vae']) image = self.vae.decode(inputs_shared["latents"], device=self.device) image = self.vae_output_to_image(image) self.load_models_to_device([]) return image ``` ## `units` `units` 包含了所有的前处理过程,例如:宽高检查、提示词编码、初始噪声生成等。在整个模型前处理过程中,数据被抽象为了互斥的三部分,分别存储在对应的字典中: * `inputs_shard`: 共享输入,与 [Classifier-Free Guidance](https://arxiv.org/abs/2207.12598)(简称 CFG)无关的参数。 * `inputs_posi`: Classifier-Free Guidance 的 Positive 侧输入,包含与正向提示词相关的内容。 * `inputs_nega`: Classifier-Free Guidance 的 Negative 侧输入,包含与负向提示词相关的内容。 Pipeline Unit 的实现包括三种:直接模式、CFG 分离模式、接管模式。 如果某些计算与 CFG 无关,可采用直接模式,例如 Qwen-Image 的随机噪声初始化: ```python class QwenImageUnit_NoiseInitializer(PipelineUnit): def __init__(self): super().__init__( input_params=("height", "width", "seed", "rand_device"), output_params=("noise",), ) def process(self, pipe: QwenImagePipeline, height, width, seed, rand_device): noise = pipe.generate_noise((1, 16, height//8, width//8), seed=seed, rand_device=rand_device, rand_torch_dtype=pipe.torch_dtype) return {"noise": noise} ``` 如果某些计算与 CFG 有关,需分别处理正向和负向提示词,但两侧的输入参数是相同的,可采用 CFG 分离模式,例如 Qwen-image 的提示词编码: ```python class QwenImageUnit_PromptEmbedder(PipelineUnit): def __init__(self): super().__init__( seperate_cfg=True, input_params_posi={"prompt": "prompt"}, input_params_nega={"prompt": "negative_prompt"}, input_params=("edit_image",), output_params=("prompt_emb", "prompt_emb_mask"), onload_model_names=("text_encoder",) ) def process(self, pipe: QwenImagePipeline, prompt, edit_image=None) -> dict: pipe.load_models_to_device(self.onload_model_names) # Do something return {"prompt_emb": prompt_embeds, "prompt_emb_mask": encoder_attention_mask} ``` 如果某些计算需要全局的信息,则需要接管模式,例如 Qwen-Image 的实体分区控制: ```python class QwenImageUnit_EntityControl(PipelineUnit): def __init__(self): super().__init__( take_over=True, input_params=("eligen_entity_prompts", "width", "height", "eligen_enable_on_negative", "cfg_scale"), output_params=("entity_prompt_emb", "entity_masks", "entity_prompt_emb_mask"), onload_model_names=("text_encoder",) ) def process(self, pipe: QwenImagePipeline, inputs_shared, inputs_posi, inputs_nega): # Do something return inputs_shared, inputs_posi, inputs_nega ``` 以下是 Pipeline Unit 所需的参数配置: * `seperate_cfg`: 是否启用 CFG 分离模式 * `take_over`: 是否启用接管模式 * `input_params`: 共享输入参数 * `output_params`: 输出参数 * `input_params_posi`: Positive 侧输入参数 * `input_params_nega`: Negative 侧输入参数 * `onload_model_names`: 需调用的模型组件名 > Q: 部分参数并未在推理过程中调用,例如 `output_params`,是否仍有必要配置? > > A: 这些参数不会影响推理过程,但会影响一些实验性功能,因此我们建议将其配置好。例如“拆分训练”,我们可以将训练中的前处理离线完成,但部分需要梯度回传的模型计算无法拆分,这些参数用于构建计算图从而推断哪些计算是可以拆分的。 ## `model_fn` `model_fn` 是迭代中的统一 `forward` 接口,对于开源模型生态尚未形成的模型,直接沿用去噪模型的 `forward` 即可,例如: ```python def model_fn_new(dit=None, latents=None, timestep=None, prompt_emb=None, **kwargs): return dit(latents, prompt_emb, timestep) ``` 对于开源生态丰富的模型,`model_fn` 通常包含复杂且混乱的跨模型推理,以 `diffsynth/pipelines/qwen_image.py` 为例,这个函数中实现的额外计算包括:实体分区控制、三种 ControlNet、Gradient Checkpointing 等,开发者在实现这一部分时要格外小心,避免模块功能之间的冲突。