시작은 미약하였으나 , 그 끝은 창대하리라

[트랜스포머] 트랜스포머 인코더를 이용한 시계열 예측. 본문

인공지능/딥러닝 사이드 Project

[트랜스포머] 트랜스포머 인코더를 이용한 시계열 예측.

애플파ol 2023. 7. 13. 01:22

목표: 시계열 데이터를 트랜스포머 인코더를 이용하여 예측해보자.

전체코드: https://github.com/YongTaeIn/Transformer-Encoder-model-for-time-series.git

 

출처:https://wikidocs.net/31379
트랜스포머 인코더를 활용한 시계열 예측(마지막은 1칸을 예측하기 위해 그림상 Dense를 1로 함)

0.  Input shape 의 변환 순서

1 번과정, embedding = [batch size, time_seq, feature_dim] 

2번 과정, positional Encoding =[batch size, time_seq, d_model]

3번과정~4번과정Encoder blcok= [batch size, time_seq, d_model]

5번과정, FFW =[batch size, time_seq, 본인설정]

6번과정 ,차원축소=[batch size, time_seq]

7번과정, Dense = [batch size, 예측하고자 하는 시계열 길이(본인설정)]

 

1번과정 )  Feature input 

트랜스포머 논문을 보면 알겠지만 input이 d_model로 변환되어야 한다.

NLP 에서는 embedding차원을 바꿔주는 함수가 존재하지만. 우리가 하는 시계열에서는 그럴수가 없다. 결국 Projection을 통해 d_model 을 만들어준다. (Projection= FC layer를 말한다)

# 23.07.07
# discription: Embedding with d_model as transformer input.
# shape [batch , Time_seq, feature_dim] -> [batch , Time_seq, d_model]

# Tip. projection mean to pass through a Linear Layer(=fc layer).

import torch
import torch.nn as nn


class input_embedding(nn.Module):
    def __init__(self,input_size,d_model):
        super(input_embedding,self).__init__()
        self.encoder_input_layer = nn.Linear(
            in_features=input_size, 
            out_features=d_model
        )

    def forward(self,x):
        x=self.encoder_input_layer(x)
        return x

2번과정 )  Positional encoding

트랜스포머 모델은 입력을 한번에 다 넣다보니..시간순서에 대한 정보가 누락이 된다. 이것을 해결하기 위해 논문에서는 Positional encoding을 활용해준다.

 

-알고리즘 순서.

  1. positional encoding 초기화

  2. x seq 길이에 맞춰 PE return

import torch
import torch.nn as nn
import math
import torch.nn.functional as F
from torch.nn.utils import weight_norm
from torch import nn, Tensor


class PositionalEncoding(nn.Module):
 
    def __init__(
        self, 
        dropout: float=0.1, 
        max_len: int=144, 
        d_model: int=512,
        batch_first: bool=False
        ):

        """
        Parameters:
            dropout: the dropout rate
            max_seq_len: the maximum length of the input sequences
            d_model: The dimension of the output of sub-layers in the model 
                     (Vaswani et al, 2017)
        """

        super().__init__()
        self.d_model = d_model      
        self.dropout = nn.Dropout(p=dropout)
        self.batch_first = batch_first
        self.x_dim = 1 if batch_first else 0

        # copy pasted from PyTorch tutorial
        position = torch.arange(max_len).unsqueeze(1)       
        div_term = torch.exp(torch.arange(0, d_model, 2) * (-math.log(10000.0) / d_model))       
        pe = torch.zeros(max_len, 1, d_model)
        pe[:, 0, 0::2] = torch.sin(position * div_term)
        pe[:, 0, 1::2] = torch.cos(position * div_term)
        self.register_buffer('pe', pe)
        
    def forward(self, x: Tensor) -> Tensor:
        """
        Args:
            x: Tensor, shape [batch_size, enc_seq_len, dim_val] or 
               [enc_seq_len, batch_size, dim_val]
        """
        x = x + self.pe[:x.size(self.x_dim)]
        return self.dropout(x)

 

 

3_1 번과정 )  Multi-Head Attention

입력으로 Query(Q), Key(K), Value(V) 값을 요구한다. 또한 Query(Q), Key(K), Value(V)는 self attetnion 을 통해 나오면

Self attetnion을 여러개 쌓인것을 Multi-Head Attention 이라한다. 

(효과: CNN이 필터를 여러개 사용함으로 특징맵을 다양하게 추출해 내는 것과 유사함.)

