Source code for nilmtk.utils

from __future__ import print_function, division
import numpy as np
import pandas as pd
import networkx as nx
from copy import deepcopy
from os.path import isdir, dirname, abspath
from os import getcwd
from inspect import currentframe, getfile, getsourcefile
from sys import getfilesystemencoding, stdout
from IPython.core.display import HTML, display
from collections import OrderedDict
import datetime
from nilmtk.datastore import DataStore, HDFDataStore, CSVDataStore, Key

[docs]def show_versions(): """Prints versions of various dependencies""" output = OrderedDict() output["Date"] = str( import sys import platform output["Platform"] = str(platform.platform()) system_information = sys.version_info output["System version"] = "{}.{}".format(system_information.major, system_information.minor) PACKAGES = ["nilmtk", "nilm_metadata", "numpy", "matplotlib", "pandas", "sklearn"] for package_name in PACKAGES: key = package_name + " version" try: exec("import " + package_name) except ImportError: output[key] = "Not found" else: output[key] = eval(package_name + ".__version__") try: print(pd.show_versions()) except: pass else: print("") for k, v in output.iteritems(): print("{}: {}".format(k, v))
[docs]def timedelta64_to_secs(timedelta): """Convert `timedelta` to seconds. Parameters ---------- timedelta : np.timedelta64 Returns ------- float : seconds """ if len(timedelta) == 0: return np.array([]) else: return timedelta / np.timedelta64(1, 's')
[docs]def tree_root(graph): """Returns the object that is the root of the tree. Parameters ---------- graph : networkx.Graph """ # from assert isinstance(graph, nx.Graph) roots = [node for node,in_degree in graph.in_degree_iter() if in_degree==0] n_roots = len(roots) if n_roots > 1: raise RuntimeError('Tree has more than one root!') if n_roots == 0: raise RuntimeError('Tree has no root!') return roots[0]
[docs]def nodes_adjacent_to_root(graph): root = tree_root(graph) return graph.successors(root)
[docs]def index_of_column_name(df, name): for i, col_name in enumerate(df.columns): if col_name == name: return i raise KeyError(name)
[docs]def find_nearest(known_array, test_array): """Find closest value in `known_array` for each element in `test_array`. Parameters ---------- known_array : numpy array consisting of scalar values only; shape: (m, 1) test_array : numpy array consisting of scalar values only; shape: (n, 1) Returns ------- indices : numpy array; shape: (n, 1) For each value in `test_array` finds the index of the closest value in `known_array`. residuals : numpy array; shape: (n, 1) For each value in `test_array` finds the difference from the closest value in `known_array`. """ # from index_sorted = np.argsort(known_array) known_array_sorted = known_array[index_sorted] idx1 = np.searchsorted(known_array_sorted, test_array) idx2 = np.clip(idx1 - 1, 0, len(known_array_sorted)-1) idx3 = np.clip(idx1, 0, len(known_array_sorted)-1) diff1 = known_array_sorted[idx3] - test_array diff2 = test_array - known_array_sorted[idx2] indices = index_sorted[np.where(diff1 <= diff2, idx3, idx2)] residuals = test_array - known_array[indices] return indices, residuals
[docs]def container_to_string(container, sep='_'): if isinstance(container, str): string = container else: try: string = sep.join([str(element) for element in container]) except TypeError: string = str(container) return string
[docs]def simplest_type_for(values): n_values = len(values) if n_values == 1: return list(values)[0] elif n_values == 0: return else: return tuple(values)
[docs]def flatten_2d_list(list2d): list1d = [] for item in list2d: if isinstance(item, basestring): list1d.append(item) else: try: len(item) except TypeError: list1d.append(item) else: list1d.extend(item) return list1d
[docs]def get_index(data): """ Parameters ---------- data : pandas.DataFrame or Series or DatetimeIndex Returns ------- index : the index for the DataFrame or Series """ if isinstance(data, (pd.DataFrame, pd.Series)): index = data.index elif isinstance(data, pd.DatetimeIndex): index = data else: raise TypeError('wrong type for `data`.') return index
[docs]def convert_to_timestamp(t): """ Parameters ---------- t : str or pd.Timestamp or datetime or None Returns ------- pd.Timestamp or None """ return None if t is None else pd.Timestamp(t)
[docs]def get_module_directory(): # Taken from path_to_this_file = dirname(getfile(currentframe())) if not isdir(path_to_this_file): encoding = getfilesystemencoding() path_to_this_file = dirname(unicode(__file__, encoding)) if not isdir(path_to_this_file): abspath(getsourcefile(lambda _: None)) if not isdir(path_to_this_file): path_to_this_file = getcwd() assert isdir(path_to_this_file), path_to_this_file + ' is not a directory' return path_to_this_file
[docs]def dict_to_html(dictionary): def format_string(value): try: if isinstance(value, basestring) and 'http' in value: html = '<a href="{url}">{url}</a>'.format(url=value) else: html = '{}'.format(value) except UnicodeEncodeError: html = '' return html html = '<ul>' for key, value in dictionary.iteritems(): html += '<li><strong>{}</strong>: '.format(key) if isinstance(value, list): html += '<ul>' for item in value: html += '<li>{}</li>'.format(format_string(item)) html += '</ul>' elif isinstance(value, dict): html += dict_to_html(value) else: html += format_string(value) html += '</li>' html += '</ul>' return html
[docs]def offset_alias_to_seconds(alias): """Seconds for each period length.""" dr = pd.date_range('00:00', periods=2, freq=alias) return (dr[-1] - dr[0]).total_seconds()
[docs]def check_directory_exists(d): if not isdir(d): raise IOError("Directory '{}' does not exist.".format(d))
[docs]def tz_localize_naive(timestamp, tz): if tz is None: return timestamp if timestamp is None or pd.isnull(timestamp): return pd.NaT timestamp = pd.Timestamp(timestamp) if timestamp_is_naive(timestamp): timestamp = timestamp.tz_localize('UTC') return timestamp.tz_convert(tz)
[docs]def get_tz(df): index = df.index try: tz = except AttributeError: tz = None return tz
[docs]def timestamp_is_naive(timestamp): """ Parameters ---------- timestamp : pd.Timestamp or datetime.datetime Returns ------- True if `timestamp` is naive (i.e. if it does not have a timezone associated with it). See: """ if timestamp.tzinfo is None: return True elif timestamp.tzinfo.utcoffset(timestamp) is None: return True else: return False
[docs]def get_datastore(filename, format, mode='a'): """ Parameters ---------- filename : string format : 'CSV' or 'HDF' mode : 'a' (append) or 'w' (write), optional Returns ------- metadata : dict """ if filename is not None: if format == 'HDF': return HDFDataStore(filename, mode) elif format == 'CSV': return CSVDataStore(filename) else: raise ValueError('format not recognised') else: ValueError('filename is None')
[docs]def normalise_timestamp(timestamp, freq): """Returns the nearest Timestamp to `timestamp` which would be in the set of timestamps returned by pd.DataFrame.resample(freq=freq) """ timestamp = pd.Timestamp(timestamp) series = pd.Series(np.NaN, index=[timestamp]) resampled = series.resample(freq) return resampled.index[0]
[docs]def append_or_extend_list(lst, value): if value is None: return elif isinstance(value, list): lst.extend(value) else: lst.append(value)
[docs]def convert_to_list(list_like): return [] if list_like is None else list(list_like)
[docs]def most_common(lst): """Returns the most common entry in lst.""" lst = list(lst) counts = {item:lst.count(item) for item in set(lst)} counts = pd.Series(counts) counts.sort() most_common = counts.index[-1] return most_common
[docs]def capitalise_first_letter(string): return string[0].upper() + string[1:]
[docs]def capitalise_index(index): labels = list(index) for i, label in enumerate(labels): labels[i] = capitalise_first_letter(label) return labels
[docs]def capitalise_legend(ax): legend_handles = ax.get_legend_handles_labels() labels = capitalise_index(legend_handles[1]) ax.legend(legend_handles[0], labels) return ax