LSTM
class LSTMModel(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, dropout):
super(LSTMModel, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
self.selu = nn.SELU() # nn.ReLU()
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(hidden_size, 1)
def forward(self, x):
batch_size = x.size(0)
h0 = Variable(torch.zeros(self.num_layers, batch_size, self.hidden_size)).to(x.device)
c0 = Variable(torch.zeros(self.num_layers, batch_size, self.hidden_size)).to(x.device)
x, _ = self.lstm(x, (h0, c0))
x = x[:, -1, :]
x = self.dropout(self.selu(x))
x = self.fc(x)
return x.unsqueeze(-1)
GRU
class GRUModel(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, dropout):
super(GRUModel, self).__init__()
self.hidden_size = hidden_size
self.num_layers = num_layers
self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
self.selu = nn.SeLU()
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(hidden_size, 1)
def forward(self, x):
h0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size)).to(x.device)
x, _ = self.gru(x, h0)
x = x[:, -1, :]
x = self.dropout(self.selu(x))
x = self.fc(x)
return x.unsqueeze(-1)
CNN
class CNNModel(nn.Module):
def __init__(self, input_size, hidden_size, kernel_size, input_window, output_window, dropout=0.2):
super(CNNModel, self).__init__()
self.conv1d1 = nn.Conv1d(input_size, hidden_size, kernel_size, stride=1, padding=kernel_size // 2)
self.maxpool = nn.MaxPool1d(kernel_size=2, stride=2)
self.selu = nn.SELU()
self.dropout = nn.Dropout(dropout)
self.fc = nn.Linear(hidden_size * (input_window // 2), output_window)
def forward(self, x):
x = x.transpose(1, 2)
x = self.conv1d1(x)
x = self.selu(x)
x = self.maxpool(x)
x = x.reshape(x.size(0), -1)
x = self.dropout(self.selu(x))
x = self.fc(x)
return x.unsqueeze(-1)
CNN-LSTM
class CNN_LSTM(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, dropout, output_size, output_window):
super(CNN_LSTM, self).__init__()
self.output_size = output_size
self.output_window = output_window
self.conv1d = nn.Conv1d(in_channels=input_size, out_channels=hidden_size, kernel_size=3, stride=1, padding=1)
self.maxpool = nn.MaxPool1d(kernel_size=2)
self.lstm = nn.LSTM(input_size=hidden_size, hidden_size=hidden_size*2, num_layers=num_layers, batch_first=True)
self.dropout1 = nn.Dropout(dropout)
self.fc1 = nn.Linear(hidden_size*2, output_size * output_window)
self.selu = nn.SELU()
def forward(self, x):
x = x.transpose(1, 2)
x = self.selu(self.conv1d(x))
x = self.maxpool(x)
x = x.transpose(1, 2)
x, _ = self.lstm(x)
x = x[:, -1, :]
x = self.dropout1(self.selu(x))
x = self.fc1(x)
x = x.reshape(-1, self.output_window, self.output_size)
return x
TCN
"""
https://github.com/hyliush/deep-time-series/blob/master/models/TCN.py
"""
class Chomp1d(nn.Module):
def __init__(self, chomp_size):
super(Chomp1d, self).__init__()
self.chomp_size = chomp_size
def forward(self, x):
return x[:, :, :-self.chomp_size].contiguous()
class TemporalBlock(nn.Module):
def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
super(TemporalBlock, self).__init__()
self.conv1 = nn.utils.parametrizations.weight_norm(nn.Conv1d(n_inputs, n_outputs, kernel_size,
stride=stride, padding=padding, dilation=dilation))
self.chomp1 = Chomp1d(padding)
self.dropout1 = nn.Dropout(dropout)
self.conv2 = nn.utils.parametrizations.weight_norm(nn.Conv1d(n_outputs, n_outputs, kernel_size,
stride=stride, padding=padding, dilation=dilation))
self.chomp2 = Chomp1d(padding)
self.dropout2 = nn.Dropout(dropout)
self.net = nn.Sequential(self.conv1, self.chomp1, self.dropout1,
self.conv2, self.chomp2, self.dropout2)
self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
self.init_weights()
def init_weights(self):
self.conv1.weight.data.normal_(0, 0.01)
self.conv2.weight.data.normal_(0, 0.01)
if self.downsample is not None:
self.downsample.weight.data.normal_(0, 0.01)
def forward(self, x):
out = self.net(x)
res = x if self.downsample is None else self.downsample(x)
return out + res
class TCN(nn.Module):
def __init__(self, input_size, hidden_size, num_layers, kernel_size, dropout=0.2):
super(TCN, self).__init__()
num_channels = [hidden_size] * num_layers
layers = []
num_levels = len(num_channels)
for i in range(num_levels):
dilation_size = 2 ** i
in_channels = input_size if i == 0 else num_channels[i-1]
out_channels = num_channels[i]
layers += [TemporalBlock(in_channels, out_channels, kernel_size, stride=1, dilation=dilation_size,
padding=(kernel_size-1) * dilation_size, dropout=dropout)]
self.network = nn.Sequential(*layers)
self.out_proj = nn.Linear(hidden_size, 1)
def forward(self, x):
x = x.transpose(1, 2)
x = self.network(x)
x = x[:, :, -1:]
x = self.out_proj(x.transpose(1, 2))
return x
Transformer Encoder
import math
import torch.nn.functional as F
class PositionalEncoding(nn.Module):
def __init__(self, d_model, dropout=0.1, max_len=5000):
super(PositionalEncoding, self).__init__()
self.dropout = nn.Dropout(p=dropout)
pe = torch.zeros(max_len, d_model)
position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
pe = pe.unsqueeze(0).transpose(0, 1)
self.register_buffer('pe', pe)
def forward(self, x):
x = x + self.pe[:x.size(0), :]
return self.dropout(x)
class TransformerEncoder(nn.Module):
def __init__(self, input_size, hidden_size, kernel_size, seq_len, n_head, n_layers, output_size, output_window):
super(TransformerEncoder, self).__init__()
self.conv = nn.Conv1d(input_size, hidden_size, kernel_size=kernel_size)
self.pos_encoder = PositionalEncoding(hidden_size, max_len=seq_len)
self.transformer_layer = nn.TransformerEncoderLayer(d_model=hidden_size, nhead=n_head)
self.transformer = nn.TransformerEncoder(self.transformer_layer, num_layers=n_layers)
self.fc = nn.Linear(hidden_size, output_size * output_window)
self.kernel_size = kernel_size
def forward(self, x):
x = x.transpose(1, 2)
x = F.pad(x, (self.kernel_size-1,0))
x = self.conv(x).permute(2, 0, 1)
x = self.pos_encoder(x)
x = self.transformer(x).transpose(0, 1)[:, -1:]
x = self.fc(x)
return x.transpose(1, 2)
D-Linear
class moving_avg(torch.nn.Module):
def __init__(self, kernel_size, stride):
super(moving_avg, self).__init__()
self.kernel_size = kernel_size
self.avg = torch.nn.AvgPool1d(kernel_size=kernel_size, stride=stride, padding=0)
def forward(self, x):
front = x[:, 0:1, :].repeat(1, (self.kernel_size - 1) // 2, 1)
end = x[:, -1:, :].repeat(1, (self.kernel_size - 1) // 2, 1)
x = torch.cat([front, x, end], dim=1)
x = self.avg(x.permute(0, 2, 1))
x = x.permute(0, 2, 1)
return x
class series_decomp(torch.nn.Module):
def __init__(self, kernel_size):
super(series_decomp, self).__init__()
self.moving_avg = moving_avg(kernel_size, stride=1)
def forward(self, x):
moving_mean = self.moving_avg(x)
residual = x - moving_mean
return moving_mean, residual
class LTSF_DLinear(torch.nn.Module):
def __init__(self, seq_len, pred_len, kernel_size, individual, feature_size):
super(LTSF_DLinear, self).__init__()
self.seq_len = seq_len
self.pred_len = pred_len
self.decompsition = series_decomp(kernel_size)
self.individual = individual
self.channels = feature_size
if self.individual:
self.Linear_Seasonal = torch.nn.ModuleList()
self.Linear_Trend = torch.nn.ModuleList()
for i in range(self.channels):
self.Linear_Trend.append(torch.nn.Linear(self.seq_len, self.pred_len))
self.Linear_Trend[i].weight = torch.nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len, self.seq_len]))
self.Linear_Seasonal.append(torch.nn.Linear(self.seq_len, self.pred_len))
self.Linear_Seasonal[i].weight = torch.nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len, self.seq_len]))
else:
self.Linear_Trend = torch.nn.Linear(self.seq_len, self.pred_len)
self.Linear_Trend.weight = torch.nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len, self.seq_len]))
self.Linear_Seasonal = torch.nn.Linear(self.seq_len, self.pred_len)
self.Linear_Seasonal.weight = torch.nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len, self.seq_len]))
self.fc = nn.Linear(feature_size, 1)
def forward(self, x):
trend_init, seasonal_init = self.decompsition(x)
trend_init, seasonal_init = trend_init.permute(0,2,1), seasonal_init.permute(0,2,1)
if self.individual:
trend_output = torch.zeros([trend_init.size(0), trend_init.size(1), self.pred_len], dtype=trend_init.dtype).to(trend_init.device)
seasonal_output = torch.zeros([seasonal_init.size(0), seasonal_init.size(1), self.pred_len], dtype=seasonal_init.dtype).to(seasonal_init.device)
for idx in range(self.channels):
trend_output[:, idx, :] = self.Linear_Trend[idx](trend_init[:, idx, :])
seasonal_output[:, idx, :] = self.Linear_Seasonal[idx](seasonal_init[:, idx, :])
else:
trend_output = self.Linear_Trend(trend_init)
seasonal_output = self.Linear_Seasonal(seasonal_init)
x = seasonal_output + trend_output
x = x.permute(0,2,1)
if x.size(2) > 1:
x = self.fc(x)
return x
return x
'ML & DL > Deep Learning' 카테고리의 다른 글
[Time Sereis Forecasting] One-Step, Multi-Step, Multi-Output (0) | 2024.11.04 |
---|---|
[Time Series Forecasting] Sliding Window Dataset (0) | 2024.10.14 |
IoU, Precision, Recall, mAP 정리 (0) | 2023.05.19 |
WBF, Ensemble for Object Detection 정리 (0) | 2023.05.19 |
NMS, Soft-NMS 정리 및 구현 (0) | 2023.05.19 |