(출처: https://stats.stackexchange.com/questions/421935/what-exactly-are-keys-queries-and-values-in-attention-mechanisms

필자는  참고로 ScaleDotProductAttetnion을 다른 폴더에서 import 해와서 사용한다. 

# 23.07.07
# discription: With multi-head attention, 
# you can select and use the desired attention.
# 

# Tip.


import torch
import torch.nn as nn
from layers.Attention_layer_family import ScaleDotProductAttention

class MultiHeadAttention(nn.Module):
    def __init__(self,d_model,head):
        super().__init__()
        self.d_model = d_model
        self.head = head
        self.head_dim = d_model // head
        self.w_q = nn.Linear(d_model,d_model)
        self.w_k = nn.Linear(d_model,d_model)
        self.w_v = nn.Linear(d_model,d_model)
        self.w_o = nn.Linear(d_model,d_model)
        self.attention = ScaleDotProductAttention()

    def forward(self, q, k, v, mask=None):
        #  [batch_size, seq_len, d_model]
        batch_size, _, _ = q.size()

        # 1. Q,K,V를  d_k, d_k, d_v 차원으로 projection
        q, k, v = self.w_q(q), self.w_k(k), self.w_v(v)

        # 2. Q,K,V를 head 수 만큼 분리해주기 
        # (batch_size, seq_len, d_model)  
        # -> (batch_size, head, seq_len, head_dim)
        # 디코더에서는 q와 k,v 의 seq_len 가 다른 경우가 올 수 있음
        q = q.view(batch_size, -1, self.head, self.head_dim).transpose(1,2)
        k = k.view(batch_size, -1, self.head, self.head_dim).transpose(1,2)
        v = v.view(batch_size, -1, self.head, self.head_dim).transpose(1,2)

        # 3. Scaled Dot-Product Attention 을 수행하기
        out, attention_score = self.attention(q,k,v,mask)

        # 4. 분리된 head들을 concat 하기
        # (batch_size, head, seq_len, head_dim)
        # -> (batch_size, seq_len, d_model)
        out = out.transpose(1,2).contiguous().view(batch_size, -1, self.d_model)

        # 5. d_model 차원으로 projection
        out = self.w_o(out)

        return out, attention_score

3_2 번과정 )  Self-Attention 

다양한 계산 과정이 있는데 여기서는 DotProduct 를 사용함.

self attention 에서는 query=key=value 임으로,  임베딩한 입력데이터를  X 라고 가정하면 X = Q = K = V 로 설정하고 가면 된다.

import torch
import torch.nn as nn
import math


# ScaleDotProductAttention
class ScaleDotProductAttention(nn.Module):
    def __init__(self):
        super().__init__()
        self.softmax = nn.Softmax(dim=-1)
        
    def forward(self, q, k, v, mask=None):
        # scaling에 필요한 head_dim 값 얻기
        # (batch_size, head, seq_len, head_dim)
        _, _, _, head_dim = q.size()

        # 1. K를 transpose하기 (seq_len, head_dim의 행렬 전환)
        k_t = k.transpose(-1,-2)

        # 2. Q 와 K^T 의 MatMul
        # (batch_size, head, q_seq_len, k_seq_len)
        attention_score = torch.matmul(q,k_t)

        # 3. Scaling
        attention_score /= math.sqrt(head_dim)

        # 4. Mask가 있다면 마스킹된 부위 -1e10으로 채우기
        # mask는 단어가 있는 곳(True), 마스킹된 곳(False)으로 표시되었기 때문에 False(0)에 해당되는 부분을 -1e10으로 masking out한다.
        # Tensor.masked_fill_(mask_boolean, value) 함수는 True값을 value로 채운다.
        if mask is not None:
            attention_score = attention_score.masked_fill(mask==0,-1e10) 
        
        # 5. Softmax 취하기 
        attention_score = self.softmax(attention_score)

        # 6. Attention 결과와 V의 MatMul 계산하기
        result = torch.matmul(attention_score, v)
        
        return result, attention_score

4번과정 )  Add & Norm

별거없다..

 Add 는 Residual net 을 의미하며 y=f(x)+x 인것이다. ( Residual net 을 찾아보면 알 수 있다.)

 Norm 은 정규화 과정을 언급하는 것이다. 아마 흔히들 BatchNorm 을 알텐데 여기서는 Layer Norm 을 사용하였다.

(Batch Norm = feature 정규화 느낌, Layer Norm = instance 정규화 느낌)

 

코드는 Encoder_layer 에서 설명.

 

5번과정 )  Feed Forward 

우리가 잘 알고 있는 그냥 MLP(Multi Layer Perceptron) 이다.

import torch
import torch.nn as nn

class PositionWise(nn.Module):
    def __init__(self,d_model,d_ff):
        super().__init__()
        self.w_1 = nn.Linear(d_model,d_ff)
        self.relu = nn.ReLU()
        self.w_2 = nn.Linear(d_ff,d_model)
    
    def forward(self, x):
        # Linear Layer1
        x = self.w_1(x)

        # ReLU
        x = self.relu(x)
        
        # Linear Layer2
        x = self.w_2(x)

        return x

