import heapq
import os
import pandas as pd
from .os_funcs import mkdir_p, get_things_in_loc
[docs]class SimplePriorityQueue():
"""
a simple wrapper around heapq.
>>> q = SimplePriorityQueue()
>>> q.put((2, "Harry"))
>>> q.put((3, "Charles"))
>>> q.put((1, "Riya"))
>>> q.put((4, "Stacy"))
>>> q.put((0, "John"))
>>> print(q.nlargest(3))
[(4, 'Stacy'), (3, 'Charles'), (2, 'Harry')]
>>> print(q.nsmallest(8))
[(1, 'Riya'), (2, 'Harry'), (3, 'Charles'), (4, 'Stacy')]
>>> print(q.get())
(1, 'Riya')
>>> print(q.get())
(2, 'Harry')
>>> print(q.get())
(3, 'Charles')
>>> print(q.get())
(4, 'Stacy')
>>> print(q.get())
None
"""
def __init__(self, maxsize=0):
self.heap = []
self.maxsize = maxsize
heapq.heapify(self.heap)
def __len__(self):
return len(self.heap)
def __str__(self):
return self.heap.__str__()
[docs] def put(self, element):
# like indicated here, https://stackoverflow.com/questions/42236820/adding-numpy-array-to-a-heap-queue, inserting numpy array to heapq can be risky.
try:
numpy_checker = element in self.heap
except:
raise ValueError(
"Exact same value put again into the priority queue! You may trigger numpy comparision error, please check and fix.")
if self.maxsize > 0 and len(self.heap) >= self.maxsize:
heapq.heappushpop(self.heap, element)
else:
heapq.heappush(self.heap, element)
[docs] def get(self):
try:
return heapq.heappop(self.heap)
except IndexError:
return None
[docs] def nlargest(self, n, key=None):
return heapq.nlargest(n, self.heap)
[docs] def nsmallest(self, n, key=None):
return heapq.nsmallest(n, self.heap)
[docs]class SeedData:
"""
A dictionary that aims to average the evaluated mean_episode_return accross different random seed.
Also controls where to resume the experiments from.
"""
def __init__(self, save_path, seeds, resume_from={}):
self.seeds = seeds
self.seed_data = pd.DataFrame({
'algo_name': pd.Series([], dtype='str'),
'test_reward': pd.Series([], dtype='float'),
'seed': pd.Series([], dtype='int'),
})
self.save_path = save_path
mkdir_p(save_path)
self.load()
# set experiment range
self.resume_from = resume_from
self.resume_check_passed = False
[docs] def load(self):
re_list = get_things_in_loc(self.save_path)
if not re_list:
print("Cannot find the a seed_data.csv at", self.save_path, "initializing a new one.")
self.seed_data.to_csv(os.path.join(self.save_path, 'seed_data.csv'), index=False)
else:
self.seed_data = pd.read_csv(os.path.join(self.save_path, 'seed_data.csv'))
print("Loaded the seed_data.csv at", self.save_path)
[docs] def save(self):
self.seed_data.to_csv(os.path.join(self.save_path, 'seed_data.csv'), index=False)
[docs] def append(self, algo_name, test_reward, seed):
self.seed_data.loc[len(self.seed_data)] = [algo_name, test_reward, seed]
[docs] def setter(self, algo_name, test_reward, seed):
# average over seed makes seed==-1
# online makes dataset_percent==0.0
self.append(algo_name, test_reward, seed)
averaged_reward = self.seed_data.loc[(self.seed_data['algo_name'] == algo_name)]['test_reward'].mean()
if seed == self.seeds[-1]: # append the average, seed now set to -1
self.seed_data.loc[len(self.seed_data)] = [algo_name, averaged_reward, -1]
self.save()
return averaged_reward
[docs] def resume_checker(self, current_positions):
"""
current_positions has the same shape as self.resume_from
return True if the current loop still need to be skipped.
"""
if self.resume_check_passed is True: # checker has already passed.
return True
if not self.resume_from:
self.resume_check_passed = True
elif all([self.resume_from[condition] is None for condition in self.resume_from]):
self.resume_check_passed = True
else:
self.resume_check_passed = all([self.resume_from[condition] == current_positions[condition] for condition in self.resume_from])
return self.resume_check_passed