#!/usr/bin/python
from __future__ import print_function, division
import unittest
from os.path import join
import pandas as pd
from datetime import timedelta
from testingtools import data_dir
from nilmtk.datastore import HDFDataStore, CSVDataStore
from nilmtk import TimeFrame
# class name can't begin with test
[docs]class SuperTestDataStore(object):
START_DATE = pd.Timestamp('2012-01-01 00:00:00', tz=None)
NROWS = 1E4
END_DATE = START_DATE + timedelta(seconds=NROWS-1)
TIMEFRAME = TimeFrame(START_DATE, END_DATE)
[docs] def test_timeframe(self):
self.datastore.window.clear()
for key in self.keys:
self.assertEqual(self.datastore.get_timeframe(key), self.TIMEFRAME)
self._apply_mask()
for key in self.keys:
self.datastore.window.enabled = False
self.assertEqual(self.datastore.get_timeframe(key), self.TIMEFRAME)
self.datastore.window.enabled = True
self.assertEqual(self.datastore.get_timeframe(key), self.datastore.window)
[docs] def test_load(self):
timeframe = TimeFrame('2012-01-01 00:00:00', '2012-01-01 00:00:05')
self.datastore.window.clear()
gen = self.datastore.load(key=self.keys[0],
cols=[('power', 'active')],
sections=[timeframe],
n_look_ahead_rows=10)
df = next(gen)
self.assertEqual(df.index[0], timeframe.start)
self.assertEqual(df.index[-1], timeframe.end - timedelta(seconds=1))
self.assertEqual(df.look_ahead.index[0], timeframe.end)
self.assertEqual(len(df.look_ahead), 10)
[docs] def test_load_chunks(self):
self.datastore.window.clear()
chunks = self.datastore.load(key=self.keys[0])
i = 0
for chunk in chunks:
i += 1
self.assertEqual(chunk.index[0], self.TIMEFRAME.start)
self.assertEqual(chunk.index[-1], self.TIMEFRAME.end)
self.assertEqual(len(chunk), self.NROWS)
self.assertEqual(i, 1)
timeframes = [TimeFrame('2012-01-01 00:00:00', '2012-01-01 00:00:05'),
TimeFrame('2012-01-01 00:10:00', '2012-01-01 00:10:05')]
chunks = self.datastore.load(key=self.keys[0], sections=timeframes)
i = 0
for chunk in chunks:
self.assertEqual(chunk.index[0], timeframes[i].start)
self.assertEqual(chunk.index[-1], timeframes[i].end-timedelta(seconds=1))
self.assertEqual(len(chunk), 5)
i += 1
self.assertEqual(i, 2)
# Check when we have a narrow mask
self.datastore.window = TimeFrame('2012-01-01 00:10:02', '2012-01-01 00:10:10')
chunks = self.datastore.load(key=self.keys[0], sections=timeframes)
i = 0
for chunk in chunks:
if chunk.empty:
continue
self.assertEqual(chunk.index[0], pd.Timestamp('2012-01-01 00:10:02'))
self.assertEqual(chunk.index[-1], pd.Timestamp('2012-01-01 00:10:04'))
self.assertEqual(len(chunk), 3)
i += 1
self.assertEqual(i, 1)
[docs] def test_load_chunks_small_chunksize(self):
self.datastore.window.clear()
timeframes = [TimeFrame('2012-01-01 00:00:00', '2012-01-01 00:01:00'),
TimeFrame('2012-01-01 00:10:00', '2012-01-01 00:11:00')]
chunks = self.datastore.load(key=self.keys[0], sections=timeframes,
chunksize=20)
one_sec = timedelta(seconds=1)
for i, chunk in enumerate(chunks):
self.assertEqual(chunk.index[0], chunk.timeframe.start)
self.assertTrue((chunk.timeframe.end - one_sec) <=
chunk.index[-1] <=
chunk.timeframe.end)
#--------- helper functions ---------------------#
def _apply_mask(self):
self.datastore.window = TimeFrame('2012-01-01 00:10:00',
'2012-01-01 00:20:00')
[docs]class TestHDFDataStore(unittest.TestCase, SuperTestDataStore):
@classmethod
[docs] def setUpClass(cls):
filename = join(data_dir(), 'random.h5')
cls.datastore = HDFDataStore(filename)
cls.keys = ['/building1/elec/meter{:d}'.format(i) for i in range(1,6)]
@classmethod
[docs] def tearDownClass(cls):
cls.datastore.close()
[docs] def test_column_names(self):
for key in self.keys:
self.assertEqual(self.datastore._column_names(key),
[('power', 'active'), ('energy', 'reactive'),
('voltage', '')])
[docs] def test_n_rows(self):
self._apply_mask()
for key in self.keys:
self.datastore.window.enabled = True
self.assertEqual(self.datastore._nrows(key), 10*60)
self.datastore.window.enabled = False
self.assertEqual(self.datastore._nrows(key), self.NROWS)
[docs] def test_estimate_memory_requirement(self):
self._apply_mask()
for key in self.keys:
self.datastore.window.enabled = True
mem = self.datastore._estimate_memory_requirement(key, self.datastore._nrows(key))
self.assertEqual(mem, 12000)
self.datastore.window.enabled = False
mem = self.datastore._estimate_memory_requirement(key, self.datastore._nrows(key))
self.assertEqual(mem, 200000)
[docs]class TestCSVDataStore(unittest.TestCase, SuperTestDataStore):
@classmethod
[docs] def setUpClass(cls):
filename = join(data_dir(), 'random_csv')
cls.datastore = CSVDataStore(filename)
cls.keys = ['/building1/elec/meter{:d}'.format(i) for i in range(1,6)]
@classmethod
[docs] def tearDownClass(cls):
cls.datastore.close()
if __name__ == '__main__':
unittest.main()