Source code for d6tstack.convert_xls

import warnings
import os.path

import numpy as np
import pandas as pd

import ntpath

import openpyxl
import xlrd

try:
    from openpyxl.utils.cell import coordinate_from_string
except:
    from openpyxl.utils import coordinate_from_string
from d6tstack.helpers import compare_pandas_versions, check_valid_xls

import d6tcollect
# d6tcollect.init(__name__)

#******************************************************************
# read_excel_advanced
#******************************************************************
[docs]def read_excel_advanced(fname, remove_blank_cols=True, remove_blank_rows=True, collapse_header=True, header_xls_range=None, header_xls_start=None, header_xls_end=None, is_preview=False, nrows_preview=3, **kwds): """ Read Excel files to pandas dataframe with advanced options like set header ranges and remove blank columns and rows Args: fname (str): Excel file path remove_blank_cols (bool): remove blank columns remove_blank_rows (bool): remove blank rows collapse_header (bool): to convert multiline header to a single line string header_xls_range (string): range of headers in excel, eg: A4:B16 header_xls_start (string): Starting cell of excel for header range, eg: A4 header_xls_end (string): End cell of excel for header range, eg: B16 is_preview (bool): Read only first `nrows_preview` lines nrows_preview (integer): Initial number of rows to be used for preview columns (default: 3) kwds (mixed): parameters for `pandas.read_excel()` to pass through Returns: df (dataframe): pandas dataframe Note: You can pass in any `pandas.read_excel()` parameters in particular `sheet_name` """ header = [] if header_xls_range: if not (header_xls_start and header_xls_end): header_xls_range = header_xls_range.split(':') header_xls_start, header_xls_end = header_xls_range else: raise ValueError('Parameter conflict. Can only pass header_xls_range or header_xls_start with header_xls_end') if header_xls_start and header_xls_end: if 'skiprows' in kwds or 'usecols' in kwds: raise ValueError('Parameter conflict. Cannot pass skiprows or usecols with header_xls') scol, srow = coordinate_from_string(header_xls_start) ecol, erow = coordinate_from_string(header_xls_end) # header, skiprows, usecols header = list(range(erow - srow + 1)) usecols = scol + ":" + ecol skiprows = srow - 1 if compare_pandas_versions(pd.__version__, "0.20.3") > 0: df = pd.read_excel(fname, header=header, skiprows=skiprows, usecols=usecols, **kwds) else: df = pd.read_excel(fname, header=header, skiprows=skiprows, parse_cols=usecols, **kwds) else: df = pd.read_excel(fname, **kwds) # remove blank cols and rows if remove_blank_cols: df = df.dropna(axis='columns', how='all') if remove_blank_rows: df = df.dropna(axis='rows', how='all') # todo: add df.reset_index() once no actual data in index # clean up header if collapse_header: if len(header) > 1: df.columns = [' '.join([s for s in col if not 'Unnamed' in s]).strip().replace("\n", ' ') for col in df.columns.values] df = df.reset_index() else: df.rename(columns=lambda x: x.strip().replace("\n", ' '), inplace=True) # preview if is_preview: df = df.head(nrows_preview) return df
#****************************************************************** # XLSSniffer #******************************************************************
[docs]class XLSSniffer(object, metaclass=d6tcollect.Collect): """ Extracts available sheets from MULTIPLE Excel files and runs diagnostics Args: fname_list (list): file paths, eg ['dir/a.csv','dir/b.csv'] logger (object): logger object with send_log(), optional """ def __init__(self, fname_list, logger=None): if not fname_list: raise ValueError("Filename list should not be empty") self.fname_list = fname_list self.logger = logger check_valid_xls(self.fname_list) self.sniff() def sniff(self): """ Executes sniffer Returns: boolean: True if everything ok. Results are accessible in ``.df_xls_sheets`` """ xls_sheets = {} for fname in self.fname_list: if self.logger: self.logger.send_log('sniffing sheets in '+ntpath.basename(fname),'ok') xls_fname = {} xls_fname['file_name'] = ntpath.basename(fname) if fname[-5:]=='.xlsx': fh = openpyxl.load_workbook(fname,read_only=True) xls_fname['sheets_names'] = fh.sheetnames fh.close() # todo: need to close file? elif fname[-4:]=='.xls': fh = xlrd.open_workbook(fname, on_demand=True) xls_fname['sheets_names'] = fh.sheet_names() fh.release_resources() else: raise IOError('Only .xls or .xlsx files can be combined') xls_fname['sheets_count'] = len(xls_fname['sheets_names']) xls_fname['sheets_idx'] = np.arange(xls_fname['sheets_count']).tolist() xls_sheets[fname] = xls_fname self.xls_sheets = xls_sheets df_xls_sheets = pd.DataFrame(xls_sheets).T df_xls_sheets.index.names = ['file_path'] self.dict_xls_sheets = xls_sheets self.df_xls_sheets = df_xls_sheets return True def all_contain_sheetname(self,sheet_name): """ Check if all files contain a certain sheet Args: sheet_name (string): sheetname to check Returns: boolean: If true """ return np.all([sheet_name in self.dict_xls_sheets[fname]['sheets_names'] for fname in self.fname_list]) def all_have_idx(self,sheet_idx): """ Check if all files contain a certain index Args: sheet_idx (string): index to check Returns: boolean: If true """ return np.all([sheet_idx<=(d['sheets_count']-1) for k,d in self.dict_xls_sheets.items()]) def all_same_count(self): """ Check if all files contain the same number of sheets Args: sheet_idx (string): index to check Returns: boolean: If true """ first_elem = next(iter(self.dict_xls_sheets.values())) return np.all([first_elem['sheets_count']==d['sheets_count'] for k,d in self.dict_xls_sheets.items()]) def all_same_names(self): first_elem = next(iter(self.dict_xls_sheets.values())) return np.all([first_elem['sheets_names']==d['sheets_names'] for k,d in self.dict_xls_sheets.items()])
#****************************************************************** # convertor #******************************************************************
[docs]class XLStoBase(object, metaclass=d6tcollect.Collect): def __init__(self, if_exists='skip', output_dir=None, logger=None): """ Base class for converting Excel files Args: if_exists (str): Possible values: skip and replace, default: skip, optional output_dir (str): If present, file is saved in given directory, optional logger (object): logger object with send_log('msg','status'), optional """ if if_exists not in ['skip', 'replace']: raise ValueError("Possible value of 'if_exists' are 'skip' and 'replace'") self.logger = logger self.if_exists = if_exists self.output_dir = output_dir if self.output_dir: if not os.path.exists(self.output_dir): os.makedirs(self.output_dir) def _get_output_filename(self, fname): if self.output_dir: basename = os.path.basename(fname) fname_out = os.path.join(self.output_dir, basename) else: fname_out = fname is_skip = (self.if_exists == 'skip' and os.path.isfile(fname_out)) return fname_out, is_skip def convert_single(self, fname, sheet_name, **kwds): """ Converts single file Args: fname: path to file sheet_name (str): optional sheet_name to override global `cfg_xls_sheets_sel` Same as `d6tstack.utils.read_excel_advanced()` Returns: list: output file names """ if self.logger: msg = 'converting file: '+ntpath.basename(fname)+' | sheet: ' if hasattr(self, 'cfg_xls_sheets_sel'): msg += str(self.cfg_xls_sheets_sel[fname]) self.logger.send_log(msg,'ok') fname_out = fname + '-' + str(sheet_name) + '.csv' fname_out, is_skip = self._get_output_filename(fname_out) if not is_skip: df = read_excel_advanced(fname, sheet_name=sheet_name, **kwds) df.to_csv(fname_out, index=False) else: warnings.warn('File %s exists, skipping' %fname) return fname_out
[docs]class XLStoCSVMultiFile(XLStoBase, metaclass=d6tcollect.Collect): """ Converts xls|xlsx files to csv files. Selects a SINGLE SHEET from each file. To extract MULTIPLE SHEETS from a file use XLStoCSVMultiSheet Args: fname_list (list): file paths, eg ['dir/a.csv','dir/b.csv'] cfg_xls_sheets_sel_mode (string): mode to select tabs * ``name``: select by name, provide name for each file, can customize by file * ``name_global``: select by name, one name for all files * ``idx``: select by index, provide index for each file, can customize by file * ``idx_global``: select by index, one index for all files cfg_xls_sheets_sel (dict): values to select tabs `{'filename':'value'}` output_dir (str): If present, file is saved in given directory, optional if_exists (str): Possible values: skip and replace, default: skip, optional logger (object): logger object with send_log('msg','status'), optional """ def __init__(self, fname_list, cfg_xls_sheets_sel_mode='idx_global', cfg_xls_sheets_sel=0, output_dir=None, if_exists='skip', logger=None): super().__init__(if_exists, output_dir, logger) if not fname_list: raise ValueError("Filename list should not be empty") self.set_files(fname_list) self.set_select_mode(cfg_xls_sheets_sel_mode, cfg_xls_sheets_sel) def set_files(self, fname_list): """ Update input files. You will also need to update sheet selection with ``.set_select_mode()``. Args: fname_list (list): see class description for details """ self.fname_list = fname_list self.xlsSniffer = XLSSniffer(fname_list) def set_select_mode(self, cfg_xls_sheets_sel_mode, cfg_xls_sheets_sel): """ Update sheet selection values Args: cfg_xls_sheets_sel_mode (string): see class description for details cfg_xls_sheets_sel (list): see class description for details """ assert cfg_xls_sheets_sel_mode in ['name','idx','name_global','idx_global'] sheets = self.xlsSniffer.dict_xls_sheets if cfg_xls_sheets_sel_mode=='name_global': cfg_xls_sheets_sel_mode = 'name' cfg_xls_sheets_sel = dict(zip(self.fname_list,[cfg_xls_sheets_sel]*len(self.fname_list))) elif cfg_xls_sheets_sel_mode=='idx_global': cfg_xls_sheets_sel_mode = 'idx' cfg_xls_sheets_sel = dict(zip(self.fname_list,[cfg_xls_sheets_sel]*len(self.fname_list))) if not set(cfg_xls_sheets_sel.keys())==set(sheets.keys()): raise ValueError('Need to select a sheet from every file') # check given selection actually present in files if cfg_xls_sheets_sel_mode=='name': if not np.all([cfg_xls_sheets_sel[fname] in sheets[fname]['sheets_names'] for fname in self.fname_list]): raise ValueError('Invalid sheet name selected in one of the files') # todo show which file is mismatched elif cfg_xls_sheets_sel_mode=='idx': if not np.all([cfg_xls_sheets_sel[fname] <= sheets[fname]['sheets_count'] for fname in self.fname_list]): raise ValueError('Invalid index selected in one of the files') # todo show which file is mismatched else: raise ValueError('Invalid xls_sheets_mode') self.cfg_xls_sheets_sel_mode = cfg_xls_sheets_sel_mode self.cfg_xls_sheets_sel = cfg_xls_sheets_sel def convert_all(self, **kwds): """ Converts all files Args: Any parameters for `d6tstack.utils.read_excel_advanced()` Returns: list: output file names """ fnames_converted = [] for fname in self.fname_list: fname_out = self.convert_single(fname, self.cfg_xls_sheets_sel[fname], **kwds) fnames_converted.append(fname_out) return fnames_converted
[docs]class XLStoCSVMultiSheet(XLStoBase, metaclass=d6tcollect.Collect): """ Converts ALL SHEETS from a SINGLE xls|xlsx files to separate csv files Args: fname (string): file path sheet_names (list): list of int or str. If not given, will convert all sheets in the file output_dir (str): If present, file is saved in given directory, optional if_exists (str): Possible values: skip and replace, default: skip, optional logger (object): logger object with send_log('msg','status'), optional """ def __init__(self, fname, sheet_names=None, output_dir=None, if_exists='skip', logger=None): super().__init__(if_exists, output_dir, logger) self.fname = fname if sheet_names: if not isinstance(sheet_names, (list,str)): raise ValueError('sheet_names needs to be a list') self.sheet_names = sheet_names else: self.xlsSniffer = XLSSniffer([fname, ]) self.sheet_names = self.xlsSniffer.xls_sheets[self.fname]['sheets_names'] def convert_single(self, sheet_name, **kwds): """ Converts all files Args: sheet_name (str): Excel sheet Any parameters for `d6tstack.utils.read_excel_advanced()` Returns: str: output file name """ return super().convert_single(self.fname, sheet_name, **kwds) def convert_all(self, **kwds): """ Converts all files Args: Any parameters for `d6tstack.utils.read_excel_advanced()` Returns: list: output file names """ fnames_converted = [] for iSheet in self.sheet_names: fname_out = self.convert_single(iSheet, **kwds) fnames_converted.append(fname_out) return fnames_converted