6번과정 )  Linear

출력이  3차원에서 2차원으로 축소를 해야한다(6번 과정). 그후에 Linear 를 사용하여 원하는 출력의 개수를 만들어 낸다.(7번과정)

[batch_size, target seq len, d_model(이거나 본인이 설정한 차원)] -> [batch_size, target seq len]

별거없다 뒤에 [ :, : , 0 ] 를 붙이면 2차원으로 된다. (자세한 코드 구현은 뒤에서)

3차원->2차원 축소

x = self.linear_mapping(x)[:,:,0]

예시

import torch

batch_size = 3
sequence_length = 4
d_model = 2

x = torch.randn(batch_size, sequence_length, d_model)

y=(x)[:, :, 0]

print(x.size())
print(y.size())

>>> torch.Size([3, 4, 2])
>>> torch.Size([3, 4])

7. 원하는 길이로  뽑아주면 됨.

# 예시임, 돌아가는 코드는 아님.
# 자세한건 아래 전체 코드에서 설명.

self.linear2 = nn.Sequential(
nn.Linear(144, num_predicted_features)
)

 x = self.linear2(x)

 

 

 

➢ TOTAL CODE.

 1.  Encoder layer (block) 맨위에 구조에서 ' × N' N배가 되는 대상이다 . 즉, Encoder layer (block) 이다.

# 23.07.07
# discription: 
# shape 

# Tip. 
import torch
import torch.nn as nn

from layers.multi_head_attention import MultiHeadAttention
from layers.positioin_wise import PositionWise


class EncoderLayer(nn.Module):
    def __init__(self, d_model, head, d_ff, dropout):
        super().__init__()
        self.attention = MultiHeadAttention(d_model,head)
        self.layerNorm1 = nn.LayerNorm(d_model)

        self.ffn = PositionWise(d_model,d_ff)
        self.layerNorm2 = nn.LayerNorm(d_model)

        self.dropout = nn.Dropout(p=dropout)

    def forward(self, x):  # (self, x, padding_mask)
        # residual connection을 위해 잠시 담아둔다.
        residual = x

        # 1. multi-head attention (self attention)
        x, attention_score = self.attention(q=x, k=x, v=x) # (q=x, k=x, v=x, mask=padding_mask)

        # 2. add & norm
        x = self.dropout(x) + residual
        x = self.layerNorm1(x)

        residual = x

        # 3. feed-forward network
        x = self.ffn(x)

        # 5. add & norm
        x = self.dropout(x) + residual
        x = self.layerNorm2(x)

        return x, attention_score

 

 2.  Encoder

Encoder layer를 N 번 쌓게 해주고, 초기 input embedding, Positional Encoding, 차원축소, 마지막 예측 까지.  해주는 부분이다.

import torch
import torch.nn as nn

from layers.positional_encoding import PositionalEncoding
from layers.encoder_input_projection import input_embedding
from blocks.encoder_layer import EncoderLayer



class Encoder(nn.Module):
    def __init__(self, input_size, num_predicted_features, d_model, head, d_ff, max_len, dropout, n_layers):
        super().__init__()

        # Embedding
        self.input_emb = input_embedding(input_size=input_size,d_model=d_model)
        
        self.pos_encoding = PositionalEncoding(d_model=d_model, max_len=max_len,dropout=dropout)
        #self.dropout = nn.Dropout(p=dropout)

        # n개의 encoder layer를 list에 담기
        self.encoder_layers = nn.ModuleList([EncoderLayer(d_model=d_model, 
                                                         head=head, 
                                                         d_ff=d_ff, 
                                                         dropout=dropout)
                                             for _ in range(n_layers)])
        self.linear_mapping = nn.Linear(
        in_features=d_model, 
        out_features=num_predicted_features
        )

        self.linear2 = nn.Sequential(
        nn.Linear(144, num_predicted_features)
        )




    def forward(self, x): # ,padding_mask
        # 1. 입력에 대한 input embedding, 
        
        input_emb = self.input_emb(x)
        
        # 2. positional encoding 생성
        x = self.pos_encoding(input_emb)

        # 3. n번 EncoderLayer 반복하기
        for encoder_layer in self.encoder_layers:
            x, attention_score = encoder_layer(x) 

        # 4. 차원축소 수행
        x = self.linear_mapping(x)
        x=x[:,:,0]

        # 5. 원하는 길이 출력
        x = self.linear2(x)
        return x

 

이렇게 하면 Encoder 부분에 대한 구현이 끝이난다. 더 자세하게 실행하는 것 까지 알고 싶다면 github 링크(run.py) 를 타고 들어오면 알 수있다.

https://github.com/YongTaeIn/Transformer-Encoder-model-for-time-series.git

Comments