Compare commits

..

1 Commits

Author SHA1 Message Date
Artiprocher
59f512b574 add acestep models 2026-04-02 10:58:45 +08:00
8 changed files with 3169 additions and 1 deletions

View File

@@ -884,4 +884,40 @@ mova_series = [
"model_class": "diffsynth.models.mova_dual_tower_bridge.DualTowerConditionalBridge", "model_class": "diffsynth.models.mova_dual_tower_bridge.DualTowerConditionalBridge",
}, },
] ]
MODEL_CONFIGS = qwen_image_series + wan_series + flux_series + flux2_series + z_image_series + ltx2_series + anima_series + mova_series
ace_step_series = [
{
# Example: ModelConfig(model_id="ACE-Step/Ace-Step1.5", origin_file_pattern="Qwen3-Embedding-0.6B/model.safetensors")
"model_hash": "3509bea17b0e8cffc3dd4a15cc7899d0",
"model_name": "ace_step_text_encoder",
"model_class": "diffsynth.models.ace_step_text_encoder.AceStepTextEncoder",
"state_dict_converter": "diffsynth.utils.state_dict_converters.ace_step_text_encoder.AceStepTextEncoderStateDictConverter",
},
{
# Example: ModelConfig(model_id="ACE-Step/Ace-Step1.5", origin_file_pattern="vae/diffusion_pytorch_model.safetensors")
"model_hash": "51420834e54474986a7f4be0e4d6f687",
"model_name": "ace_step_vae",
"model_class": "diffsynth.models.ace_step_vae.AceStepVAE",
"extra_kwargs": {
"encoder_hidden_size": 128,
"downsampling_ratios": [2, 4, 4, 6, 10],
"channel_multiples": [1, 2, 4, 8, 16],
"decoder_channels": 128,
"decoder_input_channels": 64,
"audio_channels": 2,
"sampling_rate": 48000
}
},
{
# Example: ModelConfig(model_id="ACE-Step/Ace-Step1.5", origin_file_pattern="acestep-v15-turbo/model.safetensors")
"model_hash": "ba29d8bddbb6ace65675f6a757a13c00",
"model_name": "ace_step_dit",
"model_class": "diffsynth.models.ace_step_dit.AceStepConditionGenerationModelWrapper",
"state_dict_converter": "diffsynth.utils.state_dict_converters.ace_step_dit.AceStepDiTStateDictConverter",
"extra_kwargs": {
"config_path": "models/ACE-Step/Ace-Step1.5/acestep-v15-turbo"
}
},
]
MODEL_CONFIGS = qwen_image_series + wan_series + flux_series + flux2_series + z_image_series + ltx2_series + anima_series + mova_series + ace_step_series

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,38 @@
from transformers import Qwen3Model, Qwen3Config
import torch
class AceStepTextEncoder(torch.nn.Module):
def __init__(self):
super().__init__()
config = Qwen3Config(**{
"architectures": ["Qwen3Model"],
"attention_bias": False,
"attention_dropout": 0.0,
"bos_token_id": 151643,
"eos_token_id": 151643,
"head_dim": 128,
"hidden_act": "silu",
"hidden_size": 1024,
"initializer_range": 0.02,
"intermediate_size": 3072,
"max_position_embeddings": 32768,
"max_window_layers": 28,
"model_type": "qwen3",
"num_attention_heads": 16,
"num_hidden_layers": 28,
"num_key_value_heads": 8,
"rms_norm_eps": 1e-06,
"rope_scaling": None,
"rope_theta": 1000000,
"sliding_window": None,
"tie_word_embeddings": True,
"torch_dtype": "bfloat16",
"use_cache": True,
"use_sliding_window": False,
"vocab_size": 151669
})
self.model = Qwen3Model(config)
def forward(self, *args, **kwargs):
return self.model(*args, **kwargs)

View File

