Source code for nilmtk.dataset_converters.redd.convert_redd

from __future__ import print_function, division
import pandas as pd
import numpy as np
from copy import deepcopy
from os.path import join, isdir, isfile
from os import listdir
import re
from sys import stdout
from nilmtk.utils import get_datastore
from nilmtk.datastore import Key
from nilmtk.timeframe import TimeFrame
from nilmtk.measurement import LEVEL_NAMES
from nilmtk.utils import get_module_directory, check_directory_exists
from nilm_metadata import convert_yaml_to_hdf5, save_yaml_to_datastore

"""
TODO:
* The bottleneck appears to be CPU.  So could be sped up by using 
  multiprocessing module to use multiple CPU cores to load REDD channels in 
  parallel.
"""


[docs]def convert_redd(redd_path, output_filename, format='HDF'): """ Parameters ---------- redd_path : str The root path of the REDD low_freq dataset. output_filename : str The destination filename (including path and suffix). format : str format of output. Either 'HDF' or 'CSV'. Defaults to 'HDF' """ def _redd_measurement_mapping_func(house_id, chan_id): ac_type = 'apparent' if chan_id <= 2 else 'active' return [('power', ac_type)] # Open DataStore store = get_datastore(output_filename, format, mode='w') # Convert raw data to DataStore _convert(redd_path, store, _redd_measurement_mapping_func, 'US/Eastern') # Add metadata save_yaml_to_datastore(join(get_module_directory(), 'dataset_converters', 'redd', 'metadata'), store) store.close() print("Done converting REDD to HDF5!")
def _convert(input_path, store, measurement_mapping_func, tz, sort_index=True): """ Parameters ---------- input_path : str The root path of the REDD low_freq dataset. store : DataStore The NILMTK DataStore object. measurement_mapping_func : function Must take these parameters: - house_id - chan_id Function should return a list of tuples e.g. [('power', 'active')] tz : str Timezone e.g. 'US/Eastern' sort_index : bool """ check_directory_exists(input_path) # Iterate though all houses and channels houses = _find_all_houses(input_path) for house_id in houses: print("Loading house", house_id, end="... ") stdout.flush() chans = _find_all_chans(input_path, house_id) for chan_id in chans: print(chan_id, end=" ") stdout.flush() key = Key(building=house_id, meter=chan_id) measurements = measurement_mapping_func(house_id, chan_id) csv_filename = _get_csv_filename(input_path, key) df = _load_csv(csv_filename, measurements, tz) if sort_index: df = df.sort_index() # raw REDD data isn't always sorted store.put(str(key), df) print() def _find_all_houses(input_path): """ Returns ------- list of integers (house instances) """ dir_names = [p for p in listdir(input_path) if isdir(join(input_path, p))] return _matching_ints(dir_names, '^house_(\d)$') def _find_all_chans(input_path, house_id): """ Returns ------- list of integers (channels) """ house_path = join(input_path, 'house_{:d}'.format(house_id)) filenames = [p for p in listdir(house_path) if isfile(join(house_path, p))] return _matching_ints(filenames, '^channel_(\d\d?).dat$') def _matching_ints(strings, regex): """Uses regular expression to select and then extract an integer from strings. Parameters ---------- strings : list of strings regex : string Regular Expression. Including one group. This group is used to extract the integer from each string. Returns ------- list of ints """ ints = [] p = re.compile(regex) for string in strings: m = p.match(string) if m: integer = int(m.group(1)) ints.append(integer) ints.sort() return ints def _get_csv_filename(input_path, key_obj): """ Parameters ---------- input_path : (str) the root path of the REDD low_freq dataset key_obj : (nilmtk.Key) the house and channel to load Returns ------- filename : str """ assert isinstance(input_path, str) assert isinstance(key_obj, Key) # Get path house_path = 'house_{:d}'.format(key_obj.building) path = join(input_path, house_path) assert isdir(path) # Get filename filename = 'channel_{:d}.dat'.format(key_obj.meter) filename = join(path, filename) assert isfile(filename) return filename def _load_csv(filename, columns, tz): """ Parameters ---------- filename : str columns : list of tuples (for hierarchical column index) tz : str e.g. 'US/Eastern' Returns ------- dataframe """ # Load data df = pd.read_csv(filename, sep=' ', names=columns, dtype={m:np.float32 for m in columns}) # Modify the column labels to reflect the power measurements recorded. df.columns.set_names(LEVEL_NAMES, inplace=True) # Convert the integer index column to timezone-aware datetime df.index = pd.to_datetime(df.index.values, unit='s', utc=True) df = df.tz_convert(tz) return df