Source code for d6tstack.sniffer

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""

Finds CSV settings and Excel sheets in multiple files. Often needed as input for stacking

"""
import collections
import csv

import d6tcollect
# d6tcollect.init(__name__)

#******************************************************************
# csv
#******************************************************************

[docs]def csv_count_rows(fname): def blocks(files, size=65536): while True: b = files.read(size) if not b: break yield b with open(fname) as f: nrows = sum(bl.count("\n") for bl in blocks(f)) return nrows
[docs]class CSVSniffer(object, metaclass=d6tcollect.Collect): """ Automatically detects settings needed to read csv files. SINGLE file only, for MULTI file use CSVSnifferList Args: fname (string): file path nlines (int): number of lines to sample from each file delims (string): possible delimiters, default ",;\t|" """ def __init__(self, fname, nlines = 10, delims=',;\t|'): self.cfg_fname = fname self.nrows = csv_count_rows(fname) # todo: check for file size, if large don't run this self.cfg_nlines = min(nlines,self.nrows) # read_lines() doesn't check EOF # todo: check 1% of file up to a max self.cfg_delims_pool = delims self.delim = None # delim used for the file self.csv_lines = None # top n lines read from file self.csv_lines_delim = None # detected delim for each line in file self.csv_rows = None # top n lines split usingn delim def read_nlines(self): # read top lines fhandle = open(self.cfg_fname) self.csv_lines = [fhandle.readline().rstrip() for _ in range(self.cfg_nlines)] fhandle.close() def scan_delim(self): if not self.csv_lines: self.read_nlines() # get delimiter for each line in file delims = [] for line in self.csv_lines: try: csv_sniff = csv.Sniffer().sniff(line, self.cfg_delims_pool) delims.append(csv_sniff.delimiter) except: delims.append(None) # todo: able to catch exception more specifically? self.csv_lines_delim = delims def get_delim(self): if not self.csv_lines_delim: self.scan_delim() # all delimiters the same? if len(set(self.csv_lines_delim))>1: self.delim_is_consistent = False csv_delim_count = collections.Counter(self.csv_lines_delim) csv_delim = csv_delim_count.most_common(1)[0][0] # use the most common used delimeter # todo: rerun on cfg_csv_scan_topline**2 files in case there is a large # of header rows else: self.delim_is_consistent = True csv_delim = self.csv_lines_delim[0] if csv_delim==None: raise IOError('Could not determine a valid delimiter, pleaes check your files are .csv or .txt using one delimiter of %s' %(self.cfg_delims_pool)) else: self.delim = csv_delim self.csv_rows = [s.split(self.delim) for s in self.csv_lines][self.count_skiprows():] if self.check_column_length_consistent(): self.certainty = 'high' else: self.certainty = 'probable' return self.delim def check_column_length_consistent(self): # check if all rows have the same length. NB: this is just on the sample! if not self.csv_rows: self.get_delim() return len(set([len(row) for row in self.csv_rows]))==1 def count_skiprows(self): # finds the number of rows to skip by finding the last line which doesn't use the selected delimiter if not self.delim: self.get_delim() if self.delim_is_consistent: # all delims the same so nothing to skip return 0 l = [d != self.delim for d in self.csv_lines_delim] l = list(reversed(l)) return len(l) - l.index(True) def has_header_inverse(self): # checks if head present if all columns in first row contain a letter if not self.csv_rows: self.get_delim() def is_number(s): try: float(s) return True except ValueError: return False self.is_all_rows_number_col = all([any([is_number(s) for s in row]) for row in self.csv_rows]) ''' self.row_distance = [distance.jaccard(self.csv_rows[0], self.csv_rows[i]) for i in range(1,len(self.csv_rows))] iqr_low, iqr_high = np.percentile(self.row_distance[1:], [5, 95]) is_first_row_different = not(iqr_low <= self.row_distance[0] <= iqr_high) ''' def has_header(self): # more likely than not to contain headers so have to prove no header present self.has_header_inverse() return not self.is_all_rows_number_col
[docs]class CSVSnifferList(object, metaclass=d6tcollect.Collect): """ Automatically detects settings needed to read csv files. MULTI file use Args: fname_list (list): file names, eg ['a.csv','b.csv'] nlines (int): number of lines to sample from each file delims (string): possible delimiters, default ',;\t|' """ def __init__(self, fname_list, nlines = 10, delims=',;\t|'): self.cfg_fname_list = fname_list self.sniffers = [CSVSniffer(fname, nlines, delims) for fname in fname_list] def get_all(self, fun_name, msg_error): val = [] for sniffer in self.sniffers: func = getattr(sniffer, fun_name) val.append(func()) if len(set(val))>1: raise NotImplementedError(msg_error+' Make sure all files have the same format') # todo: want to raise an exception here...? or just use whatever got detected for each file? else: return val[0] def get_delim(self): return self.get_all('get_delim','Inconsistent delimiters detected!') def count_skiprows(self): return self.get_all('count_skiprows','Inconsistent skiprows detected!') def has_header(self): return self.get_all('has_header','Inconsistent header setting detected!')
# todo: propagate status of individual sniffers. instead of raising exception pass back status to get user input
[docs]def sniff_settings_csv(fname_list): sniff = CSVSnifferList(fname_list) csv_sniff = {} csv_sniff['delim'] = sniff.get_delim() csv_sniff['skiprows'] = sniff.count_skiprows() csv_sniff['has_header'] = sniff.has_header() csv_sniff['header'] = 0 if sniff.has_header() else None return csv_sniff