Source code for nilmtk.disaggregate.fhmm_exact

from __future__ import print_function, division
import pandas as pd
import itertools
import numpy as np
from hmmlearn import hmm
from datetime import datetime
from ..timeframe import merge_timeframes, list_of_timeframe_dicts, TimeFrame

from copy import deepcopy
from collections import OrderedDict

SEED = 42

# Fix the seed for repeatibility of experiments
np.random.seed(SEED)


[docs]def sort_startprob(mapping, startprob): """ Sort the startprob according to power means; as returned by mapping """ num_elements = len(startprob) new_startprob = np.zeros(num_elements) for i in xrange(len(startprob)): new_startprob[i] = startprob[mapping[i]] return new_startprob
[docs]def sort_covars(mapping, covars): new_covars = np.zeros_like(covars) for i in xrange(len(covars)): new_covars[i] = covars[mapping[i]] return new_covars
[docs]def sort_transition_matrix(mapping, A): """Sorts the transition matrix according to increasing order of power means; as returned by mapping Parameters ---------- mapping : A : numpy.array of shape (k, k) transition matrix """ num_elements = len(A) A_new = np.zeros((num_elements, num_elements)) for i in range(num_elements): for j in range(num_elements): A_new[i, j] = A[mapping[i], mapping[j]] return A_new
[docs]def sort_learnt_parameters(startprob, means, covars, transmat): mapping = return_sorting_mapping(means) means_new = np.sort(means, axis=0) startprob_new = sort_startprob(mapping, startprob) covars_new = sort_covars(mapping, covars) transmat_new = sort_transition_matrix(mapping, transmat) assert np.shape(means_new) == np.shape(means) assert np.shape(startprob_new) == np.shape(startprob) assert np.shape(transmat_new) == np.shape(transmat) return [startprob_new, means_new, covars_new, transmat_new]
[docs]def compute_A_fhmm(list_A): """ Parameters ----------- list_pi : List of PI's of individual learnt HMMs Returns -------- result : Combined Pi for the FHMM """ result = list_A[0] for i in range(len(list_A) - 1): result = np.kron(result, list_A[i + 1]) return result
[docs]def compute_means_fhmm(list_means): """ Returns ------- [mu, cov] """ states_combination = list(itertools.product(*list_means)) num_combinations = len(states_combination) means_stacked = np.array([sum(x) for x in states_combination]) means = np.reshape(means_stacked, (num_combinations, 1)) cov = np.tile(5 * np.identity(1), (num_combinations, 1, 1)) return [means, cov]
[docs]def compute_pi_fhmm(list_pi): """ Parameters ----------- list_pi : List of PI's of individual learnt HMMs Returns ------- result : Combined Pi for the FHMM """ result = list_pi[0] for i in range(len(list_pi) - 1): result = np.kron(result, list_pi[i + 1]) return result
[docs]def create_combined_hmm(model): list_pi = [model[appliance].startprob_ for appliance in model] list_A = [model[appliance].transmat_ for appliance in model] list_means = [model[appliance].means_.flatten().tolist() for appliance in model] pi_combined = compute_pi_fhmm(list_pi) A_combined = compute_A_fhmm(list_A) [mean_combined, cov_combined] = compute_means_fhmm(list_means) combined_model = hmm.GaussianHMM( n_components=len(pi_combined), covariance_type='full', startprob=pi_combined, transmat=A_combined) combined_model.covars_ = cov_combined combined_model.means_ = mean_combined return combined_model
[docs]def return_sorting_mapping(means): means_copy = deepcopy(means) means_copy = np.sort(means_copy, axis=0) # Finding mapping mapping = {} for i, val in enumerate(means_copy): mapping[i] = np.where(val == means)[0][0] return mapping
[docs]def decode_hmm(length_sequence, centroids, appliance_list, states): """ Decodes the HMM state sequence """ hmm_states = {} hmm_power = {} total_num_combinations = 1 for appliance in appliance_list: total_num_combinations *= len(centroids[appliance]) for appliance in appliance_list: hmm_states[appliance] = np.zeros(length_sequence, dtype=np.int) hmm_power[appliance] = np.zeros(length_sequence) for i in range(length_sequence): factor = total_num_combinations for appliance in appliance_list: # assuming integer division (will cause errors in Python 3x) factor = factor // len(centroids[appliance]) temp = int(states[i]) / factor hmm_states[appliance][i] = temp % len(centroids[appliance]) hmm_power[appliance][i] = centroids[ appliance][hmm_states[appliance][i]] return [hmm_states, hmm_power]
[docs]class FHMM(object): def __init__(self): self.model = {} self.predictions = pd.DataFrame()
[docs] def train(self, metergroup, **load_kwargs): """ Train using 1d FHMM. Places the learnt model in `model` attribute The current version performs training ONLY on the first chunk. Online HMMs are welcome if someone can contribute :) Assumes all pre-processing has been done. """ learnt_model = OrderedDict() for i, meter in enumerate(metergroup.submeters().meters): print("Training model for submeter '{}'".format(meter)) learnt_model[meter] = hmm.GaussianHMM(2, "full") # Data to fit X = [] meter_data = meter.power_series(**load_kwargs).next().dropna() length = len(meter_data.index) X = meter_data.values.reshape(length, 1) self.X = X learnt_model[meter].fit([X]) # Combining to make a AFHMM self.meters = [] new_learnt_models = OrderedDict() for meter in learnt_model: startprob, means, covars, transmat = sort_learnt_parameters( learnt_model[meter].startprob_, learnt_model[meter].means_, learnt_model[meter].covars_, learnt_model[meter].transmat_) new_learnt_models[meter] = hmm.GaussianHMM( startprob.size, "full", startprob, transmat) new_learnt_models[meter].means_ = means new_learnt_models[meter].covars_ = covars # UGLY! But works. self.meters.append(meter) learnt_model_combined = create_combined_hmm(new_learnt_models) self.individual = new_learnt_models self.model = learnt_model_combined
[docs] def disaggregate_chunk(self, test_mains): """ Disaggregate the test data according to the model learnt previously Performs 1D FHMM disaggregation. For now assuming there is no missing data at this stage. """ # See v0.1 code # for ideas of how to handle missing data in this code if needs be. # Array of learnt states learnt_states_array = [] test_mains = test_mains.dropna() length = len(test_mains.index) temp = test_mains.values.reshape(length, 1) learnt_states_array.append(self.model.predict(temp)) # Model means = OrderedDict() for elec_meter, model in self.individual.iteritems(): means[elec_meter] = ( model.means_.round().astype(int).flatten().tolist()) means[elec_meter].sort() decoded_power_array = [] decoded_states_array = [] for learnt_states in learnt_states_array: [decoded_states, decoded_power] = decode_hmm( len(learnt_states), means, means.keys(), learnt_states) decoded_states_array.append(decoded_states) decoded_power_array.append(decoded_power) prediction = pd.DataFrame(decoded_power_array[0], index=test_mains.index) return prediction
[docs] def disaggregate(self, mains, output_datastore, **load_kwargs): '''Disaggregate mains according to the model learnt previously. Parameters ---------- mains : nilmtk.ElecMeter or nilmtk.MeterGroup output_datastore : instance of nilmtk.DataStore subclass For storing power predictions from disaggregation algorithm. output_name : string, optional The `name` to use in the metadata for the `output_datastore`. e.g. some sort of name for this experiment. Defaults to "NILMTK_FHMM_<date>" resample_seconds : number, optional The desired sample period in seconds. **load_kwargs : key word arguments Passed to `mains.power_series(**kwargs)` ''' import warnings warnings.filterwarnings("ignore", category=Warning) MIN_CHUNK_LENGTH = 100 if not self.model: raise RuntimeError( "The model needs to be instantiated before" " calling `disaggregate`. For example, the" " model can be instantiated by running `train`.") # Extract optional parameters from load_kwargs date_now = datetime.now().isoformat().split('.')[0] output_name = load_kwargs.pop('output_name', 'NILMTK_FHMM_' + date_now) resample_seconds = load_kwargs.pop('resample_seconds', 60) resample_rule = '{:d}S'.format(resample_seconds) timeframes = [] building_path = '/building{}'.format(mains.building()) mains_data_location = '{}/elec/meter1'.format(building_path) data_is_available = False for chunk in mains.power_series(**load_kwargs): # Check that chunk is sensible size before resampling if len(chunk) < MIN_CHUNK_LENGTH: continue # Record metadata timeframes.append(chunk.timeframe) measurement = chunk.name chunk = chunk.resample(rule=resample_rule) # Check chunk size *again* after resampling if len(chunk) < MIN_CHUNK_LENGTH: continue # Start disaggregation predictions = self.disaggregate_chunk(chunk) for meter in predictions.columns: data_is_available = True meter_instance = meter.instance() cols = pd.MultiIndex.from_tuples([chunk.name]) predicted_power = predictions[[meter]] output_df = pd.DataFrame(predicted_power) output_df.columns = pd.MultiIndex.from_tuples([chunk.name]) output_datastore.append('{}/elec/meter{}' .format(building_path, meter_instance), output_df) # Copy mains data to disag output output_datastore.append(key=mains_data_location, value=pd.DataFrame(chunk, columns=cols)) if not data_is_available: return ################################## # Add metadata to output_datastore # TODO: `preprocessing_applied` for all meters # TODO: split this metadata code into a separate function # TODO: submeter measurement should probably be the mains # measurement we used to train on, not the mains measurement. # DataSet and MeterDevice metadata: meter_devices = { 'FHMM': { 'model': 'FHMM', 'sample_period': resample_seconds, 'max_sample_period': resample_seconds, 'measurements': [{ 'physical_quantity': measurement[0], 'type': measurement[1] }] }, 'mains': { 'model': 'mains', 'sample_period': resample_seconds, 'max_sample_period': resample_seconds, 'measurements': [{ 'physical_quantity': measurement[0], 'type': measurement[1] }] } } merged_timeframes = merge_timeframes(timeframes, gap=resample_seconds) total_timeframe = TimeFrame(merged_timeframes[0].start, merged_timeframes[-1].end) dataset_metadata = {'name': output_name, 'date': date_now, 'meter_devices': meter_devices, 'timeframe': total_timeframe.to_dict()} output_datastore.save_metadata('/', dataset_metadata) # Building metadata # Mains meter: elec_meters = { 1: { 'device_model': 'mains', 'site_meter': True, 'data_location': mains_data_location, 'preprocessing_applied': {}, # TODO 'statistics': { 'timeframe': total_timeframe.to_dict() } } } # TODO: FIX THIS! Ugly hack for now # Appliances and submeters: appliances = [] for i, meter in enumerate(self.meters): meter_instance = meter.instance() for app in meter.appliances: appliance = { 'meters': [meter_instance], 'type': app.identifier.type, 'instance': app.identifier.instance # TODO this `instance` will only be correct when the # model is trained on the same house as it is tested on. # https://github.com/nilmtk/nilmtk/issues/194 } appliances.append(appliance) elec_meters.update({ meter_instance: { 'device_model': 'FHMM', 'submeter_of': 1, 'data_location': ('{}/elec/meter{}' .format(building_path, meter_instance)), 'preprocessing_applied': {}, # TODO 'statistics': { 'timeframe': total_timeframe.to_dict() } } }) # Setting the name if it exists if meter.name: if len(meter.name) > 0: elec_meters[meter_instance]['name'] = meter.name building_metadata = { 'instance': mains.building(), 'elec_meters': elec_meters, 'appliances': appliances } output_datastore.save_metadata(building_path, building_metadata)