"""Manage assets used during computation."""
import base64
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Dict, Generic, Optional, Type, TypeVar
import yaml
from .access import AccessConfigs
from .items import Header, Operator
NBYTES = 8
ENDIANNESS = "little"
HEADER_EXT = ".yaml"
ARRAY_EXT = [".npy", ".npz"]
COMPRESSED_EXT = ".lz4"
OPERATOR_EXT = [ext + COMPRESSED_EXT for ext in ARRAY_EXT]
[docs]
class LookupError(ValueError):
"""Failure in content retrieval from inventory."""
[docs]
def encode(header: Header):
"""Extract an hash from a header."""
return base64.urlsafe_b64encode(
abs(hash(header)).to_bytes(NBYTES, byteorder=ENDIANNESS)
).decode(encoding="utf-8")
[docs]
def operator_name(header: Header, err: bool):
"""Determine operator file name, from the associated header."""
stem = encode(header)
return stem + OPERATOR_EXT[1 if err else 0]
H = TypeVar("H", bound=Header)
[docs]
@dataclass(frozen=True)
class Inventory(Generic[H]):
"""Assets manager.
In particular, manage autosave, autoload, and memory caching.
"""
path: Path
access: AccessConfigs
header_type: Type[Header]
cache: Dict[H, Optional[Operator]] = field(default_factory=dict)
contentless: bool = False
name: Optional[str] = None
"""Only for logging purpose."""
def __str__(self) -> str:
return f"Inventory '{self.name}'"
[docs]
def lookup(self, stem: str, header: bool = False) -> Path:
"""Look up for content path in inventory."""
EXT = OPERATOR_EXT if not header else [HEADER_EXT]
found = [
path
for path in self.path.iterdir()
if path.name.startswith(stem)
if "".join(path.suffixes) in EXT
]
if len(found) == 0:
raise LookupError(f"Item '{stem}' not available in {self}.")
elif len(found) > 1:
raise LookupError(
f"Too many items associated to '{stem}' in {self}:\n{found}"
)
return found[0]
def __getitem__(self, header: H) -> Optional[Operator]:
r"""Retrieve operator for given header.
If the operator is not already in memory, it will be automatically
loaded.
"""
self.access.assert_open()
try:
op = self.cache[header]
if op is not None or self.contentless:
return op
except KeyError:
pass
stem = encode(header)
# in case of contentless, check header availability instead
if self.contentless:
self.lookup(stem, header=True)
self.cache[header] = None
return None
# for contentful inventories, check operator availability
oppath = self.lookup(stem)
with open(oppath, "rb") as fd:
op = Operator.load(fd)
self.cache[header] = op
return op
def __setitem__(self, header: H, operator: Optional[Operator]):
"""Set operator for given header.
Header and operator are automatically dumped on disk.
"""
self.access.assert_writeable()
# always save the header on disk
headpath = self.path / header_name(header)
headpath.write_text(yaml.dump(asdict(header)), encoding="utf-8")
# in case of contentless, set empty cache and exit
if self.contentless:
self.cache[header] = None
return
# otherwise save also the operator, and add to the cache
assert operator is not None
with_err = operator.error is not None
oppath = self.path / operator_name(header, err=with_err)
with open(oppath, "wb") as fd:
operator.save(fd)
self.cache[header] = operator
def __delitem__(self, header: H):
"""Drop operator from memory.
Irrelevant for contentless inventories.
Note
----
This method only drops the operator from memory, and it's not expected
to do anything else.
Autosave is done on set, and explicit saves are performed by the
computation functions.
If a further explicit save is required, repeat explicit assignment::
inventory[header] = inventory[header]
This is only useful if the operator has been mutated in place, that in
general should be avoided, since the operator should only be the result
of a full computation or a library manipulation.
"""
self.cache[header] = None
def __iter__(self):
"""Iterate over loaded content.
This iteration is only over cache, so it might not be faithful with
respect to the real content on disk.
To iterate the full content of the disk, just call right before
:meth:`sync`.
"""
yield from self.cache
def __len__(self):
"""Return the number of elements in the cache."""
return len(self.cache)
[docs]
def sync(self):
"""Sync the headers in the cache with the content on disk.
In particular, headers on disk that are missing in the :attr:`cache`
are added to it, without loading actual operators in memory.
Despite the name, the operation is non-destructive, so, even if cache
has been abused, nothing will be deleted nor unloaded.
"""
for path in self.path.iterdir():
if path.suffix != HEADER_EXT:
continue
header = self.header_type(
**yaml.safe_load(path.read_text(encoding="utf-8"))
)
self.cache[header] = None
def __invert__(self):
"""Alias for :meth:`sync`."""
self.sync()
[docs]
def empty(self):
"""Empty the in-memory cache."""
for header in self.cache:
del self[header]