필자가 Gluon을 시작한 계기는 바로 Keras로 seq2seq기반의 네트워크를 구축하기가 매우 어렵거나 모호해서 였다. 사실 간단한 영한 기계번역기를 Keras로 만들다가 한계에 부딧혀 포기했다. 그 원인은 아래 도표를 보면 알겠지만 학습과 예측의 네트웍 플로우가 다른데 있는데, 이럴 경우 예측 코드는 Keras에서 작성하기가 어려워진다. 사실 어렵다기 보다는 모호한 코드를 짜게 될 가능성이 많다. 그래서 해당 코드를 작성하고 이틀만 지나도 내가 뭘 했는지 알기 어렵다. 아마도 이런 이유때문에 특히 NLP하는 분들은 PyTorch를 선호하는게 아닐까 한다. 하지만 필자는 올해 10월에 나온 Gluon을 기반으로 구축해 봤다. 왜냐면 아직 이 예제는 Gluon으로 작성된게 없기 때문이다.
해당 예제는 이미 keras example에 아주 잘 작성되어 있다. 간단히 설명하면 encoder의 마지막 시퀀스 출력 벡터를 디코더 길이만큼 반복해 context 벡터를 만들어 입력을 넣어주는 방식이다. 다만 구현이 해당 방식이 아니면 복잡해지는 단점이 있다는 것이다. 그래서 기계번역에서 성능이 좋게 나온다는 오리지널 seq2seq 방식을 Keras 개발자기 직접 블로그에 올리기도 하였다. 하지만 Keras에서 inference loop 쪽 코드를 보면 한계가 여실히 드러나는 것을 알 수 있는데, 필자도 해당 코드 기반으로 영한번역기를 구축하다 결국 디버깅 및 inference code가 복잡하고 어려워지면서 포기했다.
사실 Gluon으로 작업을 하면서도 이 간단한 문제도 예측성능이 나오지 않아 정말 많은 시행착오를 했다(퇴근 후 3일 동안 새벽 3시까지…). 아래 코드를 보면 알겠지만 마지막 레이어에서 fully connected가 3차원일 경우 어떻게 코드를 작성해야 마지막 차원에서 원하는 Softmax를 찾을지.. (사실 이 부분은 tensorflow 코드와 Keras 코드를 알고 있었는데, Gluon에서는 dense layer의 옵션만으로도 적용이 가능했다. 그런데 Keras는 TimeDistribution 레이어로 해결된다. 예제가 많기 때문에 그냥 아무 생각없이 가져다 쓰기 바빴던 내가 참.. 원망 스럽다.) 그리고 encoder,decoder state를 어떻게 전달해야 되는지, 초기 코드의 loss가 80까지 나오는데 (결과적으로 원인은 데이터 생성에서 버그에 있었지만), 원인을 어떻게 찾을지 ….그리고 중간 중간 reshape이나 concat을 하게 되는데 다음 레이어에 맞게 dimension만 맞추려 하다가는 잘못 변환된 데이터를 입력하는 경우가 많고, 이 때문에 학습이 안되는 경우가 많았다. 역시나 이런 부분에 대한 고민과 훈련이 안된 티가 난다. 사실 이런 부분은 Keras를 쓸때는 거의 신경쓰지 않는 문제긴 하지만, 이쪽 영역을 한다면 반드시 익숙해져야 될 부분이다. 머리속에 3차원 4차원의 행렬을 생각하면서 코딩해야 되는 어려움은 해본 분들만 알 수 있을 것이다.
일단 하소연은 이정도로 하겠고… seq2seq에 대한 문제정의를 해보도록 하겠다.
일단 학습 코드인데, 학습시에는 3가지 데이터가 쓰인다. 먼저 “S1+5E”와 같은 수식 텍스트 그리고 decoder에 입력으로 들어갈 정답 “S6E”. 그리고 실제 loss를 계산할 Y인 “6E”.여기서 “S”, “E”는 각각 텍스트의 시작과 마지막을 의미한다.
decoder쪽은 입력으로 t-1의 텍스트가 입려되면, t 시점의 텍스트를 예측하게끔 훈련이 되며, encoder에서는 입력 수식의 데이터의 시계열적인 정보를 2개의 state(여기선 LSTM을 사용했다)로 decoder에 전달하게 된다.
그림에서 마지막 예측 결과를 종합하는 dense layer는 생략되었는데, 이 부분은 코드를 참고하길 바란다.
예측시에는 Encoder에 입력되는 텍스트만 들어가게 되는데, Decoder의 t-1시점의 출력이 t시점의 입력으로 다시 들어가게 되는 부분이 네트워크 구조가 학습시와 달라지는 부분이다. 이 부분 때문에 Keras 구현 난이도가 200% 올라가는 것이다.
그럼 코드를 보면서 주석으로 설명을 올려 보겠다.
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from mxnet import nd as F
import mxnet as mx
from mxnet import gluon
from mxnet.gluon import nn, rnn
import mxnet.autograd as autograd
from mxnet import nd as F
아래는 학습 데이터 생성 코드이다. 데이터 생성 예제 코드는 인터넷에 많은데… 이 생성 부분은 주로 이곳코드를 참고했다.
def n(digits=3):
number = ''
for i in range(np.random.randint(1, digits + 1)):
number += np.random.choice(list('0123456789'))
return int(number)
def padding(chars, maxlen):
return chars + ' ' * (maxlen - len(chars))
N = 50000
N_train = int(N * 0.9)
N_validation = N - N_train
digits = 3 # 최대 자릿수
input_digits = digits * 2 + 3 # 예: 123+456
output_digits = digits + 3 # 500+500 = 1000 이상이면 4자리가 된다
added = set()
questions = []
answers = []
answers_y = []
while len(questions) < N:
a, b = n(), n() # 두 개의 수를 적당히 생성한다
pair = tuple(sorted((a, b)))
if pair in added:
continue
question = 'S{}+{}E'.format(a, b)
question = padding(question, input_digits)
answer = 'S' + str(a + b) + 'E'
answer = padding(answer, output_digits)
answer_y = str(a + b) + 'E'
answer_y = padding(answer_y, output_digits)
added.add(pair)
questions.append(question)
answers.append(answer)
answers_y.append(answer_y)
chars = '0123456789+SE '
char_indices = dict((c, i) for i, c in enumerate(chars))
indices_char = dict((i, c) for i, c in enumerate(chars))
X = np.zeros((len(questions), input_digits, len(chars)), dtype=np.integer)
Y = np.zeros((len(questions), digits + 3, len(chars)), dtype=np.integer)
Z = np.zeros((len(questions), digits + 3, len(chars)), dtype=np.integer)
for i in range(N):
for t, char in enumerate(questions[i]):
X[i, t, char_indices[char]] = 1
for t, char in enumerate(answers[i]):
Y[i, t, char_indices[char]] = 1
for t, char in enumerate(answers_y[i]):
Z[i, t, char_indices[char]] = 1
X_train, X_validation, Y_train, Y_validation, Z_train, Z_validation = \
train_test_split(X, Y, Z, train_size=N_train)
예시 테스트 데이터를 생성하는 함수
def gen_n_test(N):
q = []
y = []
for i in range(N):
a, b = n(), n()
question = '{}+{}'.format(a, b)
answer_y = str(a + b)
q.append(question)
y.append(answer_y)
return(q,y)
적중 유무를 색으로 표시하기 위해…
class colors:
ok = '\033[92m'
fail = '\033[91m'
close = '\033[0m'
class calculator(gluon.Block):
def __init__(self, n_hidden,in_seq_len, out_seq_len, vacab_size , **kwargs):
super(calculator,self).__init__(**kwargs)
#입력 시퀀스 길이
self.in_seq_len = in_seq_len
#출력 시퀀스 길이
self.out_seq_len = out_seq_len
# LSTM의 hidden 개수
self.n_hidden = n_hidden
#고유문자개수
self.vacab_size = vacab_size
with self.name_scope():
self.encoder = rnn.LSTMCell(hidden_size=n_hidden)
self.decoder = rnn.LSTMCell(hidden_size=n_hidden)
self.batchnorm = nn.BatchNorm(axis=2)
#flatten을 false로 할 경우 마지막 차원에 fully connected가 적용된다.
self.dense = nn.Dense(self.vacab_size,flatten=False)
def forward(self, inputs, outputs):
"""
학습 코드
"""
#encoder LSTM
enout, (next_h, next_c) = self.encoder.unroll(inputs=inputs, length=self.in_seq_len, merge_outputs=True)
#decoder LSTM
for i in range(self.out_seq_len):
#out_seq_len 길이만큼 LSTMcell을 unroll하면서 출력값을 적재한다.
deout, (next_h, next_c) = self.decoder(outputs[:,i,:], [next_h, next_c], )
if i == 0:
deouts = deout
else:
deouts = F.concat(deouts, deout, dim=1)
#2dim -> 3dim 으로 reshape
deouts = F.reshape(deouts, (-1, self.out_seq_len, self.n_hidden))
deouts = self.batchnorm(deouts)
deouts_fc = self.dense(deouts)
return(deouts_fc)
def calulation(self, input_str, char_indices, indices_char, input_digits=9, lchars=14, ctx=mx.gpu(0)):
"""
inference 코드
"""
#앞뒤에 S,E 코드 추가
input_str = 'S' + input_str + 'E'
#string to one-hot coding
X = F.zeros((1, input_digits, lchars), ctx=ctx)
for t, char in enumerate(input_str):
X[0, t, char_indices[char]] = 1
#디코더의 초기 입력값으로 넣을 'S'를 one-hot coding한다.
Y_init = F.zeros((1, lchars), ctx=ctx)
Y_init[0,char_indices['S']] = 1
#인코더 출력값을 도출한다.
enout, (next_h, next_c) = self.encoder.unroll(inputs=X, length=self.in_seq_len, merge_outputs=True)
deout = Y_init
#출력 시퀀스 길이만큼 순회
for i in range(self.out_seq_len):
deout, (next_h, next_c) = self.decoder(deout, [next_h, next_c])
#batchnorm을 적용하기 위해 차원 증가/원복
deout = F.expand_dims(deout,axis=1)
deout = self.batchnorm(deout)
deout = deout[:,0,:]
#'S'의 다음 시퀀스 출력값도출
deout_sm = self.dense(deout)
deout = F.one_hot(F.argmax(F.softmax(deout_sm, axis=1), axis=1), depth=self.vacab_size)
if i == 0:
ret_seq = indices_char[F.argmax(deout_sm, axis=1).asnumpy()[0].astype('int')]
else:
ret_seq += indices_char[F.argmax(deout_sm, axis=1).asnumpy()[0].astype('int')]
if ret_seq[-1] == ' ' or ret_seq[-1] == 'E':
break
#공백 padding 제거 후 리턴
return(ret_seq.strip('E').strip())
data iterator 생성
tr_set = gluon.data.ArrayDataset(X_train, Y_train, Z_train)
tr_data_iterator = gluon.data.DataLoader(tr_set, batch_size=256, shuffle=True)
te_set =gluon.data.ArrayDataset(X_validation, Y_validation, Z_validation)
te_data_iterator = gluon.data.DataLoader(te_set, batch_size=256, shuffle=True)
ctx = mx.gpu()
#모형 인스턴스 생성 및 트래이너, loss 정의
model = calculator(300, 9, 6, 14)
model.collect_params().initialize(mx.init.Xavier(), ctx=ctx)
trainer = gluon.Trainer(model.collect_params(), 'rmsprop')
loss = gluon.loss.SoftmaxCrossEntropyLoss(axis = 2, sparse_label=False)
print(model)
def calculate_loss(model, data_iter, loss_obj, ctx=ctx):
test_loss = []
for i, (x_data, y_data, z_data) in enumerate(data_iter):
x_data = x_data.as_in_context(ctx).astype('float32')
y_data = y_data.as_in_context(ctx).astype('float32')
z_data = z_data.as_in_context(ctx).astype('float32')
with autograd.predict_mode():
z_output = model(x_data, y_data)
loss_te = loss_obj(z_output, z_data)
curr_loss = mx.nd.mean(loss_te).asscalar()
test_loss.append(curr_loss)
return(np.mean(test_loss))
epochs = 201
### 학습 코드
tot_test_loss = []
tot_train_loss = []
for e in range(epochs):
train_loss = []
for i, (x_data, y_data, z_data) in enumerate(tr_data_iterator):
x_data = x_data.as_in_context(ctx).astype('float32')
y_data = y_data.as_in_context(ctx).astype('float32')
z_data = z_data.as_in_context(ctx).astype('float32')
with autograd.record():
z_output = model(x_data, y_data)
loss_ = loss(z_output, z_data)
loss_.backward()
trainer.step(x_data.shape[0])
curr_loss = mx.nd.mean(loss_).asscalar()
train_loss.append(curr_loss)
if e % 10 == 0:
# 매 10 에폭마다 예제 테스트 결과 출력
q, y = gen_n_test(10)
for i in range(10):
with autograd.predict_mode():
p = model.calulation(q[i], char_indices, indices_char).strip()
iscorr = 1 if p == y[i] else 0
if iscorr == 1:
print(colors.ok + '☑' + colors.close, end=' ')
else:
print(colors.fail + '☒' + colors.close, end=' ')
print("{} = {}({}) 1/0 {}".format(q[i], p, y[i], str(iscorr) ))
#caculate test loss
test_loss = calculate_loss(model, te_data_iterator, loss_obj = loss, ctx=ctx)
print("Epoch %s. Train Loss: %s, Test Loss : %s" % (e, np.mean(train_loss), test_loss))
tot_test_loss.append(test_loss)
tot_train_loss.append(np.mean(train_loss))
매 10에폭마다 랜덤 예제의 값을 예측하게 했는데, 200에폭에 근접하면서 랜덤 테스트셋을 대부분 적중하는 모습을 볼 수 있어 제대로 학습이 진행되고 있음을 확인할 수 있을 것이다.
이 모형으로 덧셈을 할 사람이 누가 있을까? 사실 이 코드는 실제 활용을 위한 코드는 아니고 이 seq2seq 코드로 직접 기계번역 모형을 빌드해보기 위함이다. 성공적으로 토이 모형이 만들어 졌으니 아마도 다음 포스트는 기계번역 초기 모형이 되지 않을까 하는 생각을 해본다. 그 포스트에서는 Attention 등과 같은 기술도 구현해 볼 수 있지 않을까 생각해본다.
seq2seq기반 덧셈 모형 빌드(with Gluon) by from __future__ import dream is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License.