Untitled
unknown
plain_text
4 years ago
18 kB
7
Indexable
import numpy as np import torch import torch.nn as nn import torch.nn.functional as F from torch.autograd import Variable class SIRD(nn.Module): def __init__(self, days): super().__init__() self.days = days def forward(self, inputs, params): if self.training: std = torch.cat([params[:, 6]] * self.days, dim = 1) rand = torch.normal(torch.zeros(inputs.shape[0], self.days), std) beta = params[:, 6] + rand * params[:, 5] else: beta = torch.cat([params[:, 6]] * self.days, dim = 1) S = inputs[..., 0] - beta * inputs[..., 0] * inputs[..., 1] + params[:, 3] * inputs[..., 3] - params[:, 4] * inputs[..., 0] E = inputs[..., 1] + beta * inputs[..., 0] * inputs[..., 1] - params[:, 2] * inputs[..., 1] I = inputs[..., 2] + params[:, 4] * inputs[..., 0] - params[:, 1] * inputs[..., 2] + params[:, 2] * inputs[..., 1] - params[:, 0] * inputs[..., 2] R = inputs[..., 3] + params[:, 0] * inputs[..., 2] - params[:, 3] * inputs[..., 3] D = inputs[..., 4] + params[:, 1] * inputs[..., 2] N = inputs[..., -1] return torch.stack((S, E, I, R, D, beta, N), dim = -1) class SEIRDTest(nn.Module): def __init__(self, initN, days): super().__init__() self.days = days self.initN = initN def forward(self, inputs, params): # inputs: [Sn (0), E (1), St (2), I (3), R (4), D (5), beta, N] # params [sigma (0), beta_bar (1), tau (2), rho (3), xi (4), delta (5), gamma (6), mu (7)] params[:,7] = params[:,7] /100 if self.training: std = torch.cat([params[:, 1]] * self.days, dim = 1) rand = torch.normal(torch.zeros(inputs.shape[0], self.days), std) beta = params[:, 1] + rand * params[:, 0] else: beta = torch.cat([params[:, 1]] * self.days, dim = 1) log_Sn = torch.log(inputs[..., 0] * self.initN / 1000 + 2) log_E = torch.log(inputs[..., 1] * self.initN / 1000 + 2) tau_s = params[:, 2] * log_Sn / (log_Sn + log_E) tau_e = params[:, 2] * log_E / (log_Sn + log_E) Sn = inputs[..., 0] - beta * inputs[..., 0] * inputs[..., 1] - tau_s * inputs[..., 0] + params[:, 3] * inputs[..., 2] + params[:, 4] * inputs[..., 4] E = inputs[..., 1] + beta * inputs[..., 0] * inputs[..., 1] - tau_e * inputs[..., 1] St = inputs[..., 2] + tau_s * inputs[..., 0] - params[:, 5] * inputs[..., 2] - params[:, 3] * inputs[..., 2] I = inputs[..., 3] + tau_e * inputs[..., 1] + params[:, 5] * inputs[..., 2] - params[:, 6] * inputs[..., 3] - params[:, 7] * inputs[..., 3] R = inputs[..., 4] + params[:, 6] * inputs[..., 3] - params[:, 4] * inputs[..., 4] D = inputs[..., 5] + params[:, 7] * inputs[..., 3] N = inputs[..., -1] return torch.stack((Sn, E, St, I, R, D, beta, N), dim = -1) class WeightedMSELoss(nn.Module): def __init__(self, weight = None, reduction='mean'): super().__init__() self.weight = torch.Tensor(weight) self.reduction = reduction def forward(self, inputs, targets): if self.weight is None: return F.mse_loss(inputs, targets, reduction = self.reduction) loss = torch.sum(self.weight * ((inputs - targets) ** 2), dim = 1) if self.reduction == 'mean': return loss.mean() elif self.reduction == 'sum': return loss.sum() return loss class CBAMBlock(nn.Module): def __init__(self, channels, ratio, kernel_size = 3): super().__init__() self.avg_pool = nn.AdaptiveAvgPool1d(1) self.max_pool = nn.AdaptiveMaxPool1d(1) self.fc1 = nn.Conv1d(channels, channels // ratio, kernel_size = 1) self.fc2 = nn.Conv1d(channels // ratio, channels, kernel_size = 1) self.relu = nn.ReLU(inplace = True) self.sigmoid = nn.Sigmoid() self.conv = nn.Conv1d(2, 1, kernel_size = kernel_size, padding = kernel_size // 2) def forward(self, x): out_avg_pool = self.avg_pool(x) out_max_pool = self.max_pool(x) out_avg_fc1 = self.fc1(out_avg_pool) out_max_fc1 = self.fc1(out_max_pool) out_avg_relu = self.relu(out_avg_fc1) out_max_relu = self.relu(out_max_fc1) out_avg_fc2 = self.fc2(out_avg_relu) out_max_fc2 = self.fc2(out_max_relu) # N, C, H, W out = out_avg_fc2 + out_max_fc2 out_sigmoid = self.sigmoid(out) out_channel = x * out_sigmoid out_avg = torch.mean(out_channel, 1, True) out_max, _ = torch.max(out_channel, 1, True) out = torch.cat((out_avg, out_max), dim = 1) out = self.conv(out) out = self.sigmoid(out) return out * out_channel + x class AlphaLayer(nn.Module): def __init__(self): super().__init__() self.net = nn.Sequential( nn.Conv1d(1, 32, kernel_size = 2), nn.ReLU(), CBAMBlock(32, 8), nn.Dropout(0.4), nn.Conv1d(32, 32, kernel_size = 2, dilation = 2), nn.ReLU(), CBAMBlock(32, 8), nn.Dropout(0.4), nn.Conv1d(32, 64, kernel_size = 2, dilation = 4), nn.ReLU(), CBAMBlock(64, 8), nn.Dropout(0.4), nn.Flatten(), nn.Linear(192, 64), nn.ReLU(inplace = True), nn.Dropout(0.4), nn.Linear(64, 1), nn.Sigmoid() ) def forward(self, input): return self.net(input) class SIRD_T(nn.Module): def forward(self, inputs, params): # inputs: NUK, PU, PK, R, D, V, EC # params: tau - 0, zeta - 1, beta - 2, eta1 - 3, mu - 4, gamma_1 - 5, theta - 6, gamma_2 - 7 xi = 1 / 180 tau, zeta, beta, eta_1, mu, gamma_1,theta, gamma_2 = params[:,0], params[:,1], params[:,2], params[:,3], params[:,4], params[:, 5], params[:, 6], params[:, 7] NUK, PU, PK, R, D, V, EC = inputs[..., 0], inputs[..., 1], inputs[..., 2], inputs[..., 3], inputs[..., 4], inputs[..., 5], inputs[..., 6] NUK = NUK - tau*NUK - beta*NUK*PU + xi*R PU = PU + beta*NUK*PU + zeta*beta*V*PU - eta_1*PU PK = PK + eta_1*PU - theta*PK - gamma_1*PK EC = EC + theta*PK - gamma_2*EC - mu*EC R = R + gamma_1*PK + gamma_2*EC - xi*R D = D + mu*EC V = V + tau*NUK - zeta*beta*V*PU return torch.stack((NUK,PU,PK,R,D,V,EC),dim=-1) @staticmethod def apply_constraints(input, param): xi = 1/180 tau, zeta, beta, eta_1, mu, gamma_1,theta, gamma_2 = param[:,0], param[:,1], param[:,2], param[:,3], param[:,4], param[:, 5], param[:, 6], param[:, 7] NUK, PU, PK, R, D, V, EC = input[..., 0], input[..., 1], input[..., 2], input[..., 3], input[..., 4], input[..., 5], input[..., 6] return torch.relu(xi*R - gamma_1*PK - gamma_2*EC).sum(dim=1).mean() + torch.relu(theta+gamma_1-1).mean() + torch.relu(mu+gamma_2-1).mean() + torch.relu(beta-0.01).mean() + torch.relu(zeta-0.05).mean() class SEIRD_LVEC(nn.Module): def forward(self, inputs, params): # inputs: NUK, PU, PK, R, D, V, EC, L # params: tau - 0, zeta - 1, beta - 2, eta1 - 3, mu - 4, gamma_1 - 5, theta - 6, gamma_2 - 7, rho - 8, delta - 9, rho_hat - 10 xi = 1 / 180 # print(inputs) tau, zeta, beta, eta_1, mu, gamma_1,theta, gamma_2, rho, delta, rho_hat = params[:,0], params[:,1], params[:,2], params[:,3], params[:,4], params[:, 5], params[:, 6], params[:, 7], params[:, 8], params[:, 9], params[:, 10] NUK, PU, PK, R, D, V, EC, L = inputs[..., 0], inputs[..., 1], inputs[..., 2], inputs[..., 3], inputs[..., 4], inputs[..., 5], inputs[..., 6], inputs[..., 7] _NUK = NUK - tau*NUK - beta*NUK*PU + xi*R - rho*NUK + rho_hat*L _PU = PU + beta*NUK*PU + zeta*beta*V*PU - eta_1*PU _PK = PK + eta_1*PU - theta*PK - gamma_1*PK + delta*L _R = R + gamma_1*PK + gamma_2*EC - xi * R _D = D + mu*EC _V = V + tau*NUK - zeta*beta*V*PU _EC = EC + theta*PK - gamma_2*EC - mu*EC _L = L + rho*NUK - (delta + rho_hat)*L return torch.stack((_NUK,_PU,_PK,_R,_D,_V,_EC,_L),dim=-1) @staticmethod def apply_constraints(input, param): xi = 1 / 180 tau, zeta, beta, eta_1, mu, gamma_1,theta, gamma_2, rho, delta, rho_hat = param[:,0], param[:,1], param[:,2], param[:,3], param[:,4], param[:, 5], param[:, 6], param[:, 7], param[:, 8], param[:, 9], param[:, 10] NUK, PU, PK, R, D, V, EC, L = input[..., 0], input[..., 1], input[..., 2], input[..., 3], input[..., 4], input[..., 5], input[..., 6], input[..., 7] return torch.relu(xi*R - gamma_1*PK - gamma_2*EC).sum(dim=1).mean() + torch.relu(theta+gamma_1-1).mean() + torch.relu(mu+gamma_2-1).mean() + torch.relu(beta-0.001).mean() + torch.relu(zeta-0.5).mean() + torch.relu(delta + rho_hat - 1).mean() class SIRD_T_v2(nn.Module): """ 2 steps """ def forward(self, inputs, params): # inputs: NUK, PU, PK, R, D, V # params: tau - 0, zeta - 1, beta - 2, eta1 - 3, mu - 4, gamma - 5, xi - 6 xi = 1 / 180 #N, t+1 -> t+11, 6 NUK, PU, PK, R, D, V = inputs[..., 0], inputs[..., 1], inputs[..., 2], inputs[..., 3], inputs[..., 4], inputs[..., 5] NUK1 = NUK - params[:, 0] * NUK - params[:, 2] * NUK * PU + xi * R PU1 = PU + params[:, 2] * NUK * PU + params[:, 2] * (params[:, 1]) * V * PU - params[:, 3] * PU PK1 = PK + params[:, 3] * PU - (params[:, 4] + params[:, 5]) * PK R1 = R + params[:, 5] * PK - xi * R D1 = D + params[:, 4] * PK V1 = V + params[:, 0] * NUK - params[:, 2] * (params[:, 1]) * V * PU #N, t+2 -> t+12, 6 NUK2 = NUK1 - params[:, 0] * NUK1 - params[:, 2] * NUK1 * PU1 + xi * R1 PU2 = PU1 + params[:, 2] * NUK1 * PU1 + params[:, 2] * (params[:, 1]) * V1 * PU1 - params[:, 3] * PU1 PK2 = PK1 + params[:, 3] * PU1 - (params[:, 4] + params[:, 5]) * PK1 R2 = R1 + params[:, 5] * PK1 - xi * R1 D2 = D1 + params[:, 4] * PK1 V2 = V1 + params[:, 0] * NUK1 - params[:, 2] * (params[:, 1]) * V1 * PU1 return torch.stack((NUK, PU, PK, R, D, V), dim = -1) @staticmethod def apply_constraints(input, param): return torch.relu(1 / 180 * input[..., 3] - param[:, 5] * input[..., 2]).mean() def ODEFunc(params): class SEIRDV(nn.Module): def forward(self, t, y): # inputs: NUK, PU, PK, R, D, V # params: tau - 0, zeta - 1, beta - 2, eta1 - 3, mu - 4, gamma - 5, xi - 6 # params[:, 6] = 1 / 180 + params[:, 6] xi = 1 / 180 NUK, PU, PK, R, D, V = y[..., 0], y[..., 1], y[..., 2], y[..., 3], y[..., 4], y[..., 5] NUK = - params[:, 0] * NUK - params[:, 2] * NUK * PU + xi * R PU = params[:, 2] * NUK * PU + params[:, 2] * (params[:, 1]) * V * PU - params[:, 3] * PU PK = params[:, 3] * PU - (params[:, 4] + params[:, 5]) * PK R = params[:, 5] * PK - xi * R D = params[:, 4] * PK V = params[:, 0] * NUK - params[:, 2] * (params[:, 1]) * V * PU return torch.stack((NUK, PU, PK, R, D, V), dim = -1) return SEIRDV() class CNNBlock(nn.Module): def __init__(self, in_channels, out_channels, cardinality, kernel_size = 3, dilation = 1, padding = 1, residual = False, norm = False): super().__init__() if residual: if norm: self.res_block = nn.Sequential( nn.Conv1d(in_channels, out_channels, kernel_size = kernel_size, padding = padding, dilation = dilation), # nn.LayerNorm(out_channels), DAIN_Layer(out_channels), nn.ReLU(inplace = True) ) else: self.res_block = nn.Sequential( nn.Conv1d(in_channels, out_channels, kernel_size = kernel_size, padding = padding, dilation = dilation), nn.ReLU(inplace = True) ) if norm: self.block1 = nn.Sequential( nn.Conv1d(in_channels, cardinality * 4, kernel_size = 1), DAIN_Layer(cardinality * 4), nn.ReLU(inplace = True), nn.Conv1d(cardinality * 4, cardinality * 4, kernel_size = kernel_size, padding = padding, dilation = dilation, groups = cardinality), DAIN_Layer(cardinality * 4), nn.ReLU(inplace = True), nn.Conv1d(cardinality * 4, out_channels, kernel_size = 1), DAIN_Layer(out_channels), nn.ReLU(inplace = True) ) else: self.block1 = nn.Sequential( nn.Conv1d(in_channels, cardinality * 4, kernel_size = 1), nn.ReLU(inplace = True), nn.Conv1d(cardinality * 4, cardinality * 4, kernel_size = kernel_size, padding = padding, dilation = dilation, groups = cardinality), nn.ReLU(inplace = True), nn.Conv1d(cardinality * 4, out_channels, kernel_size = 1), nn.ReLU(inplace = True) ) self.residual = residual def forward(self, input): output = self.block1(input) if self.residual: output = output + self.res_block(input) return output class AttnLSTM(nn.Module): def __init__(self, input_dim, hidden_dim, time_steps): super().__init__() self.fc = nn.Linear(2 * hidden_dim + time_steps, 1) self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers = 1, batch_first = True) self.softmax = nn.Softmax(dim = 1) self.input_dim = input_dim self.hidden_dim = hidden_dim self.time_steps = time_steps def forward(self, input): device = input.device h_t, c_t = Variable(torch.zeros(1, input.size(0), self.hidden_dim, device = device)), Variable(torch.zeros(1, input.size(0), self.hidden_dim, device = device)) output = Variable(torch.zeros(input.size(0), input.size(1), self.hidden_dim, device = device)) for i in range(self.time_steps): x = torch.cat((h_t.repeat(self.input_dim, 1, 1).permute(1, 0, 2), # 1, batch_size, hidden_dim -> batch_size, input_dim, hidden_dim c_t.repeat(self.input_dim, 1, 1).permute(1, 0, 2), input.permute(0, 2, 1)), dim = 2) # input -> batch_size, input_dim, time_steps e_t = self.fc(x.view(-1, 2 * self.hidden_dim + self.time_steps)) # batch_size, input_dim a_t = self.softmax(e_t.view(-1, self.input_dim)) w_input = torch.mul(a_t, input[:, i]) # batch_size, input_dim x batch_size, input_dim -> batch_size, input_dim _, (h_t, c_t) = self.lstm(w_input.unsqueeze(1), (h_t, c_t)) output[:, i] = h_t.squeeze(0) return output, (h_t, c_t) class DAIN_Layer(nn.Module): def __init__(self, input_dim=144, mode='adaptive_avg', mean_lr=0.00001, gate_lr=0.001, scale_lr=0.00001): super(DAIN_Layer, self).__init__() self.mode = mode self.mean_lr = mean_lr self.gate_lr = gate_lr self.scale_lr = scale_lr # Parameters for adaptive average self.mean_layer = nn.Linear(input_dim, input_dim, bias=False) self.mean_layer.weight.data = torch.FloatTensor(data=np.eye(input_dim, input_dim)) # Parameters for adaptive std self.scaling_layer = nn.Linear(input_dim, input_dim, bias=False) self.scaling_layer.weight.data = torch.FloatTensor(data=np.eye(input_dim, input_dim)) # Parameters for adaptive scaling self.gating_layer = nn.Linear(input_dim, input_dim) self.eps = 1e-8 def forward(self, x): # Expecting (n_samples, dim, n_feature_vectors) # Nothing to normalize if self.mode == None: pass # Do simple average normalization elif self.mode == 'avg': avg = torch.mean(x, 2) avg = avg.view(avg.size(0), avg.size(1), 1) x = x - avg # Perform only the first step (adaptive averaging) elif self.mode == 'adaptive_avg': avg = torch.mean(x, 2) adaptive_avg = self.mean_layer(avg) adaptive_avg = adaptive_avg.view(adaptive_avg.size(0), adaptive_avg.size(1), 1) x = x - adaptive_avg # Perform the first + second step (adaptive averaging + adaptive scaling ) elif self.mode == 'adaptive_scale': # Step 1: avg = torch.mean(x, 2) adaptive_avg = self.mean_layer(avg) adaptive_avg = adaptive_avg.view(adaptive_avg.size(0), adaptive_avg.size(1), 1) x = x - adaptive_avg # Step 2: std = torch.mean(x ** 2, 2) std = torch.sqrt(std + self.eps) adaptive_std = self.scaling_layer(std) adaptive_std[adaptive_std <= self.eps] = 1 adaptive_std = adaptive_std.view(adaptive_std.size(0), adaptive_std.size(1), 1) x = x / (adaptive_std) elif self.mode == 'full': # Step 1: avg = torch.mean(x, 2) adaptive_avg = self.mean_layer(avg) adaptive_avg = adaptive_avg.view(adaptive_avg.size(0), adaptive_avg.size(1), 1) x = x - adaptive_avg # # Step 2: std = torch.mean(x ** 2, 2) std = torch.sqrt(std + self.eps) adaptive_std = self.scaling_layer(std) adaptive_std[adaptive_std <= self.eps] = 1 adaptive_std = adaptive_std.view(adaptive_std.size(0), adaptive_std.size(1), 1) x = x / adaptive_std # Step 3: avg = torch.mean(x, 2) gate = F.sigmoid(self.gating_layer(avg)) gate = gate.view(gate.size(0), gate.size(1), 1) x = x * gate else: assert False return x
Editor is loading...