from __future__ import print_function, division
import pandas as pd
import numpy as np
from collections import Counter
from itertools import izip
from warnings import warn
import scipy.spatial as ss
from scipy import fft
from pandas.tools.plotting import lag_plot, autocorrelation_plot
from scipy.special import digamma,gamma
from math import log,pi
import numpy.random as nr
import matplotlib.pyplot as plt
import numpy as np
from datetime import timedelta
import gc
import pytz
from .timeframe import TimeFrame
from .measurement import select_best_ac_type
from .utils import (offset_alias_to_seconds, convert_to_timestamp,
flatten_2d_list, append_or_extend_list,
timedelta64_to_secs)
from .plots import plot_series
from .preprocessing import Apply
from nilmtk.stats.histogram import histogram_from_generator
from nilmtk.appliance import DEFAULT_ON_POWER_THRESHOLD
MAX_SIZE_ENTROPY = 10000
[docs]class Electric(object):
"""Common implementations of methods shared by ElecMeter and MeterGroup.
"""
[docs] def when_on(self, on_power_threshold=None, **load_kwargs):
"""Are the connected appliances appliance is on (True) or off (False)?
Uses `self.on_power_threshold()` if `on_power_threshold` not provided.
Parameters
----------
on_power_threshold : number, optional
Defaults to self.on_power_threshold()
**load_kwargs : key word arguments
Passed to self.power_series()
Returns
-------
generator of pd.Series
index is the same as for chunk returned by `self.power_series()`
values are booleans
"""
if on_power_threshold is None:
on_power_threshold = self.on_power_threshold()
for chunk in self.power_series(**load_kwargs):
yield chunk >= on_power_threshold
[docs] def on_power_threshold(self):
"""Returns the minimum `on_power_threshold` across all appliances
immediately downstream of this meter. If any appliance
does not have an `on_power_threshold` then default to 10 watts."""
if not self.appliances:
return DEFAULT_ON_POWER_THRESHOLD
on_power_thresholds = [a.on_power_threshold() for a in self.appliances]
return min(on_power_thresholds)
[docs] def min_on_duration(self):
return self._aggregate_metadata_attribute('min_on_duration')
[docs] def min_off_duration(self):
return self._aggregate_metadata_attribute('min_off_duration')
def _aggregate_metadata_attribute(self, attr, agg_func=np.max,
default_value=0,
from_type_metadata=True):
attr_values = []
for a in self.appliances:
if from_type_metadata:
attr_value = a.type.get(attr)
else:
attr_value = a.metadata.get(attr)
if attr_value is not None:
attr_values.append(attr_value)
if len(attr_values) == 0:
return default_value
else:
return agg_func(attr_values)
[docs] def matches_appliances(self, key):
"""
Parameters
----------
key : dict
Returns
-------
True if all key:value pairs in `key` match any appliance
in `self.appliances`.
"""
for appliance in self.appliances:
if appliance.matches(key):
return True
return False
[docs] def power_series_all_data(self, **kwargs):
chunks = []
for series in self.power_series(**kwargs):
if len(series) > 0:
chunks.append(series)
if chunks:
all_data = pd.concat(chunks)
else:
all_data = None
return all_data
def _prep_kwargs_for_sample_period_and_resample(self, sample_period=None,
resample=False,
resample_kwargs=None,
**kwargs):
if 'preprocessing' in kwargs:
warn("If you are using `preprocessing` to resample then please"
" do not! Instead, please use the `sample_period` parameter"
" and set `resample=True`.")
if sample_period is None:
sample_period = self.sample_period()
else:
resample = True
sample_period = int(round(sample_period))
if resample:
if resample_kwargs is None:
resample_kwargs = {}
def resample_func(df):
resample_kwargs['rule'] = '{:d}S'.format(sample_period)
try:
df = df.resample(**resample_kwargs)
except pytz.AmbiguousTimeError:
# Work-around for
# https://github.com/pydata/pandas/issues/10117
tz = df.index.tz.zone
df = df.tz_convert('UTC')
df = df.resample(**resample_kwargs)
df = df.tz_convert(tz)
return df
kwargs.setdefault('preprocessing', []).append(
Apply(func=resample_func))
return kwargs
def _replace_none_with_meter_timeframe(self, start=None, end=None):
if start is None or end is None:
timeframe_for_meter = self.get_timeframe()
if start is None:
start = timeframe_for_meter.start
if end is None:
end = timeframe_for_meter.end
return start, end
[docs] def plot(self, ax=None, timeframe=None, plot_legend=True, unit='W',
plot_kwargs=None, **kwargs):
"""
Parameters
----------
width : int, optional
Number of points on the x axis required
ax : matplotlib.axes, optional
plot_legend : boolean, optional
Defaults to True. Set to False to not plot legend.
unit : {'W', 'kW'}
**kwargs
"""
# Get start and end times for the plot
timeframe = self.get_timeframe() if timeframe is None else timeframe
if not timeframe:
return ax
kwargs['sections'] = [timeframe]
kwargs = self._set_sample_period(timeframe, **kwargs)
power_series = self.power_series_all_data(**kwargs)
if power_series is None or power_series.empty:
return ax
if unit == 'kW':
power_series /= 1000
if plot_kwargs is None:
plot_kwargs = {}
plot_kwargs.setdefault('label', self.label())
# Pandas 0.16.1 has a bug where 'label' isn't respected. See:
# https://github.com/nilmtk/nilmtk/issues/407
# The following line is a work around for this bug.
power_series.name = self.label()
ax = power_series.plot(ax=ax, **plot_kwargs)
ax.set_ylabel('Power ({})'.format(unit))
if plot_legend:
plt.legend()
return ax
def _set_sample_period(self, timeframe, width=800, **kwargs):
# Calculate the resolution for the x axis
duration = timeframe.timedelta.total_seconds()
secs_per_pixel = int(round(duration / width))
kwargs.update({'sample_period': secs_per_pixel, 'resample': True})
return kwargs
[docs] def proportion_of_upstream(self, **load_kwargs):
"""Returns a value in the range [0,1] specifying the proportion of
the upstream meter's total energy used by this meter.
"""
upstream = self.upstream_meter()
upstream_good_sects = upstream.good_sections(**load_kwargs)
proportion_of_energy = (self.total_energy(sections=upstream_good_sects) /
upstream.total_energy(sections=upstream_good_sects))
if isinstance(proportion_of_energy, pd.Series):
best_ac_type = select_best_ac_type(proportion_of_energy.keys())
return proportion_of_energy[best_ac_type]
else:
return proportion_of_energy
[docs] def vampire_power(self, **load_kwargs):
# TODO: this might be a naive approach to calculating vampire power.
return self.power_series_all_data(**load_kwargs).min()
[docs] def uptime(self, **load_kwargs):
"""
Returns
-------
timedelta: total duration of all good sections.
"""
good_sections = self.good_sections(**load_kwargs)
return good_sections.uptime()
[docs] def average_energy_per_period(self, offset_alias='D', use_uptime=True, **load_kwargs):
"""Calculate the average energy per period. e.g. the average
energy per day.
Parameters
----------
offset_alias : str
A Pandas `offset alias`. See:
pandas.pydata.org/pandas-docs/stable/timeseries.html#offset-aliases
use_uptime : bool
Returns
-------
pd.Series
Keys are AC types.
Values are energy in kWh per period.
"""
if 'sections' in load_kwargs:
raise RuntimeError("Please do not pass in 'sections' into"
" 'average_energy_per_period'. Instead"
" use 'use_uptime' param.")
if use_uptime:
td = self.uptime(**load_kwargs)
else:
td = self.get_timeframe().timedelta
if not td:
return np.NaN
uptime_secs = td.total_seconds()
periods = uptime_secs / offset_alias_to_seconds(offset_alias)
energy = self.total_energy(**load_kwargs)
return energy / periods
[docs] def proportion_of_energy(self, other, **loader_kwargs):
"""Compute the proportion of energy of self compared to `other`.
By default, only uses other.good_sections(). You may want to set
`sections=self.good_sections().intersection(other.good_sections())`
Parameters
----------
other : nilmtk.MeteGroup or ElecMeter
Typically this will be mains.
Returns
-------
float [0,1] or NaN if other.total_energy == 0
"""
good_other_sections = other.good_sections(**loader_kwargs)
loader_kwargs.setdefault('sections', good_other_sections)
# TODO test effect of setting `sections` for other
other_total_energy = other.total_energy(**loader_kwargs)
if other_total_energy.sum() == 0:
return np.NaN
total_energy = self.total_energy(**loader_kwargs)
if total_energy.empty:
return 0.0
other_ac_types = other_total_energy.keys()
self_ac_types = total_energy.keys()
shared_ac_types = set(other_ac_types).intersection(self_ac_types)
n_shared_ac_types = len(shared_ac_types)
if n_shared_ac_types > 1:
return (total_energy[shared_ac_types] /
other_total_energy[shared_ac_types]).mean()
elif n_shared_ac_types == 0:
ac_type = select_best_ac_type(self_ac_types)
other_ac_type = select_best_ac_type(other_ac_types)
warn("No shared AC types. Using '{:s}' for submeter"
" and '{:s}' for other.".format(ac_type, other_ac_type))
elif n_shared_ac_types == 1:
ac_type = list(shared_ac_types)[0]
other_ac_type = ac_type
return total_energy[ac_type] / other_total_energy[other_ac_type]
[docs] def correlation(self, other, **load_kwargs):
"""
Finds the correlation between the two ElecMeters. Both the ElecMeters
should be perfectly aligned
Adapted from:
http://www.johndcook.com/blog/2008/11/05/how-to-calculate-pearson-correlation-accurately/
Parameters
----------
other : an ElecMeter or MeterGroup object
Returns
-------
float : [-1, 1]
"""
sample_period = max(self.sample_period(), other.sample_period())
load_kwargs.setdefault('sample_period', sample_period)
def sum_and_count(electric):
n = 0
cumulator = 0.0
for power in electric.power_series(**load_kwargs):
n += len(power.index)
cumulator += power.sum()
return n, cumulator
x_n, x_sum = sum_and_count(self)
if x_n <= 1:
return np.NaN
y_n, y_sum = sum_and_count(other)
if y_n <= 1:
return np.NaN
# we're using Python 3's division (which returns a float)
x_bar = x_sum / x_n
y_bar = y_sum / y_n
# Second pass is used to find x_s and y_s (std.devs)
def stdev(electric, mean, n):
s_square_sum = 0.0
for power in electric.power_series(**load_kwargs):
s_square_sum += ((power - mean) * (power - mean)).sum()
s_square = s_square_sum / (n - 1)
return np.sqrt(s_square)
x_s = stdev(self, x_bar, x_n)
y_s = stdev(other, y_bar, y_n)
numerator = 0.0
for (x_power, y_power) in izip(self.power_series(**load_kwargs),
other.power_series(**load_kwargs)):
xi_minus_xbar = x_power - x_bar
del x_power
gc.collect()
yi_minus_ybar = y_power - y_bar
del y_power
gc.collect()
numerator += (xi_minus_xbar * yi_minus_ybar).sum()
del xi_minus_xbar
del yi_minus_ybar
gc.collect()
denominator = (x_n - 1) * x_s * y_s
corr = numerator / denominator
return corr
[docs] def plot_lag(self, lag=1, ax=None):
"""
Plots a lag plot of power data
http://www.itl.nist.gov/div898/handbook/eda/section3/lagplot.htm
Returns
-------
matplotlib.axis
"""
if ax is None:
ax = plt.gca()
for power in self.power_series():
lag_plot(power, lag, ax=ax)
return ax
[docs] def plot_spectrum(self, ax=None):
"""
Plots spectral plot of power data
http://www.itl.nist.gov/div898/handbook/eda/section3/spectrum.htm
Code borrowed from:
http://glowingpython.blogspot.com/2011/08/how-to-plot-frequency-spectrum-with.html
Returns
-------
matplotlib.axis
"""
if ax is None:
ax = plt.gca()
Fs = 1.0/self.sample_period()
for power in self.power_series():
n = len(power.values) # length of the signal
k = np.arange(n)
T = n/Fs
frq = k/T # two sides frequency range
frq = frq[range(n//2)] # one side frequency range
Y = fft(power)/n # fft computing and normalization
Y = Y[range(n//2)]
ax.plot(frq,abs(Y)) # plotting the spectrum
ax.set_xlabel('Freq (Hz)')
ax.set_ylabel('|Y(freq)|')
return ax
[docs] def plot_autocorrelation(self, ax=None):
"""
Plots autocorrelation of power data
Reference:
http://www.itl.nist.gov/div898/handbook/eda/section3/autocopl.htm
Returns
-------
matplotlib.axis
"""
if ax is None:
ax = plt.gca()
for power in self.power_series():
autocorrelation_plot(power, ax=ax)
return ax
[docs] def plot_power_histogram(self, ax=None, load_kwargs=None,
plot_kwargs=None, range=None, **hist_kwargs):
"""
Parameters
----------
ax : axes
load_kwargs : dict
plot_kwargs : dict
range : None or tuple
if range=(None, x) then on_power_threshold will be used as minimum.
**hist_kwargs
Returns
-------
ax
"""
if ax is None:
ax = plt.gca()
if load_kwargs is None:
load_kwargs = {}
if plot_kwargs is None:
plot_kwargs = {}
generator = self.power_series(**load_kwargs)
# set range
if range is None or range[0] is None:
maximum = None if range is None else range[1]
range = (self.on_power_threshold(), maximum)
hist, bins = histogram_from_generator(generator, range=range,
**hist_kwargs)
# Plot
plot_kwargs.setdefault('linewidth', 0.1)
ax.fill_between(bins[:-1], 0, hist, **plot_kwargs)
first_bin_width = bins[1] - bins[0]
ax.set_xlim([bins[0]-(first_bin_width/2), bins[-1]])
ax.set_xlabel('Power (watts)')
ax.set_ylabel('Count')
return ax
[docs] def switch_times(self, threshold=40):
"""
Returns an array of pd.DateTime when a switch occurs as defined by threshold
Parameters
----------
threshold: int, threshold in Watts between succcessive readings
to amount for an appliance state change
"""
datetime_switches = []
for power in self.power_series():
delta_power = power.diff()
delta_power_absolute = delta_power.abs()
datetime_switches.append(delta_power_absolute[(delta_power_absolute>threshold)].index.values.tolist())
return flatten_2d_list(datetime_switches)
[docs] def entropy(self, k=3, base=2):
"""
This implementation is provided courtesy NPEET toolbox,
the authors kindly allowed us to directly use their code.
As a courtesy procedure, you may wish to cite their paper,
in case you use this function.
This fails if there is a large number of records. Need to
ask the authors what to do about the same!
The classic K-L k-nearest neighbor continuous entropy estimator
x should be a list of vectors, e.g. x = [[1.3],[3.7],[5.1],[2.4]]
if x is a one-dimensional scalar and we have four samples
"""
def kdtree_entropy(z):
assert k <= len(z)-1, "Set k smaller than num. samples - 1"
d = len(z[0])
N = len(z)
#small noise to break degeneracy, see doc.
intens = 1e-10
z = [list(p + intens*nr.rand(len(z[0]))) for p in z]
tree = ss.cKDTree(z)
nn = [tree.query(point, k+1, p=float('inf'))[0][k] for point in z]
const = digamma(N)-digamma(k) + d*log(2)
return (const + d*np.mean(map(log, nn)))/log(base)
out = []
for power in self.power_series():
x = power.values
num_elements = len(x)
x = x.reshape((num_elements, 1))
if num_elements > MAX_SIZE_ENTROPY:
splits = num_elements/MAX_SIZE_ENTROPY + 1
y = np.array_split(x, splits)
for z in y:
out.append(kdtree_entropy(z))
else:
out.append(kdtree_entropy(x))
return sum(out)/len(out)
[docs] def available_power_ac_types(self):
"""Finds available alternating current types from power measurements.
Returns
-------
list of strings e.g. ['apparent', 'active']
.. note:: Deprecated in NILMTK v0.3
`available_power_ac_types` should not be used. Instead please
use `available_ac_types('power').`
"""
warn("`available_power_ac_types` is deprecated. Please use"
" `available_ac_types('power')` instead.", DeprecationWarning)
return self.available_ac_types('power')
[docs] def load_series(self, **kwargs):
"""
Parameters
----------
ac_type : str
physical_quantity : str
We sum across ac_types of this physical quantity.
**kwargs : passed through to load().
Returns
-------
generator of pd.Series. If a single ac_type is found for the
physical_quantity then the series.name will be a normal tuple.
If more than 1 ac_type is found then the ac_type will be a string
of the ac_types with '+' in between. e.g. 'active+apparent'.
"""
# Pull data through preprocessing pipeline
physical_quantity = kwargs['physical_quantity']
generator = self.load(**kwargs)
for chunk in generator:
if chunk.empty:
yield chunk
continue
chunk_to_yield = chunk[physical_quantity].sum(axis=1)
ac_types = '+'.join(chunk[physical_quantity].columns)
chunk_to_yield.name = (physical_quantity, ac_types)
chunk_to_yield.timeframe = getattr(chunk, 'timeframe', None)
chunk_to_yield.look_ahead = getattr(chunk, 'look_ahead', None)
yield chunk_to_yield
[docs] def power_series(self, **kwargs):
"""Get power Series.
Parameters
----------
ac_type : str, defaults to 'best'
**kwargs :
Any other key word arguments are passed to self.load()
Returns
-------
generator of pd.Series of power measurements.
"""
# Select power column:
kwargs['physical_quantity'] = 'power'
kwargs.setdefault('ac_type', 'best')
return self.load_series(**kwargs)
[docs] def activity_histogram(self, period='D', bin_duration='H', **kwargs):
"""Return a histogram vector showing when activity occurs.
e.g. to see when, over the course of an average day, activity occurs
then use `bin_duration='H'` and `period='D'`.
Parameters
----------
period : str. Pandas period alias.
bin_duration : str. Pandas period alias e.g. 'H' = hourly; 'D' = daily.
Width of each bin of the histogram. `bin_duration` must exactly
divide the chosen `period`.
Returns
-------
hist : np.ndarray
length will be `period / bin_duration`
"""
n_bins = (offset_alias_to_seconds(period) /
offset_alias_to_seconds(bin_duration))
if n_bins != int(n_bins):
raise ValueError('`bin_duration` must exactly divide the'
' chosen `period`')
n_bins = int(n_bins)
# Resample to `bin_duration` and load
kwargs['sample_period'] = offset_alias_to_seconds(bin_duration)
kwargs['resample_kwargs'] = {'how': 'max'}
when_on = self.when_on(**kwargs)
# Calculate histogram...
hist = np.zeros(n_bins, dtype=int)
for on_chunk in when_on:
if len(on_chunk) < 5:
continue
on_chunk = on_chunk.fillna(0).astype(int)
# Trick using resample to 'normalise' start and end dates
# to natural boundaries of 'period'.
resampled_to_period = on_chunk.iloc[[0, -1]].resample(period).index
start = resampled_to_period[0]
end = resampled_to_period[-1] + 1
# Reindex chunk
new_index = pd.date_range(start, end, freq=bin_duration, closed='left')
on_chunk = on_chunk.reindex(new_index, fill_value=0, copy=False)
# reshape
matrix = on_chunk.reshape((-1, n_bins))
# histogram
hist += matrix.sum(axis=0)
return hist
[docs] def plot_activity_histogram(self, ax=None, period='D', bin_duration='H',
plot_kwargs=None, **kwargs):
if ax is None:
ax = plt.gca()
hist = self.activity_histogram(bin_duration=bin_duration,
period=period, **kwargs)
if plot_kwargs is None:
plot_kwargs = {}
n_bins = len(hist)
plot_kwargs.setdefault('align', 'center')
plot_kwargs.setdefault('linewidth', 0)
ax.bar(range(n_bins), hist, np.ones(n_bins), **plot_kwargs)
ax.set_xlim([-0.5, n_bins])
ax.set_title('Activity distribution')
ax.set_xlabel(bin_duration + ' of ' + period)
ax.set_ylabel('Count')
return ax
[docs] def activation_series(self, min_off_duration=None, min_on_duration=None,
border=1, on_power_threshold=None, **kwargs):
"""Returns runs of an appliance.
Most appliances spend a lot of their time off. This function finds
periods when the appliance is on.
Parameters
----------
min_off_duration : int
If min_off_duration > 0 then ignore 'off' periods less than
min_off_duration seconds of sub-threshold power consumption
(e.g. a washing machine might draw no power for a short
period while the clothes soak.) Defaults value from metadata or,
if metadata absent, defaults to 0.
min_on_duration : int
Any activation lasting less seconds than min_on_duration will be
ignored. Defaults value from metadata or, if metadata absent,
defaults to 0.
border : int
Number of rows to include before and after the detected activation
on_power_threshold : int or float
Defaults to self.on_power_threshold()
**kwargs : kwargs for self.power_series()
Returns
-------
list of pd.Series. Each series contains one activation.
"""
kwargs.setdefault('resample', True)
if on_power_threshold is None:
on_power_threshold = self.on_power_threshold()
if min_off_duration is None:
min_off_duration = self.min_off_duration()
if min_on_duration is None:
min_on_duration = self.min_on_duration()
activations = []
for chunk in self.power_series(**kwargs):
activations_for_chunk = activation_series_for_chunk(
chunk, min_off_duration, min_on_duration,
border, on_power_threshold)
activations.extend(activations_for_chunk)
return activations
[docs]def align_two_meters(master, slave, func='power_series'):
"""Returns a generator of 2-column pd.DataFrames. The first column is from
`master`, the second from `slave`.
Takes the sample rate and good_periods of `master` and applies to `slave`.
Parameters
----------
master, slave : ElecMeter or MeterGroup instances
"""
sample_period = master.sample_period()
period_alias = '{:d}S'.format(sample_period)
sections = master.good_sections()
master_generator = getattr(master, func)(sections=sections)
for master_chunk in master_generator:
if len(master_chunk) < 2:
return
chunk_timeframe = TimeFrame(master_chunk.index[0],
master_chunk.index[-1])
slave_generator = getattr(slave, func)(sections=[chunk_timeframe])
slave_chunk = next(slave_generator)
# TODO: do this resampling in the pipeline?
slave_chunk = slave_chunk.resample(period_alias)
if slave_chunk.empty:
continue
master_chunk = master_chunk.resample(period_alias)
yield pd.DataFrame({'master': master_chunk, 'slave': slave_chunk})
[docs]def activation_series_for_chunk(chunk, min_off_duration=0, min_on_duration=0,
border=1, on_power_threshold=5):
"""Returns runs of an appliance.
Most appliances spend a lot of their time off. This function finds
periods when the appliance is on.
Parameters
----------
chunk : pd.Series
min_off_duration : int
If min_off_duration > 0 then ignore 'off' periods less than
min_off_duration seconds of sub-threshold power consumption
(e.g. a washing machine might draw no power for a short
period while the clothes soak.) Defaults to 0.
min_on_duration : int
Any activation lasting less seconds than min_on_duration will be
ignored. Defaults to 0.
border : int
Number of rows to include before and after the detected activation
on_power_threshold : int or float
Defaults to self.on_power_threshold()
Returns
-------
list of pd.Series. Each series contains one activation.
"""
when_on = chunk >= on_power_threshold
# Find state changes
state_changes = when_on.astype(np.int8).diff()
del when_on
switch_on_events = np.where(state_changes == 1)[0]
switch_off_events = np.where(state_changes == -1)[0]
del state_changes
if len(switch_on_events) == 0 or len(switch_off_events) == 0:
return []
# Make sure events align
if switch_off_events[0] < switch_on_events[0]:
switch_off_events = switch_off_events[1:]
if len(switch_off_events) == 0:
return []
if switch_on_events[-1] > switch_off_events[-1]:
switch_on_events = switch_on_events[:-1]
if len(switch_on_events) == 0:
return []
assert len(switch_on_events) == len(switch_off_events)
# Smooth over off-durations less than min_off_duration
if min_off_duration > 0:
off_durations = (chunk.index[switch_on_events[1:]].values -
chunk.index[switch_off_events[:-1]].values)
off_durations = timedelta64_to_secs(off_durations)
above_threshold_off_durations = np.where(
off_durations >= min_off_duration)[0]
# Now remove off_events and on_events
switch_off_events = switch_off_events[
np.concatenate([above_threshold_off_durations,
[len(switch_off_events)-1]])]
switch_on_events = switch_on_events[
np.concatenate([[0], above_threshold_off_durations+1])]
assert len(switch_on_events) == len(switch_off_events)
activations = []
for on, off in zip(switch_on_events, switch_off_events):
duration = (chunk.index[off] - chunk.index[on]).total_seconds()
if duration < min_on_duration:
continue
on -= 1 + border
if on < 0:
on = 0
off += border
activations.append(chunk.iloc[on:off])
return activations