Source code for nilmtk.datastore.datastore

from __future__ import print_function, division
import pandas as pd
from itertools import repeat, tee
from time import time
from copy import deepcopy
from collections import OrderedDict
import numpy as np
import yaml
from os.path import isdir, isfile, join, exists, dirname
from os import listdir, makedirs, remove
from shutil import rmtree
import re
from nilm_metadata.convert_yaml_to_hdf5 import _load_file
from nilmtk.timeframe import TimeFrame
from nilmtk.node import Node

# do not edit! added by PythonBreakpoints
from pdb import set_trace as _breakpoint


MAX_MEM_ALLOWANCE_IN_BYTES = 2**28

[docs]class DataStore(object): """ Provides a common interface to all physical data stores. Supports hierarchical stores. The DataStore class lives in the bottom layer of NILMTK. It loads a single chunk at a time from physical location and returns a DataFrame. * Deals with: retrieving data from disk / network / direct from a meter * Optimised for: handling large amounts of data * Services it provides: delivering a generator of pd.DataFrames of data given a specific time span and columns * Totally agnostic about what the data 'means'. It could be voltage, current, temperature, PIR readings etc. * could have subclasses for NILMTK HDF5, NILMTK CSV, Xively, Current Cost meters etc. * One DataStore per HDF5 file or folder or CSV files or Xively feed etc. Attributes ---------- window : nilmtk.TimeFrame Defines the timeframe we are interested in. """ def __init__(self): """ Parameters ---------- filename : string """ self.window = TimeFrame() def __getitem__(self, key): """Loads all of a DataFrame from disk. Parameters ---------- key : str Returns ------- DataFrame Raises ------ KeyError if `key` is not found. """ raise NotImplementedError("NotImplementedError") @property def window(self): return self._window @window.setter
[docs] def window(self, window): window.check_tz() self._window = window
[docs] def load(self, key, cols=None, sections=None, n_look_ahead_rows=0, chunksize=MAX_MEM_ALLOWANCE_IN_BYTES): """ Parameters ---------- key : string, the location of a table within the DataStore. cols : list of Measurements, optional e.g. [('power', 'active'), ('power', 'reactive'), ('voltage')] if not provided then will return all columns from the table. sections : TimeFrameGroup; or list of nilmtk.TimeFrame objects; or a pd.PeriodIndex, optional. Defines the time sections to load. If `self.window` is enabled then each `section` will be intersected with `self.window`. n_look_ahead_rows : int, optional, defaults to 0 If >0 then each returned DataFrame will have a `look_ahead` property which will be a DataFrame of length `n_look_ahead_rows` of the data immediately in front of the data in the main DataFrame. chunksize : int, optional Returns ------- generator of DataFrame objects Each DataFrame is has extra attributes: - timeframe : TimeFrame of section intersected with self.window - look_ahead : pd.DataFrame: with `n_look_ahead_rows` rows. The first row will be for `section.end`. `look_ahead` stores data which appears on disk immediately after `section.end`; i.e. it ignores the next `section.start`. Returns an empty DataFrame if no data is available for the specified section (or if the section.intersection(self.window) is empty). Raises ------ KeyError if `key` is not in store. """ raise NotImplementedError("NotImplementedError")
[docs] def append(self, key, value): """ Parameters ---------- key : str value : pd.DataFrame Notes ----- To quote the Pandas documentation for pandas.io.pytables.HDFStore.append: Append does *not* check if data being appended overlaps with existing data in the table, so be careful. """ raise NotImplementedError("NotImplementedError")
[docs] def put(self, key, value): """ Parameters ---------- key : str value : pd.DataFrame """ raise NotImplementedError("NotImplementedError")
[docs] def remove(self, key, value): """ Parameters ---------- key : str value : pd.DataFrame """ raise NotImplementedError("NotImplementedError")
[docs] def load_metadata(self, key='/'): """ Parameters ---------- key : string, optional if '/' then load metadata for the whole dataset. Returns ------- metadata : dict """ raise NotImplementedError("NotImplementedError")
[docs] def save_metadata(self, key, metadata): """ Parameters ---------- key : string metadata : dict """ raise NotImplementedError("NotImplementedError")
[docs] def elements_below_key(self, key='/'): """ Returns ------- list of strings """
[docs] def close(self): raise NotImplementedError("NotImplementedError")
[docs] def open(self): raise NotImplementedError("NotImplementedError")
[docs] def get_timeframe(self, key): """ Returns ------- nilmtk.TimeFrame of entire table after intersecting with self.window. """ raise NotImplementedError("NotImplementedError")
[docs]def write_yaml_to_file(metadata_filename, metadata): metadata_file = file(metadata_filename, 'w') yaml.dump(metadata, metadata_file) metadata_file.close()
[docs]def join_key(*args): """ Examples -------- >>> join_key('building1', 'elec', 'meter1') '/building1/elec/meter1' >>> join_key('/') '/' >>> join_key('') '/' """ key = '/' for arg in args: arg_stripped = str(arg).strip('/') if arg_stripped: key += arg_stripped + '/' if len(key) > 1: key = key[:-1] # remove last trailing slash return key
[docs]def convert_datastore(input_store, output_store): """ Parameters ---------- input_store : nilmtk.DataStore output_store : nilmtk.DataStore """ # dataset metadata metadata = input_store.load_metadata() output_store.save_metadata('/', metadata) for building in input_store.elements_below_key(): building_key = '/'+building # building metadata metadata = input_store.load_metadata(building_key) output_store.save_metadata(building_key, metadata) for utility in input_store.elements_below_key(building): utility_key = building_key+'/'+utility for meter in input_store.elements_below_key(utility_key): # ignore cache (should this appear as an element below key?) if meter == 'cache': continue meter_key = utility_key+'/'+meter # store meter data for df in input_store.load(meter_key): output_store.append(meter_key, df)