@@ -0,0 +1,416 @@
# Copyright 2025 The HuggingFace Team. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import math
from dataclasses import dataclass
import numpy as np
import torch
import torch.nn as nn
from torch.nn.utils import weight_norm
class Snake1d(nn.Module):
"""
A 1-dimensional Snake activation function module.
"""
def __init__(self, hidden_dim, logscale=True):
super().__init__()
self.alpha = nn.Parameter(torch.zeros(1, hidden_dim, 1))
self.beta = nn.Parameter(torch.zeros(1, hidden_dim, 1))
self.alpha.requires_grad = True
self.beta.requires_grad = True
self.logscale = logscale
def forward(self, hidden_states):
shape = hidden_states.shape
alpha = self.alpha if not self.logscale else torch.exp(self.alpha)
beta = self.beta if not self.logscale else torch.exp(self.beta)
hidden_states = hidden_states.reshape(shape[0], shape[1], -1)
hidden_states = hidden_states + (beta + 1e-9).reciprocal() * torch.sin(alpha * hidden_states).pow(2)
hidden_states = hidden_states.reshape(shape)
return hidden_states
class OobleckResidualUnit(nn.Module):
"""
A residual unit composed of Snake1d and weight-normalized Conv1d layers with dilations.
"""
def __init__(self, dimension: int = 16, dilation: int = 1):
super().__init__()
pad = ((7 - 1) * dilation) // 2
self.snake1 = Snake1d(dimension)
self.conv1 = weight_norm(nn.Conv1d(dimension, dimension, kernel_size=7, dilation=dilation, padding=pad))
self.snake2 = Snake1d(dimension)
self.conv2 = weight_norm(nn.Conv1d(dimension, dimension, kernel_size=1))
def forward(self, hidden_state):
output_tensor = hidden_state
output_tensor = self.conv1(self.snake1(output_tensor))
output_tensor = self.conv2(self.snake2(output_tensor))
padding = (hidden_state.shape[-1] - output_tensor.shape[-1]) // 2
if padding > 0:
hidden_state = hidden_state[..., padding:-padding]
output_tensor = hidden_state + output_tensor
return output_tensor
class OobleckEncoderBlock(nn.Module):
"""Encoder block used in Oobleck encoder."""
def __init__(self, input_dim, output_dim, stride: int = 1):
super().__init__()
self.res_unit1 = OobleckResidualUnit(input_dim, dilation=1)
self.res_unit2 = OobleckResidualUnit(input_dim, dilation=3)
self.res_unit3 = OobleckResidualUnit(input_dim, dilation=9)
self.snake1 = Snake1d(input_dim)
self.conv1 = weight_norm(
nn.Conv1d(input_dim, output_dim, kernel_size=2 * stride, stride=stride, padding=math.ceil(stride / 2))
)
def forward(self, hidden_state):
hidden_state = self.res_unit1(hidden_state)
hidden_state = self.res_unit2(hidden_state)
hidden_state = self.snake1(self.res_unit3(hidden_state))
hidden_state = self.conv1(hidden_state)
return hidden_state
class OobleckDecoderBlock(nn.Module):
"""Decoder block used in Oobleck decoder."""
def __init__(self, input_dim, output_dim, stride: int = 1):
super().__init__()
self.snake1 = Snake1d(input_dim)
self.conv_t1 = weight_norm(
nn.ConvTranspose1d(
input_dim,
output_dim,
kernel_size=2 * stride,
stride=stride,
padding=math.ceil(stride / 2),
)
)
self.res_unit1 = OobleckResidualUnit(output_dim, dilation=1)
self.res_unit2 = OobleckResidualUnit(output_dim, dilation=3)
self.res_unit3 = OobleckResidualUnit(output_dim, dilation=9)
def forward(self, hidden_state):
hidden_state = self.snake1(hidden_state)
hidden_state = self.conv_t1(hidden_state)
hidden_state = self.res_unit1(hidden_state)
hidden_state = self.res_unit2(hidden_state)
hidden_state = self.res_unit3(hidden_state)
return hidden_state
class OobleckDiagonalGaussianDistribution(object):
def __init__(self, parameters: torch.Tensor, deterministic: bool = False):
self.parameters = parameters
self.mean, self.scale = parameters.chunk(2, dim=1)
self.std = nn.functional.softplus(self.scale) + 1e-4
self.var = self.std * self.std
self.logvar = torch.log(self.var)
self.deterministic = deterministic
def sample(self, generator: torch.Generator = None) -> torch.Tensor:
device = self.parameters.device
dtype = self.parameters.dtype
sample = torch.randn(self.mean.shape, generator=generator, device=device, dtype=dtype)
x = self.mean + self.std * sample
return x
def kl(self, other: "OobleckDiagonalGaussianDistribution" = None) -> torch.Tensor:
if self.deterministic:
return torch.Tensor([0.0])
else:
if other is None:
return (self.mean * self.mean + self.var - self.logvar - 1.0).sum(1).mean()
else:
normalized_diff = torch.pow(self.mean - other.mean, 2) / other.var
var_ratio = self.var / other.var
logvar_diff = self.logvar - other.logvar
kl = normalized_diff + var_ratio + logvar_diff - 1
kl = kl.sum(1).mean()
return kl
def mode(self) -> torch.Tensor:
return self.mean
@dataclass
class AutoencoderOobleckOutput:
"""
Output of AutoencoderOobleck encoding method.
Args:
latent_dist (`OobleckDiagonalGaussianDistribution`):
Encoded outputs of `Encoder` represented as the mean and standard deviation of
`OobleckDiagonalGaussianDistribution`. `OobleckDiagonalGaussianDistribution` allows for sampling latents
from the distribution.
"""
latent_dist: "OobleckDiagonalGaussianDistribution"
@dataclass
class OobleckDecoderOutput:
r"""
Output of decoding method.
Args:
sample (`torch.Tensor` of shape `(batch_size, audio_channels, sequence_length)`):
The decoded output sample from the last layer of the model.
"""
sample: torch.Tensor
class OobleckEncoder(nn.Module):
"""Oobleck Encoder"""
def __init__(self, encoder_hidden_size, audio_channels, downsampling_ratios, channel_multiples):
super().__init__()
strides = downsampling_ratios
channel_multiples = [1] + channel_multiples
# Create first convolution
self.conv1 = weight_norm(nn.Conv1d(audio_channels, encoder_hidden_size, kernel_size=7, padding=3))
self.block = []
# Create EncoderBlocks that double channels as they downsample by `stride`
for stride_index, stride in enumerate(strides):
self.block += [
OobleckEncoderBlock(
input_dim=encoder_hidden_size * channel_multiples[stride_index],
output_dim=encoder_hidden_size * channel_multiples[stride_index + 1],
stride=stride,
)
]
self.block = nn.ModuleList(self.block)
d_model = encoder_hidden_size * channel_multiples[-1]
self.snake1 = Snake1d(d_model)
self.conv2 = weight_norm(nn.Conv1d(d_model, encoder_hidden_size, kernel_size=3, padding=1))
def forward(self, hidden_state):
hidden_state = self.conv1(hidden_state)
for module in self.block:
hidden_state = module(hidden_state)
hidden_state = self.snake1(hidden_state)
hidden_state = self.conv2(hidden_state)
return hidden_state
class OobleckDecoder(nn.Module):
"""Oobleck Decoder"""
def __init__(self, channels, input_channels, audio_channels, upsampling_ratios, channel_multiples):
super().__init__()
strides = upsampling_ratios
channel_multiples = [1] + channel_multiples
# Add first conv layer
self.conv1 = weight_norm(nn.Conv1d(input_channels, channels * channel_multiples[-1], kernel_size=7, padding=3))
# Add upsampling + MRF blocks
block = []
for stride_index, stride in enumerate(strides):
block += [
OobleckDecoderBlock(
input_dim=channels * channel_multiples[len(strides) - stride_index],
output_dim=channels * channel_multiples[len(strides) - stride_index - 1],
stride=stride,
)
]
self.block = nn.ModuleList(block)
output_dim = channels
self.snake1 = Snake1d(output_dim)
self.conv2 = weight_norm(nn.Conv1d(channels, audio_channels, kernel_size=7, padding=3, bias=False))
def forward(self, hidden_state):
hidden_state = self.conv1(hidden_state)
for layer in self.block:
hidden_state = layer(hidden_state)
hidden_state = self.snake1(hidden_state)
hidden_state = self.conv2(hidden_state)
return hidden_state
class AceStepVAE(nn.Module):
r"""
An autoencoder for encoding waveforms into latents and decoding latent representations into waveforms. First
introduced in Stable Audio.
Parameters:
encoder_hidden_size (`int`, *optional*, defaults to 128):
Intermediate representation dimension for the encoder.
downsampling_ratios (`list[int]`, *optional*, defaults to `[2, 4, 4, 8, 8]`):
Ratios for downsampling in the encoder. These are used in reverse order for upsampling in the decoder.
channel_multiples (`list[int]`, *optional*, defaults to `[1, 2, 4, 8, 16]`):
Multiples used to determine the hidden sizes of the hidden layers.
decoder_channels (`int`, *optional*, defaults to 128):
Intermediate representation dimension for the decoder.
decoder_input_channels (`int`, *optional*, defaults to 64):
Input dimension for the decoder. Corresponds to the latent dimension.
audio_channels (`int`, *optional*, defaults to 2):
Number of channels in the audio data. Either 1 for mono or 2 for stereo.
sampling_rate (`int`, *optional*, defaults to 44100):
The sampling rate at which the audio waveform should be digitalized expressed in hertz (Hz).
"""
def __init__(
self,
encoder_hidden_size=128,
downsampling_ratios=[2, 4, 4, 8, 8],
channel_multiples=[1, 2, 4, 8, 16],
decoder_channels=128,
decoder_input_channels=64,
audio_channels=2,
sampling_rate=44100,
):
super().__init__()
self.encoder_hidden_size = encoder_hidden_size
self.downsampling_ratios = downsampling_ratios
self.decoder_channels = decoder_channels
self.upsampling_ratios = downsampling_ratios[::-1]
self.hop_length = int(np.prod(downsampling_ratios))
self.sampling_rate = sampling_rate
self.encoder = OobleckEncoder(
encoder_hidden_size=encoder_hidden_size,
audio_channels=audio_channels,
downsampling_ratios=downsampling_ratios,
channel_multiples=channel_multiples,
)
self.decoder = OobleckDecoder(
channels=decoder_channels,
input_channels=decoder_input_channels,
audio_channels=audio_channels,
upsampling_ratios=self.upsampling_ratios,
channel_multiples=channel_multiples,
)
self.use_slicing = False
def encode(self, x: torch.Tensor, return_dict: bool = True):
"""
Encode a batch of images into latents.
Args:
x (`torch.Tensor`): Input batch of images.
return_dict (`bool`, *optional*, defaults to `True`):
Whether to return a [`~models.autoencoder_kl.AutoencoderKLOutput`] instead of a plain tuple.
Returns:
The latent representations of the encoded images. If `return_dict` is True, a
[`~models.autoencoder_kl.AutoencoderKLOutput`] is returned, otherwise a plain `tuple` is returned.
"""
if self.use_slicing and x.shape[0] > 1:
encoded_slices = [self.encoder(x_slice) for x_slice in x.split(1)]
h = torch.cat(encoded_slices)
else:
h = self.encoder(x)
posterior = OobleckDiagonalGaussianDistribution(h)
if not return_dict:
return (posterior,)
return AutoencoderOobleckOutput(latent_dist=posterior)
def _decode(self, z: torch.Tensor, return_dict: bool = True):
dec = self.decoder(z)
if not return_dict:
return (dec,)
return OobleckDecoderOutput(sample=dec)
def decode(self, z: torch.FloatTensor, return_dict: bool = True, generator=None):
"""
Decode a batch of images.
Args:
z (`torch.Tensor`): Input batch of latent vectors.
return_dict (`bool`, *optional*, defaults to `True`):
Whether to return a [`~models.vae.OobleckDecoderOutput`] instead of a plain tuple.
Returns:
[`~models.vae.OobleckDecoderOutput`] or `tuple`:
If return_dict is True, a [`~models.vae.OobleckDecoderOutput`] is returned, otherwise a plain `tuple`
is returned.
"""
if self.use_slicing and z.shape[0] > 1:
decoded_slices = [self._decode(z_slice).sample for z_slice in z.split(1)]
decoded = torch.cat(decoded_slices)
else:
decoded = self._decode(z).sample
if not return_dict:
return (decoded,)
return OobleckDecoderOutput(sample=decoded)
def forward(
self,
sample: torch.Tensor,
sample_posterior: bool = False,
return_dict: bool = True,
generator: torch.Generator = None,
):
r"""
Args:
sample (`torch.Tensor`): Input sample.
sample_posterior (`bool`, *optional*, defaults to `False`):
Whether to sample from the posterior.
return_dict (`bool`, *optional*, defaults to `True`):
Whether or not to return a [`OobleckDecoderOutput`] instead of a plain tuple.
"""
x = sample
posterior = self.encode(x).latent_dist
if sample_posterior:
z = posterior.sample(generator=generator)
else:
z = posterior.mode()
dec = self.decode(z).sample
if not return_dict:
return (dec,)
return OobleckDecoderOutput(sample=dec)

