LSTM

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)

        self.selu = nn.SELU() # nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        batch_size = x.size(0)
        h0 = Variable(torch.zeros(self.num_layers, batch_size, self.hidden_size)).to(x.device)
        c0 = Variable(torch.zeros(self.num_layers, batch_size, self.hidden_size)).to(x.device)

        x, _ = self.lstm(x, (h0, c0))
        x = x[:, -1, :]

        x = self.dropout(self.selu(x))
        x = self.fc(x)

        return x.unsqueeze(-1)Copy Icon

 

GRU

class GRUModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout):
        super(GRUModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers

        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)

        self.selu = nn.SeLU()
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size, 1)

    def forward(self, x):
        h0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size)).to(x.device)

        x, _ = self.gru(x, h0)
        x = x[:, -1, :]
        
        x = self.dropout(self.selu(x))
        x = self.fc(x)

        return x.unsqueeze(-1)Copy Icon

 

CNN

class CNNModel(nn.Module):
    def __init__(self, input_size, hidden_size, kernel_size, input_window, output_window, dropout=0.2):
        super(CNNModel, self).__init__()

        self.conv1d1 = nn.Conv1d(input_size, hidden_size, kernel_size, stride=1, padding=kernel_size // 2)
        self.maxpool = nn.MaxPool1d(kernel_size=2, stride=2)

        self.selu = nn.SELU()
        self.dropout = nn.Dropout(dropout)

        self.fc = nn.Linear(hidden_size * (input_window // 2), output_window)

    def forward(self, x):
        x = x.transpose(1, 2)
        x = self.conv1d1(x)
        x = self.selu(x)
        x = self.maxpool(x)

        x = x.reshape(x.size(0), -1)

        x = self.dropout(self.selu(x))
        x = self.fc(x)

        return x.unsqueeze(-1)Copy Icon

 

CNN-LSTM

class CNN_LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, dropout, output_size, output_window):
        super(CNN_LSTM, self).__init__()
        
        self.output_size = output_size
        self.output_window = output_window

        self.conv1d = nn.Conv1d(in_channels=input_size, out_channels=hidden_size, kernel_size=3, stride=1, padding=1)
        self.maxpool = nn.MaxPool1d(kernel_size=2)

        self.lstm = nn.LSTM(input_size=hidden_size, hidden_size=hidden_size*2, num_layers=num_layers, batch_first=True)

        self.dropout1 = nn.Dropout(dropout)
        self.fc1 = nn.Linear(hidden_size*2, output_size * output_window)

        self.selu = nn.SELU()

    def forward(self, x):
        x = x.transpose(1, 2)

        x = self.selu(self.conv1d(x))
        x = self.maxpool(x)

        x = x.transpose(1, 2)

        x, _ = self.lstm(x)
        x = x[:, -1, :]

        x = self.dropout1(self.selu(x))
        x = self.fc1(x)
        x = x.reshape(-1, self.output_window, self.output_size)
        
        return xCopy Icon

 

TCN

"""
https://github.com/hyliush/deep-time-series/blob/master/models/TCN.py
"""

class Chomp1d(nn.Module):
    def __init__(self, chomp_size):
        super(Chomp1d, self).__init__()
        self.chomp_size = chomp_size

    def forward(self, x):
        return x[:, :, :-self.chomp_size].contiguous()


class TemporalBlock(nn.Module):
    def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
        super(TemporalBlock, self).__init__()
        self.conv1 = nn.utils.parametrizations.weight_norm(nn.Conv1d(n_inputs, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp1 = Chomp1d(padding)
        self.dropout1 = nn.Dropout(dropout)

        self.conv2 = nn.utils.parametrizations.weight_norm(nn.Conv1d(n_outputs, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp2 = Chomp1d(padding)
        self.dropout2 = nn.Dropout(dropout)

        self.net = nn.Sequential(self.conv1, self.chomp1, self.dropout1,
                                 self.conv2, self.chomp2, self.dropout2)
        self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
        self.init_weights()

    def init_weights(self):
        self.conv1.weight.data.normal_(0, 0.01)
        self.conv2.weight.data.normal_(0, 0.01)
        if self.downsample is not None:
            self.downsample.weight.data.normal_(0, 0.01)

    def forward(self, x):
        out = self.net(x)
        res = x if self.downsample is None else self.downsample(x)
        return out + res

class TCN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, kernel_size, dropout=0.2):
        super(TCN, self).__init__()

        num_channels = [hidden_size] * num_layers
        layers = []
        num_levels = len(num_channels)
        for i in range(num_levels):
            dilation_size = 2 ** i
            in_channels = input_size if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]

            layers += [TemporalBlock(in_channels, out_channels, kernel_size, stride=1, dilation=dilation_size,
                                     padding=(kernel_size-1) * dilation_size, dropout=dropout)]

        self.network = nn.Sequential(*layers)
        self.out_proj = nn.Linear(hidden_size, 1)

    def forward(self, x):
        x = x.transpose(1, 2)
        x = self.network(x)
        x = x[:, :, -1:]
        x = self.out_proj(x.transpose(1, 2))
        return xCopy Icon

 

Transformer Encoder

import math
import torch.nn.functional as F

class PositionalEncoding(nn.Module):
    def __init__(self, d_model, dropout=0.1, max_len=5000):
        super(PositionalEncoding, self).__init__()
        self.dropout = nn.Dropout(p=dropout)

        pe = torch.zeros(max_len, d_model)
        position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(1)
        div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-math.log(10000.0) / d_model))
        pe[:, 0::2] = torch.sin(position * div_term)
        pe[:, 1::2] = torch.cos(position * div_term)
        pe = pe.unsqueeze(0).transpose(0, 1)
        self.register_buffer('pe', pe)

    def forward(self, x):
        x = x + self.pe[:x.size(0), :]
        return self.dropout(x)


class TransformerEncoder(nn.Module):
    def __init__(self, input_size, hidden_size, kernel_size, seq_len, n_head, n_layers, output_size, output_window):
        super(TransformerEncoder, self).__init__()

        self.conv = nn.Conv1d(input_size, hidden_size, kernel_size=kernel_size)
        self.pos_encoder = PositionalEncoding(hidden_size, max_len=seq_len)
        self.transformer_layer = nn.TransformerEncoderLayer(d_model=hidden_size, nhead=n_head)
        self.transformer = nn.TransformerEncoder(self.transformer_layer, num_layers=n_layers)
        self.fc = nn.Linear(hidden_size, output_size * output_window)

        self.kernel_size = kernel_size

    def forward(self, x):
        x = x.transpose(1, 2)
        x = F.pad(x, (self.kernel_size-1,0))
        x = self.conv(x).permute(2, 0, 1)
        x = self.pos_encoder(x)
        x = self.transformer(x).transpose(0, 1)[:, -1:]
        x = self.fc(x)
        return x.transpose(1, 2)Copy Icon

 

D-Linear

class moving_avg(torch.nn.Module):
    def __init__(self, kernel_size, stride):
        super(moving_avg, self).__init__()
        self.kernel_size = kernel_size
        self.avg = torch.nn.AvgPool1d(kernel_size=kernel_size, stride=stride, padding=0)

    def forward(self, x):
        front = x[:, 0:1, :].repeat(1, (self.kernel_size - 1) // 2, 1)
        end = x[:, -1:, :].repeat(1, (self.kernel_size - 1) // 2, 1)
        x = torch.cat([front, x, end], dim=1)
        x = self.avg(x.permute(0, 2, 1))
        x = x.permute(0, 2, 1)
        return x

class series_decomp(torch.nn.Module):
    def __init__(self, kernel_size):
        super(series_decomp, self).__init__()
        self.moving_avg = moving_avg(kernel_size, stride=1)

    def forward(self, x):
        moving_mean = self.moving_avg(x)
        residual = x - moving_mean
        return moving_mean, residual

class LTSF_DLinear(torch.nn.Module):
    def __init__(self, seq_len, pred_len, kernel_size, individual, feature_size):
        super(LTSF_DLinear, self).__init__()

        self.seq_len = seq_len
        self.pred_len = pred_len

        self.decompsition = series_decomp(kernel_size)
        self.individual = individual
        self.channels = feature_size

        if self.individual:
            self.Linear_Seasonal = torch.nn.ModuleList()
            self.Linear_Trend = torch.nn.ModuleList()
            for i in range(self.channels):
                self.Linear_Trend.append(torch.nn.Linear(self.seq_len, self.pred_len))
                self.Linear_Trend[i].weight = torch.nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len, self.seq_len]))
                self.Linear_Seasonal.append(torch.nn.Linear(self.seq_len, self.pred_len))
                self.Linear_Seasonal[i].weight = torch.nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len, self.seq_len]))
        else:
            self.Linear_Trend = torch.nn.Linear(self.seq_len, self.pred_len)
            self.Linear_Trend.weight = torch.nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len, self.seq_len]))
            self.Linear_Seasonal = torch.nn.Linear(self.seq_len,  self.pred_len)
            self.Linear_Seasonal.weight = torch.nn.Parameter((1/self.seq_len)*torch.ones([self.pred_len, self.seq_len]))

        self.fc = nn.Linear(feature_size, 1)

    def forward(self, x):
        trend_init, seasonal_init = self.decompsition(x)
        trend_init, seasonal_init = trend_init.permute(0,2,1), seasonal_init.permute(0,2,1)

        if self.individual:
            trend_output = torch.zeros([trend_init.size(0), trend_init.size(1), self.pred_len], dtype=trend_init.dtype).to(trend_init.device)
            seasonal_output = torch.zeros([seasonal_init.size(0), seasonal_init.size(1), self.pred_len], dtype=seasonal_init.dtype).to(seasonal_init.device)
            for idx in range(self.channels):
                trend_output[:, idx, :] = self.Linear_Trend[idx](trend_init[:, idx, :])
                seasonal_output[:, idx, :] = self.Linear_Seasonal[idx](seasonal_init[:, idx, :])
        else:
            trend_output = self.Linear_Trend(trend_init)
            seasonal_output = self.Linear_Seasonal(seasonal_init)

        x = seasonal_output + trend_output
        x = x.permute(0,2,1)

        if x.size(2) > 1:
            x = self.fc(x)
            return x
        return xCopy Icon
욱근욱