필자가 지난번 seq2seq기반 덧셈 모형 빌드(with Gluon)을 Gluon으로 구축했으며, 잘 동작하는 모습을 보여줬다. 해당 코드를 정리하면서 딥러닝이 어떠한 방식으로 덧셈을 하는지 조금더 엿볼 수 있으면 어떨까 하는 생각이 들었다. 이 글을 보기 전에 이전 포스트를 먼저 읽어보길 권한다.
이 포스트에서 보여줄 두가지 부분은 어텐션 매커니즘(attention mechanism) 구현과 시각화 그리고 Gluon 모델의 학습/예측 퍼포먼스 향상을 할 수 있는 Hybridize 기능 테스트이다.
어텐션 매커니즘¶
먼저 어텐션 매커니즘이다.
사실상 seq2seq 기반의 딥러닝 interpretation 방법은 어텐션 매커니즘(attention mechanism)이 대표적이다. 그러나 이러한 효과뿐만아니라 원래 어텐션 방법이 사용된 동기는 seq2seq 모형의 sequence가 길어질 수록 과거 상태를 기억하지 못하는 단점이 존재해서 별도의 네트웍을 이용해 우리가 주목해야될 벡터가 무엇인지 강조해 주는데 목적이 있다. 하지만 최대 9문자를 기반으로 덧셈을 수행하는 모형에 어텐션까지 필요할 이유는 없으나, NMT(Neural Machine Translation)을 만들어 보기 전단계의 기술 튜토리얼이라 생각하겠다.
attention mechanism은 어떠한 특정의 알고리즘이 아니라 특정 정보를 뽑기위한 컨셉이다. 좀더 자세히 이야기 하면 디코더의 정보를 인코더의 정보로 어떻게 가공하느냐의 방식은 오픈된 영역이고 구현하기 나름이라는 것이다. 이러한 이유로 지금까지 공개적인 프레임웍에서 구현체가 등장하지 않은게 아닐까 하는 개인적인 생각을 해본다.
해당 매커니즘에 대한 설명은 이곳 블로그에서 잘 설명하고 있다. 사실 이 레벨 이상 개념을 설명하는건 어렵다고 생각한다. 하지만 별도로 필자는 구현체를 기준으로 설명을 좀더 자세히 해보도록 하겠다.
필자가 구현한 구현체의 얼개는 대체적으로 아래와 같다.
결국 위 그림에서 attend를 어떻게 구해주냐이며, 이 과정은 soft하게 진행된다. 이 말인즉, deocder의 매 시퀀스가 진행될때마다 다시 encoder sequence의 가중치를 달리한다는 것이다.
매 디코더 시퀀스가 진행되면서, 인코더 시퀀스에서 강조해야 될 부분을 디코더 입력값(Input
)과 디코더의 히든 스테이트의 값(hidden
)을 기반으로 계산(Concat
, Dense
)하고, 이를 인코드 시퀀스에 가중(X
연산)한뒤, 결과로 나온 값을 기반으로 다시 디코더 입력값에 적용(Concat
, Dense
)해 새로운 인코더의 입력(Output
)으로 넣어주는 과정을 거친다.
결국 코드에서는 개별 Input
에 대해서 인코더의 가중 정보를 추가해 새로운 Input
을 만들어주는게 아래 apply_attention()
함수에서 수행하는 과정이다.
Gluon Hybridize¶
두 번째 확인할 기술적인 부분은 Gluon Hybridize
이다. Gluon에서 아주 간단한 코드 수정만으로 학습속도 및 예측속도를 상승시킬 수 있는 방식이다.
Gluon에서 Hybridize는 일종의 모형 컴파일 기능이다. PyTorch의 경우 Imperative 방식을 지원하고 이 덕분에 빠른 디버깅 및 구현이 가능한 장점이 있는데, Gluon에서도 일반적으로 Imperative방식으로 구현하게 하고 모형 구현이 다 된 이후 프로덕션에 올리기 위해 이 모형을 Symbolic 방식으로 변환시켜 올릴 수 있게 하는데, 이를 Hybridize라고 한다(Symbolic 방식의 대표적인 프레임웍은 TensorFlow이며, 메모리 최적화 및 속도가 빠른 장점이 있다). 필자의 경험상 평균 2배 이상의 학습/예측속도 성능향상이 보장되었다. 이를 위해 Gluon에서 gluon.HybridBlock
클래스를 상속받아 hybrid_forward()
함수를 재구현하게 한다.
hybrid_forward()
함수에서 특이한 인자인 F
가 있는데, 이 인자가 입력되는 데이터에 따라서 mx.sym
기반 연산을 수행할지 mx.nd
기반 연산을 수행할지 자동으로 선택되게 한다. Hybridize
함수 호출 이후에는 내부적으로 모든 연산은 symbolic하게 구성이 되며 mx.nd
array가 입력되더라도 symbol로 자동으로 변환되게 된다.
학습 속도 비교 테스트 결과는 아래에서 확인해 보도록 하겠다.
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from mxnet import nd as F
import mxnet as mx
from mxnet import gluon
from mxnet.gluon import nn, rnn
import mxnet.autograd as autograd
from mxnet import nd as F
import seaborn as sns
sns.set_style("whitegrid")
%matplotlib inline
아래는 학습 데이터 생성 코드이다. 이전 포스트에 비해서 달라진 부분은 input_digits
, output_digits
길이를 같게 해준 부분이다. 이 부분은 모형 해석을 위해 맞춰줬다.
def n(digits=3):
number = ''
for i in range(np.random.randint(1, digits + 1)):
number += np.random.choice(list('0123456789'))
return int(number)
def padding(chars, maxlen):
return chars + ' ' * (maxlen - len(chars))
N = 50000
N_train = int(N * 0.9)
N_validation = N - N_train
digits = 3 # 최대 자릿수
input_digits = digits * 2 + 3 # 예: 123+456
output_digits = digits * 2 + 3 # 500+500 = 1000 이상이면 4자리가 된다
added = set()
questions = []
answers = []
answers_y = []
while len(questions) < N:
a, b = n(), n() # 두 개의 수를 적당히 생성한다
pair = tuple(sorted((a, b)))
if pair in added:
continue
question = 'S{}+{}E'.format(a, b)
question = padding(question, input_digits)
answer = 'S' + str(a + b) + 'E'
answer = padding(answer, output_digits)
answer_y = str(a + b) + 'E'
answer_y = padding(answer_y, output_digits)
added.add(pair)
questions.append(question)
answers.append(answer)
answers_y.append(answer_y)
chars = '0123456789+SE '
char_indices = dict((c, i) for i, c in enumerate(chars))
indices_char = dict((i, c) for i, c in enumerate(chars))
X = np.zeros((len(questions), input_digits, len(chars)), dtype=np.integer)
Y = np.zeros((len(questions), digits * 2 + 3, len(chars)), dtype=np.integer)
Z = np.zeros((len(questions), digits * 2 + 3, len(chars)), dtype=np.integer)
for i in range(N):
for t, char in enumerate(questions[i]):
X[i, t, char_indices[char]] = 1
for t, char in enumerate(answers[i]):
Y[i, t, char_indices[char]] = 1
for t, char in enumerate(answers_y[i]):
Z[i, t, char_indices[char]] = 1
X_train, X_validation, Y_train, Y_validation, Z_train, Z_validation = \
train_test_split(X, Y, Z, train_size=N_train)
예시 테스트 데이터를 생성하는 함수 (이전과 같다.)
def gen_n_test(N):
q = []
y = []
for i in range(N):
a, b = n(), n()
question = '{}+{}'.format(a, b)
answer_y = str(a + b)
q.append(question)
y.append(answer_y)
return(q,y)
적중 유무를 색으로 표시하기 위해…
class colors:
ok = '\033[92m'
fail = '\033[91m'
close = '\033[0m'
gluon.HybridBlock
을 상속받아 HybridBlock으로 구성했다. 참고로 array[:,1,:]
방식의 슬라이싱 형태의 연산을 mx.nd.*
계열 함수 연산으로 모두 바꿔줘야 된다.
class calculator(gluon.HybridBlock):
def __init__(self, n_hidden, vacab_size, max_seq_length=9, attention=False ,**kwargs):
super(calculator,self).__init__(**kwargs)
#입력 시퀀스 길이
self.in_seq_len = max_seq_length
#출력 시퀀스 길이
self.out_seq_len = max_seq_length
# LSTM의 hidden 개수
self.n_hidden = n_hidden
#고유문자개수
self.vacab_size = vacab_size
#max_seq_length
self.max_seq_length = max_seq_length
self.attention = attention
with self.name_scope():
self.encoder = rnn.LSTMCell(hidden_size=n_hidden)
self.decoder = rnn.LSTMCell(hidden_size=n_hidden)
self.batchnorm = nn.BatchNorm(axis=2)
#flatten을 false로 할 경우 마지막 차원에 fully connected가 적용된다.
self.dense = nn.Dense(self.vacab_size,flatten=False)
if self.attention:
self.attdense = nn.Dense(self.max_seq_length, flatten=False)
self.attn_combine = nn.Dense( self.vacab_size, flatten=False)
def hybrid_forward(self,F, inputs, outputs):
"""
학습 코드
"""
#encoder LSTM
enout, (next_h, next_c) = self.encoder.unroll(inputs=inputs, length=self.in_seq_len, merge_outputs=True)
#decoder LSTM with attention
for i in range(self.out_seq_len):
#out_seq_len 길이만큼 LSTMcell을 unroll하면서 출력값을 적재한다.
p_outputs = F.slice_axis(outputs, axis=1, begin=i, end=i+1)
p_outputs = F.reshape(p_outputs, (-1, self.vacab_size))
# p_outputs = outputs[:,i,:]
# 위와 같이 진행한 이유는 hybridize를 위함
if self.attention:
p_outputs, _ = self.apply_attention(F=F, inputs=p_outputs, hidden=next_h, encoder_outputs=enout)
deout, (next_h, next_c) = self.decoder(p_outputs, [next_h, next_c], )
if i == 0:
deouts = deout
else:
deouts = F.concat(deouts, deout, dim=1)
#2dim -> 3dim 으로 reshape
deouts_orig = F.reshape(deouts, (-1, self.out_seq_len, self.n_hidden))
deouts = self.batchnorm(deouts_orig)
deouts_fc = self.dense(deouts)
return(deouts_fc)
def apply_attention(self, F, inputs, hidden, encoder_outputs):
#inputs : decoder input의미
concated = F.concat(inputs, hidden, dim=1)
#(,max_seq_length) : max_seq_length 개별 시퀀스의 중요도
attn_weights = F.softmax(self.attdense(concated), axis=1)
#(N,max_seq_length) x (N,max_seq_length,n_hidden) = (N, max_seq_length, n_hidden)
#attn_weigths 가중치를 인코더 출력값에 곱해줌
w_encoder_outputs = F.broadcast_mul(encoder_outputs, attn_weights.expand_dims(2))
#(N, vocab_size * max_seq_length), (N, max_seq_length * n_hidden) = (N, ...)
output = F.concat(inputs.flatten(), w_encoder_outputs.flatten(), dim=1)
#(N, vocab_size)
output = self.attn_combine(output)
#attention weight은 시각화를 위해 뽑아둔다.
return(output, attn_weights)
def calulation(self, input_str, char_indices, indices_char, input_digits=9, lchars=14, ctx=mx.gpu(0)):
"""
inference 코드
"""
#앞뒤에 S,E 코드 추가
input_str = 'S' + input_str + 'E'
#string to one-hot coding
X = F.zeros((1, input_digits, lchars), ctx=ctx)
for t, char in enumerate(input_str):
X[0, t, char_indices[char]] = 1
#디코더의 초기 입력값으로 넣을 'S'를 one-hot coding한다.
Y_init = F.zeros((1, lchars), ctx=ctx)
Y_init[0,char_indices['S']] = 1
#인코더 출력값을 도출한다.
enout, (next_h, next_c) = self.encoder.unroll(inputs=X, length=self.in_seq_len, merge_outputs=True)
deout = Y_init
#출력 시퀀스 길이만큼 순회
for i in range(self.out_seq_len):
if self.attention:
deout, att_weight = self.apply_attention(F=F, inputs=deout, hidden=next_h, encoder_outputs=enout)
if i == 0:
att_weights = att_weight
else:
att_weights = F.concat(att_weights,att_weight,dim=0)
deout, (next_h, next_c) = self.decoder(deout, [next_h, next_c])
#batchnorm을 적용하기 위해 차원 증가/원복
deout = F.expand_dims(deout,axis=1)
deout = self.batchnorm(deout)
#reduce dim
deout = deout[:,0,:]
#'S'의 다음 시퀀스 출력값도출
deout_sm = self.dense(deout)
deout = F.one_hot(F.argmax(F.softmax(deout_sm, axis=1), axis=1), depth=self.vacab_size)
gen_char = indices_char[F.argmax(deout_sm, axis=1).asnumpy()[0].astype('int')]
if gen_char == ' ' or gen_char == 'E':
break
else:
if i == 0:
ret_seq = gen_char
else:
ret_seq += gen_char
return(ret_seq, att_weights)
data iterator 생성
tr_set = gluon.data.ArrayDataset(X_train, Y_train, Z_train)
tr_data_iterator = gluon.data.DataLoader(tr_set, batch_size=256, shuffle=True)
te_set =gluon.data.ArrayDataset(X_validation, Y_validation, Z_validation)
te_data_iterator = gluon.data.DataLoader(te_set, batch_size=256, shuffle=True)
ctx = mx.gpu()
def model_init():
#모형 인스턴스 생성 및 트래이너, loss 정의
model = calculator(300, 14, attention=True)
model.collect_params().initialize(mx.init.Xavier(), ctx=ctx)
trainer = gluon.Trainer(model.collect_params(), 'rmsprop')
loss = gluon.loss.SoftmaxCrossEntropyLoss(axis = 2, sparse_label=False)
return(model, loss, trainer)
model.hybridize()
print(model)
def calculate_loss(model, data_iter, loss_obj, ctx=ctx):
test_loss = []
for i, (x_data, y_data, z_data) in enumerate(data_iter):
x_data = x_data.as_in_context(ctx).astype('float32')
y_data = y_data.as_in_context(ctx).astype('float32')
z_data = z_data.as_in_context(ctx).astype('float32')
with autograd.predict_mode():
z_output = model(x_data, y_data)
loss_te = loss_obj(z_output, z_data)
curr_loss = mx.nd.mean(loss_te).asscalar()
test_loss.append(curr_loss)
return(np.mean(test_loss))
def train(epochs, tr_data_iterator, model, loss, trainer):
### 학습 코드
tot_test_loss = []
tot_train_loss = []
for e in range(epochs):
train_loss = []
for i, (x_data, y_data, z_data) in enumerate(tr_data_iterator):
x_data = x_data.as_in_context(ctx).astype('float32')
y_data = y_data.as_in_context(ctx).astype('float32')
z_data = z_data.as_in_context(ctx).astype('float32')
with autograd.record():
z_output = model(x_data, y_data)
loss_ = loss(z_output, z_data)
loss_.backward()
trainer.step(x_data.shape[0])
curr_loss = mx.nd.mean(loss_).asscalar()
train_loss.append(curr_loss)
if e % 10 == 0:
# 매 10 에폭마다 예제 테스트 결과 출력
q, y = gen_n_test(10)
for i in range(10):
with autograd.predict_mode():
p, att_weight = model.calulation(q[i], char_indices, indices_char)
iscorr = 1 if p == y[i] else 0
if iscorr == 1:
print(colors.ok + '☑' + colors.close, end=' ')
else:
print(colors.fail + '☒' + colors.close, end=' ')
print("{} = {}({}) 1/0 {}".format(q[i], p, y[i], str(iscorr) ))
#caculate test loss
test_loss = calculate_loss(model, te_data_iterator, loss_obj = loss, ctx=ctx)
print("Epoch %s. Train Loss: %s, Test Loss : %s" % (e, np.mean(train_loss), test_loss))
tot_test_loss.append(test_loss)
tot_train_loss.append(np.mean(train_loss))
Hybridize 기능을 테스트해보겠다. hybridize()
함수 호출 전까지 모형은 PyTorch와 같은 Imperative 방식이다.
%%time
epochs = 10
model, loss, trainer = model_init()
#model.hybridize()
train(epochs, tr_data_iterator, model, loss, trainer)
10회 에폭을 도는데 약 76초 소요되었다.
아래는 hybridize()
호출 이후의 성능이다.
%%time
epochs = 10
model, loss, trainer = model_init()
model.hybridize()
train(epochs, tr_data_iterator, model, loss, trainer)
47초 소요되어 1.6배 학습속도 향상이 있었다는 것을 알 수 있다. GPU 메모리 사용 효율측면에서는 별도 확인이 필요하다.
그럼 120에폭정도 더 수행해 모형을 충분히 학습 시킨 후 입력 수식에 대한 어텐션 가중치를 시각화 해보도록 하자!
%%time
epochs=120
model, loss, trainer = model_init()
model.hybridize()
train(epochs, tr_data_iterator, model, loss, trainer)
in_sent = "900+900"
p, att_weight = model.calulation(in_sent, char_indices, indices_char)
p
ax = sns.heatmap(att_weight.asnumpy()[1:,1:-1][:len(p), :len(in_sent)],
xticklabels=list(in_sent),
yticklabels= list(p), cmap='bone')
ax.xaxis.tick_top()
ax.tick_params(axis='y', rotation='auto')
위에서 보면 1800의 첫번째 자리(1)를 계산하는데, 개별 숫자의 첫번째 자리의 숫자들이(9,9) 영향을 준것을 확인할 수 있다. 이 이외의 패턴은 어텐션으로는 명확하게 원리를 파악하기 어렵다는 것을 알 수 있다(굳이 해석하자면 십의자리 0의 계산 원리정도?).
어텐션은 seq2seq가 아니더라도 대부분의 네트웍에 활용 가능한 개념이다. 게다가 더불어 성능 향상 측면에서도 긍정적 효과가 있다고 알려져 있어서 그냥 묻지마 활용을 하는 추세이다. 무엇보다 필자가 개인적으로 상당히 좋은 점으로 꼽는 부분은 예측시각화 측면이다.
정말 이젠 영한 기계번역 모델을 만들어볼 때가 된것 같다. 아마도 위 모듈에서 임베딩 정도만 추가하면 기초적인 기계번역 모형을 구축할 수 있을 것으로 보인다.
reference : http://pytorch.org/tutorials/intermediate/seq2seq_translation_tutorial.html
딥러닝이 덧셈을 하는 방법, Attention Mechanism으로 살펴보기 by from __future__ import dream is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License.