Source code for nilmtk.timeframe

from __future__ import print_function, division
import pandas as pd
import pytz
from datetime import timedelta
from copy import deepcopy
from warnings import warn

[docs]class TimeFrame(object): """A TimeFrame is a single time span or period, e.g. from "2013" to "2014". Attributes ---------- _start : pd.Timestamp or None if None and empty if False then behave as if start is infinitely far into the past _end : pd.Timestamp or None if None and empty is False then behave as if end is infinitely far into the future enabled : boolean If False then behave as if both _end and _start are None _empty : boolean If True then represents an empty time frame include_end : boolean """ def __init__(self, start=None, end=None, tz=None): self.clear() if isinstance(start, TimeFrame): self.copy_constructor(start) else: self.start = start self.end = end self.include_end = False if tz is not None: if self._start: self._start = self._start.tz_localize(tz) if self._end: self._end = self._end.tz_localize(tz)
[docs] def copy_constructor(self, other): for key, value in other.__dict__.iteritems(): setattr(self, key, value)
[docs] def clear(self): self.enabled = True self._start = None self._end = None self._empty = False
@classmethod
[docs] def from_dict(cls, d): def key_to_timestamp(key): string = d.get(key) return None if string is None else pd.Timestamp(string) start = key_to_timestamp('start') end = key_to_timestamp('end') return cls(start, end)
@property def start(self): if self.enabled: return self._start @property def end(self): if self.enabled: return self._end @property
[docs] def empty(self): return self._empty
@start.setter
[docs] def start(self, new_start): new_start = convert_nat_to_none(new_start) if new_start is None: self._start = None return new_start = pd.Timestamp(new_start) if self.end and new_start >= self.end: raise ValueError("start date must be before end date") else: self._start = new_start
@end.setter
[docs] def end(self, new_end): new_end = convert_nat_to_none(new_end) if new_end is None: self._end = None return new_end = pd.Timestamp(new_end) if self.start and new_end <= self.start: raise ValueError("end date must be after start date") else: self._end = new_end
[docs] def adjacent(self, other, gap=0): """Returns True if self.start == other.end or visa versa. Parameters ---------- gap : float or int Number of seconds gap allowed. Notes ----- Does not yet handle case where self or other is open-ended. """ assert gap >= 0 gap_td = timedelta(seconds=gap) if self.empty or other.empty: return False return (other.start - gap_td <= self.end <= other.start or self.start - gap_td <= other.end <= self.start)
[docs] def union(self, other): """Return a single TimeFrame combining self and other.""" start = min(self.start, other.start) end = max(self.end, other.end) return TimeFrame(start, end)
@property
[docs] def timedelta(self): if self.end and self.start: return self.end - self.start elif self.empty: return timedelta(0)
[docs] def intersection(self, other): """Returns a new TimeFrame of the intersection between this TimeFrame and `other` TimeFrame. If the intersect is empty then the returned TimeFrame will have empty == True.""" if other is None: return deepcopy(self) assert isinstance(other, TimeFrame) include_end = False if self.empty or other.empty: start = None end = None empty = True else: if other.start is None: start = self.start elif self.start is None: start = other.start else: start = max(self.start, other.start) if other.end is None: end = self.end elif self.end is None: end = other.end else: end = min(self.end, other.end) # set include_end if end == other.end: include_end = other.include_end elif end == self.end: include_end = self.include_end empty = False if (start is not None) and (end is not None): if start >= end: start = None end = None empty = True intersect = TimeFrame(start, end) intersect._empty = empty intersect.include_end = include_end return intersect
[docs] def query_terms(self, variable_name='timeframe'): if self.empty: raise Exception("TimeFrame is empty.") terms = [] if self.start is not None: terms.append("index>=" + variable_name + ".start") if self.end is not None: terms.append("index<" + ("=" if self.include_end else "") + variable_name + ".end") return None if terms == [] else terms
[docs] def slice(self, frame): """Slices `frame` using self.start and self.end. Parameters ---------- frame : pd.DataFrame or pd.Series to slice Returns ------- frame : sliced frame """ if not self.empty: if self.include_end: sliced = frame[(frame.index >= self.start) & (frame.index <= self.end)] else: sliced = frame[(frame.index >= self.start) & (frame.index < self.end)] sliced.timeframe = self return sliced
def __nonzero__(self): if self.empty: return False else: return (self.start is not None) or (self.end is not None) def __repr__(self): return ("TimeFrame(start='{}', end='{}', empty={})" .format(self.start, self.end, self.empty)) def __eq__(self, other): return ((other.start == self.start) and (other.end == self.end) and (other.empty == self.empty)) def __ne__(self, other): return not self.__eq__(other) def __hash__(self): return hash((self.start, self.end, self.empty))
[docs] def to_dict(self): dct = {} if self.start: dct['start'] = self.start.isoformat() if self.end: dct['end'] = self.end.isoformat() return dct
[docs] def check_tz(self): if any([isinstance(tf.tz, pytz._FixedOffset) for tf in [self.start, self.end] if tf is not None]): warn("Using a pytz._FixedOffset timezone may cause issues" " (e.g. might cause Pandas to raise 'TypeError: too many" " timezones in this block, create separate data columns'). " " It is better to set the timezone to a geographical location" " e.g. 'Europe/London'.")
[docs] def check_for_overlap(self, other): intersect = self.intersection(other) if not intersect.empty: raise ValueError("Periods overlap: " + str(self) + " " + str(other))
[docs] def split(self, duration_threshold): """Splits this TimeFrame into smaller adjacent TimeFrames no longer in duration than duration_threshold. Parameters ---------- duration_threshold : int, seconds Returns ------- generator of new TimeFrame objects """ if not self: raise ValueError("Cannot split a TimeFrame if `start` or `end`" " is None") if duration_threshold >= self.timedelta.total_seconds(): yield self return duration_threshold_td = timedelta(seconds=duration_threshold) timeframe = self while True: allowed_end = timeframe.start + duration_threshold_td if timeframe.end <= allowed_end: yield timeframe break else: yield TimeFrame(start=timeframe.start, end=allowed_end) timeframe = TimeFrame(start=allowed_end, end=timeframe.end)
[docs]def split_timeframes(timeframes, duration_threshold): # TODO: put this into TimeFrameGroup. #316 for timeframe in timeframes: for split in timeframe.split(duration_threshold): yield split
[docs]def merge_timeframes(timeframes, gap=0): """ Parameters ---------- timeframes : list of TimeFrame objects (must be sorted) Returns ------- merged : list of TimeFrame objects Where adjacent timeframes have been merged. """ # TODO: put this into TimeFrameGroup. #316 assert isinstance(timeframes, list) assert all([isinstance(timeframe, TimeFrame) for timeframe in timeframes]) n_timeframes = len(timeframes) if n_timeframes == 0: return [] elif n_timeframes == 1: return timeframes merged = [timeframes[0]] for timeframe in timeframes[1:]: if timeframe.adjacent(merged[-1], gap): merged[-1] = timeframe.union(merged[-1]) else: merged.append(timeframe) return merged
[docs]def list_of_timeframe_dicts(timeframes): """ Parameters ---------- timeframes : list of TimeFrame objects Returns ------- list of dicts """ # TODO: put this into TimeFrameGroup. #316 return [timeframe.to_dict() for timeframe in timeframes]
[docs]def timeframe_from_dict(d): return TimeFrame.from_dict(d)
[docs]def list_of_timeframes_from_list_of_dicts(dicts): # TODO: put this into TimeFrameGroup. #316 return [timeframe_from_dict(d) for d in dicts]
[docs]def convert_none_to_nat(timestamp): return pd.NaT if timestamp is None else timestamp
[docs]def convert_nat_to_none(timestamp): return None if timestamp is pd.NaT else timestamp