from multiprocessing import Pool, Manager
from collections import MutableSequence
from pyIUDX.rs import rs
from pyIUDX.cat import cat
from pyIUDX.auth import auth
import numpy as np
import datetime
import requests
import copy
""" TODO: FIx numpy datetime issue """
""" TODO: GeoAttributes coming in data """
""" TODO: Read base schemas locally """
""" TODO: Remove cat url in Item() """
""" TODO: Error in case data is not coming """
""" TODO: Item constructor can directly take catalogue entry """
[docs]class Property(object):
"""Container class for a Property
Property is an aspect of a resource item that describes it
or its current value
"""
def __init__(self, name, properties):
self.name = name
self.value = None
for p in properties.keys():
setattr(self, p, properties[p])
self.attributes = copy.deepcopy(self.__dict__)
self.attributes.pop("$ref", None)
self.attributes.pop("attributes", None)
[docs] def setValue(self, value, time=None):
"""Set State for this Property
Args:
value (string): Value
time (datetime.datetime()): Time index
"""
if time is None:
self.value = value
else:
self.value = np.append(self.value,
np.array([[time, value]],
dtype=object), axis=0)
return
[docs] def reset(self):
if isinstance(self.value, str):
return
self.value = np.empty((0, 2), dtype=object)
[docs] def sort(self):
"""Sort time series
"""
self.value = self.value[np.argsort(self.value[:, 0])]
[docs]class QuantitativeProperty(object):
"""Container class for a quantitative property
QuantitativeProperty is a measureable property having
further attributes such as units. Values are always indexed
with time.
"""
def __init__(self, obj, name, properties):
"""QuantitativeProperty constructor
Args:
obj (object): Parent object
name (string): This property's name
properties (Dict[string]): Various properties of the QuantitativeProperty
e.g {"unitText": "ppm", "describes": "description of the property"]
"""
self.name = name
for p in properties.keys():
setattr(self, p, properties[p])
self.attributes = copy.deepcopy(self.__dict__)
self.attributes.pop("$ref", None)
self.attributes.pop("value", None)
self.attributes.pop("attributes", None)
self.parent = obj
[docs] def reset(self, num_time_indices):
"""Reset value of this Property
"""
self.value = np.empty((0, num_time_indices+1), dtype=object)
return
[docs] def sort(self, updatedAtIdx):
"""Sort time series
"""
self.value = self.value[np.argsort(self.value[:, updatedAtIdx])]
[docs] def setValue(self, time, value):
"""Set Value for this Property
Args:
time (datetime.datetime()): Time index
value (float): Value
"""
try:
value = float(value)
arr = [t for t in time]
arr.append(value)
self.value = np.append(self.value,
np.array([arr], dtype=object),
axis=0)
except Exception as e:
print(e)
return
[docs] def latest(self):
"""Get latest data for this property
Returns:
value (ndarray): numpy 2d array with 0th column as time
"""
self.parent.latest()
return self.value
[docs] def during(self, startTime, endTime):
"""Get data during a set interval for this property
Args:
startTime (string): Start time
endTime (string): End time
Returns:
value (ndarray): numpy 2d array with 0th column as time
TODO:
Use date time instead of strings
"""
self.parent.during(startTime, endTime)
return self.value
[docs] def valueBetween(self, minval, maxVal):
"""Get all data during which this attribute was between min and max val
Args:
minVal (int): minimum value
minVal (int): maximum value
Returns:
value (ndarray): numpy 2d array with 0th column as time
TODO:
Specify time frame too
"""
self.parent.valueBetween(self.name, minval, maxVal)
return self.value
[docs]class GeoProperty(object):
"""Container class for a Geo property
GeoProperty is a spatial property having
further attributes such as coordinates for either a point or a polygon.
"""
def __init__(self, name):
"""GeoProperty constructor
Properties to be used only for geoProperties defined in dataModels
Args:
properties (Dict[string]): Various properties of the GeoProperty
to bes passed only if this property is defined in data model
"""
self.name = name
""" Type in case geoProperty is static """
self.type = None
self.coordinates = None
[docs] def setStaticGeo(self, geoProperty):
""" Static points """
if geoProperty["value"]["geometry"]["type"] == "Point":
self.type = "Point"
self.coordinates = geoProperty["value"]["geometry"]["coordinates"]
elif geoProperty["value"]["geometry"]["type"] == "Polygon":
self.type = "Polygon"
self.coordinates = geoProperty["value"]["geometry"]["coordinates"]
[docs] def setDynamicGeo(self, time, coordinates):
"""Set Value for this geoProperty
To be used only when this geoProperty is time varying
Args:
time (datetime.datetime()): Time index
value (float): Value
"""
if isinstance(coordinates, dict):
try:
tp = coordinates["type"]
if tp == "Point":
coordinates = coordinates["coordinates"]
except:
pass
elif isinstance(coordinates, str):
try:
coordinates = float(coordinates)
except:
pass
elif isinstance(coordinates, float):
pass
self.coordinates = np.append(self.coordinates,
np.array([[time, coordinates]],
dtype=object), axis=0)
return
[docs] def reset(self):
"""Reset coordinates
This is only invoked in case where this geoProperty is time varying
TODO: Workaround for type
"""
if self.type is None:
self.coordinates = np.empty((0, 2), dtype=object)
[docs] def sort(self):
"""Sort time series
"""
self.coordinates = self.coordinates[np.argsort(self.coordinates[:, 0])]
[docs]class Item(object):
"""class for an iudx resource item
A resource item has it's static attribute representation in a catalogue
and a dynamic "data" representation in a resource server.
This class presents an abstraction layer combining both
"""
def __init__(self, catUrl, rsUrl, resourceItemId, dataModel=None):
""" PyIUDX item base class
Args:
catUrl (string): Domain name/ip of the catalogue server
"""
self.id = resourceItemId
self.cat = cat.Catalogue(catUrl)
""" TODO: get rs from catalogue item """
self.rs = rs.ResourceServer(rsUrl)
catItem = self.cat.getOneResourceItem(self.id)
if catItem is None:
raise RuntimeError("Item :" + self.id +
" not found in catalogue")
self.properties = []
self.geoProperties = []
self.timeProperties = []
self.quantitativeProperties = []
""" Load datamodel properties """
self.dm = dataModel
if self.dm is None:
try:
self.dm = self.cat.getDataModel(self.id)
except Exception as e:
raise RuntimeError("Couldn't retrieve item's data model")
""" Read attributes from datamodel and store"""
for attr in self.dm["properties"].keys():
try:
attrType = self.dm["properties"][attr]["$ref"].split("/")[-1]
except:
continue
if attrType == "Property":
self.properties.append(attr)
setattr(self, attr,
Property(attr,
self.dm["properties"][attr]))
""" Minor fix to include updated
TODO: make a general list of common_defs TimeProperties
"""
if attrType == "TimeProperty" or attrType == "updatedAt":
self.timeProperties.append(attr)
if attrType == "QuantitativeProperty":
self.quantitativeProperties.append(attr)
setattr(self, attr,
QuantitativeProperty(self, attr,
self.dm["properties"][attr]))
if attrType == "GeoProperty":
self.geoProperties.append(attr)
setattr(self, attr,
GeoProperty(attr))
""" TODO: What if multiple time attributes """
""" Find time attribute from datamodel """
for p in self.timeProperties:
if self.dm["properties"][p]["$ref"].split("/")[-1] == "updatedAt":
self.updatedAt = p
else:
self.updatedAt = self.timeProperties[0]
self.updatedAtIdx = self.timeProperties.index(self.updatedAt)
self.num_time_indices = len(self.timeProperties)
""" Read item properties """
for key in catItem.keys():
if isinstance(catItem[key], dict):
if (catItem[key]["type"] == "GeoProperty" and
key in self.geoProperties):
getattr(self, key).setStaticGeo(catItem[key])
if (catItem[key]["type"] == "Property" and
key in self.properties):
getattr(self, key).setValue(catItem[key])
[docs] def populateValue(self, data):
"""Helper function to populate a QuantitativeProperty's value array
"""
if data is None:
return
""" TODO: Assuming a datetime format is bad """
for row in data:
timestamp = []
for tp in self.timeProperties:
if tp in row.keys():
try:
timestamp.append(datetime.datetime\
.strptime(row[tp]\
.split("+")[0],
"%Y-%m-%dT%H:%M:%S.%f"))
except:
timestamp.append(datetime.datetime\
.strptime(row[tp]\
.split("+")[0],
"%Y-%m-%dT%H:%M:%S"))
for k in row.keys():
if k in self.quantitativeProperties:
attr = getattr(self, k)
attr.setValue(timestamp, row[k])
if k in self.geoProperties:
attr = getattr(self, k)
attr.setDynamicGeo(timestamp, row[k])
if k in self.properties:
attr = getattr(self, k)
attr.setValue(row[k], time=timestamp)
"""Sort time series """
for k in self.quantitativeProperties:
getattr(self, k).sort(self.updatedAtIdx)
for k in self.geoProperties:
pass
#getattr(self, k).sort()
for k in self.properties:
getattr(self, k).sort()
[docs] def reset(self):
""" Reset data of all quantitative attributes of this item
Returns:
self (object): Returns back the updated object
"""
for d in self.properties:
getattr(self, d).reset()
for d in self.quantitativeProperties:
getattr(self, d).reset(self.num_time_indices)
for d in self.geoProperties:
getattr(self, d).reset()
[docs] def latest(self):
""" Get latest data for all properties belonging to this item
Returns:
self (object): Returns back the updated object
"""
try:
data = self.rs.getLatestData(self.id)
except Exception as e:
data = None
pass
self.reset()
self.populateValue(data)
return self
[docs] def during(self, start, end):
"""Get data during a set interval for all properties belonging to this item
Args:
startTime (string): Start time
endTime (string): End time
Returns:
value (ndarray): numpy 2d array with 0th column as time
"""
data = self.rs.getDataDuring(self.id, start, end)
self.reset()
self.populateValue(data)
return self
[docs] def latestWith(self, attr, val):
""" Get latest data for all properties belonging to this item with a specific attribute
This method will give latest data where an attribute is specified. For e.g
attr = "ROUTE_ID", val = "110" will give latest data for that bus
Args:
attr (string): The name of the attribute
val (string): The value of the attribute
Returns:
self (object): Returns back the updated object
"""
data = self.rs.getLatestDataValuesLike(self.id, attr, val)
self.reset()
self.populateValue(data)
return self
[docs] def valueBetween(self, attrName, minval, maxVal):
"""Get all data during which this attribute was between min and max val
Args:
attrName (string): name of the attribute
minVal (int): minimum value
minVal (int): maximum value
TODO:
Specify time frame too
"""
data = self.rs.getDataValuesBetween(self.id, attrName, minval, maxVal)
self.populateValue(data)
""" TODO: Add Status """
[docs]class Items(MutableSequence):
"""class for a list of iudx resource items.
This class extends a list to provide Class Item style functionality
coupled with multiprocessing pool to allow for faster data access
"""
def __init__(self, catUrl, rsUrl, items=None):
""" PyIUDX items base class
Args:
catUrl (string): Domain name/ip of the catalogue server
"""
super(Items, self).__init__()
self.list = Manager().list()
self.catUrl = catUrl
self.rsUrl = rsUrl
self.cat = cat.Catalogue(catUrl)
if items is None:
return
""" TODO: Replace this with resourceGroup datamodel """
""" Get initial datamodel """
try:
self.dm = self.cat.getDataModel(items[0]["id"])
except Exception as e:
raise RuntimeError("Couldn't retrieve item's data model")
""" Init items """
with Pool(4) as p:
p.starmap(self.initItem, [(self.catUrl, self.rsUrl, item["id"], self.list) for item in items])
p.close()
p.join()
self.list = list(self.list)
self.len = len(self.list)
[docs] def initItem(self, catUrl, rsUrl, item, objList):
"""Multiprocessed job """
objList.append(Item(catUrl, rsUrl, item, self.dm))
[docs] def getLatest(self, obj):
"""Multiprocessed job """
obj.latest()
return obj
[docs] def getDuring(self, obj, startTime, endTime):
"""Multiprocessed job """
obj.during(startTime, endTime)
return obj
[docs] def latest(self):
""" Get latest data
For all resource items of this instance and for all
properties belonging to this item
Returns:
self (object): Returns back the updated object
"""
with Pool(4) as p:
self.list = p.map(self.getLatest, self.list)
p.close()
p.join()
return self
[docs] def during(self, startTime, endTime):
"""Get data during a set interval
For all resource items of this
instance and for all properties belonging to this item
Args:
startTime (string): Start time
endTime (string): End time
Returns:
value (ndarray): numpy 2d array with 0th column as time
"""
with Pool(4) as p:
self.list = p.starmap(self.getDuring,
[(self.list[i], startTime, endTime,)
for i in range(self.len)])
p.close()
p.join()
return self
def __repr__(self):
return "<{0} {1}>".format(self.__class__.__name__, self.list)
def __len__(self):
"""List length"""
return len(self.list)
def __getitem__(self, ii):
"""Get a list item"""
return self.list[ii]
def __delitem__(self, ii):
"""Delete an item"""
del self.list[ii]
def __setitem__(self, ii, val):
self.list[ii] = val
def __str__(self):
return str(self.list)
[docs] def insert(self, ii, val):
self.list.insert(ii, val)