support flux.2 klein

This commit is contained in:
Artiprocher
2026-01-19 16:56:14 +08:00
parent ae52d93694
commit b6ccb362b9
18 changed files with 460 additions and 35 deletions

View File

@@ -14,6 +14,7 @@ from transformers import AutoProcessor, AutoTokenizer
from ..models.flux2_text_encoder import Flux2TextEncoder
from ..models.flux2_dit import Flux2DiT
from ..models.flux2_vae import Flux2VAE
from ..models.z_image_text_encoder import ZImageTextEncoder
class Flux2ImagePipeline(BasePipeline):
@@ -25,6 +26,7 @@ class Flux2ImagePipeline(BasePipeline):
)
self.scheduler = FlowMatchScheduler("FLUX.2")
self.text_encoder: Flux2TextEncoder = None
self.text_encoder_qwen3: ZImageTextEncoder = None
self.dit: Flux2DiT = None
self.vae: Flux2VAE = None
self.tokenizer: AutoProcessor = None
@@ -32,6 +34,7 @@ class Flux2ImagePipeline(BasePipeline):
self.units = [
Flux2Unit_ShapeChecker(),
Flux2Unit_PromptEmbedder(),
Flux2Unit_Qwen3PromptEmbedder(),
Flux2Unit_NoiseInitializer(),
Flux2Unit_InputImageEmbedder(),
Flux2Unit_ImageIDs(),
@@ -276,6 +279,10 @@ class Flux2Unit_PromptEmbedder(PipelineUnit):
return prompt_embeds, text_ids
def process(self, pipe: Flux2ImagePipeline, prompt):
# Skip if Qwen3 text encoder is available (handled by Qwen3PromptEmbedder)
if pipe.text_encoder_qwen3 is not None:
return {}
pipe.load_models_to_device(self.onload_model_names)
prompt_embeds, text_ids = self.encode_prompt(
pipe.text_encoder, pipe.tokenizer, prompt,
@@ -284,6 +291,136 @@ class Flux2Unit_PromptEmbedder(PipelineUnit):
return {"prompt_embeds": prompt_embeds, "text_ids": text_ids}
class Flux2Unit_Qwen3PromptEmbedder(PipelineUnit):
def __init__(self):
super().__init__(
seperate_cfg=True,
input_params_posi={"prompt": "prompt"},
input_params_nega={"prompt": "negative_prompt"},
output_params=("prompt_emb", "prompt_emb_mask"),
onload_model_names=("text_encoder_qwen3",)
)
self.hidden_states_layers = (9, 18, 27) # Qwen3 layers
def get_qwen3_prompt_embeds(
self,
text_encoder: ZImageTextEncoder,
tokenizer: AutoTokenizer,
prompt: Union[str, List[str]],
dtype: Optional[torch.dtype] = None,
device: Optional[torch.device] = None,
max_sequence_length: int = 512,
):
dtype = text_encoder.dtype if dtype is None else dtype
device = text_encoder.device if device is None else device
prompt = [prompt] if isinstance(prompt, str) else prompt
all_input_ids = []
all_attention_masks = []
for single_prompt in prompt:
messages = [{"role": "user", "content": single_prompt}]
text = tokenizer.apply_chat_template(
messages,
tokenize=False,
add_generation_prompt=True,
enable_thinking=False,
)
inputs = tokenizer(
text,
return_tensors="pt",
padding="max_length",
truncation=True,
max_length=max_sequence_length,
)
all_input_ids.append(inputs["input_ids"])
all_attention_masks.append(inputs["attention_mask"])
input_ids = torch.cat(all_input_ids, dim=0).to(device)
attention_mask = torch.cat(all_attention_masks, dim=0).to(device)
# Forward pass through the model
with torch.inference_mode():
output = text_encoder(
input_ids=input_ids,
attention_mask=attention_mask,
output_hidden_states=True,
use_cache=False,
)
# Only use outputs from intermediate layers and stack them
out = torch.stack([output.hidden_states[k] for k in self.hidden_states_layers], dim=1)
out = out.to(dtype=dtype, device=device)
batch_size, num_channels, seq_len, hidden_dim = out.shape
prompt_embeds = out.permute(0, 2, 1, 3).reshape(batch_size, seq_len, num_channels * hidden_dim)
return prompt_embeds
def prepare_text_ids(
self,
x: torch.Tensor, # (B, L, D) or (L, D)
t_coord: Optional[torch.Tensor] = None,
):
B, L, _ = x.shape
out_ids = []
for i in range(B):
t = torch.arange(1) if t_coord is None else t_coord[i]
h = torch.arange(1)
w = torch.arange(1)
l = torch.arange(L)
coords = torch.cartesian_prod(t, h, w, l)
out_ids.append(coords)
return torch.stack(out_ids)
def encode_prompt(
self,
text_encoder: ZImageTextEncoder,
tokenizer: AutoTokenizer,
prompt: Union[str, List[str]],
dtype = None,
device: Optional[torch.device] = None,
num_images_per_prompt: int = 1,
prompt_embeds: Optional[torch.Tensor] = None,
max_sequence_length: int = 512,
):
prompt = [prompt] if isinstance(prompt, str) else prompt
if prompt_embeds is None:
prompt_embeds = self.get_qwen3_prompt_embeds(
text_encoder=text_encoder,
tokenizer=tokenizer,
prompt=prompt,
dtype=dtype,
device=device,
max_sequence_length=max_sequence_length,
)
batch_size, seq_len, _ = prompt_embeds.shape
prompt_embeds = prompt_embeds.repeat(1, num_images_per_prompt, 1)
prompt_embeds = prompt_embeds.view(batch_size * num_images_per_prompt, seq_len, -1)
text_ids = self.prepare_text_ids(prompt_embeds)
text_ids = text_ids.to(device)
return prompt_embeds, text_ids
def process(self, pipe: Flux2ImagePipeline, prompt):
# Check if Qwen3 text encoder is available
if pipe.text_encoder_qwen3 is None:
return {}
pipe.load_models_to_device(self.onload_model_names)
prompt_embeds, text_ids = self.encode_prompt(
pipe.text_encoder_qwen3, pipe.tokenizer, prompt,
dtype=pipe.torch_dtype, device=pipe.device,
)
return {"prompt_embeds": prompt_embeds, "text_ids": text_ids}
class Flux2Unit_NoiseInitializer(PipelineUnit):
def __init__(self):
super().__init__(