View File

@@ -0,0 +1,217 @@
import torch, math
from PIL import Image
from typing import Union
from tqdm import tqdm
from einops import rearrange
import numpy as np
from math import prod
from transformers import AutoTokenizer
from ..core.device.npu_compatible_device import get_device_type
from ..diffusion import FlowMatchScheduler
from ..core import ModelConfig, gradient_checkpoint_forward
from ..diffusion.base_pipeline import BasePipeline, PipelineUnit, ControlNetInput
from ..utils.lora.merge import merge_lora
from ..core.device.npu_compatible_device import get_device_type
from ..core import ModelConfig
from ..diffusion.base_pipeline import BasePipeline
from ..models.ace_step_text_encoder import AceStepTextEncoder
from ..models.ace_step_vae import AceStepVAE
from ..models.ace_step_dit import AceStepConditionGenerationModelWrapper
class AceStepAudioPipeline(BasePipeline):
def __init__(self, device=get_device_type(), torch_dtype=torch.bfloat16):
super().__init__(device=device, torch_dtype=torch_dtype)
self.text_encoder: AceStepTextEncoder = None
self.dit: AceStepConditionGenerationModelWrapper = None
self.vae: AceStepVAE = None
self.scheduler = FlowMatchScheduler()
self.tokenizer: AutoTokenizer = None
self.in_iteration_models = ("dit",)
self.units = []
@staticmethod
def from_pretrained(
torch_dtype: torch.dtype = torch.bfloat16,
device: Union[str, torch.device] = get_device_type(),
model_configs: list[ModelConfig] = [],
tokenizer_config: ModelConfig = ModelConfig(model_id="ACE-Step/Ace-Step1.5", origin_file_pattern="Qwen3-Embedding-0.6B"),
vram_limit: float = None,
):
# Initialize pipeline
pipe = AceStepAudioPipeline(device=device, torch_dtype=torch_dtype)
model_pool = pipe.download_and_load_models(model_configs, vram_limit)
# Fetch models
pipe.text_encoder = model_pool.fetch_model("ace_step_text_encoder")
pipe.dit = model_pool.fetch_model("ace_step_dit")
pipe.vae = model_pool.fetch_model("ace_step_vae")
if tokenizer_config is not None:
tokenizer_config.download_if_necessary()
pipe.tokenizer = AutoTokenizer.from_pretrained(tokenizer_config.path)
# VRAM Management
pipe.vram_management_enabled = pipe.check_vram_management_state()
return pipe
@torch.no_grad()
def __call__(
self,
caption: str,
lyrics: str = "",
duration: float = 160,
bpm: int = None,
keyscale: str = "",
timesignature: str = "",
vocal_language: str = "zh",
instrumental: bool = False,
inference_steps: int = 8,
guidance_scale: float = 3.0,
seed: int = None,
):
# Format text prompt with metadata
text_prompt = self._format_text_prompt(caption, bpm, keyscale, timesignature, duration)
lyrics_text = self._format_lyrics(lyrics, vocal_language, instrumental)
# Tokenize
text_inputs = self.tokenizer(
text_prompt,
return_tensors="pt",
padding=True,
truncation=True,
max_length=512,
).to(self.device)
lyrics_inputs = self.tokenizer(
lyrics_text,
return_tensors="pt",
padding=True,
truncation=True,
max_length=2048,
).to(self.device)
# Encode text and lyrics
text_outputs = self.text_encoder(
input_ids=text_inputs["input_ids"],
attention_mask=text_inputs["attention_mask"],
)
lyrics_outputs = self.text_encoder(
input_ids=lyrics_inputs["input_ids"],
attention_mask=lyrics_inputs["attention_mask"],
)
# Get hidden states
text_hidden_states = text_outputs.last_hidden_state
lyric_hidden_states = lyrics_outputs.last_hidden_state
# Prepare generation parameters
latent_frames = int(duration * 46.875) # 48000 / 1024 ≈ 46.875 Hz
# For text2music task, use silence_latent as src_latents
# silence_latent will be tokenized/detokenized to get lm_hints_25Hz (127 dims)
# which will be used as context for generation
if self.silence_latent is not None:
# Slice or pad silence_latent to match latent_frames
if self.silence_latent.shape[1] >= latent_frames:
src_latents = self.silence_latent[:, :latent_frames, :].to(device=self.device, dtype=self.torch_dtype)
else:
# Pad with zeros if silence_latent is shorter
pad_len = latent_frames - self.silence_latent.shape[1]
src_latents = torch.cat([
self.silence_latent.to(device=self.device, dtype=self.torch_dtype),
torch.zeros(1, pad_len, self.src_latent_channels, device=self.device, dtype=self.torch_dtype)
], dim=1)
else:
# Fallback: create random latents if silence_latent is not loaded
src_latents = torch.randn(1, latent_frames, self.src_latent_channels,
device=self.device, dtype=self.torch_dtype)
# Create attention mask
attention_mask = torch.ones(1, latent_frames, device=self.device, dtype=self.torch_dtype)
# Use silence_latent for the silence_latent parameter as well
silence_latent = src_latents
# Chunk masks and is_covers (for text2music, these are all zeros)
# chunk_masks shape: [batch, latent_frames, 1]
chunk_masks = torch.zeros(1, latent_frames, 1, device=self.device, dtype=self.torch_dtype)
is_covers = torch.zeros(1, device=self.device, dtype=self.torch_dtype)
# Reference audio (empty for text2music)
# For text2music mode, we need empty reference audio
# refer_audio_acoustic_hidden_states_packed: [batch, num_segments, hidden_dim]
# refer_audio_order_mask: [num_segments] - indicates which batch each segment belongs to
refer_audio_acoustic_hidden_states_packed = torch.zeros(1, 1, 64, device=self.device, dtype=self.torch_dtype)
refer_audio_order_mask = torch.zeros(1, device=self.device, dtype=torch.long) # 1-d tensor
# Generate audio latents using DiT model
generation_result = self.dit.model.generate_audio(
text_hidden_states=text_hidden_states,
text_attention_mask=text_inputs["attention_mask"],
lyric_hidden_states=lyric_hidden_states,
lyric_attention_mask=lyrics_inputs["attention_mask"],
refer_audio_acoustic_hidden_states_packed=refer_audio_acoustic_hidden_states_packed,
refer_audio_order_mask=refer_audio_order_mask,
src_latents=src_latents,
chunk_masks=chunk_masks,
is_covers=is_covers,
silence_latent=silence_latent,
attention_mask=attention_mask,
seed=seed if seed is not None else 42,
fix_nfe=inference_steps,
shift=guidance_scale,
)
# Extract target latents from result dictionary
generated_latents = generation_result["target_latents"]
# Decode latents to audio
# generated_latents shape: [batch, latent_frames, 64]
# VAE expects: [batch, latent_frames, 64]
audio_output = self.vae.decode(generated_latents, return_dict=True)
audio = audio_output.sample
# Post-process audio
audio = self._postprocess_audio(audio)
self.load_models_to_device([])
return audio
def _format_text_prompt(self, caption, bpm, keyscale, timesignature, duration):
"""Format text prompt with metadata"""
prompt = "# Instruction\nFill the audio semantic mask based on the given conditions:\n\n"
prompt += f"# Caption\n{caption}\n\n"
prompt += "# Metas\n"
if bpm:
prompt += f"- bpm: {bpm}\n"
if timesignature:
prompt += f"- timesignature: {timesignature}\n"
if keyscale:
prompt += f"- keyscale: {keyscale}\n"
prompt += f"- duration: {int(duration)} seconds\n"
prompt += "<|endoftext|>"
return prompt
def _format_lyrics(self, lyrics, vocal_language, instrumental):
"""Format lyrics with language"""
if instrumental or not lyrics:
lyrics = "[Instrumental]"
lyrics_text = f"# Languages\n{vocal_language}\n\n# Lyric\n{lyrics}<|endoftext|>"
return lyrics_text
def _postprocess_audio(self, audio):
"""Post-process audio tensor"""
# Ensure audio is on CPU and in float32
audio = audio.to(device="cpu", dtype=torch.float32)
# Normalize to [-1, 1]
max_val = torch.abs(audio).max()
if max_val > 0:
audio = audio / max_val
return audio

