Source code for nilmtk.disaggregate.hart_85

from __future__ import print_function, division
import pandas as pd
from collections import OrderedDict, deque
from datetime import datetime
from ..feature_detectors.cluster import hart85_means_shift_cluster
from ..feature_detectors.steady_states import find_steady_states_transients
from ..timeframe import merge_timeframes, list_of_timeframe_dicts, TimeFrame

# Fix the seed for repeatability of experiments
SEED = 42
import numpy as np
np.random.seed(SEED)


[docs]class MyDeque(deque):
[docs] def popmiddle(self, pos): self.rotate(-pos) ret = self.popleft() self.rotate(pos) return ret
[docs]class PairBuffer(object): """ Attributes: * transitionList (list of tuples) * matchedPairs (dataframe containing matched pairs of transitions) """ def __init__(self, buffer_size, min_tolerance, percent_tolerance, large_transition, num_measurements): """ Parameters ---------- buffer_size: int, optional size of the buffer to use for finding edges min_tolerance: int, optional variance in power draw allowed for pairing a match percent_tolerance: float, optional if transition is greater than large_transition, then use percent of large_transition large_transition: float, optional power draw of a Large transition num_measurements: int, optional 2 if only active power 3 if both active and reactive power """ # We use a deque here, because it allows us quick access to start and end popping # and additionally, we can set a maxlen which drops oldest items. This nicely # suits Hart's recomendation that the size should be tunable. self._buffer_size = buffer_size self._min_tol = min_tolerance self._percent_tol = percent_tolerance self._large_transition = large_transition self.transition_list = MyDeque([], maxlen=self._buffer_size) self._num_measurements = num_measurements if self._num_measurements == 3: # Both active and reactive power is available self.pair_columns = ['T1 Time', 'T1 Active', 'T1 Reactive', 'T2 Time', 'T2 Active', 'T2 Reactive'] elif self._num_measurements == 2: # Only active power is available self.pair_columns = ['T1 Time', 'T1 Active', 'T2 Time', 'T2 Active'] self.matched_pairs = pd.DataFrame(columns=self.pair_columns)
[docs] def clean_buffer(self): # Remove any matched transactions for idx, entry in enumerate(self.transition_list): if entry[self._num_measurements]: self.transition_list.popmiddle(idx) self.clean_buffer() break # Remove oldest transaction if buffer cleaning didn't remove anything # if len(self.transitionList) == self._bufferSize: # self.transitionList.popleft()
[docs] def add_transition(self, transition): # Check transition is as expected. assert isinstance(transition, (tuple, list)) # Check that we have both active and reactive powers. assert len(transition) == self._num_measurements # Convert as appropriate if isinstance(transition, tuple): mtransition = list(transition) # Add transition to List of transitions (set marker as unpaired) mtransition.append(False) self.transition_list.append(mtransition) # checking for pairs # self.pairTransitions() # self.cleanBuffer()
[docs] def pair_transitions(self): """ Hart 85, P 33. When searching the working buffer for pairs, the order in which entries are examined is very important. If an Appliance has on and off several times in succession, there can be many pairings between entries in the buffer. The algorithm must not allow an 0N transition to match an OFF which occurred at the end of a different cycle, so that only ON/OFF pairs which truly belong together are paired up. Otherwise the energy consumption of the appliance will be greatly overestimated. The most straightforward search procedures can make errors of this nature when faced with types of transition sequences. Hart 85, P 32. For the two-state load monitor, a pair is defined as two entries which meet the following four conditions: (1) They are on the same leg, or are both 240 V, (2) They are both unmarked, (3) The earlier has a positive real power component, and (4) When added together, they result in a vector in which the absolute value of the real power component is less than 35 Watts (or 3.5% of the real power, if the transitions are over 1000 W) and the absolute value of the reactive power component is less than 35 VAR (or 3.5%). ... the correct way to search the buffer is to start by checking elements which are close together in the buffer, and gradually increase the distance. First, adjacent elements are checked for pairs which meet all four requirements above; if any are found they are processed and marked. Then elements two entries apart are checked, then three, and so on, until the first and last element are checked... """ tlength = len(self.transition_list) pairmatched = False if tlength < 2: return pairmatched # Can we reduce the running time of this algorithm? # My gut feeling is no, because we can't re-order the list... # I wonder if we sort but then check the time... maybe. TO DO # (perhaps!). # Start the element distance at 1, go up to current length of buffer for eDistance in range(1, tlength): idx = 0 while idx < tlength - 1: # We don't want to go beyond length of array compindex = idx + eDistance if compindex < tlength: val = self.transition_list[idx] # val[1] is the active power and # val[self._num_measurements] is match status if (val[1] > 0) and (val[self._num_measurements] is False): compval = self.transition_list[compindex] if compval[self._num_measurements] is False: # Add the two elements for comparison vsum = np.add( val[1:self._num_measurements], compval[1:self._num_measurements]) # Set the allowable tolerance for reactive and # active matchtols = [self._min_tol, self._min_tol] for ix in range(1, self._num_measurements): matchtols[ix - 1] = self._min_tol if (max(np.fabs([val[ix], compval[ix]])) < self._large_transition) else (self._percent_tol * max(np.fabs([val[ix], compval[ix]]))) if self._num_measurements == 3: condition = (np.fabs(vsum[0]) < matchtols[0]) and ( np.fabs(vsum[1]) < matchtols[1]) elif self._num_measurements == 2: condition = np.fabs(vsum[0]) < matchtols[0] if condition: # Mark the transition as complete self.transition_list[idx][ self._num_measurements] = True self.transition_list[compindex][ self._num_measurements] = True pairmatched = True # Append the OFF transition to the ON. Add to # dataframe. matchedpair = val[ 0:self._num_measurements] + compval[0:self._num_measurements] self.matched_pairs.loc[ len(self.matched_pairs)] = matchedpair # Iterate Index idx += 1 else: break return pairmatched
[docs]class Hart85(object): """ 1 or 2 dimensional Hart 1985 algorithm. Attributes ---------- model : dict Each key is either the instance integer for an ElecMeter, or a tuple of instances for a MeterGroup. Each value is a sorted list of power in different states. """ def __init__(self): self.model = {}
[docs] def train(self, metergroup, cols=[('power','active')], buffer_size=20, noise_level=70, state_threshold=15, min_tolerance=100, percent_tolerance=0.035, large_transition=1000, **kwargs): """ Train using Hart85. Places the learnt model in `model` attribute. Parameters ---------- metergroup : a nilmtk.MeterGroup object cols: nilmtk.Measurement, should be one of the following [('power','active')] [('power','apparent')] [('power','reactive')] [('power','active'), ('power', 'reactive')] buffer_size: int, optional size of the buffer to use for finding edges min_tolerance: int, optional variance in power draw allowed for pairing a match percent_tolerance: float, optional if transition is greater than large_transition, then use percent of large_transition large_transition: float, optional power draw of a Large transition """ self.cols = cols self.state_threshold = state_threshold self.noise_level = noise_level [self.steady_states, self.transients] = find_steady_states_transients(metergroup, cols, noise_level, state_threshold, **kwargs) self.pair_df = self.pair(buffer_size, min_tolerance, percent_tolerance, large_transition) self.centroids = hart85_means_shift_cluster(self.pair_df, cols)
[docs] def pair(self, buffer_size, min_tolerance, percent_tolerance, large_transition): subset = list(self.transients.itertuples()) buffer = PairBuffer(min_tolerance=min_tolerance, buffer_size=buffer_size, percent_tolerance=percent_tolerance, large_transition=large_transition, num_measurements=len(self.transients.columns) + 1) for s in subset: # if len(buffer.transitionList) < bsize if len(buffer.transition_list) == buffer_size: buffer.clean_buffer() buffer.add_transition(s) buffer.pair_transitions() return buffer.matched_pairs
[docs] def hart85_disaggregate_single_chunk(self, chunk, prev, transients): """ """ states = pd.DataFrame(-1, index=chunk.index, columns=self.centroids.index.values) for transient_tuple in transients.itertuples(): if transient_tuple[0] < chunk.index[0]: # Transient occurs before chunk has started; do nothing pass elif transient_tuple[0] > chunk.index[-1]: # Transient occurs after chunk has ended; do nothing pass else: # Absolute value of transient abs_value = np.abs(transient_tuple[1:]) positive = transient_tuple[1] > 0 absolute_value_transient_minus_centroid = pd.DataFrame((self.centroids - abs_value).abs()) if len(transient_tuple) == 2: # 1d data index_least_delta = absolute_value_transient_minus_centroid.idxmin().values[0] else: # 2d data. Need to find absolute value before computing minimum columns = absolute_value_transient_minus_centroid.columns absolute_value_transient_minus_centroid["multidim"] = absolute_value_transient_minus_centroid[[columns[0]]]*absolute_value_transient_minus_centroid[[columns[0]]] + absolute_value_transient_minus_centroid[[columns[1]]]*absolute_value_transient_minus_centroid[[columns[1]]] index_least_delta = absolute_value_transient_minus_centroid["multidim"].argmin() #print("Index_least",index_least_delta) if positive: # Turned on states.loc[transient_tuple[0]][index_least_delta] = 1 else: # Turned off states.loc[transient_tuple[0]][index_least_delta] = 0 return states
[docs] def assign_power_from_states(self, states_chunk, prev): """ """ self.schunk = states_chunk di = {} ndim = len(self.centroids.columns) for appliance in states_chunk.columns: df = pd.DataFrame(index = states_chunk.index) values = states_chunk[[appliance]].values.flatten() if ndim==1: power = np.zeros(len(values), dtype=int) else: power = np.zeros((len(values), 2), dtype=int) on = False i = 0 while i <len(values)-1: if values[i] == 1: #print("A", values[i], i) on = True i = i +1 power[i] = self.centroids.ix[appliance].values while values[i]!=0 and i<len(values)-1: #print("B", values[i], i) power[i] = self.centroids.ix[appliance].values i = i + 1 elif values[i] == 0: #print("C", values[i], i) on = False i = i +1 power[i] = 0 while values[i]!=1 and i<len(values)-1: #print("D", values[i], i) if ndim == 1: power[i] = 0 else: power[i] = [0, 0] i = i + 1 else: #print("E", values[i], i) # Unknown state. If previously we know about this appliance's state, we can # use that. Else, it defaults to 0 if prev[appliance]==-1 or prev[appliance]==0: #print("F", values[i], i) on = False power[i] = 0 while values[i]!=1 and i<len(values)-1: #print("G", values[i], i) if ndim==1: power[i] = 0 else: power[i] = [0, 0] i = i + 1 else: #print("H", values[i], i) on = True power[i] = self.centroids.ix[appliance].values while values[i]!=0 and i<len(values)-1: #print("I", values[i], i) power[i] = self.centroids.ix[appliance].values i = i + 1 di[appliance] = power #print(power.sum()) return di
[docs] def disaggregate(self, mains, output_datastore, **load_kwargs): """ Disaggregate mains according to the model learnt previously. Parameters ---------- mains : nilmtk.ElecMeter or nilmtk.MeterGroup output_datastore : instance of nilmtk.DataStore subclass For storing power predictions from disaggregation algorithm. output_name : string, optional The `name` to use in the metadata for the `output_datastore`. e.g. some sort of name for this experiment. Defaults to "NILMTK_Hart85_<date>" resample_seconds : number, optional The desired sample period in seconds. **load_kwargs : key word arguments Passed to `mains.power_series(**kwargs)` """ date_now = datetime.now().isoformat().split('.')[0] output_name = load_kwargs.pop('output_name', 'Hart85_' + date_now) resample_seconds = load_kwargs.pop('resample_seconds', 60) building_path = '/building{}'.format(mains.building()) mains_data_location = '{}/elec/meter1'.format(building_path) [temp, transients] = find_steady_states_transients(mains, cols=self.cols, state_threshold = self.state_threshold, noise_level=self.noise_level, **load_kwargs) # For now ignoring the first transient transients = transients[1:] # Initially all appliances/meters are in unknown state (denoted by -1) prev = OrderedDict() learnt_meters = self.centroids.index.values for meter in learnt_meters: prev[meter] = -1 states_total = [] timeframes=[] # Now iterating over mains data and disaggregating chunk by chunk for chunk in mains.power_series(**load_kwargs): # Record metadata timeframes.append(chunk.timeframe) measurement = chunk.name states_chunk = self.hart85_disaggregate_single_chunk(chunk, prev, transients) prev = states_chunk.iloc[-1].to_dict() power_chunk_dict = self.assign_power_from_states(states_chunk, prev) #self.po = power_chunk_dict cols = pd.MultiIndex.from_tuples([chunk.name]) for meter in power_chunk_dict.keys(): output_datastore.append('{}/elec/meter{:d}' .format(building_path, meter+2), pd.DataFrame(power_chunk_dict[meter], index=states_chunk.index, columns=cols)) output_datastore.append(key=mains_data_location, value=pd.DataFrame(chunk, columns=cols)) # DataSet and MeterDevice metadata: meter_devices = { 'Hart85': { 'model': 'Hart85', 'sample_period': resample_seconds, 'max_sample_period': resample_seconds, 'measurements': [{ 'physical_quantity': measurement[0], 'type': measurement[1] }] }, 'mains': { 'model': 'mains', 'sample_period': resample_seconds, 'max_sample_period': resample_seconds, 'measurements': [{ 'physical_quantity': measurement[0], 'type': measurement[1] }] } } merged_timeframes = merge_timeframes(timeframes, gap=resample_seconds) total_timeframe = TimeFrame(merged_timeframes[0].start, merged_timeframes[-1].end) dataset_metadata = {'name': output_name, 'date': date_now, 'meter_devices': meter_devices, 'timeframe': total_timeframe.to_dict()} output_datastore.save_metadata('/', dataset_metadata) # Building metadata # Mains meter: elec_meters = { mains.instance(): { 'device_model': 'mains', 'site_meter': True, 'data_location': mains_data_location, 'preprocessing_applied': {}, # TODO 'statistics': { 'timeframe': total_timeframe.to_dict() } } } # Submeters: # Starts at 2 because meter 1 is mains. for chan in range(2, len(self.centroids)+2): elec_meters.update({ chan: { 'device_model': 'Hart85', 'submeter_of': 1, 'data_location': ('{}/elec/meter{:d}' .format(building_path, chan)), 'preprocessing_applied': {}, # TODO 'statistics': { 'timeframe': total_timeframe.to_dict() } } }) appliances = [] for i in range(len(self.centroids.index)): appliance = { 'meters': [i+2], 'type': 'unknown', 'instance': i # TODO this `instance` will only be correct when the # model is trained on the same house as it is tested on. # https://github.com/nilmtk/nilmtk/issues/194 } appliances.append(appliance) building_metadata = { 'instance': mains.building(), 'elec_meters': elec_meters, 'appliances':appliances } output_datastore.save_metadata(building_path, building_metadata)
""" def export_model(self, filename): model_copy = {} for appliance, appliance_states in self.model.iteritems(): model_copy[ "{}_{}".format(appliance.name, appliance.instance)] = appliance_states j = json.dumps(model_copy) with open(filename, 'w+') as f: f.write(j) def import_model(self, filename): with open(filename, 'r') as f: temp = json.loads(f.read()) for appliance, centroids in temp.iteritems(): appliance_name = appliance.split("_")[0].encode("ascii") appliance_instance = int(appliance.split("_")[1]) appliance_name_instance = ApplianceID( appliance_name, appliance_instance) self.model[appliance_name_instance] = centroids """