Source code for nilmtk.results

import abc
import pandas as pd
import copy
from .timeframe import TimeFrame
from nilmtk.utils import get_tz, tz_localize_naive

[docs]class Results(object): """Stats results from each node need to be assigned to a specific class so we know how to combine results from multiple chunks. For example, Energy can be simply summed; while dropout rate should be averaged, and gaps need to be merged across chunk boundaries. Results objects contain a DataFrame, the index of which is the start timestamp for which the results are valid; the first column ('end') is the end timestamp for which the results are valid. Other columns are accumulators for the results. Attributes ---------- _data : DataFrame Index is period start. Columns are: `end` and any columns for internal storage of stats. Static Attributes ----------------- name : str The string used to cache this results object. """ __metaclass__ = abc.ABCMeta def __init__(self): self._data = pd.DataFrame(columns=['end'])
[docs] def combined(self): """Return all results from each chunk combined. Either return single float for all periods or a dict where necessary, e.g. if calculating Energy for a meter which records both apparent power and active power then get active power with energyresults.combined['active'] """ return self._data[self._columns_with_end_removed()].sum()
[docs] def per_period(self): """return a DataFrame. Index is period start. Columns are: end and <stat name> """ return copy.deepcopy(self._data)
[docs] def simple(self): """Returns the simplest representation of the results.""" return self.combined()
[docs] def append(self, timeframe, new_results): """Append a single result. Parameters ---------- timeframe : nilmtk.TimeFrame new_results : dict """ if not isinstance(timeframe, TimeFrame): raise TypeError("`timeframe` must be of type 'nilmtk.TimeFrame'," " not '{}' type.".format(type(timeframe))) if not isinstance(new_results, dict): raise TypeError("`new_results` must of a dict, not '{}' type." .format(type(new_results))) # check that there is no overlap for index, series in self._data.iterrows(): tf = TimeFrame(index, series['end']) tf.check_for_overlap(timeframe) row = pd.DataFrame(index=[timeframe.start], columns=['end'] + new_results.keys()) row['end'] = timeframe.end for key, val in new_results.iteritems(): row[key] = val self._data = self._data.append(row, verify_integrity=True) self._data.sort_index(inplace=True)
[docs] def check_for_overlap(self): # TODO this could be made much faster n = len(self._data) index = self._data.index for i in range(n): row1 = self._data.iloc[i] tf1 = TimeFrame(index[i], row1['end']) for j in range(i+1, n): row2 = self._data.iloc[j] tf2 = TimeFrame(index[j], row2['end']) tf1.check_for_overlap(tf2)
[docs] def update(self, new_result): """Add results from a new chunk. Parameters ---------- new_result : Results subclass (same class as self) from new chunk of data. """ if not isinstance(new_result, self.__class__): raise TypeError("new_results must be of type '{}'" .format(self.__class__)) if new_result._data.empty: return self._data = self._data.append(new_result._data) self._data.sort_index(inplace=True) self.check_for_overlap()
[docs] def unify(self, other): """Take results from another table of data (another physical meter) and merge those results into self. For example, if we have a dual-split mains supply then we want to merge the results from each physical meter. The two sets of results must be for exactly the same timeframes. Parameters ---------- other : Results subclass (same class as self). Results calculated from another table of data. """ assert isinstance(other, self.__class__) for i, row in self._data.iterrows(): if (other._data['end'].loc[i] != row['end'] or i not in other._data.index): raise RuntimeError("The sections we are trying to merge" " do not have the same end times so we" " cannot merge them.")
[docs] def import_from_cache(self, cached_stat, sections): """ Parameters ---------- cached_stat : DataFrame of cached data sections : list of nilmtk.TimeFrame objects describing the sections we want to load stats for. """ if cached_stat.empty: return tz = get_tz(cached_stat) usable_sections_from_cache = [] def append_row(row, section): row = row.astype(object) # We stripped off the timezone when exporting to cache # so now we must put the timezone back. row['end'] = tz_localize_naive(row['end'], tz) if row['end'] == section.end: usable_sections_from_cache.append(row) for section in sections: if not section: continue try: rows_matching_start = cached_stat.loc[section.start] except KeyError: pass else: if isinstance(rows_matching_start, pd.Series): append_row(rows_matching_start, section) else: for row_i in range(rows_matching_start.shape[0]): row = rows_matching_start.iloc[row_i] append_row(row, section) self._data = pd.DataFrame(usable_sections_from_cache) self._data.sort_index(inplace=True)
[docs] def export_to_cache(self): """ Returns ------- pd.DataFrame Notes ----- Objects are converted using `DataFrame.convert_objects()`. The reason for doing this is to strip out the timezone information from data columns. We have to do this otherwise Pandas complains if we try to put a column with multiple timezones (e.g. Europe/London across a daylight saving boundary). """ return self._data.convert_objects()
[docs] def timeframes(self): """Returns a list of timeframes covered by this Result.""" # For some reason, using `iterrows()` messes with the # timezone of the index, hence we need to 'manually' iterate # over the rows. return [TimeFrame(self._data.index[i], self._data.iloc[i]['end']) for i in range(len(self._data))]
def _columns_with_end_removed(self): cols = set(self._data.columns) if len(cols) > 0: cols.remove('end') cols = list(cols) return cols def __repr__(self): return str(self._data)