Source code for mzutils.os_funcs

import codecs
import errno
import os
import shutil
import tarfile
import time
import zipfile
from inspect import getfullargspec

import nltk

[docs]def parent_dir_and_name(file_path): """ >>> file_path="a/b.c" >>> parent_dir_and_name(file_path) ('/root/.../a', 'b.c') :param file_path: :return: """ return os.path.split(os.path.abspath(file_path))
[docs]def basename_and_extension(file_path): """ >>> file_path="a/b.c" >>> basename_and_extension(file_path) ('b', '.c') :param file_path: :return: """ return os.path.splitext(os.path.basename(file_path))
[docs]def get_things_in_loc(in_path, just_files=True, endswith=None): """ in_path can be file path or dir path. This function return a list of file paths in in_path if in_path is a dir, or within the parent path of in_path if it is not a dir. just_files=False will let the function go recursively into the subdirs. :endswith: None or a list of file extensions (to end with). """ # TODO: check for file if not os.path.exists(in_path): print(str(in_path) + " does not exists!") return re_list = [] if not os.path.isdir(in_path): in_path = parent_dir_and_name(in_path)[0] for name in os.listdir(in_path): name_path = os.path.abspath(os.path.join(in_path, name)) if os.path.isfile(name_path) and (endswith is None or (True in [name_path.endswith(ext) for ext in endswith])): re_list.append(name_path) elif not just_files: if os.path.isdir(name_path): re_list += get_things_in_loc(name_path, just_files=just_files, endswith=endswith) return re_list
[docs]def get_checkpoints_in_loc(in_path, keywords=['checkpoint-'], files_or_folders='folders'): """ This function will loop through in_path to find all files/folders that includes all keywords if files_or_folders='files'/'folders'. again, in_path can be file path or dir path. The function is meant to grab all checkpoint-XXXX in a folder. """ if not os.path.exists(in_path): print(str(in_path) + " does not exists!") return re_list = [] if not os.path.isdir(in_path): in_path = parent_dir_and_name(in_path)[0] for name in os.listdir(in_path): name_path = os.path.abspath(os.path.join(in_path, name)) pattern_truth = all([keyword in name_path for keyword in keywords]) if pattern_truth: if os.path.isfile(name_path) and files_or_folders == 'files': re_list.append(name_path) elif os.path.isdir(name_path) and files_or_folders == 'folders': re_list.append(name_path) return re_list
[docs]def clean_dir(dir_path, just_files=True): """ Clean up a directory. :param dir_path: :param just_files: If just_files=False, also remove all directory trees in that directory. :return: """ if not os.path.isdir(dir_path): if not os.path.exists(dir_path): print(str(dir_path) + " does not exists!") return if not os.path.isdir(dir_path): print(str(dir_path) + " has to be a directory!") return for name in os.listdir(dir_path): name_path = os.path.join(dir_path, name) if os.path.isfile(name_path): os.remove(name_path) elif not just_files: if os.path.isdir(name_path): shutil.rmtree(name_path)
[docs]def mkdir_p(dir_path): """ mkdir -p functionality in python :param dir_path: :return: """ try: os.makedirs(dir_path, exist_ok=True) except OSError as e: if e.errno != errno.EEXIST: raise e
[docs]def unzip_all(dir_path, target_path, endswith=".zip"): """ :param dir_path: :param target_path: :param endswith: ".zip", ".tar.gz" or ".tar" :return: """ for item in os.listdir(dir_path): if item.endswith(endswith): if endswith == ".zip": zip_ref = zipfile.ZipFile(os.path.join(dir_path, item), 'r') elif endswith == ".tar.gz": zip_ref =, item), 'r:gz') elif endswith == ".tar": zip_ref =, item), 'r:') else: continue zip_ref.extractall(target_path) zip_ref.close()
[docs]def documents_segementor_on_word_length(documents_dir, store_dir, max_length, language='english', clean_store_dir=False): """ segment a long document to several small documents based on the nltk tokenized word length. sentence structure will be kept. :param documents_dir: where all documents located. :param store_dir: where to store segmented documents. :param max_length: document segments' max length. :param language: for the use of nltk, default english. :param clean_store_dir: :return: number of documents after segmented. """ final_num_of_docs = 0 if not os.path.isdir(documents_dir): raise Exception("documents_dir: where all documents located.") if not os.path.isdir(store_dir): os.mkdir(store_dir) if clean_store_dir: clean_dir(store_dir, just_files=False) names = [name for name in os.listdir(documents_dir) if os.path.isfile(os.path.join(documents_dir, name))] for name in names: final_num_of_docs += helper_document_segmentor(documents_dir, store_dir, name, max_length, language) return final_num_of_docs
# ------------------helper funcs-----------------------------
[docs]def helper_document_segmentor(documents_dir, store_dir, name, max_length, language): documents = [] with, name), "r", "utf-8") as fp: filecontent = sentences = nltk.sent_tokenize(filecontent, language) i = 0 word_count = 0 document = "" while i < len(sentences): sentence = sentences[i] current_count = len(nltk.word_tokenize(sentence, language)) if current_count >= max_length: document = document + sentence + " " documents.append(document) word_count = 0 document = "" i = i + 1 print("Warning: " + "there is a sentence with word length " + str( current_count) + " , but the maximum document length is " + str(max_length)) continue # raise Exception("there is a sentence with word length " + str(current_count) + " , but the maximum document length is " + str(max_length)) if word_count + current_count >= max_length: documents.append(document) word_count = 0 document = "" else: document = document + sentence + " " word_count = word_count + current_count i = i + 1 documents.append(document) helper_save_documents(store_dir, name, documents) return len(documents)
[docs]def helper_save_documents(store_dir, name, documents): with, name), "w+", "utf-8") as fp: fp.write(documents[0]) for document in documents[1:]: filepath = helper_check_existance_and_add_timestamp(store_dir, name) with, "w+", "utf-8") as fp: fp.write(document)
[docs]def helper_check_existance_and_add_timestamp(store_dir, name): timestamp = str(int(round(time.time() * 1000))) filepath = os.path.join(store_dir, name) filename, extension = os.path.splitext(filepath) while os.path.exists(filename + extension): filename += timestamp return filename + extension
[docs]def loop_through_copy_files_to_one_dir(looped_dir, target_dir, include_link=False): """ function to loop through nested directories and copy all the files to a target directory. :param looped_dir: :param target_dir: a directory string. :return: """ if not os.path.isdir(looped_dir): raise Exception("looped_dir: a directory.") if not os.path.isdir(target_dir): raise Exception("target_dir: a directory.") for thing in os.listdir(looped_dir): thing = os.path.join(looped_dir, thing) if os.path.isdir(thing): loop_through_copy_files_to_one_dir(thing, target_dir) elif os.path.isfile(thing): shutil.move(thing, os.path.join(target_dir, parent_dir_and_name(thing)[1])) elif include_link: shutil.move(thing, os.path.join(target_dir, parent_dir_and_name(thing)[1])) return
[docs]def loop_through_return_abs_file_path(looped_dir): """ function to loop through nested directories and return file absolute path in a list. :param looped_dir: :return: list """ re_list = [] if not os.path.isdir(looped_dir): raise Exception("looped_dir: a directory.") for thing in os.listdir(looped_dir): thing = os.path.join(looped_dir, thing) thing = os.path.abspath(thing) if os.path.isdir(thing): re_list = re_list + loop_through_return_abs_file_path(thing) elif os.path.isfile(thing): re_list.append(thing) return re_list
[docs]def loop_through_store_files_to_list(looped_dir, encoding="utf-8"): """ function to loop through nested directories and store the content of all files into a list separately. This function does not care about symbolic link inside the nested directories. :param looped_dir: :param encoding: :return: list """ re_list = [] if not os.path.isdir(looped_dir): raise Exception("looped_dir: a directory.") for thing in os.listdir(looped_dir): thing = os.path.join(looped_dir, thing) if os.path.isdir(thing): re_list = re_list + loop_through_store_files_to_list(thing, encoding) elif os.path.isfile(thing): with, 'r', encoding) as fp: filecontent = re_list.append(filecontent) return re_list
[docs]def loop_through_store_lines_to_list(looped_dir, encoding="utf-8"): """ function to loop through nested directories and store the lines of all files into a list. This function does not care about symbolic link inside the nested directories. :param looped_dir: :param encoding: :return: list """ re_list = [] if not os.path.isdir(looped_dir): raise Exception("looped_dir: a directory.") for thing in os.listdir(looped_dir): thing = os.path.join(looped_dir, thing) if os.path.isdir(thing): re_list = re_list + loop_through_store_files_to_list(thing, encoding) elif os.path.isfile(thing): with, 'r', encoding) as fp: filecontent = fp.readlines() re_list += filecontent return re_list
[docs]def save__init__args(values, underscore=False, overwrite=False, subclass_only=False): """ Use in `__init__()` only; assign all args/kwargs to instance attributes. To maintain precedence of args provided to subclasses, call this in the subclass before `super().__init__()` if `save__init__args()` also appears in base class, or use `overwrite=True`. With `subclass_only==True`, only args/kwargs listed in current subclass apply. usage: >>> class AgentModel: ... def __init__( ... self, ... meta_info_attr_size=7, ... obs_shape=(3, 64, 64), ... reward_shape=(1,), ... n_agents=1, ... obs_last_action=False, ... obs_agent_id=True, ... rnn_hidden_dim=64, ... based_on='observation', ... n_actions=11, ... use_cuda=True,): ... save__init__args(locals()) >>> a=AgentModel() >>> a.rnn_hidden_dim >>> 64 """ prefix = "_" if underscore else "" self = values['self'] args = list() Classes = type(self).mro() if subclass_only: Classes = Classes[:1] for Cls in Classes: # class inheritances if '__init__' in vars(Cls): args += getfullargspec(Cls.__init__).args[1:] for arg in args: attr = prefix + arg if arg in values and (not hasattr(self, attr) or overwrite): setattr(self, attr, values[arg])
[docs]def set_local_vars_from_yaml(yaml_loc, name_space_dict): """ set local variables from yaml file. :param yaml_loc: your yaml file location. :param name_space_dict: a dictionary that contains the local variables. e.g. locals() :return: None for example, if your yaml file contains a variable called num_workers and the value of which is an integer 4, then >>> set_local_vars_from_yaml('path_to_file.yaml', locals()) >>> num_workers 4 """ import yaml with open(yaml_loc, 'r') as fp: config_dict = yaml.safe_load(fp) name_space_dict.update(config_dict)
[docs]class TimeRecorder: def __init__(self): """ need to import time. """ self.init_time = time.time() self.base_time = self.init_time self.times = []
[docs] def record(self, name=""): current = time.time() self.times.append((name, current - self.base_time)) self.base_time = current
[docs] def get_times(self): return self.times
[docs] def get_times_str(self): return str(self.get_times())