View File

@@ -0,0 +1,15 @@
def AceStepDiTStateDictConverter(state_dict):
"""
Convert ACE-Step DiT state dict to add 'model.' prefix for wrapper class.
The wrapper class has self.model = AceStepConditionGenerationModel(config),
so all keys need to be prefixed with 'model.'
"""
state_dict_ = {}
keys = state_dict.keys() if hasattr(state_dict, 'keys') else state_dict
for k in keys:
v = state_dict[k]
if not k.startswith("model."):
k = "model." + k
state_dict_[k] = v
return state_dict_

View File

@@ -0,0 +1,19 @@
def AceStepTextEncoderStateDictConverter(state_dict):
"""
将 ACE-Step Text Encoder 权重添加 model. 前缀
Args:
state_dict: 原始的 state dict可能是 dict 或 DiskMap
Returns:
转换后的 state dict所有 key 添加 "model." 前缀
"""
state_dict_ = {}
# 处理 DiskMap 或普通 dict
keys = state_dict.keys() if hasattr(state_dict, 'keys') else state_dict
for k in keys:
v = state_dict[k]
if not k.startswith("model."):
k = "model." + k
state_dict_[k] = v
return state_dict_

View File

@@ -0,0 +1,14 @@
from diffsynth.pipelines.ace_step_audio import AceStepAudioPipeline, ModelConfig
import torch
pipe = AceStepAudioPipeline.from_pretrained(
torch_dtype=torch.bfloat16,
device="cuda",
model_configs=[
ModelConfig(model_id="ACE-Step/Ace-Step1.5", origin_file_pattern="Qwen3-Embedding-0.6B/model.safetensors"),
ModelConfig(model_id="ACE-Step/Ace-Step1.5", origin_file_pattern="acestep-v15-turbo/model.safetensors"),
ModelConfig(model_id="ACE-Step/Ace-Step1.5", origin_file_pattern="vae/diffusion_pytorch_model.safetensors"),
],
tokenizer_config=ModelConfig(model_id="ACE-Step/Ace-Step1.5", origin_file_pattern="Qwen3-Embedding-0.6B"),
)