Source code for mzutils.ctsv_funcs

import codecs
import csv
import os
import sys

import mzutils.list_funcs


[docs]def write_tsv(file_path, rows): """ :param file_path: :param rows: a list of rows to be written in the tsv file. The rows are lists of items. :return: """ csv.field_size_limit(sys.maxsize) with codecs.open(file_path, "w+", encoding="utf-8") as fp: tsv_writer = csv.writer(fp, delimiter='\t') for row in rows: tsv_writer.writerow(row)
# tsv_writer.writerow(["index", "question11", "question2"]) # tsv_writer.writerow(["0", sentence1, sentence2])
[docs]def read_tsv(file_path): """ read a tsv into a nested python list. :param file_path: :return: """ csv.field_size_limit(sys.maxsize) cached_list = [] with codecs.open(file_path, "r", encoding="utf-8") as fp: tsv_reader = csv.reader(fp, delimiter='\t') for row in tsv_reader: cached_list.append(row) return cached_list
[docs]def append_tsv(file_path, rows): """ :param file_path: :param rows: a list of rows to be written in the tsv file. The rows are lists of items. :return: """ csv.field_size_limit(sys.maxsize) with codecs.open(file_path, "a+", encoding="utf-8") as fp: tsv_writer = csv.writer(fp, delimiter='\t') for row in rows: tsv_writer.writerow(row)
[docs]def segment_large_csv(file_path, destination_path, segmentation_length, duplicate_header=False): """ segment a large file to several smaller files to a destination. If duplicate_header is True, the first line of the original large file will be duplicated to every segmented files, results in the length of segmented file = segmentation_length + 1. which also means that :param file_path: :param destination_path: :param segmentation_length: :param duplicate_header: :return: how many files are segmented. """ csv.field_size_limit(sys.maxsize) filename, file_extension = os.path.splitext(os.path.basename(file_path)) header = None with codecs.open(file_path, "r", encoding="utf-8") as fp: csv_reader = csv.reader(fp) if duplicate_header: header = csv_reader.__next__() segmentation_length += 1 j = 0 while True: i = 0 j += 1 current_filepath = os.path.join(destination_path, filename + str(j) + file_extension) with codecs.open(current_filepath, "w+", encoding="utf-8") as fp: csv_writer = csv.writer(fp) if duplicate_header: csv_writer.writerow(header) while i < segmentation_length: try: row = next(csv_reader) csv_writer.writerow(row) i += 1 except StopIteration: return j
[docs]def segment_large_tsv(file_path, destination_path, segmentation_length, duplicate_header=False): """ segment a large file to several smaller files to a destination. If duplicate_header is True, the first line of the original large file will be duplicated to every segmented files, results in the length of segmented file = segmentation_length + 1. which also means that :param file_path: :param destination_path: :param segmentation_length: :param duplicate_header: :return: how many files are segmented. """ csv.field_size_limit(sys.maxsize) filename, file_extension = os.path.splitext(os.path.basename(file_path)) header = None with codecs.open(file_path, "r", encoding="utf-8") as fp: tsv_reader = csv.reader(fp, delimiter='\t') if duplicate_header: header = tsv_reader.__next__() segmentation_length += 1 j = 0 while True: i = 0 j += 1 current_filepath = os.path.join(destination_path, filename + str(j) + file_extension) with codecs.open(current_filepath, "w+", encoding="utf-8") as fp: tsv_writer = csv.writer(fp, delimiter='\t') if duplicate_header: tsv_writer.writerow(header) while i < segmentation_length: try: row = next(tsv_reader) tsv_writer.writerow(row) i += 1 except StopIteration: return j
[docs]def save_tsv_as_csv(tsv_file, csv_file=None): csv.field_size_limit(sys.maxsize) from mzutils.os_funcs import parent_dir_and_name, basename_and_extension with codecs.open(tsv_file, "r", encoding="utf-8") as tfp: if csv_file is None: csv_file = os.path.join(parent_dir_and_name(tsv_file)[0], basename_and_extension(tsv_file)[0]) + '.csv' with codecs.open(csv_file, "w+", encoding="utf-8") as cfp: tsv_reader = csv.reader(tfp, delimiter='\t') csv_writer = csv.writer(cfp, delimiter=',') for row in tsv_reader: csv_writer.writerow(row)
[docs]def find_max_sub_list_length(lst: list): """ pick the left longest sub_list """ if len(lst) == 0: return 0, None max_len = 0 max_sub_list = lst[0] for sub_list in lst: if len(sub_list) > max_len: max_len = len(sub_list) max_sub_list = sub_list return max_len, max_sub_list
[docs]def beautify_csv_lines_horizontal(lst: list): """ the list contain sub_lists with different lengths. This function helps to write them with paddings. return back list of sub_lists with the same length. """ lst = lst.copy() max_len, _ = find_max_sub_list_length(lst) for i in range(len(lst)): lst[i] = mzutils.list_funcs.pad_list(lst[i], max_len) return lst
[docs]def beautify_csv_lines(lst: list): """ the list contain sub_lists with different lengths. This function helps to write them with paddings. return back list of sub_lists of rows to write to csv. """ curr_lst = [] lst = beautify_csv_lines_horizontal(lst) for i in range(len(lst[0])): curr_lst.append([lst[j][i] for j in range(len(lst))]) return curr_lst