mirror of
https://github.com/modelscope/DiffSynth-Studio.git
synced 2026-03-18 22:08:13 +00:00
141 lines
5.1 KiB
Python
141 lines
5.1 KiB
Python
import torch, math
|
|
|
|
|
|
class FlowMatchScheduler():
|
|
|
|
def __init__(
|
|
self,
|
|
num_inference_steps=100,
|
|
num_train_timesteps=1000,
|
|
shift=3.0,
|
|
sigma_max=1.0,
|
|
sigma_min=0.003/1.002,
|
|
inverse_timesteps=False,
|
|
extra_one_step=False,
|
|
reverse_sigmas=False,
|
|
exponential_shift=False,
|
|
exponential_shift_mu=None,
|
|
shift_terminal=None,
|
|
):
|
|
self.num_train_timesteps = num_train_timesteps
|
|
self.shift = shift
|
|
self.sigma_max = sigma_max
|
|
self.sigma_min = sigma_min
|
|
self.inverse_timesteps = inverse_timesteps
|
|
self.extra_one_step = extra_one_step
|
|
self.reverse_sigmas = reverse_sigmas
|
|
self.exponential_shift = exponential_shift
|
|
self.exponential_shift_mu = exponential_shift_mu
|
|
self.shift_terminal = shift_terminal
|
|
self.set_timesteps(num_inference_steps)
|
|
|
|
|
|
def set_timesteps(self, num_inference_steps=100, denoising_strength=1.0, training=False, shift=None, dynamic_shift_len=None, exponential_shift_mu=None):
|
|
if shift is not None:
|
|
self.shift = shift
|
|
sigma_start = self.sigma_min + (self.sigma_max - self.sigma_min) * denoising_strength
|
|
if self.extra_one_step:
|
|
self.sigmas = torch.linspace(sigma_start, self.sigma_min, num_inference_steps + 1)[:-1]
|
|
else:
|
|
self.sigmas = torch.linspace(sigma_start, self.sigma_min, num_inference_steps)
|
|
if self.inverse_timesteps:
|
|
self.sigmas = torch.flip(self.sigmas, dims=[0])
|
|
if self.exponential_shift:
|
|
if exponential_shift_mu is not None:
|
|
mu = exponential_shift_mu
|
|
elif dynamic_shift_len is not None:
|
|
mu = self.calculate_shift(dynamic_shift_len)
|
|
else:
|
|
mu = self.exponential_shift_mu
|
|
self.sigmas = math.exp(mu) / (math.exp(mu) + (1 / self.sigmas - 1))
|
|
else:
|
|
self.sigmas = self.shift * self.sigmas / (1 + (self.shift - 1) * self.sigmas)
|
|
if self.shift_terminal is not None:
|
|
one_minus_z = 1 - self.sigmas
|
|
scale_factor = one_minus_z[-1] / (1 - self.shift_terminal)
|
|
self.sigmas = 1 - (one_minus_z / scale_factor)
|
|
if self.reverse_sigmas:
|
|
self.sigmas = 1 - self.sigmas
|
|
self.timesteps = self.sigmas * self.num_train_timesteps
|
|
if training:
|
|
x = self.timesteps
|
|
y = torch.exp(-2 * ((x - num_inference_steps / 2) / num_inference_steps) ** 2)
|
|
y_shifted = y - y.min()
|
|
bsmntw_weighing = y_shifted * (num_inference_steps / y_shifted.sum())
|
|
self.linear_timesteps_weights = bsmntw_weighing
|
|
self.training = True
|
|
else:
|
|
self.training = False
|
|
|
|
|
|
def step(self, model_output, timestep, sample, to_final=False, **kwargs):
|
|
if isinstance(timestep, torch.Tensor):
|
|
timestep = timestep.cpu()
|
|
timestep_id = torch.argmin((self.timesteps - timestep).abs())
|
|
sigma = self.sigmas[timestep_id]
|
|
if to_final or timestep_id + 1 >= len(self.timesteps):
|
|
sigma_ = 1 if (self.inverse_timesteps or self.reverse_sigmas) else 0
|
|
else:
|
|
sigma_ = self.sigmas[timestep_id + 1]
|
|
prev_sample = sample + model_output * (sigma_ - sigma)
|
|
return prev_sample
|
|
|
|
|
|
def return_to_timestep(self, timestep, sample, sample_stablized):
|
|
if isinstance(timestep, torch.Tensor):
|
|
timestep = timestep.cpu()
|
|
timestep_id = torch.argmin((self.timesteps - timestep).abs())
|
|
sigma = self.sigmas[timestep_id]
|
|
model_output = (sample - sample_stablized) / sigma
|
|
return model_output
|
|
|
|
|
|
def add_noise(self, original_samples, noise, timestep):
|
|
if isinstance(timestep, torch.Tensor):
|
|
timestep = timestep.cpu()
|
|
timestep_id = torch.argmin((self.timesteps - timestep).abs())
|
|
sigma = self.sigmas[timestep_id]
|
|
sample = (1 - sigma) * original_samples + sigma * noise
|
|
return sample
|
|
|
|
|
|
def training_target(self, sample, noise, timestep):
|
|
target = noise - sample
|
|
return target
|
|
|
|
|
|
def training_weight(self, timestep):
|
|
timestep_id = torch.argmin((self.timesteps - timestep.to(self.timesteps.device)).abs())
|
|
weights = self.linear_timesteps_weights[timestep_id]
|
|
return weights
|
|
|
|
|
|
def calculate_shift(
|
|
self,
|
|
image_seq_len,
|
|
base_seq_len: int = 256,
|
|
max_seq_len: int = 8192,
|
|
base_shift: float = 0.5,
|
|
max_shift: float = 0.9,
|
|
):
|
|
m = (max_shift - base_shift) / (max_seq_len - base_seq_len)
|
|
b = base_shift - m * base_seq_len
|
|
mu = image_seq_len * m + b
|
|
return mu
|
|
|
|
def compute_empirical_mu(self, image_seq_len: int, num_steps: int) -> float:
|
|
a1, b1 = 8.73809524e-05, 1.89833333
|
|
a2, b2 = 0.00016927, 0.45666666
|
|
|
|
if image_seq_len > 4300:
|
|
mu = a2 * image_seq_len + b2
|
|
return float(mu)
|
|
|
|
m_200 = a2 * image_seq_len + b2
|
|
m_10 = a1 * image_seq_len + b1
|
|
|
|
a = (m_200 - m_10) / 190.0
|
|
b = m_200 - 200.0 * a
|
|
mu = a * num_steps + b
|
|
|
|
return float(mu) |