Source code for qualia2.rl.memory

# -*- coding: utf-8 -*- 
from ..autograd import Tensor
from ..core import *
from ..functions import listconcat, absolute
from collections import deque
from collections import namedtuple
import random

Experience = namedtuple('Experience', ['state','next','reward','action','done'])

[docs]class ReplayMemory(deque): def __init__(self, maxlen): super().__init__(maxlen=maxlen) def __getitem__(self, idx): if isinstance(idx, slice): return ReplayMemory(list(self)[idx]) else: return super().__getitem__(idx)
[docs] def sample(self, batch_size, steps=1): assert steps > 0 if steps > 1: idx = random.sample(range(len(self)-steps),batch_size) result = [] for step in range(steps): tmp = [*zip(*[self[i+step] for i in idx])] result.append(Experience(*[listconcat(tmp[i]).detach() for i in range(3)],np.concatenate(tuple([np.asarray(i).reshape(1,-1) for i in tmp[3]]),axis=0),np.array(tmp[4]))) return result, None, None else: tmp = [*zip(*random.sample(self, batch_size))] return Experience(*[listconcat(tmp[i]).detach() for i in range(3)],np.concatenate(tuple([np.asarray(i).reshape(1,-1) for i in tmp[3]]),axis=0),np.array(tmp[4])), None, None
[docs]class PrioritizedMemory(deque): def __init__(self, maxlen, alpha=0.6, beta_start=0.4, beta_frames=100000): super().__init__(maxlen=maxlen) self.priorities = np.zeros((maxlen)) self.alpha = alpha self.beta_start = beta_start self.beta_frames = beta_frames self.pos = 0 self.frame_idx = 1
[docs] def beta(self): return min(1.0, self.beta_start + self.frame_idx * (1.0 - self.beta_start) / self.beta_frames)
[docs] def append(self, experience): max_priority = np.max(self.priorities) if len(self)>0 else 1.0 self.priorities[self.pos] = max_priority self.pos = (self.pos + 1) % self.maxlen super().append(experience)
[docs] def update_priorities(self, idx, priorities): self.priorities[idx] = priorities
[docs] def sample(self, batch_size, steps=1): assert steps > 0 if steps > 1: prob = self.priorities[:len(self)-steps]**self.alpha prob /= np.sum(prob) idx = np.random.choice(len(self)-steps, batch_size, p=prob) weights = (len(self)*prob[idx])**(-self.beta()) weights /= np.max(weights) self.frame_idx += 1 result = [] for step in range(steps): tmp = [*zip(*[self[i+step] for i in to_cpu(idx)])] result.append(Experience(*[listconcat(tmp[i]).detach() for i in range(3)],np.concatenate(tuple([np.asarray(i).reshape(1,-1) for i in tmp[3]]),axis=0),np.array(tmp[4]))) return result, idx, weights else: prob = self.priorities[:len(self)]**self.alpha prob /= np.sum(prob) idx = np.random.choice(len(self), batch_size, p=prob) tmp = [*zip(*[self[i] for i in to_cpu(idx)])] weights = (len(self)*prob[idx])**(-self.beta()) weights /= np.max(weights) self.frame_idx += 1 return Experience(*[listconcat(tmp[i]).detach() for i in range(3)],np.concatenate(tuple([np.asarray(i).reshape(1,-1) for i in tmp[3]]),axis=0),np.array(tmp[4])), idx, weights