Source code for caar.cleanthermostat

from __future__ import absolute_import, division, print_function

from io import open
import csv
from collections import namedtuple, OrderedDict
import datetime as dt
import os.path
import pickle
import re
import sys

import numpy as np
import pandas as pd

from caar.pandas_tseries_tools import _guess_datetime_format

from caar.configparser_read import SENSOR_FIELDS,                             \
    GEOSPATIAL_FIELDS, SENSOR_ZIP_CODE, SENSOR_DEVICE_ID,                     \
    POSTAL_FILE_ZIP, POSTAL_TWO_LETTER_STATE, SENSOR_LOCATION_ID,             \
    SENSOR_ID_FIELD, UNIQUE_CYCLE_FIELD_INDEX, UNIQUE_GEOSPATIAL_FIELD,       \
    CYCLE_TYPE_INDEX, CYCLE_START_INDEX, CYCLE_END_TIME_INDEX,                \
    SENSORS_LOG_DATE_INDEX, SENSORS_DATA_INDEX, SENSOR_ID_INDEX,              \
    GEOSPATIAL_LOG_DATE_INDEX, GEOSPATIAL_OBSERVATION_INDEX, CYCLE_FIELDS,    \
    CYCLE_ID_INDEX, GEOSPATIAL_ID_INDEX


from future import standard_library
standard_library.install_aliases()


Cycle = namedtuple('Cycle', ['device_id', 'cycle_mode', 'start_time'])
Sensor = namedtuple('Sensor', ['sensor_id', 'timestamp'])
Geospatial = namedtuple('Geospatial', ['location_id', 'timestamp'])


