# -*- coding: utf-8 -*-
from . import to_cpu
from .core import *
from .autograd import Tensor
from functools import reduce
import sys
import random
import time
from datetime import timedelta
import matplotlib.pyplot as plt
from logging import getLogger
logger = getLogger('QualiaLogger').getChild('util')
[docs]def numerical_grad(fn, tensor, *args, **kwargs):
delta = 1e-4
h1 = fn(tensor + delta, *args, **kwargs)
h2 = fn(tensor - delta, *args, **kwargs)
return np.divide(np.subtract(h1.data, h2.data), 2*delta)
[docs]def check_function(fn, *args, x=None, domain=(-1e3,1e3), **kwargs):
if x is None:
arr = np.random.random_sample((100,100))
x = Tensor((domain[1]-domain[0])*arr+domain[0])
out = fn(x, *args, **kwargs)
out.backward()
a_grad = x.grad
n_grad = numerical_grad(fn, x, *args, **kwargs)
sse = np.sum(np.power(np.subtract(a_grad, n_grad),2))
logger.info('[*] measured error: {}'.format(sse))
return (a_grad, n_grad), sse
def _single(x):
assert type(x) is int
return x
def _pair(x):
if type(x) is int:
return (x,x)
elif type(x) is tuple:
assert len(x) is 2
return x
else:
raise ValueError
def _triple(x):
if type(x) is int:
return (x,x,x)
elif type(x) is tuple:
assert len(x) is 3
return x
else:
raise ValueError
def _mul(*args):
return reduce(lambda a, b: a*b, args)
[docs]def progressbar(progress, process, text_before='', text_after=''):
bar_length = 40
block = int(round(bar_length*progress/process))
sys.stdout.flush()
text = '\r[*]{}progress: [{:.0f}%] |{}| {}/{} {}'.format(' '+text_before, progress/process*100, '#'*block + "-"*(bar_length-block), progress, process, text_after)
sys.stdout.write(text)
[docs]def download_progress(count, block_size, total_size):
sys.stdout.flush()
sys.stdout.write('\r[*] downloading {:.2f}%'.format(float(count * block_size) / float(total_size) * 100.0))
[docs]class Trainer(object):
''' Trainer base class\n
'''
def __init__(self, batch, path=None):
self.batch = batch
self.path = path
self.losses = []
self.data_transformer = lambda x:x
self.label_transformer = lambda x:x
def __repr__(self):
print('{}'.format(self.__class__.__name__))
[docs] def train(self, model, dataloader, optim, criterion, epochs=200, filename=None):
''' trainer helps the training process of supervised learning
Args:
model (Module): model to train
dataloader (DataLoader): dataloader to use
optim (Optimizer): optimizer to use
criterion (Function): loss function to use
epochs (int): number of epochs
filename (string): specify the filename as well as the loading path without the file extension. (ex) path/to/filename
'''
self.before_train(dataloader, model, filename)
self.train_routine(model, dataloader, optim, criterion, epochs)
self.after_train()
[docs] def before_train(self, dataloader, model, filename):
self.data_name = str(dataloader)
self.model_name = str(model)
dataloader.batch = self.batch
dataloader.training = True
model.train()
if filename is not None:
if os.path.exists(filename+'.hdf5'):
model.load(filename)
logger.info('[*] weights loaded.')
else:
logger.error('[*] File not found: weights cannot be loaded.')
raise FileNotFoundError
logger.info('[*] training started.')
[docs] def before_episode(self, dataloader, model):
pass
[docs] def train_routine(self, model, dataloader, optim, criterion, epochs=200):
start = time.time()
for epoch in range(epochs):
self.before_episode(dataloader, model)
tmp_loss = []
for i, (data, label) in enumerate(dataloader):
output = model(self.data_transformer(data))
loss = criterion(output, self.label_transformer(label))
optim.zero_grad()
loss.backward()
optim.step()
progressbar(i, len(dataloader), 'epoch: {}/{} train loss:{:.4f} '.format(epoch+1, epochs, loss.asnumpy()), '(time: {})'.format(str(timedelta(seconds=time.time()-start))))
tmp_loss.append(loss.asnumpy())
self.after_episode(epoch+1, model, tmp_loss)
[docs] def after_episode(self, epoch, model, loss):
if len(loss) > 0:
self.losses.append(sum(loss)/len(loss))
if self.path is not None:
if not os.path.exists(self.path + '/train/{}/{}/'.format(self.data_name,self.model_name)):
os.makedirs(self.path + '/train/{}/{}/'.format(self.data_name,self.model_name))
model.save(self.path+'/train/{}/{}/{}'.format(self.data_name,self.model_name,epoch))
[docs] def after_train(self):
logger.info('[*] training completed.')
if self.path is None:
self.plot()
else:
self.plot(self.path + '/train/{}/{}/train_curve.png'.format(self.data_name,self.model_name))
[docs] def test(self, model, dataloader, batch, filename=None):
dataloader.training = False
dataloader.batch = batch
model.eval()
if filename is not None:
if os.path.exists(filename+'.hdf5'):
model.load(filename)
logger.info('[*] weights loaded for testing.')
else:
logger.error('[*] File not found: weights cannot be loaded.')
raise FileNotFoundError
logger.info('[*] testing started.')
acc = 0
for i, (data, label) in enumerate(dataloader):
output = model(self.data_transformer(data))
out = np.argmax(output.data, axis=1)
ans = np.argmax(label.data, axis=1)
acc += sum(out == ans)/label.shape[0]
progressbar(i, len(dataloader))
logger.info('\n[*] test acc: {:.2f}%'.format(float(acc/len(dataloader)*100)))
[docs] def plot(self, filename=None):
plt.plot([i for i in range(len(self.losses))], self.losses)
plt.title('training losses of {} in {}'.format(self.model_name, self.data_name))
plt.ylabel('training loss')
plt.xlabel('epochs')
if filename is not None:
plt.savefig(filename)
else:
plt.show()