"""
This file contains some low-level useful functions
"""
from __future__ import print_function
[docs]def write(*args, **kwargs):
"""
Wrapper for print function or print to ensure compatibility with python 2
The arguments are used similarly as the print_function
They can also be generalized to python 2 cases
"""
return print(*args, **kwargs)
[docs]def push_path(inp, *keys):
"""
Follow in the dictionary inp the path indicated by the keys.
If this path does not exists creates it.
Args:
inp (dict): dictionary
keys (str): keys of the path to follow
Returns:
(``branch``,``key``) tuple, where
* ``branch`` (dict): the dictionary of the second-last item of the path
* ``key`` (str): the last item of the path
Example:
>>> inp={}
>>> d,key=push_path(inp,'dft','nspin','mpol')
>>> print (d,key)
>>> print (inp)
{},'mpol'
{'dft': {'nspin': {}}}
>>> inp={'dft': {'nspin': {'mpol': 2}}}
>>> d,key=push_path(inp,'dft','nspin','mpol')
>>> print (d,key)
>>> print (inp)
{'mpol': 2},'mpol'
{'dft': {'nspin': {'mpol': 2}}}
"""
tmp = inp
for i, key in enumerate(keys):
k = key
if i == len(keys)-1:
break
tmp.setdefault(key, {})
tmp = tmp[key]
return tmp, k
[docs]def dict_set(inp, *subfields):
"""Ensure the provided fields and set the value
Provide a entry point to the dictionary.
Useful to define a key in a dictionary that may not have the
previous keys already defined.
Arguments:
inp (dict): the top-level dictionary
subfields (str,object): keys, ordered by level, that have to be
retrieved from topmost level of ``inp``.
The last item correspond to the value to be set .
Example:
>>> inp={}
>>> dict_set(inp,'dft','nspin','mpol',2)
>>> print (inp)
{'dft': {'nspin': {'mpol': 2}}}
"""
if len(subfields) <= 1:
raise ValueError('invalid subfields, the sequence should be longer than one item as the last one is the value to be given')
keys = subfields[:-1]
tmp, key = push_path(inp, *keys)
tmp[key] = subfields[-1]
[docs]def dict_get(inp, *subfields):
"""Find the value of the provided sequence of keys in the dictionary,
if available.
Retrieve the value of the dictionary in a sequence of keys if it is
available. Otherwise it provides as default value the last item of the
sequence ``subfields``.
Args:
inp (dict): the top-level dictionary. Unchanged on exit.
subfields (str,object): keys, ordered by level, that have to be
retrieved from topmost level of ``inp``. The last item correspond
to the value to be set.
Returns:
The value provided by the sequence of subfields if available,
otherwise the default value given as the last item of the ``subfields``
sequence.
"""
if len(subfields) <= 1:
raise ValueError('invalid subfields, the sequence should be longer than one item as the last one is the value to be given')
tmp = inp
keys = subfields[:-1]
val = subfields[-1]
for key in keys:
tmp = tmp.get(key)
if tmp is None:
return val
return tmp
[docs]def sort_lists(sort_by, ascending, *lists):
"""
Sort lists altogether following the lists indicated by the ``sort_by``
index.
Args:
sort_by (int): the index of the list which has to be taken as reference
for sorting
ascending (bool): Sort is performed in ascending order if True
*lists: sequence of lists to be mutually sorted.
They have to be of the same length.
Returns:
tuple of sorted lists
Example:
>>> l1=[5,3,4]
>>> l2=['c','t','q']
>>> l3=[6,3,7]
>>> print (sort_lists(0,True,l1,l2,l3))
>>> print (sort_lists(2,True,l1,l2,l3))
[(3, 4, 5), ('t', 'q', 'c'), (3, 7, 6)]
[(3, 5, 4), ('t', 'c', 'q'), (3, 6, 7)]
"""
import operator
return zip(*sorted(zip(*lists), reverse=not ascending,
key=operator.itemgetter(sort_by)))
[docs]def file_time(filename):
"""
Determine the time of the last modification of a file.
Args:
filename (str): path of the file to inspect.
Returns:
float: time of the modified file. Returns 0 if the file does not exist.
"""
import os
if os.path.isfile(filename):
return os.path.getmtime(filename)
else:
return 0
[docs]def non_null_size(filename):
"""
Control if the file has nonzero size and exists
"""
from os.path import getsize, isfile
return isfile(filename) and (getsize(filename) > 0)
[docs]def more_recent_than_parent(filename, parent):
"""
Filename should be more recent than parent
"""
t = non_null_size(filename) and (file_time(parent) <= file_time(filename))
return t
[docs]def fill_dictionary_in_parallel(nthreads, keys, func, **kwargs):
"""
Fill a dictionary of a given set of keys with the return value
of a function which accept this key as a first argument
Args:
nthreads(int): the number of threads of the pool
keys(list): the arguments of the function. Will be the list of the
dictionary
func(func): the python function that has the key as a last argument
**kwargs: further arguments of the function, if needed
Returns:
dict: the key-> obj dictionary with the obj the return value of func
"""
global __rerefunc # hopefully this name will never be employed
from multiprocessing import Pool
import time
from functools import partial
refunc = partial(func, **kwargs)
def __rerefunc(x):
return refunc(x)
if nthreads == 1:
return {k: __rerefunc(k) for k in keys}
p = Pool(nthreads)
start = time.time()
res = p.map(__rerefunc, keys)
p.close()
p.join()
end = time.time()
write(end - start)
return {a: b for a, b in zip(keys, res)}
[docs]class Node():
"""
An object that is associated to a queue.
Has a requirement and a validity function as well as a generation function
that is triggered in case the node is not valid.
Args:
obj (object): a generic object
"""
def __init__(self, obj, valid_if=None):
self.obj = obj
if valid_if is not None:
self.valid_if(func=valid_if)
[docs] def valid_if(self, func):
"""
Set the function as callback to check the validity of the node.
Args:
func(func): function that has the node object as a the first
argument and returns a boolean which assess the validity
of the function.
"""
self.validity_func = func
@property
def valid(self):
if hasattr(self, 'parent'):
return self.validity_func(self.obj, self.parent.obj)
else:
return self.validity_func(self.obj)
[docs] def requires(self, node, generator):
"""
Set the dependency between two nodes.
Args:
node (Node): the node from which this one depends.
If this is a valid node, and the present is not, it should employ
the generator function for creating the node.
generator (func): function which should be called to make the
node valid. Should have as arguments the two objects associated
the node (the current one is the first argument)
"""
self.parent = node
self.dependency_func = generator
[docs] def validate(self):
"""
This function makes the node valid by validating its dependency
and by calling the generator function if this is not the case.
Returns:
bool: the value of self.valid. Should be true.
"""
if hasattr(self, 'parent'):
assert self.parent.validate()
if not self.valid:
self.dependency_func(self.obj, self.parent.obj)
assert self.valid
return self.valid
[docs]def dict_merge(dest, src):
""" Recursive dict merge. Inspired by :meth:`dict.update`, instead of
updating only top-level keys, dict_merge recurses down into dicts nested
to an arbitrary depth, updating keys. The ``src`` is merged into
``dest``. From :ref:`angstwad/dict-merge.py
<https://gist.github.com/angstwad/bf22d1822c38a92ec0a9>`
Arguments:
dest (dict): dict onto which the merge is executed
src (dict): dict merged into dest
"""
import collections
for k, v in src.items():
if (k in dest and isinstance(dest[k], dict)
and isinstance(src[k], collections.Mapping)):
dict_merge(dest[k], src[k])
else:
dest[k] = src[k]
[docs]def merge_two_dicts(x, y):
z = x.copy() # start with x's keys and values
z.update(y) # modifies z with y's keys and values & returns None
return z
[docs]def unique_filename(prefix):
"""
Provides a filename with a unique id appended
Args:
prefix (str): the prefix of the file
Returns:
str: filename
"""
from uuid import uuid4
unique_id = str(uuid4().hex)
return prefix + unique_id
[docs]def file_list(directory='.', suffix=None, prefix=None, exclude=None,
include_directory_path=False):
"""
Return the list of the files inside a given directory
Args:
directory (str): path of the directory to search into
suffix (str): the suffix that the files should have
prefix (str): the prefix that the files should have
exclude (str): exclude the files which matches this string from the list
include_directory_path (bool): if True includes the path of
the directory in the list.
Returns:
list: list of the files that matches the requirements.
"""
files = []
from os import listdir
from os.path import join
for filename in listdir(directory):
ok = exclude not in filename if exclude is not None else True
if ok and suffix is not None:
ok = filename.endswith(suffix)
if ok and prefix is not None:
ok = filename.startswith(prefix)
if ok:
if include_directory_path:
to_append = join(directory, filename)
else:
to_append = filename
files.append(to_append)
return files
[docs]def make_dict(inp):
"""
Transform the instance ``inp`` into a python dictionary.
If inp is already a dictionary, it performs a copy.
Args:
inp (dict): a instance of a Class which inherits from dict
Returns:
dict: the copy of the class, converted as a dictionary
"""
import copy
local_tmp = copy.deepcopy(inp)
local_input = {}
local_input.update(local_tmp)
return local_input
[docs]def execute_code_if(condition, code, glob=None, loc=None):
"""
Execute code if condition is true
Args:
condition (bool): if true the code is executed
code_if_obj_not_found (str): the code to be evaluated if the object has
not been found.
globals (dict): the global variables dictionary
locals (dict): the local variables dictionary
Returns:
the object returned from the code executed, None otherwise
"""
if not condition:
return None
if glob is None:
glob = globals()
if loc is None:
loc = locals()
return eval(code, glob, loc)
[docs]def ensure_object(filename, code_if_obj_not_found=None, glob=None, loc=None):
"""
Identify a pickle file to save a given object on it.
In case this file is present, read the object from it.
Otherwise, assume that the object is ready to be dumped and write
it in the file.
Args:
filename (str): the path of the file in which the object is
saved/loaded.
code_if_obj_not_found (str): the code to be evaluated if the object has
not been found.
globals (dict): the global variables dictionary
locals (dict): the local variables dictionary
Returns:
object: The result from the pickle file or from the code to be executed
"""
import pickle
try:
with open(filename, "rb") as ifile:
obj = pickle.load(ifile)
except Exception as e:
obj = execute_code_if(True, code_if_obj_not_found, glob=glob, loc=loc)
with open(filename, "wb") as ofile:
pickle.dump(obj, ofile)
return obj
[docs]def property_attribute(self, attribute, code_if_not_found):
obj = execute_code_if(not hasattr(self, attribute), code_if_not_found)
if obj is None:
obj = getattr(self, attribute)
return obj
[docs]def function_signature_regenerator(target_kwargs_function, fun_name='',
fun_docstring='', **kwargs):
'''
Generate the function of the name provided by `fun_name`,
with signature provided by the kwargs dictionary.
Args:
target_kwargs_function (func): keyword arguments function that will be
used for the generated function.
fun_name (str): name of the regenerated function. If empty it will be
the ``target_kwargs_functon.__name__`` prefixed by ``regenerated``,
which will be copied in the docstring of the regenerated function.
fun_docstring (str): docstring of the generated function, if empty it
will take the docstring from ``target_kwargs_function``.
**kwargs: keyword arguments which will represent the signature of the
generated function.
Example:
>>> def write_kwargs(**kwargs):
>>> """
>>> Convert keyword arguments into a string
>>> """
>>> return str(kwargs)
>>> write_opts=function_signature_regenerator(write_kwargs,
>>> fun_name='write_opts',
>>> opt1='default1',
>>> opt2='default2')
>>> help(write_opts)
>>> print (write_opts())
Help on function write_opts:
write_opts(opt1='default1', opt2='default2')
Convert keyword arguments into a string
{'opt1': 'default1', 'opt2': 'default2'}
'''
signature = option_line_generator(',', **kwargs).lstrip(',')
docstring = target_kwargs_function.__doc__ if not fun_docstring else fun_docstring
if docstring is None:
docstring = "Automatically generated function from the target function '" + target_kwargs_function.__name__ + "'"
docstring = ' """\n'+docstring+'\n """'
fname = "regenerated_" + target_kwargs_function.__name__ if not fun_name else fun_name
function = "def %s(%s):\n%s\n return target_function(**locals())" % (fname, signature, docstring)
gen_locals = {}
gen_object = compile(function, 'generated_fun', 'exec')
eval(gen_object, {'target_function': target_kwargs_function}, gen_locals)
return gen_locals[fname]
[docs]def option_line_generator(separator='--', **kwargs):
"""
Associate to each of the keyword arguments a command line argument.
Args:
separator (str): The string needed to separate the options.
Might be '--' for command-line arguments, but also ','
for function signatures.
Warning:
The separator comes **before** the first argument therefore pay
attention to lstrip it in case you want to use it as a function
signature string.
Example:
>>> option_line_generator(arg1='val1',arg2='val2')
'--arg1=val1 --arg2=val2'
"""
command = ''
for option, value in kwargs.items():
command += separator + option + '="' + str(value) + '" '
return command
[docs]def floatify(scalar):
"""
Useful to make float from strings compatible from fortran
Args:
scalar (str, float): When string representing a float that might be
given in fortran notation, otherwise it might be a floating point
Returns:
float. The value associated to scalar as a floating point number
Example:
>>> # this would be the same with "1.e-4" or with 0.0001
>>> floatify('1.d-4')
1.e-4
"""
if isinstance(scalar, str):
return float(scalar.replace('d', 'e').replace('D', 'E'))
else:
return scalar
[docs]def kw_pop(*args, **kwargs):
"""
Treatment of kwargs. Eliminate from kwargs the tuple in args.
Example:
>>> kwargs = {'one': 1, 'two': 2, 'three': 3}
>>> # Existing value, default ignored
>>> kw2, two_maybe = kw_pop('two', 100, **kwargs)
>>> print (kw2, two_maybe)
{'one': 1, 'three': 3}, 2
>>> # Not, existing value, default considered
>>> kw2, four_maybe = kw_pop('four', 4, **kwargs)
>>> print (kw2, four_maybe)
{'one': 1, 'two': 2, 'three': 3}, 4
"""
arg = kwargs.copy()
key, default = args
if key in arg:
return arg, arg.pop(key)
else:
return arg, default
[docs]def split_kwargs(kwargs, sub_kwargs):
"""Split a dictionary of kwargs from a subset.
Arguments:
kwargs (dict): the original dictionary.
sub_kwargs (dict): dictionary providing the {'key': default}
set of arguments which should be splitted from kwargs.
Returns:
tuple: (new_kw, sub_kw) the splitted dictionaries
"""
new_kw = kwargs
sub_kw = {}
for key, default in sub_kwargs.items():
new_kw, value = kw_pop(key, default, **new_kw)
sub_kw[key] = value
return new_kw, sub_kw
[docs]def find_files(regexp, archive=None):
"""
Returns a list of the paths to the files that follow the regular expression
regexp. They are searched from the current working directory or from an
archive given as optional argument.
:param regexp: A regular expression
:type regexp: string
:param archive: an opened tarfile archive (optional)
:type archive:
:returns: a list of all the paths that agree with the regexp
:rtype: list of strings
:raises: ValueError if the regexp does not find a single path.
Example::
#Find all python files in the current working directory
find_files('*py')
#An exmple outside of the current working directory
find_files('*/log-*.yaml')
#Example using a tarfile
import tarfile
my_archive = tarfile.open('archive.tar.gz')
find_files('*/*/log-*.yaml', archive=my_archive)
"""
import os
# Get a list of all paths to files satisfying the regexp
if archive is not None:
paths = _find_files_from_archive(regexp, archive)
else:
paths = os.popen('ls '+regexp).read().splitlines()
# Test that the regexp found files
if paths == []:
raise ValueError('The regexp "{}" leads to no file. '\
'Consider using another one.'.format(regexp))
else:
return paths
def _find_files_from_archive(re, archive):
"""
This function retrieves the list of Logfiles instances
from the file archived satisfying a regular expression.
#function to identify an archive out of its regexp,
#solves the bug in re for '*' (solved in Python 2.7.6)
"""
import tarfile
# Open the archive
with tarfile.open(archive, 'r') as arch:
# Return paths to logfiles satisfying the regexp
return [f for f in arch.getnames()
if all(pattern in f for pattern in re.split('*'))]
[docs]def ensure_copy(src, dest):
"""Copy src into dest.
Guarantees that the file ``dest`` is a copy of the file ``src``.
Args:
src (str): path of the source file. Should be valid.
dest (src): path of the destination file
Returns:
bool: ``True`` if the file needed to be copied, ``False`` if ``src``
and ``dest`` are identical
"""
import shutil
from os import path, stat
copied = False
if path.isfile(dest) and (stat(dest) != stat(src)) or not path.isfile(dest):
shutil.copy2(src, path.dirname(dest))
copied = True
return copied
[docs]def version_is_compatible(desired_version, present_version):
"""Assess the compatibility of a version id with a reference.
Args:
desired_version (str): the version which is used as the reference,
in the format "x.y", "x.y.z", "x.y.z.w", ...
present_version (str): the version to be tested against.
Returns:
bool: True if all the version numbers of the present version are
of lower number than the desired one.
"""
desired = list(map(int, desired_version.split('.')))
obtained = list(map(int, present_version.split('.')))
not_ok = obtained[0] != desired[0]
if not not_ok:
not_ok = any([ll < m for ll, m in zip(desired[1:], obtained[1:])])
return not not_ok
[docs]class ObjectSerialization():
"""Serialization of a class into an archive.
This class can be employed each time that we would like to identify the
minimal set of files needed to instantiate an object.
Files and objects are put in an archive that can in this way be used
to retrieve and reinstantiate the object.
Args:
obj (object): a instance of the object to be serailized. Necessary
in case some attributes of it have to be included in the
serialization.
version (str): the version of the desired serialization.
If absent, the default is considered.
files (dict): dictionary of the files to be included.
The dictionary should be of the form {<filename>: <abspath>}
or rather {<filename>: {'archive':<archive_path>, 'file':<member>}}
in which case the file <member> of the archive <archive_path>
will be included. The file will be included in the serialization
as <filename>.
"""
version = '1.0' #: version of the serialization
#: This attributes are associated to the class attributes which will be
# employed to serialize the object. It should be a dictionary where the
# keys are the version employed and the values are the element of the
# class to employ
cached_attributes = {'1.0': {}}
def __init__(self, obj=None, files=None, version=None):
if files is not None:
self.files = files
self.true_version = version if version is not None else self.version
for version, attributes in self.cached_attributes.items():
if not version_is_compatible(self.true_version, version):
continue
for attr, trueattr in attributes.items():
setattr(self, attr, getattr(obj, trueattr))
[docs] def dump(self, archive, extra_encoder_functions=[]):
"""
Create an archive with the entire set of information of the
Serialization. Such a tarfile should be such that the same
analysis of the object is possible
Args:
archive (str): path of the archive to serialize the object to.
extra_encoder_functions (list): see `:py:func:serialize_objects`.
"""
dictionaries = {}
for version, attrs in self.cached_attributes.items():
if not version_is_compatible(self.true_version, version):
continue
dictionaries.update({att+'.json': getattr(self, att)
for att in attrs})
objects = serialize_objects(
dictionaries, extra_encoder_functions=extra_encoder_functions)
create_tarball(archive, self.files, objects)
[docs] @classmethod
def load(cls, archive, init_function, tmpdir_prefix='',
load_functions={}, serialization_version=None, **kwargs):
"""
Create a class instance from a serialized archive.
Args:
archive (str): the path of the archive
init_function(func): function that should be called at the
class instantiation
load_functions (dict): dictionary of load functions per
cached attribute. The function signature is the class instance,
the files list, and the attribute.
serialization_version (str): version of the load
tmpdir_prefix (str): prefix to be added to the temporary directory
**kwargs: other arguments that have to be passed to the
init_function
Returns:
Instance of the class
"""
from futile.Utils import unpack_tarball
from shutil import rmtree
from os.path import join
tmpdir, files = unpack_tarball(archive, tmpdir_prefix=tmpdir_prefix)
allfiles = [join(tmpdir, f) for f in files]
loaded = init_function(cls, allfiles, **kwargs)
for version, attrs in cls.cached_attributes.items():
if serialization_version is not None:
desired = serialization_version
else:
desired = cls.version
if not version_is_compatible(desired, version):
continue
for attr in attrs:
value = load_functions.get(attr, lambda x: None)(loaded,
allfiles,
attr)
if value is not None:
setattr(loaded, attr, value)
rmtree(tmpdir)
return loaded
[docs]def ensure_dir(file_path):
"""
Guarantees the existance on the directory given by the (relative) file_path
Args:
file_path (str): path of the directory to be created
Returns:
bool: True if the directory needed to be created,
False if it existed already or if an error happened during the
creation.
"""
from os import mkdir
try:
mkdir(file_path)
created = True
except FileExistsError:
created = False
return created
if __name__ == '__main__':
import os
# Tests of the find_files function
#
print("Test finding all python files in this directory")
print(find_files("*py"))
print()
#
print("Test finding the Utils.py file in this directory")
print(find_files("Utils.py"))
print()
#
print("Test raising a ValueError because the regexp leads to no files")
try:
find_files('*html')
except ValueError as e:
print('This raised the following ValueError:')
print(e)
print()
#
print("Test raising an exception because there is no such archive.")
fname = 'file.tar.gz'
if fname in os.popen('ls'):
os.system('rm '+fname)
# os.system('rm '+fname)
try:
find_files('*py', archive=fname)
except Exception as e:
# print(dir(e))
print('This raised the following Exception:')
print(e)
print()
#
print("Test without error using an archive")
os.system('find * -name "*py" | tar -zcvf '+fname+' -T -')
os.system('ls '+fname)
find_files('*py', archive=fname)
os.system('rm '+fname)
[docs]def tarfile_is_coherent(filename):
"""Checks the coherency of the tarfile.
Args:
filename(str): path of the tarfile.
Returns:
bool: True if the tarfile is good to go.
Raises:
Exception if the tarfile is not valid.
"""
from os.path import getsize, isfile
import tarfile
if not isfile(filename):
raise IOError('Tarfile not existing')
if getsize(filename) <= 0:
raise IOError('Zerosize tarfile')
tardude = tarfile.open(filename)
members = tardude.getmembers()
for member_info in members:
check = tardude.extractfile(member_info.name)
tardude.close()
return True
[docs]def create_tarball(filename, files, objects={}):
"""
Assemble files and objects in a tarball
Args:
filename (str): the name of the archive. Determine the tarball
compression method from its extension.
files(dict,set): file paths that have to be included in the tarball.
If it is a dict, it should be in the form "{arcname : file}",
where `file` is the path of the file to be put, and `arcname`
is the name of thefile that would be used in the archive.
If it is a set, the file will preserve its name in the archive
objects(dict): dictionary '{arcname: buffer}' of the buffers that
will have to be serialized in the `arcname` position
the buffers are given as `class::io.BytesIO` instance,
following specification of the `func:serialize_objects` function.
"""
import tarfile
from os.path import splitext
from time import time
extension = splitext(filename)[-1].lstrip('.')
arch = tarfile.open(filename, mode='w:'+extension)
isdict = isinstance(files, dict)
oldtars = {}
for arcname in files:
if isdict and isinstance(files[arcname], dict):
oldtars.setdefault(files[arcname]['archive'], []).append(
files[arcname]['file'])
continue
name = files[arcname] if isdict else arcname
arch.add(name=name, arcname=arcname)
for oldtar in oldtars:
oldarch = tarfile.open(oldtar)
for member in oldarch.getmembers():
if member.name in oldtars[oldtar]:
arch.addfile(member, oldarch.extractfile(member.name))
for arcname, string in objects.items():
tarinfo = tarfile.TarInfo(arcname)
tarinfo.size = len(string.getvalue())
tarinfo.mtime = time()
arch.addfile(tarinfo=tarinfo, fileobj=string)
string.seek(0) # restore the position for subsequent usage
arch.close()
[docs]def untar_archive(archive, dest='.'):
"""Untar the archive in the destination directory.
Arguments:
archive (str): path of the file to untar.
dest (str): destination directory. Create if not exists.
Returns:
list: list of the files contained in the tarball.
"""
import tarfile
ensure_dir(dest)
# extract the archive
arch = tarfile.open(archive)
arch.extractall(path=dest)
files = arch.getnames()
arch.close()
return files
[docs]def unpack_tarball(archive, tmpdir_prefix='tmp_'):
"""
Open an archive in a temporary directory
Args:
archive (str): the path of th archive to open
tmpdir_prefix (str): prefix of the temporary directory to untar the
archive to.
Returns:
tuple: tmpdir, files path of the temporary directory and names of
the files extracted by the tarfile
"""
import tempfile
from os.path import basename
# creates the directory
tmpdir = tempfile.mkdtemp(prefix=tmpdir_prefix + basename(archive) + '_')
files = untar_archive(archive, dest=tmpdir)
# # extract the archive
# arch = tarfile.open(archive)
# arch.extractall(path=tmpdir)
# files = arch.getnames()
# arch.close()
return tmpdir, files
[docs]def serialize_objects(objects, extra_encoder_functions=[]):
"""
Convert a dictionary of objects into buffers.
Employs json serialization into `StringIO` instances
Args:
objects(dict): dictionary of key/value pair of objects to be serialized
extra_encoder_functions (list)): list of dictionaries of the format
{'cls': Class, 'func': function} which is employed in the
serialization
Returns:
dict: dictionary of key/buffer pairs
"""
from io import BytesIO
import json
class CustomEncoder(json.JSONEncoder):
""" Special json encoder for numpy types, customizable"""
def default(self, obj):
import numpy as np
if isinstance(obj, (np.int_, np.intc, np.intp, np.int8,
np.int16, np.int32, np.int64, np.uint8,
np.uint16, np.uint32, np.uint64)):
return int(obj)
elif isinstance(obj, (np.float_, np.float16, np.float32,
np.float64)):
return float(obj)
elif isinstance(obj, (np.ndarray,)):
return obj.tolist()
elif isinstance(obj, (set,)):
return list(obj)
else:
for spec in extra_encoder_functions:
if isinstance(obj, (spec['cls'],)):
return spec['func'](obj)
return json.JSONEncoder.default(self, obj)
return {key: BytesIO((json.dumps(obj, cls=CustomEncoder)).encode("utf-8"))
for key, obj in objects.items()}
[docs]def execute(*args):
from subprocess import check_output, CalledProcessError, STDOUT
print('Executing: '+' '.join(args))
try:
result = check_output(args, stderr=STDOUT)
print(result.decode('utf-8'))
except CalledProcessError as e:
print('Error Occurred: ', '\n', e.output.decode())
[docs]def get_sha_and_size(filename):
with open(filename) as ifile:
for line in ifile.readlines():
if 'sha256' in line:
sha=line.split(':')[-1].rstrip('\n')
if 'size' in line:
size=line.split()[-1].rstrip('\n')
return sha,size
[docs]def get_curl_command(sha,size,repo):
basecurl=['curl', '-X', 'POST', '-H','"Accept: application/vnd.git-lfs+json"',
'-H','"Content-type: application/json"',
'-d']
datas='{"operation": "download", "transfer": ["basic"], "objects": [{"oid": "'+sha+'", "size": '+size+'}]}'
repos=repo+".git/info/lfs/objects/batch"
return basecurl+["'"+datas+"'",repos,'-o','href.json','--http1.1']
[docs]def data_path(archive, dest='.', path='datalake',
branch='main', github_repo='BigDFT-group/resources'):
from os import system, path as p, remove
from json import load
repo_url='https://github.com/'+github_repo
data_url='/'.join(['https://raw.githubusercontent.com',
github_repo, branch, path])
lfs='lfs.info'
execute('wget', p.join(data_url,archive), '-O', lfs)
system(" ".join(get_curl_command(*get_sha_and_size(lfs),repo_url)))
with open('href.json') as jfile:
href_d=load(jfile)
url=href_d['objects'][0]['actions']['download']['href']
ensure_dir(dest)
execute('wget', url, '-O', p.join( dest, p.basename(archive)))
remove(lfs)
remove('href.json')