[docs]def dict_from_file(raw_file, cycle=None, states=None, sensors_file=None, postal_file=None, auto=None, id_col_heading=None, cycle_col_heading=None, encoding='UTF-8', delimiter=None, quote=None, cols_to_ignore=None, meta=False): """Read delimited text file and create dict of dicts. One dict within the dict has the key 'cols_meta' and contains metadata. The other has the key 'records'. The records keys are named 2-tuples containing numeric IDs and time stamps (and cycle mode if a cycle mode is chosen with the argument 'cycle=', for cycling data). The values are either single values (floats, ints or strings) or tuples of these types. See the example .csv data files at https://github.com/nickpowersys/caar. Example sensor cycle file column headings: DeviceId, CycleType, StartTime, EndTime. Example sensor file column headings: SensorId, TimeStamp, Degrees. Example outside temperature file column headings LocationId, TimeStamp, Degrees. Common delimited text file formats including commas, tabs, pipes and spaces are detected in that order within the data rows (the header has its own delimiter detection and is handled separately, automatically) and the first delimiter detected is used. In all cases, rows are only used if the number of values match the number of column headings in the first row. Each input file is expected to have (at least) columns representing ID's, time stamps (or starting and ending time stamps for cycles), and (if not cycles) corresponding observations. To use the automatic column detection functionality, use the keyword argument 'auto' and assign it one of the values: 'cycles', 'sensors', or 'geospatial'. The ID's should contain both letters and digits in some combination (leading zeroes are also allowed in place of letters). Having the string 'id', 'Id' or 'ID' will then cause a column to be the ID index within the combined ID-time stamp index for a given input file. If there is no such heading, the leftmost column with alphanumeric strings (for example, 'T12' or '0123') will be taken as the ID. The output can be filtered on records from a state or set of states by specifying a comma-delimited string containing state abbreviations. Otherwise, all available records will be in the output. If a state or states are specified, a sensors metadata file and postal code file must be specified in the arguments and have the same location ID columns and ZipCode/PostalCode column headings in the same left-to-right order as in the examples. For the other columns, dummy values may be used if there is no actual data. Args: raw_file (str): The input file. cycle (Optional[str]): The type of cycling operation that will be included in the output. For example, possible values that may be in the data file are 'Cool' or 'Heat'. If no specific value is specified as an argument, all operating modes will be included. states (Optional[str]): One or more comma-separated, two-letter state abbreviations. sensors_file (Optional[str]): Path of metadata file for sensors. Required if there is a states argument. postal_file (Optional[str]): Metadata file for zip codes, with zip codes, their state, and other geographic information. Required if there is a states argument. auto (Optional[Boolean]): {'cycles', 'sensors', 'geospatial', None} If one of the data types is specified, the function will detect which columns contain IDs, time stamps and values of interest automatically. If None (default), the order of columns in the delimited file and the config.ini file should match. id_col_heading (Optional[str]): Indicates the heading in the header for the ID column. cycle_col_heading (Optional[str]): Indicates the heading in the header for the cycle mode column. cols_to_ignore (Optional[iterable of [str] or [int]]): Column headings or 0-based column indexes that should be left out of the output. encoding (Optional[str]): Encoding of the raw data file. Default: 'UTF-8'. delimiter (Optional[str]): Character to be used as row delimiter. Default is None, but commas, tabs, pipes and spaces are automatically detected in that priority order) if no delimiter is specified. quote (Optional[str]): Characters surrounding data fields. Default is none, but double and single quotes surrounding data fields are automatically detected and removed if they are present in the data rows. If any other character is specified in the keyword argument, and it surrounds data in any column, it will be removed instead. meta (Optional[bool]): An alternative way to return metadata about columns, besides the detect_columns() function. To use it, meta must be True, and a dict of metadata will be returned instead of a dict of records. Returns: clean_dict (dict): Dict. """ kwargs = dict([('states', states), ('sensors_file', sensors_file), ('cycle', cycle), ('postal_file', postal_file), ('auto', auto), ('delimiter', delimiter), ('quote', quote), ('meta', meta), ('id_col_heading', id_col_heading), ('encoding', encoding)]) if isinstance(meta, bool): pass else: raise ValueError('meta argument must be either False or True.') if states: try: assert kwargs.get('sensors_file'), kwargs.get('postal_file') except ValueError: _missing_sensors_or_postal_error_message() header_kwargs = dict([('encoding', encoding), ('delimiter', delimiter), ('id_col_heading', id_col_heading), ('quote', quote), ('auto', auto), ('cycle', cycle)]) header, id_index = _header_and_id_col_if_heading_or_preconfig(raw_file, **header_kwargs) skwargs = dict([('encoding', encoding), ('delimiter', delimiter), ('quote', quote), ('cycle', cycle), ('id_col', id_index), ('auto', auto), ('cols_to_ignore', cols_to_ignore), ('cycle_col_heading', cycle_col_heading)]) # If delimiter and/or quote were not specified as kwargs, # they will be set by call to _analyze_all_columns() cols_meta, delim, quote = _analyze_all_columns(raw_file, header, **skwargs) if meta: return cols_meta else: for k, v in [('cols_meta', cols_meta), ('delimiter', delim), ('quote', quote), ('header', header)]: kwargs[k] = v records = _dict_from_lines_of_text(raw_file, **kwargs) for col, meta in cols_meta.items(): if meta['type'] == 'numeric_commas': meta['type'] == 'ints' container = {'cols_meta': cols_meta, 'records': records} return container
[docs]def detect_columns(raw_file, cycle=None, states=None, sensors_file=None, postal_file=None, auto=None, encoding='UTF-8', delimiter=None, quote=None, id_col_heading=None, cycle_col_heading=None, cols_to_ignore=None): """Returns dict with columns that will be in dict based on dict_from_file() or pickle_from_file() and corresponding keyword arguments ('auto' is required, and must be a value other than None). Args: raw_file (str): The input file. cycle (Optional[str]): The type of cycle that will be in the output. For example, example values that may be in the data file are 'Cool' and/or 'Heat'. If no specific value is specified as an argument, all modes will be in the output. states (Optional[str]): One or more comma-separated, two-letter state abbreviations. sensors_file (Optional[str]): Path of metadata file for sensors. Required if there is a states argument. postal_file (Optional[str]): Metadata file for postal codes. Required if there is a states argument. auto (Optional[Boolean]): {'cycles', 'sensors', 'geospatial', None} If one of the data types is specified, the function will detect which columns contain IDs, time stamps and values of interest automatically. If None (default), the order of columns in the delimited file and the config.ini file should match. id_col_heading (Optional[str]): Indicates the heading in the header for the ID column. cycle_col_heading (Optional[str]): Indicates the heading in the header for the cycle column. cols_to_ignore (Optional[iterable of [str] or [int]]): Column headings or 0-based column indexes that should be left out of the output. encoding (Optional[str]): Encoding of the raw data file. Default: 'UTF-8'. delimiter (Optional[str]): Character to be used as row delimiter. Default is None, but commas, tabs, pipes and spaces are automatically detected in that priority order) if no delimiter is specified. quote (Optional[str]): Characters surrounding data fields. Default is none, but double and single quotes surrounding data fields are automatically detected and removed if they are present in the data rows. If any other character is specified in the keyword argument, and it surrounds data in any column, it will be removed instead. Returns: column_dict (dict): Dict in which keys are one of: 'id_col', 'start_time_col', 'end_time_col', 'cycle_col', (the latter three are for cycles data only), 'time_col', or the headings of other columns found in the file. The values are dicts. """ kwargs = dict([('meta', True), ('cycle', cycle), ('states', states), ('sensors_file', sensors_file), ('postal_file', postal_file), ('auto', auto), ('encoding', encoding), ('delimiter', delimiter), ('quote', quote), ('id_col_heading', id_col_heading), ('cycle_col_heading', cycle_col_heading), ('cols_to_ignore', cols_to_ignore)]) col_meta = dict_from_file(raw_file, **kwargs) sorted_meta = _sort_meta_in_col_order(col_meta) return sorted_meta
def _sort_meta_in_col_order(meta): sorted_meta = OrderedDict() for i in range(len(meta)): for k, v in meta.items(): if v['position'] == i: sorted_meta[k] = v return sorted_meta
[docs]def pickle_from_file(raw_file, picklepath=None, cycle=None, states=None, sensors_file=None, postal_file=None, auto=None, id_col_heading=None, cycle_col_heading=None, cols_to_ignore=None, encoding='UTF-8', delimiter=None, quote=None, meta=False): """Read delimited text file and create binary pickle file containing a dict of records. The keys are named tuples containing numeric IDs (strings) and time stamps. See the example .csv data files at https://github.com/nickpowersys/caar. Example sensor cycle file column headings: DeviceId, CycleType, StartTime, EndTime. Example sensors file column headings: SensorId, TimeStamp, Degrees. Example geospatial data file column headings LocationId, TimeStamp, Degrees. Common delimited text file formats including commas, tabs, pipes and spaces are detected in that order within the data rows (the header has its own delimiter detection and is handled separately, automatically) and the first delimiter detected is used. In all cases, rows are only used if the number of values match the number of column headings in the first row. Each input file is expected to have (at least) columns representing ID's, time stamps (or starting and ending time stamps for cycles), and (if not cycles) corresponding observations. To use the automatic column detection functionality, use the keyword argument 'auto' and assign it one of the values: 'cycles', 'sensors', or 'geospatial'. The ID's should contain both letters and digits in some combination (leading zeroes are also allowed in place of letters). Having the string 'id', 'Id' or 'ID' will then cause a column to be the ID index within the combined ID-time stamp index for a given input file. If there is no such heading, the leftmost column with alphanumeric strings (for example, 'T12' or '0123') will be taken as the ID. The output can be filtered on records from a state or set of states by specifying a comma-delimited string containing state abbreviations. Otherwise, all available records will be in the output. If a state or states are specified, a sensors metadata file and postal code file must be specified in the arguments and have the same location ID columns and ZipCode/PostalCode column headings in the same left-to-right order as in the examples. For the other columns, dummy values may be used if there is no actual data. Args: raw_file (str): The input file. picklepath (str): The path of the desired pickle file. If it is not specified, a filename is generated automatically. cycle (Optional[str]): The type of cycle that will be in the output. For example, example values that may be in the data file are either 'Cool' or 'Heat'. If left as None, all cycles will be in the output. states (Optional[str]): One or more comma-separated, two-letter state abbreviations. sensors_file (Optional[str]): Path of metadata file for sensors. Required if there is a states argument. postal_file (Optional[str]): Metadata file for postal codes. Required if there is a states argument. auto (Optional[Boolean]): {'cycles', 'sensors', 'geospatial', None} If one of the data types is specified, the function will detect which columns contain IDs, time stamps and values of interest automatically. If None (default), the order and headings of columns in the delimited text file and the config.ini file should match. id_col_heading (Optional[str]): Indicates the heading in the header for the ID column. cycle_col_heading (Optional[str]): Indicates the heading in the header for the cycle column. cols_to_ignore (Optional[iterable of [str] or [int]]): Column headings or 0-based column indexes that should be left out of the output. encoding (Optional[str]): Encoding of the raw data file. Default: 'UTF-8'. delimiter (Optional[str]): Character to be used as row delimiter. Default is None, but commas, tabs, pipes and spaces are automatically detected in that priority order) if no delimiter is specified. quote (Optional[str]): Characters surrounding data fields. Default is none, but double and single quotes surrounding data fields are automatically detected and removed if they are present in the data rows. If any other character is specified in the keyword argument, and it surrounds data in any column, it will be removed instead. meta (Optional[bool]): An alternative way to store metadata about columns, besides the detect_columns() function. To use it, meta must be True, and a dict of metadata will be created instead of a dict of records. Returns: picklepath (str): Path of output file. """ if states: try: assert sensors_file is not None, postal_file is not None except ValueError: _missing_sensors_or_postal_error_message() return 0 kwargs = dict([('states', states), ('sensors_file', sensors_file), ('cycle', cycle), ('postal_file', postal_file), ('auto', auto), ('id_col_heading', id_col_heading), ('cycle_col_heading', cycle_col_heading), ('cols_to_ignore', cols_to_ignore), ('encoding', encoding), ('delimiter', delimiter), ('quote', quote), ('meta', meta)]) records_or_meta = dict_from_file(raw_file, **kwargs) # Due to testing and the need of temporary directories, # need to convert LocalPath to string if picklepath is None: picklepath = _pickle_filename(raw_file, states=states, auto=auto, encoding=encoding) if '2.7' in sys.version: str_picklepath = unicode(picklepath) else: str_picklepath = str(picklepath) with open(str_picklepath, 'wb') as fout: pickle.dump(records_or_meta, fout, pickle.HIGHEST_PROTOCOL) return str_picklepath
def _pickle_filename(text_file, states=None, auto=None, encoding='UTF-8', delimiter=None, quote=None): """Automatically generate file name based on state(s) and content. Takes a string with two-letter abbreviations for states separated by commas. If all states are desired, states_to_clean should be None. """ header, _ = _header_and_id_col_if_heading_or_preconfig(text_file, encoding=encoding, delimiter=delimiter, quote=quote) data_type = auto if auto else _data_type_matching_header(header) if states: states = states.split(',') else: states = ['all_states'] if '2.7' in sys.version: py_version = 'py27' filename = '_'.join(states + [data_type, py_version]) + '.pickle' else: filename = '_'.join(states + [data_type]) + '.pickle' return filename def _dict_from_lines_of_text(raw_file, **kwargs): """Returns a tuple containing a dict of column meta-data and a dict of records whose keys and values correspond to 1) operating status switching events, 2) sensor data or 3) geospatial data. The keys of headers_functions are tuples containing strings with the column headings from the raw text files. """ if kwargs.get('auto'): # Detect columns containing ID, cool/heat mode and time automatically data_func_map = {'sensors': _clean_sensors_auto_detect, 'cycles': _clean_cycles_auto_detect, 'geospatial': _clean_geospatial_auto_detect} data = kwargs.get('auto') try: cleaning_function = data_func_map[data] except ValueError: print('The data type ' + data + ' is not recognized') else: # Use file definition from config.ini file to specify column headings config_cols_func_map = {SENSOR_FIELDS: _clean_sensors, CYCLE_FIELDS: _clean_cycles, GEOSPATIAL_FIELDS: _clean_geospatial} header = kwargs.get('header') try: cleaning_function = config_cols_func_map[header] except KeyError: print('Header not matched with headers in config.ini file.') records = cleaning_function(raw_file, **kwargs) return records def _clean_cycles_auto_detect(raw_file, **kwargs): args = ['header', 'delimiter', 'cols_meta', 'cycle', 'quote', 'encoding'] header, delimiter, cols_meta, cycle_mode, quote, encoding = (kwargs.get(k) for k in args) clean_args = [raw_file, header, delimiter, cols_meta] thermos_ids = _sensors_ids_in_states(**kwargs) clean_kwargs = {'cycle_mode': cycle_mode, 'thermos_ids': thermos_ids, 'quote': quote, 'encoding': encoding} clean_records = _validate_cycle_records_add_to_dict_auto(*clean_args, **clean_kwargs) return clean_records def _validate_cycle_records_add_to_dict_auto(raw_file, header, delimiter, cols_meta, cycle_mode=None, thermos_ids=None, quote=None, encoding=None): clean_records = {} id_col, start_time_col = (cols_meta[k]['position'] for k in ['id', 'start_time']) id_is_int = _id_is_int(cols_meta) cycle_col = (cols_meta['cycle']['position'] if cols_meta.get('cycle') else None) dt_args = [raw_file, start_time_col, encoding, delimiter, quote, header] datetime_format = _guess_datetime_format_from_first_record(*dt_args) data_cols = _non_index_col_types(cols_meta, dt_format=datetime_format) with open(raw_file, encoding=encoding) as lines: _ = lines.readline() for line in lines: record = _record_from_line(line, delimiter, quote, header) if record and _validate_cycles_auto_record(record, id_col, ids=thermos_ids, cycle_mode=cycle_mode, cycle_col=cycle_col): id_val = _id_val(record, id_col, id_is_int) start_dt = _to_datetime(record[start_time_col], dt_format=datetime_format) # Cycle named tuple declaration is global, in order to ensure # that named tuples using it can be pickled. # Cycle = namedtuple('Cycle', ['device_id', 'cycle_mode', # 'start_time']) multiidcols = Cycle(device_id=id_val, cycle_mode=cycle_mode, start_time=start_dt) end_time_and_other_col_vals = _record_vals(record, data_cols) clean_records[multiidcols] = end_time_and_other_col_vals return clean_records def _guess_datetime_format_from_first_record(raw_file, time_col, encoding, delimiter, quote, header): with open(raw_file, encoding=encoding) as lines: _ = lines.readline() for line in lines: record = _record_from_line(line, delimiter, quote, header) if record: time = record[time_col] datetime_format = _guess_datetime_format(time) break return datetime_format def _validate_cycles_auto_record(record, id_col, ids=None, cycle_mode=None, cycle_col=None): """Validate that record ID is in the set of IDs (if any specified), and that the cycle type matches the specified value (if any has been specified). """ return all([_validate_cycle_mode(record[cycle_col], cycle_mode), _validate_id(record[id_col], ids)]) def _clean_sensors_auto_detect(raw_file, **kwargs): args = ['header', 'delimiter', 'cols_meta', 'quote', 'encoding'] header, delimiter, cols_meta, quote, encoding = (kwargs.get(k) for k in args) clean_args = [raw_file, header, delimiter, cols_meta] thermos_ids = _sensors_ids_in_states(**kwargs) clean_kwargs = {'thermos_ids': thermos_ids, 'quote': quote, 'encoding': encoding} clean_records = _validate_sensors_add_to_dict_auto(*clean_args, **clean_kwargs) return clean_records def _validate_sensors_add_to_dict_auto(raw_file, header, delimiter, cols_meta, thermos_ids=None, quote=None, encoding=None): clean_records = {} id_col, time_col = (cols_meta[k]['position'] for k in ['id', 'time']) id_is_int = _id_is_int(cols_meta) dt_args = [raw_file, time_col, encoding, delimiter, quote, header] datetime_format = _guess_datetime_format_from_first_record(*dt_args) data_cols = _non_index_col_types(cols_meta, dt_format=datetime_format) with open(raw_file, encoding=encoding) as lines: _ = lines.readline() for line in lines: record = _record_from_line(line, delimiter, quote, header) if record and _validate_sensors_auto_record(record, id_col, ids=thermos_ids): # Sensor named tuple declaration is global, in order to ensure # that named tuples using it can be pickled. # Sensor = namedtuple('Sensor', ['sensor_id', 'timestamp']) id_val = _id_val(record, id_col, id_is_int) time = _to_datetime(record[time_col], dt_format=datetime_format) multiidcols = Sensor(sensor_id=id_val, timestamp=time) temp_and_other_vals = _record_vals(record, data_cols) clean_records[multiidcols] = temp_and_other_vals return clean_records def _non_index_col_types(cols_meta, dt_format=None): """Return a list of 2-tuples for non-index data columns. The first element of each tuple is the column index, and the second is a primitive type (int or float), function or None. The primitive type or function will be used to change the types of each data element (or if None, leave them as strings). The list is sorted ascending in the order of the positions of columns.""" if format: if '2.7' in sys.version: _to_datetime.func_defaults = (dt_format,) else: _to_datetime.__defaults__ = (dt_format,) data_cols = set([meta['position'] for k, meta in cols_meta.items() if k not in ['id', 'time', 'cycle', 'start_time']]) type_map = dict([('ints', int), ('floats', float), ('time', _to_datetime), ('numeric_commas', _remove_commas_from_int)]) data_cols_types = dict([(meta['position'], type_map[meta['type']]) for meta in cols_meta.values() if meta['position'] in data_cols and meta['type'] in type_map]) cols_types = [] for col in data_cols: if col in data_cols_types: cols_types.append((col, data_cols_types[col])) elif col in data_cols: cols_types.append((col, None)) return cols_types def _to_datetime(date_str, dt_format=None): return pd.to_datetime(date_str, format=dt_format).to_pydatetime() def _remove_commas_from_int(numeric_string): return int(numeric_string.replace(',', '')) def _validate_sensors_auto_record(record, id_col, ids=None): """Validate that standardized record has expected data content. """ return _validate_id(record[id_col], ids) def _clean_geospatial_auto_detect(raw_file, **kwargs): args = ['header', 'delimiter', 'cols_meta', 'quote', 'encoding'] header, delimiter, cols_meta, quote, encoding = (kwargs.get(k) for k in args) location_ids = _locations_in_states(**kwargs) clean_args = [raw_file, header, delimiter, cols_meta] clean_kwargs = {'location_ids': location_ids, 'quote': quote, 'encoding': encoding} clean_records = _validate_geospatial_add_to_dict_auto(*clean_args, **clean_kwargs) return clean_records def _validate_geospatial_add_to_dict_auto(raw_file, header, delimiter, cols_meta, location_ids=None, quote=None, encoding=None): clean_records = {} id_col, time_col = (cols_meta[k]['position'] for k in ['id', 'time']) id_is_int = _id_is_int(cols_meta) dt_args = [raw_file, time_col, encoding, delimiter, quote, header] datetime_format = _guess_datetime_format_from_first_record(*dt_args) data_cols = _non_index_col_types(cols_meta, dt_format=datetime_format) with open(raw_file, encoding=encoding) as lines: _ = lines.readline() for line in lines: record = _record_from_line(line, delimiter, quote, header) if record and _validate_geospatial_auto_record(record, id_col, ids=location_ids): # Geospatial named tuple declared globally to enable pickling. # The following is here for reference. # Geospatial = namedtuple('Geospatial', ['location_id', 'timestamp']) id_val = _id_val(record, id_col, id_is_int) time = _to_datetime(record[time_col], dt_format=datetime_format) multiidcols = Geospatial(location_id=id_val, timestamp=time) temp_and_other_vals = _record_vals(record, data_cols) clean_records[multiidcols] = temp_and_other_vals return clean_records def _validate_geospatial_auto_record(record, id_col, ids=None): """Validate that standardized record has expected data content. """ return _validate_id(record[id_col], ids) def _record_vals(record, col_conversions): record_vals = [] for col, convert_func in col_conversions: if convert_func: record_vals.append(convert_func(record[col])) else: record_vals.append(record[col]) if len(record_vals) > 1: return tuple(record_vals) else: return record_vals[0] def _analyze_all_columns(raw_file, header, encoding='UTF-8', delimiter=None, quote=None, id_col=None, cycle=None, auto=None, cols_to_ignore=None, cycle_col_heading=None): """Creates NumPy array with first 1,000 lines containing numeric data.""" with open(raw_file, encoding=encoding) as lines: _ = lines.readline() delimiter, quote = _determine_delimiter_and_quote(lines, delimiter, quote, auto=auto) cycle_col = _detect_cycle_col(raw_file, header, cycle, delimiter, auto=auto, encoding=encoding, quote=quote, cycle_col_heading=cycle_col_heading) sample_kwargs = {'quote': quote, 'encoding': encoding} timestamp_cols = _detect_time_stamps(raw_file, header, delimiter, **sample_kwargs) data_cols = _detect_column_data_types(raw_file, header, timestamp_cols, delimiter, cols_to_ignore, **sample_kwargs) id_other_cols = _detect_id_other_cols(raw_file, header, timestamp_cols, data_cols, id_col=id_col, cycle_col=cycle_col, delimiter=delimiter, encoding=encoding, quote=quote) cols_meta = _create_col_meta(header, id_other_cols, timestamp_cols, cols_to_ignore, cycle_col=cycle_col) return cols_meta, delimiter, quote def _select_sample_records(raw_file, header, encoding=None, delimiter=None, quote=None): with open(raw_file, encoding=encoding) as lines: _ = lines.readline() delimiter, quote = _determine_delimiter_and_quote(lines, delimiter, quote) sample_records = [] with open(raw_file, encoding=encoding) as lines: _ = lines.readline() for line in lines: record = _record_from_line(line, delimiter, quote, header) if record: sample_records.append(record) else: continue if len(sample_records) == 1000: break sample_record_array = np.array(sample_records) return sample_record_array, delimiter, quote def _determine_delimiter_and_quote(lines, delimiter, quote, auto=None): for i, line in enumerate(lines): if not _contains_digits(line): continue if delimiter is None: delimiter = _determine_delimiter(line, auto=auto) if quote is None: quote = _determine_quote(line) if delimiter and quote: break if i == 100: break return delimiter, quote def _record_has_all_expected_columns(record, header): if len(record) == len(header) and all(record): return True else: return False def _detect_time_stamps(raw_file, header, delimiter, cycle_col=None, quote=None, encoding=None): """Return column index of first and (for cycle data) second time stamp.""" first_time_stamp_col = None second_time_stamp_col = None with open(raw_file, encoding=encoding) as f: _ = f.readline() for line in f: if _contains_digits(line): record = _parse_line(line, delimiter, quote) if _record_has_all_expected_columns(record, header): break for col, val in enumerate(record): if (any([':' in val, '/' in val, '-' in val]) and _validate_time_stamp(val)): if first_time_stamp_col is None and col != cycle_col: first_time_stamp_col = col elif col != cycle_col: second_time_stamp_col = col break return [first_time_stamp_col, second_time_stamp_col] def _detect_column_data_types(raw_file, header, timestamp_cols, delimiter, cols_to_ignore, quote=None, encoding='UTF-8'): """Returns dict containing lists of column indexes that are not assigned as the ID column, cycle column, or time stamp. """ with open(raw_file, encoding=encoding) as lines: _ = lines.readline() columns_to_detect = _non_time_cols(header, timestamp_cols, cols_to_ignore) grouped = _determine_types_of_non_time_cols(columns_to_detect, lines, delimiter, quote, header) return grouped def _record_from_line(line, delimiter, quote, header): if _contains_digits(line): record = _parse_line(line, delimiter, quote) if _record_has_all_expected_columns(record, header): return record else: return None else: return None def _non_time_cols(header, timestamp_cols, cols_to_ignore): time_columns = set(_timestamp_columns(timestamp_cols)) if cols_to_ignore is None: ignoring = set() elif isinstance(cols_to_ignore[0], int): ignoring = set(cols_to_ignore) elif isinstance(cols_to_ignore[0], str): col_indexes_ignoring = [] for heading in cols_to_ignore: col_indexes_ignoring.append(_column_index_of_string(header, heading)) ignoring = set(col_indexes_ignoring) return set(range(len(header))) - time_columns - ignoring def _timestamp_columns(timestamp_cols): reserved_columns = [col for col in timestamp_cols if col is not None] return reserved_columns def _determine_types_of_non_time_cols(columns, lines, delimiter, quote, header): """Returns dict with lists as values. The lists contain column indexes that have not been assigned to the ID column, cycle column, or time stamps. """ int_records_found = {} possible_zips = {} float_cols = [] numeric_containing_commas = [] alphanumeric_cols = [] alpha_only_cols = [] zip_plus_4_cols = [] columns_assigned = [] for i, line in enumerate(lines): record = _record_from_line(line, delimiter, quote, header) if record: for col in columns: if col in columns_assigned: continue val = record[col] if val[0] == 0 and val[1] != '.': alphanumeric_cols.append(col) elif ',' in val: if _numeric_containing_commas(val): numeric_containing_commas.append(col) else: alphanumeric_cols.append(col) elif _has_form_of_5_digit_zip(val): possible_zips[col] = 1 elif _has_form_of_zip_plus_4_code(val): zip_plus_4_cols.append(val) elif _is_numeric(val): if _is_float(val): float_cols.append(col) else: int_records_found[col] = 1 elif _contains_digits(val): alphanumeric_cols.append(col) else: alpha_only_cols.append(col) columns_assigned = (float_cols + numeric_containing_commas + zip_plus_4_cols + alphanumeric_cols + alpha_only_cols) for col in int_records_found.keys(): if col in possible_zips.keys(): possible_zips.pop(col) for col in float_cols: if col in int_records_found.keys(): int_records_found.pop(col) cols_grouped = {group: cols for group, cols in [('floats', float_cols), ('ints', list(int_records_found.keys())), ('numeric_commas', numeric_containing_commas), ('alphanumeric', alphanumeric_cols), ('alpha_only', alpha_only_cols), ('possible_zips', list(possible_zips.keys()))] if cols} return cols_grouped def _is_float(val): try: assert isinstance(int(val), int) except ValueError: try: assert isinstance(float(val), float) except ValueError: return False else: return True else: return False def _has_form_of_5_digit_zip(val): if len(val) == 5 and val.isdigit(): return True def _has_form_of_zip_plus_4_code(val): if len(val) == 10 and val[5] == '-' and val.replace('-', '').isdigit(): return True else: return False def _detect_id_other_cols(raw_file, header, timestamp_cols, data_cols, cycle_col=None, id_col=None, delimiter=None, quote=None, encoding=None): if cycle_col: data_cols['cycle_col'] = cycle_col if id_col: data_cols['id_col'] = id_col return data_cols elif data_cols.get('alphanumeric') and len(data_cols['alphanumeric']) == 1: data_cols['id_col'] = data_cols['alphanumeric'][0] return data_cols else: sample_records = [] with open(raw_file, encoding=encoding) as lines: _ = lines.readline() for i, line in enumerate(lines): record = _record_from_line(line, delimiter, quote, header) if record: possible_id_col_data = _data_in_possible_id_cols(record, data_cols) # possible_id_col_contains 2-tuple in which each part # contains either a list of ints (data) or None sample_records.append(possible_id_col_data) if i == 1000: break sample_arr = np.array(sample_records) possible_id_col_indexes = [data_cols[cols] for cols in ['alphanumeric', 'ints'] if data_cols.get(cols)] # Detect ID column based on headings in columns if (_contains_id_heading(header, 0) and _contains_id_heading(header, 1)): id_col = _primary_id_col_from_two_fields(sample_arr) data_cols['id_col'] = id_col elif _contains_id_heading(header, 0): data_cols['id_col'] = 0 if id_col in [col for col in [timestamp_cols + [cycle_col]] if col is not None]: data_cols['id_col'] = None cols_with_max_val_over_150 = [] col_of_max_val, max_val = None, 0 col_of_max_std, max_std = None, 0 for col in possible_id_col_indexes: if np.amax(sample_arr[:, col]) > 150: cols_with_max_val_over_150.append(col) args = [col, sample_arr, np.amax, col_of_max_val, max_val] col_of_max_val, max_val = _compare_with_max(*args) args = [col, sample_arr, np.std, col_of_max_std, max_std] col_of_max_std, max_std = _compare_with_max(*args) if len(cols_with_max_val_over_150) == 1: id_col = cols_with_max_val_over_150[0] elif len(cols_with_max_val_over_150) > 1: id_col = col_of_max_val else: id_col = col_of_max_std data_cols['id_col'] = id_col return data_cols def _remove_alphas(val): for char in val: if not char.isdigit(): val = val.replace(char, '') if len(val): return val else: return None def _is_numeric(val): try: assert isinstance(float(val), float) except ValueError: return False else: return True def _numeric_containing_commas(val): ones_tens_and_greater = val.split('.')[0] split_by_commas = ones_tens_and_greater.split(',') first_group = split_by_commas[0] if len(first_group) > 3 and first_group.isdigit() and first_group[0] != '0': return False for group in split_by_commas[1:]: if len(group) == 3 and group.isdigit(): continue else: return False decimal_part = val.split('.')[-1] if decimal_part.isdigit(): return True else: return False def _data_in_possible_id_cols(record, data_cols): if data_cols.get('alphanumeric'): alphanumeric_ints = [int(_remove_alphas(record[col_index]) for col_index in data_cols['alphanumeric'])] else: alphanumeric_ints = None if data_cols.get('ints'): ints = [int(record[col_index]) for col_index in data_cols['ints']] else: ints = None return alphanumeric_ints, ints def _compare_with_max(col, sample_records, func, current_max_col, current_max): column_vals = sample_records[:, col] column_func_result = func(column_vals) if column_func_result > current_max: current_max = column_func_result current_max_col = col return current_max_col, current_max def _detect_mixed_alpha_numeric_id_col(alphanumeric_cols, header, sample_records): id_col = None for col in alphanumeric_cols: if _contains_id_heading(header, col): id_col = col break if id_col is None: id_col = _col_with_alpha_or_alphas_in_string(sample_records, header, alphanumeric_cols) return id_col def _detect_cycle_col(raw_file, header, cycle_mode, delimiter, auto=None, cycle_col_heading=None, quote=None, encoding=None): if auto and auto != 'cycles': return None if cycle_col_heading: with open(raw_file, encoding=encoding) as f: first_line = f.readline() delimiter = _determine_delimiter(first_line) quote = _determine_quote(first_line) header = _parse_line(first_line, delimiter, quote) cycle_col = _column_index_of_string(header, cycle_col_heading) else: cycle_col = None if cycle_mode: with open(raw_file, encoding=encoding) as lines: _ = lines.readline() cycle_col = _cycle_col_in_records(lines, header, delimiter, cycle_mode, quote=quote) if cycle_col is None: raise ValueError('No column found containing value ' + cycle_mode) return cycle_col def _cycle_col_in_records(lines, header, delimiter, cycle, quote=None): cycle_col = None for line in lines: record = _record_from_line(line, delimiter, quote, header) if cycle in record: cycle_col = record.index(cycle) break if cycle_col is None: msg = ('No column found containing value \'' + cycle + '\'\n') raise ValueError(msg) return cycle_col def _contains_id_heading(header, col_index): if 'ID' in header[col_index].upper(): return True else: return False def _primary_id_col_from_two_fields(sample_records): ids_in_col_0 = _ids_in_col(sample_records, 0) ids_in_col_1 = _ids_in_col(sample_records, 1) if len(ids_in_col_0) >= len(ids_in_col_1): id_col = 0 else: id_col = 1 return id_col def _ids_in_col(sample_records, col): ids_with_repeats = sample_records[:, col] id_arr = np.unique(ids_with_repeats) return id_arr def _col_with_alpha_or_alphas_in_string(records, header, alphanumeric_cols): """Returns index of first column with both non-digits and digits if one exists. Otherwise, returns None.""" header_len = len(header) record = records[0, :] col = _col_containing_digit_or_digits(record, header_len, alphanumeric_cols) return col def _col_containing_digit_or_digits(record, header_len, alphanumeric_cols): """Returns index of column in record with both non-digits and digits if one exists. Otherwise, returns None.""" if record.shape[1] == header_len: for col in alphanumeric_cols: for char in record[col]: if char.isdigit(): return col return None def _validate_cycle_mode(cycle, cycle_mode): if cycle is None: return True elif cycle == cycle_mode: return True # Cycle mode does not match the mode specified in the kwargs else: return False def _validate_id(id, ids): if any([ids is not None and id in ids, ids is None]): return True return False def _validate_time_stamp(time_string): try: kwarg = {'infer_datetime_format': True} assert isinstance(pd.to_datetime(time_string, **kwarg), dt.datetime) except ValueError: return False else: return True def _header_and_id_col_if_heading_or_preconfig(raw_file, encoding='UTF-8', cycle=None, delimiter=None, id_col_heading=None, auto=None, quote=None, is_postal_file=None, is_sensors_file=None): id_col_index = None with open(raw_file, encoding=encoding) as f: header = f.readline() if id_col_heading: kwargs = {'delimiter': delimiter, 'quote': quote} id_col_and_more = _id_col_delim_quote_from_id_heading(header, id_col_heading, **kwargs) id_col_index, delimiter, quote = id_col_and_more else: quote = _determine_quote(header, quote=quote) delimiter = _determine_delimiter(header, auto=auto, cycle=cycle, id_col_heading=id_col_heading, quote=quote) header = _parse_line(header, delimiter, quote) if is_postal_file or is_sensors_file: return header, id_col_index if (id_col_heading is None) and (auto is None): id_col_index = _id_col_index_for_preconfig_non_auto_file_format(header) return header, id_col_index def _remove_newline_and_any_trailing_delimiter(line, delimiter=None): return line.rstrip(delimiter + '\n') def _id_col_delim_quote_from_id_heading(line, id_col_heading, quote=None, delimiter=None): quote = _determine_quote(line, quote=quote) if delimiter is None: delimiter = _determine_delimiter(line, id_col_heading=id_col_heading, quote=quote) header = _parse_line(line, delimiter, quote) if len(header) < 3: raise ValueError('Only ', len(header), ' columns detected based ' 'on ', delimiter, ' as delimiter.') else: col_index = _column_index_of_string(header, id_col_heading) return col_index, delimiter, quote def _column_index_of_string(line, string): if string in line: return line.index(string) else: raise NameError('Please check the string argument, ', string, ', ' 'it was not found.') def _parse_line(line, delimiter, quote): line = _remove_newline_and_any_trailing_delimiter(line, delimiter=delimiter) if _character_found_in_line(line, delimiter): parsed_line = csv.reader([line], delimiter=delimiter, quotechar=quote, skipinitialspace=True) return tuple(list(parsed_line)[0]) else: msg = 'Delimiter specified, ' + delimiter + ', not found in header.' raise ValueError(msg) def _id_col_index_for_preconfig_non_auto_file_format(header): config_cols_func_map = {SENSOR_FIELDS: SENSOR_ID_INDEX, CYCLE_FIELDS: CYCLE_ID_INDEX, GEOSPATIAL_FIELDS: GEOSPATIAL_ID_INDEX} try: id_col_index = config_cols_func_map[header] except KeyError: print('Header not matched in id_col_index with headers in config.ini file.') return id_col_index def _remove_commas_from_numeric_strings(items, delimiter, quote=None): for i, val in enumerate(items): if ',' in val: if _numeric_containing_commas(val): items[i] = val.replace(',', '') elif delimiter == ',' and quote: items[i] = quote + val + quote items = tuple(items) return items def _create_col_meta(header, id_other_cols, time_stamps, cols_to_ignore, cycle_col=None): meta = _col_indexes_and_types_for_meta(id_other_cols, time_stamps, header, cols_to_ignore, cycle_col=cycle_col) cols_meta = {} for m in meta: cols_meta[m[0]] = {'heading': m[3], 'type': m[2], 'position': m[1]} return cols_meta def _col_indexes_and_types_for_meta(id_other_cols, time_stamps, header, cols_to_ignore, cycle_col=None): """Takes list of tuples containing lists of column indexes and the corresponding type labels.""" # For the non-required columns, using heading labels as keys # Records contain: 1) key 2) position 3) type(general), 4) label meta = [] id_col = id_other_cols['id_col'] id_type = _data_type_in_col(id_col, id_other_cols) meta.append(_create_meta_for_col('id', id_col, id_type, header)) if cycle_col: cycle_type = _data_type_in_col(cycle_col, id_other_cols) meta.append(_create_meta_for_col('cycle', cycle_col, cycle_type, header)) if time_stamps[1] is not None: meta.append(_create_meta_for_col('start_time', time_stamps[0], 'time', header)) meta.append(_create_meta_for_col('end_time', time_stamps[1], 'time', header)) else: meta.append(_create_meta_for_col('time', time_stamps[0], 'time', header)) for col in _non_time_cols(header, time_stamps, cols_to_ignore): if col in [c for c in [id_col, cycle_col] if c is not None]: continue data_cat = _get_data_category_of_column(col, id_other_cols) meta.append(_create_meta_for_col(header[col], col, data_cat, header)) return meta def _data_type_in_col(col_index, id_other_cols): for data_type, meta in id_other_cols.items(): if isinstance(meta, list) and col_index in meta: return data_type def _create_meta_for_col(key, position_in_header, data_category, header): return key, position_in_header, data_category, header[position_in_header] def _get_data_category_of_column(col, id_other_cols): for k, v in id_other_cols.items(): if isinstance(v, list) and col in v: return k def _determine_delimiter(line, id_col_heading=None, cycle=None, auto=None, quote=None, header=None): quote = _determine_quote(line, quote=quote) comma_possible_delimiter = False non_comma_delimiters = [] delim_kwargs = {'header': header, 'auto': auto, 'cycle': cycle, 'id_col_heading': id_col_heading} if _delimiter_gives_minimum_number_of_columns(line, ',', quote, **delim_kwargs): comma_possible_delimiter = True for d in ['\t', '|']: if _delimiter_gives_minimum_number_of_columns(line, d, quote, **delim_kwargs): non_comma_delimiters.append(d) if not any([comma_possible_delimiter, non_comma_delimiters]): if _delimiter_gives_minimum_number_of_columns(line, ' ', quote, **delim_kwargs): non_comma_delimiters.append(' ') return _only_possible_delimiter_or_raise_error(comma_possible_delimiter, non_comma_delimiters) def _only_possible_delimiter_or_raise_error(comma_possible, non_comma_possible): delims = [(',', 'Commas'), ('\t', 'Tabs'), ('|', 'Pipes'), (' ', 'Spaces')] if comma_possible: if non_comma_possible: delimiter_chars = [','] + non_comma_possible delimiters = {d: desc for d, desc in delims if d in delimiter_chars} _multiple_possible_delimiters(delimiters) else: return ',' else: if len(non_comma_possible) > 1: delimiters = {d: desc for d, desc in delims if d in non_comma_possible} _multiple_possible_delimiters(delimiters) elif len(non_comma_possible) == 1: return non_comma_possible[0] else: raise ValueError('Header does not appear to have commas (\',\'), ' 'tabs (\'\t\'), pipes (\'|\') or spaces (\' \') ' 'as delimiters. Please specify.') def _multiple_possible_delimiters(delimiters): print('The following is a (or are) possible delimiter(s): ') for d in delimiters: print(delimiters[d], ': ', d) raise ValueError('Specify \'delimiter=\' in keyword arguments') def _delimiter_gives_minimum_number_of_columns(line, delimiter, quote, header=None, auto=None, cycle=None, id_col_heading=None): if delimiter is not None and delimiter == quote: raise ValueError('Delimiter ', delimiter, ' and quote character ', quote, ' are the same.') # If the first line is being parsed, the header argument is None by default. if not _character_found_in_line(line, delimiter): return False else: line = line.rstrip(delimiter + '\n') testdelim = csv.reader([line], delimiter=delimiter, quotechar=quote, skipinitialspace=True) parsed_line = list(testdelim)[0] if header is None: if id_col_heading: assert _column_index_of_string(parsed_line, id_col_heading) return _minimum_number_of_columns_exist(parsed_line) else: if auto or cycle: ets = _expected_time_stamps(auto, cycle) else: ets = None et = {'expected_time_stamps': ets} return _minimum_number_of_columns_exist(parsed_line, **et) def _character_found_in_line(line, char): delimre = re.compile(char) return bool(delimre.search(line)) def _expected_time_stamps(auto, cycle=None): if auto in ('sensors', 'geospatial'): return 1 elif auto == 'cycles' or cycle: return 2 else: raise ValueError('Value of auto argument (\', auto, \') not found.\n \ Should be \'sensors\', \'geospatial\', or \'cycles\'') def _minimum_number_of_columns_exist(csv_reader_out, expected_time_stamps=None): if len(list(csv_reader_out)) >= 3: if expected_time_stamps: # Data row is being parsed if _number_of_time_stamps_matches(csv_reader_out, expected_time_stamps): return True else: return False # The header is being parsed and there are no time stamp values else: return True else: return False def _number_of_time_stamps_matches(parsed_line, num_expected_time_stamps): time_stamps = 0 for val in parsed_line: if _validate_time_stamp(val): time_stamps += 1 if time_stamps == num_expected_time_stamps: return True else: return False def _determine_quote(line, quote=None): quotech = None if quote: quotere = re.compile(quote) if bool(quotere.search(line)): quotech = quote return quotech else: msg = quote + ' not found as quote. Check if quote character needs to be ' \ 'escaped with a \\. \n \" and \' are detected automatically.' raise ValueError(msg) for q in ['\"', '\'']: quotere = re.compile(q) if bool(quotere.search(line)): quotech = q break return quotech def _sensors_ids_in_states(**kwargs): if kwargs.get('states'): sensors_ids = (_sensors_states_df(**kwargs) .index .unique() .ravel() .astype(np.unicode)) else: sensors_ids = None return sensors_ids def _contains_digits(line): digits = re.compile('\d') return bool(digits.search(line)) def _missing_sensors_or_postal_error_message(): print('State(s) specified but sensors and/or postal codes not ' 'specified.') # cleanthermo / fixedautohelpers def _sensors_states_df(**kwargs): """Returns pandas dataframe with sensor metadata and location information for sensors in specified states. """ postal_file, sensors_file = (kwargs.get(k) for k in ['postal_file', 'sensors_file']) states = (kwargs.get('states')).split(',') auto = kwargs.get('auto') if kwargs.get('auto') else None zip_codes_df = _zip_codes_in_states(postal_file, states, auto) thermos_df = _sensors_df(sensors_file, auto) header_kwargs = {'is_sensors_file': True} header, _ = _header_and_id_col_if_heading_or_preconfig(sensors_file, **header_kwargs) zip_heading = _label_of_col_containing_string_lower_upper_title(header, 'zip') sensors_states_df = pd.merge(thermos_df, zip_codes_df, how='inner', left_on=zip_heading, right_index=True) return sensors_states_df def _zip_codes_in_states(postal_file, states, auto): """Returns pandas dataframe based on postal code metadata file, for states specified as list. """ header, _ = _header_and_id_col_if_heading_or_preconfig(postal_file, is_postal_file=True) if auto: zip_col = _index_of_col_with_string_in_lower_upper_or_title(header, 'zip') if zip_col is None: zip_col = _index_of_col_with_string_in_lower_upper_or_title(header, 'post') else: zip_col = _index_of_col_with_string_in_lower_upper_or_title(header, POSTAL_FILE_ZIP) zip_col_label = header[zip_col] dtype_zip_code = {zip_col_label: 'str'} if os.path.splitext(postal_file)[1] == '.csv': zips_default_index_df = pd.read_csv(postal_file, dtype=dtype_zip_code) else: zips_default_index_df = pd.read_table(postal_file, dtype=dtype_zip_code) zips_default_index_df[zip_col_label] = zips_default_index_df[zip_col_label]\ .str.pad(5, side='left', fillchar='0') zips_unfiltered_df = zips_default_index_df.set_index([zip_col_label]) state_filter = zips_unfiltered_df[POSTAL_TWO_LETTER_STATE].isin(states) zip_codes_df = zips_unfiltered_df.loc[state_filter] return zip_codes_df def _sensors_df(sensors_file, auto, encoding='UTF-8', delimiter=None): """Returns pandas dataframe of sensor metadata from raw file.""" kwargs = {'encoding': encoding, 'is_sensors_file': True} header, _ = _header_and_id_col_if_heading_or_preconfig(sensors_file, **kwargs) sample_records, _, _ = _select_sample_records(sensors_file, header, encoding=encoding) if auto: zip_col = _index_of_col_with_string_in_lower_upper_or_title(header, 'zip') zip_col_label = header[zip_col] if _contains_id_heading(header, 0) and _contains_id_heading(header, 1): id_col = _primary_id_col_from_two_fields(sample_records) else: id_col = _index_of_col_with_string_in_lower_upper_or_title(header, 'id') if id_col is None: raise ValueError('No column found in sensors file with label ' 'containing \'id\', \'Id\', or \'ID\'.') id_col_heading = header[id_col] else: zip_col_label = SENSOR_ZIP_CODE id_col_heading = SENSOR_DEVICE_ID dtype_sensor = {zip_col_label: 'str', id_col_heading: 'str'} if os.path.splitext(sensors_file)[1] == '.csv': thermos_df = pd.read_csv(sensors_file, dtype=dtype_sensor) else: thermos_df = pd.read_table(sensors_file, dtype=dtype_sensor) thermos_df.set_index(keys=id_col_heading, inplace=True) thermos_df[zip_col_label] = thermos_df[zip_col_label].str.pad(5, side='left', fillchar='0') return thermos_df def _label_of_col_containing_string_lower_upper_title(header, string): index = _index_of_col_with_string_in_lower_upper_or_title(header, string) if index: return header[index] else: return None def _index_of_col_with_string_in_lower_upper_or_title(header, string): for col, val in enumerate(header): label = header[col] if any([(s in label) for s in [string, string.title(), string.upper()]]): return col return None def _locations_in_states(**kwargs): """Returns location IDs for locations in specified states.""" if kwargs.get('states'): thermos_states_df = _sensors_states_df(**kwargs) location_ids_in_states = (thermos_states_df[SENSOR_LOCATION_ID] .unique() .ravel() .astype(np.unicode)) else: location_ids_in_states = None return location_ids_in_states # Fixed (static) file format handling def _data_type_matching_header(header): """Returns a string indicating the type of data corresponding to a header in a text file, where the header is a comma-separated string in which each element is itself a string. """ if SENSOR_ID_FIELD in header: data_type = 'sensors' field_data_mapping = {UNIQUE_CYCLE_FIELD_INDEX: 'cycles', UNIQUE_GEOSPATIAL_FIELD: 'geospatial'} fields_as_keys = set(field_data_mapping.keys()) field_in_header = set.intersection(fields_as_keys, set(header)) if len(field_in_header): field = field_in_header.pop() data_type = field_data_mapping[field] return data_type def _clean_cycles(raw_file, **kwargs): """Returns dict for cycling start and end times for sensors, which may be filtered using 'states' parameter, a string that is a comma-separated series of state abbreviations. """ clean_records = {} args = ['states', 'cycle', 'delimiter', 'quote', 'header', 'cols_meta'] states, cycle, delimiter, quote, header, cols_meta = (kwargs.get(k) for k in args) id_col = _id_col_position(cols_meta) id_is_int = _id_is_int(cols_meta) data_cols = _non_index_col_types(cols_meta) if states: thermos_ids = _sensors_ids_in_states(**kwargs) with open(raw_file, encoding=kwargs.get('encoding')) as lines: _ = lines.readline() for line in lines: record = _record_from_line(line, delimiter, quote, header) if record and all(_validate_cycles_record(record, ids=thermos_ids, cycle=cycle)): # Cycle named tuple declaration is global, in order to ensure # that named tuples using it can be pickled. # Cycle = namedtuple('Cycle', ['device_id', 'cycle_mode', # 'start_time']) id_val = _id_val(record, id_col, id_is_int) multicols = Cycle(device_id=id_val, cycle_mode=_cycle_type(record), start_time=_start_cycle(record)) clean_records[multicols] = _record_vals(record, data_cols) else: clean_records = _clean_cycles_all_states(raw_file, **kwargs) return clean_records def _clean_sensors(raw_file, **kwargs): """Returns dict for sensor data, which may be filtered using 'states' parameter, a string that is a comma-separated series of state abbreviations. """ clean_records = {} args = ['states', 'header', 'delimiter', 'quote', 'cols_meta', 'encoding'] states, header, delimiter, quote, cols_meta, encoding = (kwargs.get(k) for k in args) id_is_int = _id_is_int(cols_meta) id_col = _id_col_position(cols_meta) with open(raw_file, encoding=encoding) as lines: _ = lines.readline() if states: thermos_ids = _sensors_states_df(**kwargs).index.ravel() for line in lines: record = _record_from_line(line, delimiter, quote, header) if record and all(_validate_sensors_record(record, ids=thermos_ids)): # Sensor named tuple declared globally, to enable pickling to # work. # Sensor = namedtuple('Sensor', ['sensor_id', 'timestamp']) id_val = _id_val(record, id_col, id_is_int) multicols = Sensor(sensor_id=id_val, timestamp=_sensor_timestamp(record)) clean_records[multicols] = _sensor_observation(record) else: clean_records = _clean_sensors_all_states(raw_file, **kwargs) return clean_records def _clean_sensors_all_states(raw_file, **kwargs): """Returns dict for observations recorded by sensors, regardless of state.""" clean_records = {} args = ['header', 'delimiter', 'quote', 'encoding', 'cols_meta'] header, delimiter, quote, encoding, cols_meta = (kwargs.get(k) for k in args) id_col = _id_col_position(cols_meta) id_is_int = _id_is_int(cols_meta) with open(raw_file, encoding=encoding) as lines: _ = lines.readline() for line in lines: record = _record_from_line(line, delimiter, quote, header) if record: # Sensor named tuple declaration is global, in order to ensure that # named tuples using it can be pickled. # # Sensor = namedtuple('Sensor', ['sensor_id', 'timestamp']) id_val = _id_val(record, id_col, id_is_int) multicols = Sensor(sensor_id=id_val, timestamp=_sensor_timestamp(record)) clean_records[multicols] = _sensor_observation(record) return clean_records def _validate_sensors_record(record, ids=None): """Validate that line of text file containing indoor temperatures data has expected data content. """ if ids is not None: yield _leading_id(record) in ids def _clean_cycles_all_states(raw_file, **kwargs): """Returns dict for cycle start and end times of sensors, regardless of state. """ clean_records = {} args = ['cycle', 'header', 'delimiter', 'quote', 'encoding', 'cols_meta'] cycle, header, delimiter, quote, encoding, cols_meta = (kwargs.get(k) for k in args) id_col = _id_col_position(cols_meta) id_is_int = _id_is_int(cols_meta) data_cols = _non_index_col_types(cols_meta) with open(raw_file, encoding=encoding) as lines: _ = lines.readline() for line in lines: record = _record_from_line(line, delimiter, quote, header) if record and all(_validate_cycles_record(record, cycle=cycle)): # Cycle named tuple declaration is global, in order to ensure that # named tuples using it can be pickled. # Cycle = namedtuple('Cycle', ['device_id', 'cycle_mode', # 'start_time']) id_val = _id_val(record, id_col, id_is_int) multicols = Cycle(device_id=id_val, cycle_mode=_cycle_type(record), start_time=_start_cycle(record)) clean_records[multicols] = _record_vals(record, data_cols) return clean_records def _validate_cycles_record(record, ids=None, cycle=None): """Validate that line of text file containing cycing data has expected data content. """ if ids is not None: yield _leading_id(record) in ids if cycle: yield _cycle_type(record) == cycle def _clean_geospatial(raw_file, **kwargs): """Returns dict for outdoor temperatures by location, which may be filtered using 'states' parameter, a string that is a comma-separated series of state abbreviations. """ clean_records = {} args = ['states', 'delimiter', 'quote', 'header', 'cols_meta', 'encoding'] states, delimiter, quote, header, cols_meta, encoding = (kwargs.get(k) for k in args) id_is_int = _id_is_int(cols_meta) id_col = _id_col_position(cols_meta) if states: location_ids = _locations_in_states(**kwargs) with open(raw_file, encoding=encoding) as lines: _ = lines.readline() for line in lines: record = _record_from_line(line, delimiter, quote, header) if record and all(_validate_geospatial_record(record, ids=location_ids)): id_val = _id_val(record, id_col, id_is_int) multicols = Geospatial(location_id=id_val, timestamp=_geospatial_timestamp(record)) clean_records[multicols] = _geospatial_obs(record) else: clean_records = _clean_geospatial_all_states(raw_file, **kwargs) return clean_records def _clean_geospatial_all_states(raw_file, **kwargs): """Returns dict for outdoor temperatures by location, regardless of state. """ clean_records = {} args = ['delimiter', 'quote', 'header', 'encoding', 'cols_meta'] delimiter, quote, header, encoding, cols_meta = (kwargs.get(k) for k in args) id_is_int = _id_is_int(cols_meta) id_col = _id_col_position(cols_meta) with open(raw_file, encoding=encoding) as lines: _ = lines.readline() for line in lines: record = _record_from_line(line, delimiter, quote, header) if record: # Geospatial named tuple declared globally to enable pickling. # The following is here for reference. # Geospatial = namedtuple('Geospatial', ['location_id', 'timestamp']) id_val = _id_val(record, id_col, id_is_int) multicols = Geospatial(location_id=id_val, timestamp=_geospatial_timestamp(record)) clean_records[multicols] = _geospatial_obs(record) return clean_records def _id_col_position(cols_meta): return cols_meta['id']['position'] def _id_is_int(cols_meta): id_is_int = True if cols_meta['id']['type'] == 'ints' else False return id_is_int def _id_val(record, id_col, id_is_int): id_val = int(record[id_col]) if id_is_int else record[id_col] return id_val def _validate_geospatial_record(record, ids=None): """Validate that line of text file containing outdoor temperatures data has expected content. """ if ids is not None: yield _leading_id(record) in ids def _leading_id(record): return record[0] def _cycle_type(record): return record[CYCLE_TYPE_INDEX] def _start_cycle(record): return record[CYCLE_START_INDEX] def _cycle_record_vals(record): start = CYCLE_END_TIME_INDEX end = len(CYCLE_FIELDS) return record[start:end] def _sensor_timestamp(record): timestamp_position = SENSORS_LOG_DATE_INDEX return record[timestamp_position] def _numeric_leads(record): """Returns True if all characters in the leading string element in a sequence are digits. """ return True if record[0].isdigit() else False def _sensor_observation(record): degrees_position = SENSORS_DATA_INDEX return record[degrees_position] def _inside_rec_len(record): return True if len(record) == len(SENSOR_FIELDS) else False def _geospatial_timestamp(record): timestamp_position = GEOSPATIAL_LOG_DATE_INDEX return record[timestamp_position] def _geospatial_obs(record): degrees_position = GEOSPATIAL_OBSERVATION_INDEX return record[degrees_position]