import math
import torch
from torch import nn, einsum
import torch.nn.functional as F
from inspect import isfunction
from functools import partial
import numpy as np
from tqdm import tqdm
def _warmup_beta(linear_start, linear_end, n_timestep, warmup_frac):
betas = linear_end * np.ones(n_timestep, dtype=np.float64)
warmup_time = int(n_timestep * warmup_frac)
betas[:warmup_time] = np.linspace(
linear_start, linear_end, warmup_time, dtype=np.float64)
return betas
def make_beta_schedule(schedule, n_timestep, linear_start=1e-4, linear_end=2e-2, cosine_s=8e-3):
if schedule == 'quad':
betas = np.linspace(linear_start ** 0.5, linear_end ** 0.5,
n_timestep, dtype=np.float64) ** 2
elif schedule == 'linear':
betas = np.linspace(linear_start, linear_end,
n_timestep, dtype=np.float64)
elif schedule == 'warmup10':
betas = _warmup_beta(linear_start, linear_end,
n_timestep, 0.1)
elif schedule == 'warmup50':
betas = _warmup_beta(linear_start, linear_end,
n_timestep, 0.5)
elif schedule == 'const':
betas = linear_end * np.ones(n_timestep, dtype=np.float64)
elif schedule == 'jsd': # 1/T, 1/(T-1), 1/(T-2), ..., 1
betas = 1. / np.linspace(n_timestep,
1, n_timestep, dtype=np.float64)
elif schedule == "cosine":
timesteps = (
torch.arange(n_timestep + 1, dtype=torch.float64) /
n_timestep + cosine_s
)
alphas = timesteps / (1 + cosine_s) * math.pi / 2
alphas = torch.cos(alphas).pow(2)
alphas = alphas / alphas[0]
betas = 1 - alphas[1:] / alphas[:-1]
betas = betas.clamp(max=0.999)
else:
raise NotImplementedError(schedule)
return betas
# gaussian diffusion trainer class
def exists(x):
return x is not None
def default(val, d):
if exists(val):
return val
return d() if isfunction(d) else d
def extract(a, t, x_shape):
b, *_ = t.shape
out = a.gather(-1, t)
return out.reshape(b, *((1,) * (len(x_shape) - 1)))
def noise_like(shape, device, repeat=False):
def repeat_noise(): return torch.randn(
(1, *shape[1:]), device=device).repeat(shape[0], *((1,) * (len(shape) - 1)))
def noise(): return torch.randn(shape, device=device)
return repeat_noise() if repeat else noise()
class GaussianDiffusion(nn.Module):
def __init__(
self,
denoise_fn,
image_size,
channels=3,
loss_type='l1',
conditional=True,
schedule_opt=None
):
super().__init__()
self.channels = channels
self.image_size = image_size
self.denoise_fn = denoise_fn
self.conditional = conditional
self.loss_type = loss_type
if schedule_opt is not None:
pass
# self.set_new_noise_schedule(schedule_opt)
def set_loss(self, device):
if self.loss_type == 'l1':
self.loss_func = nn.L1Loss(reduction='sum').to(device)
elif self.loss_type == 'l2':
self.loss_func = nn.MSELoss(reduction='sum').to(device)
else:
raise NotImplementedError()
def set_new_noise_schedule(self, schedule_opt, device):
to_torch = partial(torch.tensor, dtype=torch.float32, device=device)
betas = make_beta_schedule(
schedule=schedule_opt['schedule'],
n_timestep=schedule_opt['n_timestep'],
linear_start=schedule_opt['linear_start'],
linear_end=schedule_opt['linear_end'])
betas = betas.detach().cpu().numpy() if isinstance(betas, torch.Tensor) else betas
alphas = 1. - betas
alphas_cumprod = np.cumprod(alphas, axis=0)
alphas_cumprod_prev = np.append(1., alphas_cumprod[:-1])
timesteps, = betas.shape
self.num_timesteps = int(timesteps)
self.register_buffer('betas', to_torch(betas))
self.register_buffer('alphas_cumprod', to_torch(alphas_cumprod))
self.register_buffer('alphas_cumprod_prev',
to_torch(alphas_cumprod_prev))
# calculations for diffusion q(x_t | x_{t-1}) and others
self.register_buffer('sqrt_alphas_cumprod',
to_torch(np.sqrt(alphas_cumprod)))
self.register_buffer('sqrt_one_minus_alphas_cumprod',
to_torch(np.sqrt(1. - alphas_cumprod)))
self.register_buffer('log_one_minus_alphas_cumprod',
to_torch(np.log(1. - alphas_cumprod)))
self.register_buffer('sqrt_recip_alphas_cumprod',
to_torch(np.sqrt(1. / alphas_cumprod)))
self.register_buffer('sqrt_recipm1_alphas_cumprod',
to_torch(np.sqrt(1. / alphas_cumprod - 1)))
# calculations for posterior q(x_{t-1} | x_t, x_0)
posterior_variance = betas * \
(1. - alphas_cumprod_prev) / (1. - alphas_cumprod)
# above: equal to 1. / (1. / (1. - alpha_cumprod_tm1) + alpha_t / beta_t)
self.register_buffer('posterior_variance',
to_torch(posterior_variance))
# below: log calculation clipped because the posterior variance is 0 at the beginning of the diffusion chain
self.register_buffer('posterior_log_variance_clipped', to_torch(
np.log(np.maximum(posterior_variance, 1e-20))))
self.register_buffer('posterior_mean_coef1', to_torch(
betas * np.sqrt(alphas_cumprod_prev) / (1. - alphas_cumprod)))
self.register_buffer('posterior_mean_coef2', to_torch(
(1. - alphas_cumprod_prev) * np.sqrt(alphas) / (1. - alphas_cumprod)))
def q_mean_variance(self, x_start, t):
mean = extract(self.sqrt_alphas_cumprod, t, x_start.shape) * x_start
variance = extract(1. - self.alphas_cumprod, t, x_start.shape)
log_variance = extract(
self.log_one_minus_alphas_cumprod, t, x_start.shape)
return mean, variance, log_variance
def predict_start_from_noise(self, x_t, t, noise):
return (
extract(self.sqrt_recip_alphas_cumprod, t, x_t.shape) * x_t -
extract(self.sqrt_recipm1_alphas_cumprod, t, x_t.shape) * noise
)
def q_posterior(self, x_start, x_t, t):
posterior_mean = (
extract(self.posterior_mean_coef1, t, x_t.shape) * x_start +
extract(self.posterior_mean_coef2, t, x_t.shape) * x_t
)
posterior_variance = extract(self.posterior_variance, t, x_t.shape)
posterior_log_variance_clipped = extract(
self.posterior_log_variance_clipped, t, x_t.shape)
return posterior_mean, posterior_variance, posterior_log_variance_clipped
def p_mean_variance(self, x, t, clip_denoised: bool, condition_x=None):
if condition_x is not None:
x_recon = self.predict_start_from_noise(
x, t=t, noise=self.denoise_fn(torch.cat([condition_x, x], dim=1), t))
else:
x_recon = self.predict_start_from_noise(
x, t=t, noise=self.denoise_fn(x, t))
if clip_denoised:
x_recon.clamp_(-1., 1.)
model_mean, posterior_variance, posterior_log_variance = self.q_posterior(
x_start=x_recon, x_t=x, t=t)
return model_mean, posterior_variance, posterior_log_variance
@torch.no_grad()
def p_sample(self, x, t, clip_denoised=True, repeat_noise=False, condition_x=None):
b, *_, device = *x.shape, x.device
model_mean, _, model_log_variance = self.p_mean_variance(
x=x, t=t, clip_denoised=clip_denoised, condition_x=condition_x)
noise = noise_like(x.shape, device, repeat_noise)
# no noise when t == 0
nonzero_mask = (1 - (t == 0).float()).reshape(b,
没有合适的资源?快使用搜索试试~ 我知道了~
温馨提示
1.源代码中包括大量的小文件,不适合初学者调试和使用。 2.我根据其核心代码进行简化,将一些不必要的代码文件和代码块删除,并在Rain13K (就是MPRNet、Restormer等方法使用的去雨数据集) 上进行了实验,目前取得不错的表现。 3. 其他的任务应该也是可以直接使用的,只需要修改一下配置文件中的数据集路径即可。 4.大概将源代码中的文件简化一半,并写了一些关键注释。 5.经济有限可以私聊我发邮箱。 6.里边没有README,需要的可以私聊我。
资源推荐
资源详情
资源评论
收起资源包目录
SR3_IR.rar (27个子文件)
SR3_IR
main.py 7KB
data
__init__.py 906B
util.py 2KB
dataset.py 2KB
__pycache__
util.cpython-38.pyc 3KB
dataset.cpython-38.pyc 2KB
__init__.cpython-38.pyc 924B
core
metrics.py 3KB
logger.py 3KB
__pycache__
metrics.cpython-38.pyc 3KB
logger.cpython-38.pyc 4KB
model
__init__.py 209B
networks.py 4KB
model.py 6KB
base_model.py 1KB
sr3_modules
diffusion.py 9KB
unet.py 8KB
__pycache__
unet.cpython-38.pyc 8KB
diffusion.cpython-38.pyc 7KB
__pycache__
model.cpython-38.pyc 5KB
base_model.cpython-38.pyc 2KB
networks.cpython-38.pyc 3KB
__init__.cpython-38.pyc 420B
ddpm_modules
diffusion.py 12KB
unet.py 8KB
test.py 3KB
config
config.json 3KB
共 27 条
- 1
听风、
- 粉丝: 1w+
- 资源: 43
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功
- 1
- 2
- 3
- 4
- 5
- 6
前往页