Source code for __init__

import hashlib
import sys

import h5py
import numpy
from IPython import get_ipython

from workspace.variables import list_of_variables

# Interactive namespace
interactive_namespace = sys.modules['__main__']
# List of open HDF5 files
lof = []
# List containing names of created variables
lon = []

# Hidden attributes
hidden_attributes = ['group', 'parent', 'create']

# Compression settings
compression = {'chunks': True, 'compression': "lzf", 'shuffle': True}


[docs]class HDF5Files: """ Serves as an interface for the list of open HDF5 files. """ def __getitem__(self, index): return lof[index] def __iter__(self): return iter(lof) def __repr__(self): return '\n'.join( '{:2d}) {}'.format(i, repr(f).replace('<', '').replace('>', '')) for i, f in enumerate(lof))
[docs]class Variables: ''' Serves as an interface for the list of created variables. ''' def __getitem__(self, index): return getattr(interactive_namespace, lon[index]) def __iter__(self): return (getattr(interactive_namespace, name) for name in lon) def __repr__(self): return '\n'.join( '{:2d}) {}'.format(i, repr(getattr(interactive_namespace, name))) for i, name in enumerate(lon))
def create_fcn_name(cls): return 'create_' + cls.__name__.lower()
[docs]def fingerprint(obj=None): """fingerprint(obj=None) Generates 40B SHA1 digest from data of the specified object. Allowed object types: - HDF5 group - IVar class - None Parameters ---------- obj : {None|Ivar class|HDF5 group} Object available in the interactive namespace. Returns ------- **string** 40B SHA1 hexadecimal digest. """ if obj is None: return 40*'0' elif isinstance(obj, Group): group = obj.data.group else: group = obj['data'] d = bytes() keys = list(group) keys.sort() for key in keys: d += numpy.array(group[key]).tobytes() if hasattr(group, 'attrs'): keys = list(group.attrs) keys.sort() for key in keys: d += numpy.array(group.attrs[key]).tobytes() return hashlib.sha1(d).hexdigest()
[docs]def get(group, default=True): """get(group, default=True) By default get name of the variable associated with group, otherwise get variable itself, i.e. its instance. Parameters ---------- group : obj Specified HDF5 group. default : bool Indicates whether this method additionally returns instance of the variable associated with group. Returns ------- name : string Name of the variable associated with specified HDF5 group. """ for name in lon: if group == getattr(interactive_namespace, name).group: return name if default else getattr(interactive_namespace, name) else: return
[docs]def index(obj): """index(obj) Returns an index of the specified object(s). Allowed object types: - index or range of indexes (int or slice) - HDF5 file name (string) - h5py file object Parameters ---------- obj : {int|slice|string|h5py file object} Object available in the interactive namespace. Returns ------- index : int Index of specified object. """ if type(obj) is int or type(obj) is slice: index = obj elif type(obj) is h5py._hl.files.File: index = lof.index(obj) # Wyznacz obj na podstawie nazwy pliku elif type(obj) is str: for index, f in enumerate(lof): if obj == f.filename.split('/')[-1]: break try: lof[index] return index except IndexError: raise IndexError( "There is no HDF5 file associated with index {}!".format(index))
def link(group, parent=None): if parent: parent = get(parent, False) name = unify_name(group.name[1:]) setattr(interactive_namespace, name, globals()[group.attrs['type']](group, parent)) lon.append(name) return group def unify_name(name): return name def update(): # Na wypadek, gdy dodano nowy zasób należy przebudować wszystkie grupy try: for name in lon: # Komendę "xdel" można zastąpić komendą "reset_selective", # która generuje prośbę o potwierdzenie. get_ipython().run_line_magic('xdel', name) except AttributeError: for name in lon: delattr(interactive_namespace, name) lon.clear() # "G": lista wszystkich grup dostępnych w zasobach, tworząca # graf zwany lasem # "C": zbiór grup posiadających rodziców # "P": zbiór korzeni lub liści G = set() for f in lof: G.update(f.values()) # h odwzorowanie g -> fingerprint(g) h = {g: fingerprint(g) for g in G} # Wyznacz grupy ze środka drzewa (posiadające rodziców) ... C = {g for p in G for g in G if h[p] == g.attrs['parent']} # ... oraz pozostałe grupy (tj.: korzenie - grupy bez rodziców # i wierzchołki autonomiczne - nie powiązane z żadną grupą) P = G - C # Dowiąż korzenie i liście for g in P: link(g) # Dowiąż grupy (wierzchołki) ze środka drzewa while C: P = {link(g, p) for p in P for g in C if h[p] == g.attrs['parent']} C = C - P
[docs]class Group: """Contains low-level mechanisms for handling groups in HDF5. """ def __init__(self, group, parent=None): super().__setattr__('group', group) # Na poziomie zasobów hdf5 parent to 40 znakowy string. Na # poziomie pythona parent to klasa rodzica lub None, jeżeli # nie ma rodzica super().__setattr__('parent', parent) def __dir__(self): return list(self.group) + list(self.group.attrs) + list( att for att in super().__dir__() if not att.startswith('_') and att not in hidden_attributes) # http://stackoverflow.com/questions/3644545/pythons-getattr-gets-called-twice def __getattr__(self, name): if name in self.group.attrs: return self.group.attrs[name] elif name in self.group: if type(self.group[name]) is h5py._hl.group.Group: return Group(self.group[name]) else: # Zwróć (nieopakowany) dataset return self.group[name] else: raise AttributeError( "'{}' object has no attribute '{}'".format(self, name)) def __setattr__(self, name, value): if name in self.group.attrs: self.group.attrs[name] = value elif name in self.group: self.group[name] = value else: for key in compression: print('klucz={}, wartosc={}'.format(key, compression[key])) self.group.create_dataset(name, data=value, **compression) def __repr__(self): if isinstance(self, Variable): return 'Variable {} {} from {} in {}'.format( get(self.group), '(' + get(self.parent.group) + ')' if self.parent else '(-)', repr(self.group).replace('<', '').replace('>', ''), repr(self.group.file).replace('<', '').replace('>', '')) else: return '{} in {}'.format( repr(self.group).replace('<', '').replace('>', ''), repr(self.group.file).replace('<', '').replace('>', ''))
[docs]class Variable(Group): """Each instance of the Variable class is paired with an associated HDF5 *root group* and exposes convenient interface for it. """
[docs] def match(self): """match() Checks whether parent of group associated with specified variable has been modified. Returns ------- **bool** True if there are no changes to parent of specified group, False otherwise. Examples --------- Add example variable (named ``var1``) to the HDF5 file with index = -1 on the list of files:: >>> <interface_name>[-1].create.basic("var1") In variable ``var1`` create dataset named ``dset1``, which contains some numerical data:: >>> var1.data.dset1 = [1,2,3,4] Create second variable named ``var2``, as a children of ``var1``:: >>> <interface_name>[-1].create.basic("var2",var1) As for now, if we call ``match()`` method on ``var2``, it will return True, as the ``var1`` is up-to-date:: >>> var2.match() **True** If ``var1`` will be modified (i.e. by adding another dataset to it, named ``dset2``):: >>> var1.data.dset2 = [2,4,6,8] Then ``var1`` variable will not be up-to-date, and ``match()`` method called on ``var2`` will return false:: >>> var2.match() **False** """ return self.group.attrs['parent'] == fingerprint(self.parent)
[docs] def remove(self): """remove() Removes variable from the interactive namespace and removes group paired with this variable from the HDF5 file. Examples --------- Add example variable (named ``var1``) to the HDF5 file with index = -1 on the list of files:: >>> <interface_name>[-1].create.basic("var1") Remove ``var1``, by calling ``remove()`` method on it:: >>> var1.remove() """ del(self.group.parent[self.group.name[1:]]) update()
[docs] def rename(self, name): """rename(name) Renames variable in the interactive namespace and HDF5 group associated to it. Parameters ---------- name : str New name of the HDF5 group. Examples --------- Add example variable (named ``var1``) to the HDF5 file with index = -1 on the list of files:: >>> <interface_name>[0].create.basic("var1") Rename ``var1`` to ``var2``, by calling ``rename()`` method on it:: >>> var1.rename("var2") """ self.group.move(self.group.name, '/' + name) update()
embedded_keys = [k for k in vars(type('EmptyClass', tuple(), dict()))] # http://stackoverflow.com/questions/22609272/python-typename-bases-dict #for cls in list_of_variables: # vars()[cls.__name__] = type( # cls.__name__, # (Variable, ), # {k: v for k, v in vars(cls).items() if v not in embedded_keys})