from __future__ import print_function, division
import pandas as pd
from itertools import repeat, tee
from time import time
from copy import deepcopy
from collections import OrderedDict
import numpy as np
import yaml
from os.path import isdir, isfile, join, exists, dirname
from os import listdir, makedirs, remove
from shutil import rmtree
import re
from nilm_metadata.convert_yaml_to_hdf5 import _load_file
from nilmtk.timeframe import TimeFrame
from nilmtk.timeframegroup import TimeFrameGroup
from nilmtk.node import Node
from nilmtk.datastore import DataStore, MAX_MEM_ALLOWANCE_IN_BYTES
from nilmtk.datastore.key import Key
from nilmtk.datastore.datastore import write_yaml_to_file, join_key
from nilmtk.docinherit import doc_inherit
# do not edit! added by PythonBreakpoints
from pdb import set_trace as _breakpoint
[docs]class CSVDataStore(DataStore):
@doc_inherit
def __init__(self, filename):
self.filename = filename
# make root directory
path = self._key_to_abs_path('/')
if not exists(path):
makedirs(path)
# make metadata directory
path = self._get_metadata_path()
if not exists(path):
makedirs(path)
super(CSVDataStore, self).__init__()
@doc_inherit
def __getitem__(self, key):
file_path = self._key_to_abs_path(key)
if isfile(file_path):
return pd.read_csv(file_path)
else:
raise KeyError('{} not found'.format(key))
@doc_inherit
[docs] def load(self, key, cols=None, sections=None, n_look_ahead_rows=0,
chunksize=MAX_MEM_ALLOWANCE_IN_BYTES):
file_path = self._key_to_abs_path(key)
# Set `sections` variable
sections = [TimeFrame()] if sections is None else sections
sections = TimeFrameGroup(sections)
self.all_sections_smaller_than_chunksize = True
# iterate through parameter sections
# requires 1 pass through file for each section
for section in sections:
window_intersect = self.window.intersection(section)
header_rows = [0,1]
text_file_reader = pd.read_csv(file_path,
index_col=0,
header=header_rows,
parse_dates=True,
chunksize=chunksize)
# iterate through all chunks in file
for chunk_idx, chunk in enumerate(text_file_reader):
# filter dataframe by specified columns
if cols:
chunk = chunk[cols]
# mask chunk by window and section intersect
subchunk_idx = [True]*len(chunk)
if window_intersect.start:
subchunk_idx = np.logical_and(subchunk_idx, (chunk.index>=window_intersect.start))
if window_intersect.end:
subchunk_idx = np.logical_and(subchunk_idx, (chunk.index<window_intersect.end))
if window_intersect.empty:
subchunk_idx = [False]*len(chunk)
subchunk = chunk[subchunk_idx]
if len(subchunk)>0:
subchunk_end = np.max(np.nonzero(subchunk_idx))
subchunk.timeframe = TimeFrame(subchunk.index[0], subchunk.index[-1])
# Load look ahead if necessary
if n_look_ahead_rows > 0:
if len(subchunk.index) > 0:
rows_to_skip = (len(header_rows)+1)+(chunk_idx*chunksize)+subchunk_end+1
try:
subchunk.look_ahead = pd.read_csv(file_path,
index_col=0,
header=None,
parse_dates=True,
skiprows=rows_to_skip,
nrows=n_look_ahead_rows)
except ValueError:
subchunk.look_ahead = pd.DataFrame()
else:
subchunk.look_ahead = pd.DataFrame()
yield subchunk
@doc_inherit
[docs] def append(self, key, value):
file_path = self._key_to_abs_path(key)
path = dirname(file_path)
if not exists(path):
makedirs(path)
value.to_csv(file_path,
mode='a',
header=True)
@doc_inherit
[docs] def put(self, key, value):
file_path = self._key_to_abs_path(key)
path = dirname(file_path)
if not exists(path):
makedirs(path)
value.to_csv(file_path,
mode='w',
header=True)
@doc_inherit
[docs] def remove(self, key):
file_path = self._key_to_abs_path(key)
if isfile(file_path):
remove(file_path)
else:
rmtree(file_path)
@doc_inherit
@doc_inherit
@doc_inherit
[docs] def elements_below_key(self, key='/'):
elements = []
if key == '/':
for directory in listdir(self.filename):
dir_path = join(self.filename, directory)
if isdir(dir_path) and re.match('building[0-9]*', directory):
elements += [directory]
else:
relative_path = key[1:]
dir_path = join(self.filename, relative_path)
if isdir(dir_path):
for element in listdir(dir_path):
elements += [directory]
return elements
@doc_inherit
[docs] def close(self):
# not needed for CSV data store
pass
@doc_inherit
[docs] def open(self):
# not needed for CSV data store
pass
@doc_inherit
[docs] def get_timeframe(self, key):
file_path = self._key_to_abs_path(key)
text_file_reader = pd.read_csv(file_path,
index_col=0,
header=[0,1],
parse_dates=True,
chunksize=MAX_MEM_ALLOWANCE_IN_BYTES)
start = None
end = None
for df in text_file_reader:
if start is None:
start = df.index[0]
end = df.index[-1]
timeframe = TimeFrame(start, end)
return self.window.intersection(timeframe)
def _get_metadata_path(self):
return join(self.filename, 'metadata')
def _key_to_abs_path(self, key):
abs_path = self.filename
if key and len(key) > 1:
relative_path = key
if key[0] == '/':
relative_path = relative_path[1:]
abs_path = join(self.filename, relative_path)
key_object = Key(key)
if key_object.building and key_object.meter:
abs_path += '.csv'
return abs_path