Skip to content

laila

laila

Laila top-level package.

Provides the public API for policy management, subsystem access (laila.memory, laila.command, laila.communication), future lifecycle helpers, and argument loading.

AUTO_INITIALIZE_POLICY = True module-attribute

bool(x) -> bool

Returns True when the argument x is true, False otherwise. The builtins True and False are the only two instances of the class bool. The class bool is a subclass of the class int, and cannot be subclassed.

LAILA_DEFAULT_DIRECTORIES = {'root': '/home/ubuntu/.laila', 'pools': '/home/ubuntu/.laila/pools', 'logs': '/home/ubuntu/.laila/logs', 'secrets': '/home/ubuntu/.laila/secrets'} module-attribute

dict() -> new empty dictionary dict(mapping) -> new dictionary initialized from a mapping object's (key, value) pairs dict(iterable) -> new dictionary initialized as if via: d = {} for k, v in iterable: d[k] = v dict(**kwargs) -> new dictionary initialized with the name=value pairs in the keyword argument list. For example: dict(one=1, two=2)

LAILA_UNIVERSAL_NAMESPACE = UUID('6c8dcd4c-d490-58bc-81a7-6afbe8d17594') module-attribute

Instances of the UUID class represent UUIDs as specified in RFC 4122. UUID objects are immutable, hashable, and usable as dictionary keys. Converting a UUID to a string with str() yields something in the form '12345678-1234-1234-1234-123456789abc'. The UUID constructor accepts five possible forms: a similar string of hexadecimal digits, or a tuple of six integer fields (with 32-bit, 16-bit, 16-bit, 8-bit, 8-bit, and 48-bit values respectively) as an argument named 'fields', or a string of 16 bytes (with all the integer fields in big-endian order) as an argument named 'bytes', or a string of 16 bytes (with the first three fields in little-endian order) as an argument named 'bytes_le', or a single 128-bit integer as an argument named 'int'.

UUIDs have these read-only attributes:

bytes       the UUID as a 16-byte string (containing the six
            integer fields in big-endian byte order)

bytes_le    the UUID as a 16-byte string (with time_low, time_mid,
            and time_hi_version in little-endian byte order)

fields      a tuple of the six integer fields of the UUID,
            which are also available as six individual attributes
            and two derived attributes:

    time_low                the first 32 bits of the UUID
    time_mid                the next 16 bits of the UUID
    time_hi_version         the next 16 bits of the UUID
    clock_seq_hi_variant    the next 8 bits of the UUID
    clock_seq_low           the next 8 bits of the UUID
    node                    the last 48 bits of the UUID

    time                    the 60-bit timestamp
    clock_seq               the 14-bit sequence number

hex         the UUID as a 32-character hexadecimal string

int         the UUID as a 128-bit integer

urn         the UUID as a URN as specified in RFC 4122

variant     the UUID variant (one of the constants RESERVED_NCS,
            RFC_4122, RESERVED_MICROSOFT, or RESERVED_FUTURE)

version     the UUID version number (1 through 5, meaningful only
            when the variant is RFC_4122)

is_safe     An enum indicating whether the UUID has been generated in
            a way that is safe for multiprocessing applications, via
            uuid_generate_time_safe(3).

__cached__ = '/home/ubuntu/laila-core/__pycache__/__init__.cpython-312.pyc' module-attribute

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__doc__ = 'Laila top-level package.\n\nProvides the public API for policy management, subsystem access\n(``laila.memory``, ``laila.command``, ``laila.communication``),\nfuture lifecycle helpers, and argument loading.\n' module-attribute

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__file__ = '/home/ubuntu/laila-core/__init__.py' module-attribute

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__name__ = 'laila' module-attribute

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__package__ = 'laila' module-attribute

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

__path__ = ['/home/ubuntu/laila-core'] module-attribute

Built-in mutable sequence.

If no argument is given, the constructor creates a new empty list. The argument must be an iterable if specified.

arg_reader = <laila.utils.args.args.ArgReader object at 0x7e0d22501910> module-attribute

Load runtime args from simple key/value sources into the target mapping (defaults to laila.args). All load / from_* methods mutate the target in place and return None.

guarantee = <laila.utils.guarantee._Guarantee object at 0x7e0d272f35c0> module-attribute

Synchronous context manager that waits for futures created inside its scope.

On exit, every future registered while the context was active is waited on. If any future raised, the first exception is re-raised.

guarantee_async = <laila.utils.guarantee._AsyncGuarantee object at 0x7e0d272f35f0> module-attribute

Asynchronous context manager that awaits futures created inside its scope.

A background watcher task monitors child futures and propagates the first exception by cancelling the parent task.

ArgReader

Load runtime args from simple key/value sources into the target mapping (defaults to laila.args). All load / from_* methods mutate the target in place and return None.

Source code in utils/args/args.py
class ArgReader:
    """
    Load runtime args from simple key/value sources into the target mapping
    (defaults to ``laila.args``). All ``load`` / ``from_*`` methods mutate
    the target in place and return ``None``.
    """

    # Supported sources: .env, .json, .toml, .xml, or ``terminal`` (``key=value`` tokens).

    def __init__(self, target: Optional[Any] = None):
        """Initialise the reader.

        Parameters
        ----------
        target : Any, optional
            Object to set attributes on.  Defaults to ``laila.args``.
        """
        self._target = target

    def _target_map(self) -> Any:
        """Return the target mapping, falling back to ``laila.args``."""
        if self._target is not None:
            return self._target
        import laila  # lazy import to avoid circular import at module load

        return laila.args

    @staticmethod
    def _coerce_scalar(value: Any) -> Any:
        """Coerce a string value to its most specific Python scalar type."""
        if not isinstance(value, str):
            return value

        stripped = value.strip()
        lowered = stripped.lower()
        if lowered in {"true", "false"}:
            return lowered == "true"
        if lowered in {"none", "null"}:
            return None

        try:
            return int(stripped)
        except ValueError:
            pass

        try:
            return float(stripped)
        except ValueError:
            pass

        if (stripped.startswith("{") and stripped.endswith("}")) or (
            stripped.startswith("[") and stripped.endswith("]")
        ):
            try:
                return json.loads(stripped)
            except Exception:
                return value

        if (
            (stripped.startswith('"') and stripped.endswith('"'))
            or (stripped.startswith("'") and stripped.endswith("'"))
        ) and len(stripped) >= 2:
            return stripped[1:-1]

        return value

    @classmethod
    def _flatten_one_level(cls, payload: Dict[str, Any]) -> Dict[str, Any]:
        """Flatten nested dicts one level, joining keys with ``_``."""
        out: Dict[str, Any] = {}
        for key, value in payload.items():
            if isinstance(value, dict):
                for sub_key, sub_value in value.items():
                    out[f"{key}_{sub_key}"] = cls._coerce_scalar(sub_value)
            else:
                out[key] = cls._coerce_scalar(value)
        return out

    def _apply(self, payload: Dict[str, Any]) -> None:
        """Flatten and set each key/value pair on the target."""
        flat = self._flatten_one_level(payload)
        target = self._target_map()
        for key, value in flat.items():
            setattr(target, key, value)

    def clear(self) -> None:
        """Remove all keys from the target mapping."""
        target = self._target_map()
        if hasattr(target, "keys"):
            for key in list(target.keys()):
                try:
                    delattr(target, key)
                except Exception:
                    pass

    def from_json(self, path: str | Path) -> None:
        """Load arguments from a JSON file.

        Parameters
        ----------
        path : str or Path
            Path to the ``.json`` file.
        """
        with open(path, "r", encoding="utf-8") as f:
            data = json.load(f)
        if not isinstance(data, dict):
            raise ValueError("JSON args file must be a key/value object.")
        self._apply(data)

    def from_toml(self, path: str | Path) -> None:
        """Load arguments from a TOML file.

        Parameters
        ----------
        path : str or Path
            Path to the ``.toml`` file.
        """
        try:
            import tomllib  # Python 3.11+
        except ImportError as e:
            raise ImportError("tomllib is required for TOML parsing.") from e

        with open(path, "rb") as f:
            data = tomllib.load(f)
        if not isinstance(data, dict):
            raise ValueError("TOML args file must be a key/value table.")
        self._apply(data)

    def from_env(self, path: str | Path) -> None:
        """Load arguments from a ``.env``-style file.

        Parameters
        ----------
        path : str or Path
            Path to the ``.env`` file.
        """
        parsed: Dict[str, Any] = {}
        with open(path, "r", encoding="utf-8") as f:
            for raw in f:
                line = raw.strip()
                if not line or line.startswith("#"):
                    continue
                if "=" not in line:
                    continue
                k, v = line.split("=", 1)
                parsed[k.strip()] = self._coerce_scalar(v.strip())
        self._apply(parsed)

    def from_xml(self, path: str | Path) -> None:
        """Load arguments from an XML file.

        Parameters
        ----------
        path : str or Path
            Path to the ``.xml`` file.
        """
        tree = ET.parse(path)
        root = tree.getroot()
        parsed: Dict[str, Any] = {}
        for child in root:
            children = list(child)
            if children:
                parsed[child.tag] = {gc.tag: (gc.text or "") for gc in children}
            else:
                parsed[child.tag] = child.text or ""
        self._apply(parsed)

    def from_terminal(self, args: Optional[Iterable[str]] = None) -> None:
        """Load arguments from ``key=value`` command-line tokens.

        Parameters
        ----------
        args : Iterable[str], optional
            Tokens to parse.  Defaults to ``sys.argv[1:]``.
        """
        tokens = list(sys.argv[1:] if args is None else args)
        parsed: Dict[str, Any] = {}
        for token in tokens:
            if "=" not in token:
                continue
            k, v = token.split("=", 1)
            key = k.strip()
            if not key:
                continue
            parsed[key] = self._coerce_scalar(v.strip())
        self._apply(parsed)

    def load(self, source: str | Path, *, terminal_args: Optional[Iterable[str]] = None) -> None:
        """Auto-detect format and load arguments.

        Parameters
        ----------
        source : str or Path
            File path (suffix selects format) or the literal ``"terminal"``.
        terminal_args : Iterable[str], optional
            Passed to :meth:`from_terminal` when *source* is ``"terminal"``.

        Raises
        ------
        ValueError
            If the file suffix is not supported.
        """
        p = Path(source)
        suffix = p.suffix.lower()
        if suffix == ".json":
            self.from_json(p)
            return
        if suffix == ".toml":
            self.from_toml(p)
            return
        if suffix == ".env":
            self.from_env(p)
            return
        if suffix == ".xml":
            self.from_xml(p)
            return
        if str(source).lower() == "terminal":
            self.from_terminal(terminal_args)
            return
        raise ValueError(f"Unsupported args source: {source}")

__init__(target=None)

Initialise the reader.

Parameters:

Name Type Description Default
target Any

Object to set attributes on. Defaults to laila.args.

None
Source code in utils/args/args.py
def __init__(self, target: Optional[Any] = None):
    """Initialise the reader.

    Parameters
    ----------
    target : Any, optional
        Object to set attributes on.  Defaults to ``laila.args``.
    """
    self._target = target

clear()

Remove all keys from the target mapping.

Source code in utils/args/args.py
def clear(self) -> None:
    """Remove all keys from the target mapping."""
    target = self._target_map()
    if hasattr(target, "keys"):
        for key in list(target.keys()):
            try:
                delattr(target, key)
            except Exception:
                pass

from_json(path)

Load arguments from a JSON file.

Parameters:

Name Type Description Default
path str or Path

Path to the .json file.

required
Source code in utils/args/args.py
def from_json(self, path: str | Path) -> None:
    """Load arguments from a JSON file.

    Parameters
    ----------
    path : str or Path
        Path to the ``.json`` file.
    """
    with open(path, "r", encoding="utf-8") as f:
        data = json.load(f)
    if not isinstance(data, dict):
        raise ValueError("JSON args file must be a key/value object.")
    self._apply(data)

from_toml(path)

Load arguments from a TOML file.

Parameters:

Name Type Description Default
path str or Path

Path to the .toml file.

required
Source code in utils/args/args.py
def from_toml(self, path: str | Path) -> None:
    """Load arguments from a TOML file.

    Parameters
    ----------
    path : str or Path
        Path to the ``.toml`` file.
    """
    try:
        import tomllib  # Python 3.11+
    except ImportError as e:
        raise ImportError("tomllib is required for TOML parsing.") from e

    with open(path, "rb") as f:
        data = tomllib.load(f)
    if not isinstance(data, dict):
        raise ValueError("TOML args file must be a key/value table.")
    self._apply(data)

from_env(path)

Load arguments from a .env-style file.

Parameters:

Name Type Description Default
path str or Path

Path to the .env file.

required
Source code in utils/args/args.py
def from_env(self, path: str | Path) -> None:
    """Load arguments from a ``.env``-style file.

    Parameters
    ----------
    path : str or Path
        Path to the ``.env`` file.
    """
    parsed: Dict[str, Any] = {}
    with open(path, "r", encoding="utf-8") as f:
        for raw in f:
            line = raw.strip()
            if not line or line.startswith("#"):
                continue
            if "=" not in line:
                continue
            k, v = line.split("=", 1)
            parsed[k.strip()] = self._coerce_scalar(v.strip())
    self._apply(parsed)

from_xml(path)

Load arguments from an XML file.

Parameters:

Name Type Description Default
path str or Path

Path to the .xml file.

required
Source code in utils/args/args.py
def from_xml(self, path: str | Path) -> None:
    """Load arguments from an XML file.

    Parameters
    ----------
    path : str or Path
        Path to the ``.xml`` file.
    """
    tree = ET.parse(path)
    root = tree.getroot()
    parsed: Dict[str, Any] = {}
    for child in root:
        children = list(child)
        if children:
            parsed[child.tag] = {gc.tag: (gc.text or "") for gc in children}
        else:
            parsed[child.tag] = child.text or ""
    self._apply(parsed)

from_terminal(args=None)

Load arguments from key=value command-line tokens.

Parameters:

Name Type Description Default
args Iterable[str]

Tokens to parse. Defaults to sys.argv[1:].

None
Source code in utils/args/args.py
def from_terminal(self, args: Optional[Iterable[str]] = None) -> None:
    """Load arguments from ``key=value`` command-line tokens.

    Parameters
    ----------
    args : Iterable[str], optional
        Tokens to parse.  Defaults to ``sys.argv[1:]``.
    """
    tokens = list(sys.argv[1:] if args is None else args)
    parsed: Dict[str, Any] = {}
    for token in tokens:
        if "=" not in token:
            continue
        k, v = token.split("=", 1)
        key = k.strip()
        if not key:
            continue
        parsed[key] = self._coerce_scalar(v.strip())
    self._apply(parsed)

load(source, *, terminal_args=None)

Auto-detect format and load arguments.

Parameters:

Name Type Description Default
source str or Path

File path (suffix selects format) or the literal "terminal".

required
terminal_args Iterable[str]

Passed to :meth:from_terminal when source is "terminal".

None

Raises:

Type Description
ValueError

If the file suffix is not supported.

Source code in utils/args/args.py
def load(self, source: str | Path, *, terminal_args: Optional[Iterable[str]] = None) -> None:
    """Auto-detect format and load arguments.

    Parameters
    ----------
    source : str or Path
        File path (suffix selects format) or the literal ``"terminal"``.
    terminal_args : Iterable[str], optional
        Passed to :meth:`from_terminal` when *source* is ``"terminal"``.

    Raises
    ------
    ValueError
        If the file suffix is not supported.
    """
    p = Path(source)
    suffix = p.suffix.lower()
    if suffix == ".json":
        self.from_json(p)
        return
    if suffix == ".toml":
        self.from_toml(p)
        return
    if suffix == ".env":
        self.from_env(p)
        return
    if suffix == ".xml":
        self.from_xml(p)
        return
    if str(source).lower() == "terminal":
        self.from_terminal(terminal_args)
        return
    raise ValueError(f"Unsupported args source: {source}")

DefaultCentralCommand

Bases: _LAILA_CLI_CAPABLE_CLASS, _LAILA_IDENTIFIABLE_OBJECT

Central command that manages task-forces, future submission, and guarantees.

Source code in policy/central/command/schema/base.py
class _LAILA_IDENTIFIABLE_CENTRAL_COMMAND(_LAILA_CLI_CAPABLE_CLASS, _LAILA_IDENTIFIABLE_OBJECT):
    """Central command that manages task-forces, future submission, and guarantees."""

    _scopes: list[str] = PrivateAttr(default_factory=lambda: list([_CENTRAL_COMMAND_SCOPE]))


    taskforces: Dict[str, Any] = CLIExempt(default_factory=dict)
    alpha_taskforce: Optional[str] = None
    policy_id: Optional[_LAILA_IDENTIFIABLE_OBJECT | str] = CLIExempt(default=None)
    _guarantee_local: threading.local = PrivateAttr(default_factory=threading.local)


    def model_post_init(self, __context: Any) -> None:
        """Create a default task-force and set it as alpha if none provided."""
        if len(self.taskforces) == 0:
            from .....macros.defaults import DefaultTaskForce
            taskforce = DefaultTaskForce(policy_id=self.policy_id)
            self.taskforces[taskforce.global_id] = taskforce

        if self.alpha_taskforce is None:
            self.alpha_taskforce = next(iter(self.taskforces))

        return self


    def add_taskforce(
        self, 
        taskforce: Any,
    ):
        """Register a task-force with this central command.

        Parameters
        ----------
        taskforce : _LAILA_IDENTIFIABLE_TASK_FORCE
            The task-force instance to register.
        """
        self.taskforces[taskforce.global_id] = taskforce

    def _guarantee_stack(self) -> list[Dict[str, Future]]:
        """Return the thread-local guarantee scope stack, creating it if needed."""
        stack = getattr(self._guarantee_local, "stack", None)
        if stack is None:
            stack = []
            self._guarantee_local.stack = stack
        return stack

    def _guarantee_enter(self) -> None:
        """Push a new empty scope onto the guarantee stack."""
        self._guarantee_stack().append({})

    def _guarantee_exit(self) -> list[Future]:
        """Pop the current guarantee scope and return its futures."""
        stack = self._guarantee_stack()
        if not stack:
            return []
        return list(stack.pop().values())

    def _register_future_with_active_guarantees(self, future: Any) -> None:
        """Record *future* in every open guarantee scope on this thread."""
        stack = getattr(self._guarantee_local, "stack", None)
        if not stack:
            return

        for scope_futures in stack:
            scope_futures[future.global_id] = future


    def submit(
        self,
        tasks: Iterable[Callable[[], Any]],
        wait: bool = False,
        *,
        taskforce_id: Optional[str] = None
    ) -> Union[Future, List[Any], Any]:
        """Submit tasks to a task-force for execution.

        Parameters
        ----------
        tasks : Iterable[Callable[[], Any]]
            Zero-arg callables to execute.
        wait : bool, optional
            If ``True``, block until all tasks complete and return results.
        taskforce_id : str, optional
            Target task-force; defaults to the alpha task-force.

        Returns
        -------
        Future or list[Any] or Any
            A future (or group future) when *wait* is ``False``, otherwise
            the return value(s).
        """
        if taskforce_id is None:
            taskforce_id = self.alpha_taskforce

        return self.taskforces[taskforce_id].submit(
            tasks = tasks,
            wait = wait,
        )


    def shutdown(
        self, 
        wait: bool = True, 
        cancel_pending: bool = False
    ) -> None:
        """Shut down all registered task-forces.

        Parameters
        ----------
        wait : bool, optional
            Block until workers finish.
        cancel_pending : bool, optional
            Cancel queued but un-started tasks.
        """
        for tf in self.taskforces.values():
            tf.shutdown(
                wait = wait,
                cancel_pending = cancel_pending
            )

    def __await__(self):
        """Async awaiting is not yet supported."""
        raise NotImplementedError

model_post_init(__context)

Create a default task-force and set it as alpha if none provided.

Source code in policy/central/command/schema/base.py
def model_post_init(self, __context: Any) -> None:
    """Create a default task-force and set it as alpha if none provided."""
    if len(self.taskforces) == 0:
        from .....macros.defaults import DefaultTaskForce
        taskforce = DefaultTaskForce(policy_id=self.policy_id)
        self.taskforces[taskforce.global_id] = taskforce

    if self.alpha_taskforce is None:
        self.alpha_taskforce = next(iter(self.taskforces))

    return self

add_taskforce(taskforce)

Register a task-force with this central command.

Parameters:

Name Type Description Default
taskforce _LAILA_IDENTIFIABLE_TASK_FORCE

The task-force instance to register.

required
Source code in policy/central/command/schema/base.py
def add_taskforce(
    self, 
    taskforce: Any,
):
    """Register a task-force with this central command.

    Parameters
    ----------
    taskforce : _LAILA_IDENTIFIABLE_TASK_FORCE
        The task-force instance to register.
    """
    self.taskforces[taskforce.global_id] = taskforce

submit(tasks, wait=False, *, taskforce_id=None)

Submit tasks to a task-force for execution.

Parameters:

Name Type Description Default
tasks Iterable[Callable[[], Any]]

Zero-arg callables to execute.

required
wait bool

If True, block until all tasks complete and return results.

False
taskforce_id str

Target task-force; defaults to the alpha task-force.

None

Returns:

Type Description
Future or list[Any] or Any

A future (or group future) when wait is False, otherwise the return value(s).

Source code in policy/central/command/schema/base.py
def submit(
    self,
    tasks: Iterable[Callable[[], Any]],
    wait: bool = False,
    *,
    taskforce_id: Optional[str] = None
) -> Union[Future, List[Any], Any]:
    """Submit tasks to a task-force for execution.

    Parameters
    ----------
    tasks : Iterable[Callable[[], Any]]
        Zero-arg callables to execute.
    wait : bool, optional
        If ``True``, block until all tasks complete and return results.
    taskforce_id : str, optional
        Target task-force; defaults to the alpha task-force.

    Returns
    -------
    Future or list[Any] or Any
        A future (or group future) when *wait* is ``False``, otherwise
        the return value(s).
    """
    if taskforce_id is None:
        taskforce_id = self.alpha_taskforce

    return self.taskforces[taskforce_id].submit(
        tasks = tasks,
        wait = wait,
    )

shutdown(wait=True, cancel_pending=False)

Shut down all registered task-forces.

Parameters:

Name Type Description Default
wait bool

Block until workers finish.

True
cancel_pending bool

Cancel queued but un-started tasks.

False
Source code in policy/central/command/schema/base.py
def shutdown(
    self, 
    wait: bool = True, 
    cancel_pending: bool = False
) -> None:
    """Shut down all registered task-forces.

    Parameters
    ----------
    wait : bool, optional
        Block until workers finish.
    cancel_pending : bool, optional
        Cancel queued but un-started tasks.
    """
    for tf in self.taskforces.values():
        tf.shutdown(
            wait = wait,
            cancel_pending = cancel_pending
        )

__await__()

Async awaiting is not yet supported.

Source code in policy/central/command/schema/base.py
def __await__(self):
    """Async awaiting is not yet supported."""
    raise NotImplementedError

DefaultCentralCommunication

Bases: _LAILA_CLI_CAPABLE_CLASS, _LAILA_IDENTIFIABLE_OBJECT

Central communication that owns transport protocols and a peer registry.

Parameters:

Name Type Description Default
policy_id str

global_id of the owning policy (set automatically during wiring).

required
Source code in policy/central/communication/schema/base.py
class _LAILA_IDENTIFIABLE_COMMUNICATION(_LAILA_CLI_CAPABLE_CLASS, _LAILA_IDENTIFIABLE_OBJECT):
    """Central communication that owns transport protocols and a peer registry.

    Parameters
    ----------
    policy_id : str, optional
        ``global_id`` of the owning policy (set automatically during wiring).
    """

    _scopes: list[str] = PrivateAttr(
        default_factory=lambda: [_CENTRAL_COMMUNICATION_SCOPE],
    )

    model_config = ConfigDict(arbitrary_types_allowed=True)

    policy_id: Optional[str] = CLIExempt(default=None)
    peers: Dict[str, RemotePolicyProxy] = CLIExempt(default_factory=dict)
    connections: Dict[str, _LAILA_IDENTIFIABLE_COMM_PROTOCOL] = CLIExempt(
        default_factory=dict,
    )

    _local_policy: Any = PrivateAttr(default=None)

    # ------------------------------------------------------------------
    # Protocol management
    # ------------------------------------------------------------------

    def add_connection(
        self, protocol: _LAILA_IDENTIFIABLE_COMM_PROTOCOL
    ) -> None:
        """Register and start a transport protocol.

        Sets the protocol's back-reference, adds it to the registry,
        and calls ``protocol.start()`` so the connection is live when
        this method returns.  Symmetric with :meth:`remove_connection`
        which calls ``protocol.stop()``.

        Parameters
        ----------
        protocol : _LAILA_IDENTIFIABLE_COMM_PROTOCOL
            The protocol instance to register and start.
        """
        protocol._communication = self
        self.connections[protocol.global_id] = protocol
        protocol.start()

    def remove_connection(
        self, protocol: _LAILA_IDENTIFIABLE_COMM_PROTOCOL
    ) -> None:
        """Stop a transport protocol and remove it from this communication instance.

        The protocol is responsible for all its own cleanup — closing
        sockets, unregistering peers, etc.  Communication only calls
        ``stop()`` and removes the protocol from its registry.

        Parameters
        ----------
        protocol : _LAILA_IDENTIFIABLE_COMM_PROTOCOL
            The protocol instance to remove.
        """
        proto_id = protocol.global_id
        if proto_id not in self.connections:
            return
        protocol.stop()
        self.connections.pop(proto_id, None)
        protocol._communication = None

    def _resolve_protocol_for_uri(self, uri: str) -> _LAILA_IDENTIFIABLE_COMM_PROTOCOL:
        """Find a protocol that can handle *uri*, or fall back to the first one."""
        if not self.connections:
            raise ConnectionError(
                "No communication protocols configured. "
                "Call add_connection() first."
            )
        for proto in self.connections.values():
            if type(proto).can_handle_uri(uri):
                return proto
        return next(iter(self.connections.values()))

    # ------------------------------------------------------------------
    # Lifecycle
    # ------------------------------------------------------------------

    def start(self) -> None:
        """Start all registered protocols."""
        for proto in self.connections.values():
            proto.start()
        log.info("Communication started for policy %s", self.policy_id)

    def stop(self) -> None:
        """Stop all registered protocols and clear peers."""
        for proto in self.connections.values():
            proto.stop()
        self.peers.clear()
        log.info("Communication stopped for policy %s", self.policy_id)

    # ------------------------------------------------------------------
    # Peer management
    # ------------------------------------------------------------------

    def add_peer(self, uri: str, secret: str) -> str:
        """Initiate a peering connection to a remote policy.

        Resolves the appropriate protocol for *uri* and delegates the
        transport-level handshake.

        Parameters
        ----------
        uri : str
            URI of the remote policy (e.g. ``"ws://host:port"``).
        secret : str
            The remote policy's ``peer_secret_key``.

        Returns
        -------
        str
            The ``global_id`` of the newly peered remote policy.
        """
        proto = self._resolve_protocol_for_uri(uri)
        return proto.add_peer(uri, secret)

    def add_tcpip_peer(self, host: str, port: int, secret: str) -> str:
        """Peer with a remote policy over TCP/IP (WebSocket).

        Convenience wrapper that builds the ``ws://`` URI internally.

        Parameters
        ----------
        host : str
            Hostname or IP address of the remote node.
        port : int
            WebSocket port on the remote node.
        secret : str
            The remote node's ``peer_secret_key``.

        Returns
        -------
        str
            The ``global_id`` of the newly peered remote policy.
        """
        return self.add_peer(f"ws://{host}:{port}", secret)

    def _register_peer(self, peer_id: str) -> None:
        """Create a proxy for a newly connected peer.

        Called by protocol instances after a successful handshake.
        Also registers the proxy in ``laila.remote_policies``.

        Parameters
        ----------
        peer_id : str
            Remote policy ``global_id``.
        """
        if peer_id not in self.peers:
            proxy = RemotePolicyProxy(peer_id, self)
            self.peers[peer_id] = proxy
            from ..... import _remote_policies
            _remote_policies[peer_id] = proxy

    def _unregister_peer(self, peer_id: str) -> None:
        """Remove a peer proxy after the transport connection closes.

        Also removes from ``laila.remote_policies``.

        Parameters
        ----------
        peer_id : str
            Remote policy ``global_id``.
        """
        self.peers.pop(peer_id, None)
        from ..... import _remote_policies
        _remote_policies.pop(peer_id, None)

    # ------------------------------------------------------------------
    # RPC dispatch (inbound)
    # ------------------------------------------------------------------

    def _execute_rpc(self, path: list[str], args: list, kwargs: dict) -> Any:
        """Execute a dotted-path method call on the local policy.

        Parameters
        ----------
        path : list[str]
            Attribute chain relative to the local policy object,
            e.g. ``["central", "memory", "memorize"]``.
        args : list
            Positional arguments.
        kwargs : dict
            Keyword arguments.

        Returns
        -------
        Any
            Return value of the invoked method.

        Raises
        ------
        AttributeError
            If any segment of *path* does not exist on the target.
        """
        obj = self._local_policy
        if obj is None:
            raise RuntimeError("Communication has no reference to the local policy.")
        for segment in path:
            obj = getattr(obj, segment)
        return obj(*args, **kwargs)

    # ------------------------------------------------------------------
    # RPC dispatch (outbound)
    # ------------------------------------------------------------------

    def _send_rpc(self, peer_id: str, path: list[str], args: tuple, kwargs: dict) -> Any:
        """Send an RPC call to a peer via the protocol that holds the connection.

        If the deserialized response contains a ``__laila_future__`` marker
        it is automatically wrapped in a :class:`RemoteFuture` and
        registered in the local policy's ``future_bank``.

        Parameters
        ----------
        peer_id : str
            Target peer ``global_id``.
        path : list[str]
            Dotted attribute chain on the remote policy.
        args : tuple
            Positional arguments.
        kwargs : dict
            Keyword arguments.

        Returns
        -------
        Any
            The deserialized return value from the remote call, or a
            ``RemoteFuture`` when the remote returned a future.

        Raises
        ------
        ConnectionError
            If no protocol holds a connection to *peer_id*.
        RuntimeError
            If the remote side returned an error.
        """
        for proto in self.connections.values():
            if proto.has_peer(peer_id):
                result = proto.send_rpc(peer_id, path, args, kwargs)
                return self._maybe_wrap_remote_future(result, peer_id)
        raise ConnectionError(f"No connection to peer {peer_id}")

    def _maybe_wrap_remote_future(self, result: Any, peer_id: str) -> Any:
        """Wrap a future-shaped dict in a ``RemoteFuture`` and register it."""
        if not isinstance(result, dict) or not result.get("__laila_future__"):
            return result

        from ...command.schema.future.future.remote_future import RemoteFuture

        rf = RemoteFuture(
            remote_future_id=result["global_id"],
            remote_policy_id=result.get("policy_id", peer_id),
            communication=self,
            is_group=result.get("__is_group__", False),
        )

        if self._local_policy is not None:
            self._local_policy.future_bank[rf.global_id] = rf
            if hasattr(self._local_policy, "central"):
                self._local_policy.central.command._register_future_with_active_guarantees(rf)

        return rf

add_connection(protocol)

Register and start a transport protocol.

Sets the protocol's back-reference, adds it to the registry, and calls protocol.start() so the connection is live when this method returns. Symmetric with :meth:remove_connection which calls protocol.stop().

Parameters:

Name Type Description Default
protocol _LAILA_IDENTIFIABLE_COMM_PROTOCOL

The protocol instance to register and start.

required
Source code in policy/central/communication/schema/base.py
def add_connection(
    self, protocol: _LAILA_IDENTIFIABLE_COMM_PROTOCOL
) -> None:
    """Register and start a transport protocol.

    Sets the protocol's back-reference, adds it to the registry,
    and calls ``protocol.start()`` so the connection is live when
    this method returns.  Symmetric with :meth:`remove_connection`
    which calls ``protocol.stop()``.

    Parameters
    ----------
    protocol : _LAILA_IDENTIFIABLE_COMM_PROTOCOL
        The protocol instance to register and start.
    """
    protocol._communication = self
    self.connections[protocol.global_id] = protocol
    protocol.start()

remove_connection(protocol)

Stop a transport protocol and remove it from this communication instance.

The protocol is responsible for all its own cleanup — closing sockets, unregistering peers, etc. Communication only calls stop() and removes the protocol from its registry.

Parameters:

Name Type Description Default
protocol _LAILA_IDENTIFIABLE_COMM_PROTOCOL

The protocol instance to remove.

required
Source code in policy/central/communication/schema/base.py
def remove_connection(
    self, protocol: _LAILA_IDENTIFIABLE_COMM_PROTOCOL
) -> None:
    """Stop a transport protocol and remove it from this communication instance.

    The protocol is responsible for all its own cleanup — closing
    sockets, unregistering peers, etc.  Communication only calls
    ``stop()`` and removes the protocol from its registry.

    Parameters
    ----------
    protocol : _LAILA_IDENTIFIABLE_COMM_PROTOCOL
        The protocol instance to remove.
    """
    proto_id = protocol.global_id
    if proto_id not in self.connections:
        return
    protocol.stop()
    self.connections.pop(proto_id, None)
    protocol._communication = None

start()

Start all registered protocols.

Source code in policy/central/communication/schema/base.py
def start(self) -> None:
    """Start all registered protocols."""
    for proto in self.connections.values():
        proto.start()
    log.info("Communication started for policy %s", self.policy_id)

stop()

Stop all registered protocols and clear peers.

Source code in policy/central/communication/schema/base.py
def stop(self) -> None:
    """Stop all registered protocols and clear peers."""
    for proto in self.connections.values():
        proto.stop()
    self.peers.clear()
    log.info("Communication stopped for policy %s", self.policy_id)

add_peer(uri, secret)

Initiate a peering connection to a remote policy.

Resolves the appropriate protocol for uri and delegates the transport-level handshake.

Parameters:

Name Type Description Default
uri str

URI of the remote policy (e.g. "ws://host:port").

required
secret str

The remote policy's peer_secret_key.

required

Returns:

Type Description
str

The global_id of the newly peered remote policy.

Source code in policy/central/communication/schema/base.py
def add_peer(self, uri: str, secret: str) -> str:
    """Initiate a peering connection to a remote policy.

    Resolves the appropriate protocol for *uri* and delegates the
    transport-level handshake.

    Parameters
    ----------
    uri : str
        URI of the remote policy (e.g. ``"ws://host:port"``).
    secret : str
        The remote policy's ``peer_secret_key``.

    Returns
    -------
    str
        The ``global_id`` of the newly peered remote policy.
    """
    proto = self._resolve_protocol_for_uri(uri)
    return proto.add_peer(uri, secret)

add_tcpip_peer(host, port, secret)

Peer with a remote policy over TCP/IP (WebSocket).

Convenience wrapper that builds the ws:// URI internally.

Parameters:

Name Type Description Default
host str

Hostname or IP address of the remote node.

required
port int

WebSocket port on the remote node.

required
secret str

The remote node's peer_secret_key.

required

Returns:

Type Description
str

The global_id of the newly peered remote policy.

Source code in policy/central/communication/schema/base.py
def add_tcpip_peer(self, host: str, port: int, secret: str) -> str:
    """Peer with a remote policy over TCP/IP (WebSocket).

    Convenience wrapper that builds the ``ws://`` URI internally.

    Parameters
    ----------
    host : str
        Hostname or IP address of the remote node.
    port : int
        WebSocket port on the remote node.
    secret : str
        The remote node's ``peer_secret_key``.

    Returns
    -------
    str
        The ``global_id`` of the newly peered remote policy.
    """
    return self.add_peer(f"ws://{host}:{port}", secret)

DefaultCentralMemory

Bases: _LAILA_CLI_CAPABLE_CLASS, _LAILA_IDENTIFIABLE_OBJECT

Central memory controller for storing, retrieving, and deleting entries across pools.

Source code in policy/central/memory/schema/base.py
class _LAILA_IDENTIFIABLE_CENTRAL_MEMORY(_LAILA_CLI_CAPABLE_CLASS, _LAILA_IDENTIFIABLE_OBJECT):
    """Central memory controller for storing, retrieving, and deleting entries across pools."""
    _scopes: list[str] = PrivateAttr(default_factory=lambda: list([_CENTRAL_MEMORY_SCOPE]))
    pool_router: Optional[_LAILA_IDENTIFIABLE_POOL_ROUTER] = Field(default=None)
    alpha_pool: Optional[str] = Field(default=None)

    class Config:
        arbitrary_types_allowed = True

    def model_post_init(self, __context: Any) -> None:
        """Create a default pool router and ensure an alpha pool exists."""
        if self.pool_router is None:
            from .....macros.defaults import DefaultPoolRouter
            self.pool_router = DefaultPoolRouter()

        if self.alpha_pool is None:
            if _DEFAULT_POOL_NICKNAME in self.pool_router.pools_nicknames:
                self.alpha_pool = self.pool_router.pools_nicknames[_DEFAULT_POOL_NICKNAME]
            else:
                from .....macros.defaults import DefaultPool
                alpha = DefaultPool()
                self.pool_router.extend(
                    alpha, affinity=1, pool_nickname=_DEFAULT_POOL_NICKNAME
                )
                self.alpha_pool = alpha.global_id


    def extend(self, pool: _LAILA_IDENTIFIABLE_POOL, *, affinity: Optional[float] = None, pool_nickname: Optional[str] = None):
        """Delegate pool registration to the pool router."""
        self.pool_router.extend(pool, affinity=affinity, pool_nickname=pool_nickname)

    def _resolve_pool_ref(self, pool_ref: _LAILA_IDENTIFIABLE_POOL | str) -> _LAILA_IDENTIFIABLE_POOL:
        """Resolve a pool object, ID string, or nickname to a pool instance."""
        if isinstance(pool_ref, _LAILA_IDENTIFIABLE_POOL):
            return pool_ref

        if isinstance(pool_ref, str):
            if pool_ref in self.pool_router.pools:
                return self.pool_router.pools[pool_ref]
            if pool_ref in self.pool_router.pools_nicknames:
                return self.pool_router.pools[self.pool_router.pools_nicknames[pool_ref]]
            raise KeyError(f"Pool '{pool_ref}' was not found.")

        raise TypeError("pool_ref must be a pool object, pool id, or pool nickname.")

    #TODO: need to make sure cross-borrowing does not lead to stall
    @contextmanager
    def borrow(
        self,
        keys = [],
        global_borrow = False,
    ):
        """Context manager for borrowing entries (not yet implemented)."""
        raise NotImplementedError

    def _duplicate_pool(
        self,
        pool_src: _LAILA_IDENTIFIABLE_POOL | str,
        pool_dest: _LAILA_IDENTIFIABLE_POOL | str,
        *,
        inflight_max_entries: int = 4,
    ) -> GroupFuture:
        """Copy all entries from *pool_src* to *pool_dest* asynchronously."""
        from ..... import active_policy

        if inflight_max_entries < 1:
            raise ValueError("duplicate_pool requires inflight_max_entries >= 1.")

        src_pool = self._resolve_pool_ref(pool_src)
        dest_pool = self._resolve_pool_ref(pool_dest)
        entry_ids = list(src_pool.keys())

        duplicate_futures = {
            entry_id: ConcurrentPackageFuture(
                taskforce_id=active_policy.central.command.alpha_taskforce,
                policy_id=active_policy.global_id,
                purpose=f"duplicate_pool:{entry_id}",
            )
            for entry_id in entry_ids
        }

        group_future = GroupFuture(
            taskforce_id=active_policy.central.command.alpha_taskforce,
            policy_id=active_policy.global_id,
            future_ids=[f.global_id for f in duplicate_futures.values()],
        )

        for child_future in duplicate_futures.values():
            child_future.future_group_id = group_future.global_id

        semaphore = asyncio.Semaphore(inflight_max_entries)

        async def _duplicate_one(entry_id: str, child_future: ConcurrentPackageFuture) -> None:
            try:
                async with semaphore:
                    child_future.status = FutureStatus.RUNNING

                    remember_ref = self.remember(
                        entry_ids=entry_id,
                        pool_id=src_pool.global_id,
                    )
                    if remember_ref is None:
                        raise RuntimeError("duplicate_pool requires a non-default source pool.")

                    remember_fut = active_policy.future_bank[remember_ref.global_id]
                    entry = await remember_fut

                    memorize_ref = self.memorize(
                        entries=entry,
                        pool_id=dest_pool.global_id,
                    )
                    if memorize_ref is not None:
                        memorize_fut = active_policy.future_bank[memorize_ref.global_id]
                        await memorize_fut

                    child_future.exception = None
                    child_future.result = entry_id
                    child_future.status = FutureStatus.FINISHED
                    del entry
            except Exception as exc:
                child_future.exception = exc
                child_future.result = None
                child_future.status = FutureStatus.ERROR

        async def _duplicate_all() -> None:
            await asyncio.gather(
                *(
                    _duplicate_one(entry_id, child_future)
                    for entry_id, child_future in duplicate_futures.items()
                ),
                return_exceptions=True,
            )

        def _run_duplication_event_loop() -> None:
            try:
                asyncio.run(_duplicate_all())
            except Exception as exc:
                for child_future in duplicate_futures.values():
                    if child_future.status in [FutureStatus.FINISHED, FutureStatus.ERROR, FutureStatus.CANCELLED]:
                        continue
                    child_future.exception = exc
                    child_future.result = None
                    child_future.status = FutureStatus.ERROR

        threading.Thread(
            target=_run_duplication_event_loop,
            name=f"DuplicatePool-{group_future.global_id}",
            daemon=True,
        ).start()

        return group_future



    @ensure_list("entries")
    def memorize(
        self, 
        entries: Any,
        *,
        pool_id: Optional[str] = None,
        pool_nickname: Optional[str] = None,
        affinity: Optional[float] = None,
    ):
        """Persist entries to the routed pool and return a ``GroupFuture``."""
        from ..... import active_policy

        pool = self.pool_router.route(
            entries = entries,
            pool_id = pool_id,
            pool_nickname = pool_nickname,
            affinity = affinity,
        )

        return self._record(entries, pool)


    def _record(
        self,
        entries: Entry,
        pool: _LAILA_IDENTIFIABLE_POOL,
    ):
        """Dispatch recording to batch or non-batch path based on pool capability."""
        if pool.batch_accelerated:
            return self._batch_accelerated_record(entries, pool)
        else:
            return self._parallel_individual_record(entries, pool)

    def _parallel_individual_record(
        self,
        entries: Entry,
        pool: _LAILA_IDENTIFIABLE_POOL,
    ):
        """Serialize and write each entry individually via the command taskforce."""
        from ..... import active_policy

        def _individual_record_subprocedure(entry: Entry, pool: _LAILA_IDENTIFIABLE_POOL):
            record=Record(
                    entry = entry,
                    creator = active_policy.global_id,
                    borrower = active_policy.global_id
                )
            transformations = pool.transformations
            #TODO: Add comm encryption here
            #comm_encryption_protocol = active_policy.central_communication.encryption_protocol
            final_transformations = transformations
            pool[entry.global_id] = record.serialize(transformations = final_transformations)

        futures = active_policy.central.command.submit(
            tasks=[lambda entry=entry, pool=pool: _individual_record_subprocedure(entry, pool) for entry in entries]
        )
        return futures


    def _batch_accelerated_record(
        self,
        entries: List[Entry],
        pool: _LAILA_IDENTIFIABLE_POOL,
    ):
        """Batch-record entries (not yet implemented)."""
        raise NotImplementedError


    @ensure_list("entry_ids")
    def remember(
        self, 
        entry_ids: List[Entry]|List[str],
        *,
        pool_id: Optional[str] = None,
        pool_nickname: Optional[str] = None,
        affinity: Optional[float] = None,
    ):
        """Fetch entries from the routed pool and return a ``GroupFuture``."""
        from ..... import active_policy
        from .....entry import Entry

        pool = self.pool_router.route(
            entries = entry_ids,
            pool_id = pool_id,
            pool_nickname = pool_nickname,
            affinity = affinity,
        )

        return self._fetch(entry_ids, pool=pool)


    def _fetch(
        self, 
        entry_ids: List[str],
        *,
        pool: Optional[Dict[str, _LAILA_IDENTIFIABLE_POOL]] = None,
        borrow: bool = False,
    ):
        """Dispatch fetching to batch or non-batch path based on pool capability."""
        if borrow:
            raise NotImplementedError
        if pool.batch_accelerated:
            return self._batch_accelerated_fetch(entry_ids, pool=pool)
        else:
            return self._parallel_individual_fetch(entry_ids, pool=pool)


    def _parallel_individual_fetch(
        self,
        entry_ids: List[str],
        *,
        pool: Optional[Dict[str, _LAILA_IDENTIFIABLE_POOL]] = None,
    ):
        """Fetch each entry individually via the command taskforce."""
        def _individual_fetch_subprocedure(entry_id: str, pool: _LAILA_IDENTIFIABLE_POOL):
            from_pool = pool[entry_id]
            if from_pool is None:
                raise KeyError(f"Entry {entry_id} not found in pool {pool.global_id}")
            return Record.recover(from_pool)["entry"] 

        from ..... import active_policy
        futures = active_policy.central.command.submit(
            tasks=[lambda entry_id=entry_id, pool=pool: _individual_fetch_subprocedure(entry_id, pool) for entry_id in entry_ids]
        )
        return futures

    def _batch_accelerated_fetch(
        self,
        keys: List[str],
        *,
        pool: Optional[Dict[str, _LAILA_IDENTIFIABLE_POOL]] = None,
    ):
        """Batch-accelerated fetch path (not yet implemented)."""
        raise NotImplementedError


    @ensure_list("entry_ids")
    def forget(
        self, 
        entry_ids: List[Entry]|List[str],
        *,
        pool_id: Optional[str] = None,
        pool_nickname: Optional[str] = None,
        affinity: Optional[float] = None,
    ):
        """Delete entries from the routed pool and return a ``GroupFuture``."""
        pool = self.pool_router.route(
            entries = entry_ids,
            pool_id = pool_id,
            pool_nickname = pool_nickname,
            affinity = affinity,
        )

        return self._delete(entry_ids, pool=pool)

    def _delete(
        self,
        entry_ids: List[str],
        pool: _LAILA_IDENTIFIABLE_POOL,
    ):
        """Dispatch deletion to batch or non-batch path based on pool capability."""
        if pool.batch_accelerated:
            return self._batch_accelerated_delete(entry_ids, pool=pool)
        else:
            return self._parallel_individual_delete(entry_ids, pool=pool)

    def _batch_accelerated_delete(
        self,
        entry_ids: List[str],
        pool: _LAILA_IDENTIFIABLE_POOL,
    ):
        """Batch-delete entries (not yet implemented)."""
        raise NotImplementedError


    def _parallel_individual_delete(
        self,
        entry_ids: List[str],
        pool: _LAILA_IDENTIFIABLE_POOL,
    ):
        """Delete each entry individually via the command taskforce."""
        def _individual_delete_subprocedure(entry_id: str, pool: _LAILA_IDENTIFIABLE_POOL):
            del pool[entry_id]

        from ..... import active_policy
        futures = active_policy.central.command.submit(
            tasks=[lambda entry_id=entry_id, pool=pool: _individual_delete_subprocedure(entry_id, pool) for entry_id in entry_ids]
        )
        return futures

model_post_init(__context)

Create a default pool router and ensure an alpha pool exists.

Source code in policy/central/memory/schema/base.py
def model_post_init(self, __context: Any) -> None:
    """Create a default pool router and ensure an alpha pool exists."""
    if self.pool_router is None:
        from .....macros.defaults import DefaultPoolRouter
        self.pool_router = DefaultPoolRouter()

    if self.alpha_pool is None:
        if _DEFAULT_POOL_NICKNAME in self.pool_router.pools_nicknames:
            self.alpha_pool = self.pool_router.pools_nicknames[_DEFAULT_POOL_NICKNAME]
        else:
            from .....macros.defaults import DefaultPool
            alpha = DefaultPool()
            self.pool_router.extend(
                alpha, affinity=1, pool_nickname=_DEFAULT_POOL_NICKNAME
            )
            self.alpha_pool = alpha.global_id

extend(pool, *, affinity=None, pool_nickname=None)

Delegate pool registration to the pool router.

Source code in policy/central/memory/schema/base.py
def extend(self, pool: _LAILA_IDENTIFIABLE_POOL, *, affinity: Optional[float] = None, pool_nickname: Optional[str] = None):
    """Delegate pool registration to the pool router."""
    self.pool_router.extend(pool, affinity=affinity, pool_nickname=pool_nickname)

borrow(keys=[], global_borrow=False)

Context manager for borrowing entries (not yet implemented).

Source code in policy/central/memory/schema/base.py
@contextmanager
def borrow(
    self,
    keys = [],
    global_borrow = False,
):
    """Context manager for borrowing entries (not yet implemented)."""
    raise NotImplementedError

memorize(entries, *, pool_id=None, pool_nickname=None, affinity=None)

Persist entries to the routed pool and return a GroupFuture.

Source code in policy/central/memory/schema/base.py
@ensure_list("entries")
def memorize(
    self, 
    entries: Any,
    *,
    pool_id: Optional[str] = None,
    pool_nickname: Optional[str] = None,
    affinity: Optional[float] = None,
):
    """Persist entries to the routed pool and return a ``GroupFuture``."""
    from ..... import active_policy

    pool = self.pool_router.route(
        entries = entries,
        pool_id = pool_id,
        pool_nickname = pool_nickname,
        affinity = affinity,
    )

    return self._record(entries, pool)

remember(entry_ids, *, pool_id=None, pool_nickname=None, affinity=None)

Fetch entries from the routed pool and return a GroupFuture.

Source code in policy/central/memory/schema/base.py
@ensure_list("entry_ids")
def remember(
    self, 
    entry_ids: List[Entry]|List[str],
    *,
    pool_id: Optional[str] = None,
    pool_nickname: Optional[str] = None,
    affinity: Optional[float] = None,
):
    """Fetch entries from the routed pool and return a ``GroupFuture``."""
    from ..... import active_policy
    from .....entry import Entry

    pool = self.pool_router.route(
        entries = entry_ids,
        pool_id = pool_id,
        pool_nickname = pool_nickname,
        affinity = affinity,
    )

    return self._fetch(entry_ids, pool=pool)

forget(entry_ids, *, pool_id=None, pool_nickname=None, affinity=None)

Delete entries from the routed pool and return a GroupFuture.

Source code in policy/central/memory/schema/base.py
@ensure_list("entry_ids")
def forget(
    self, 
    entry_ids: List[Entry]|List[str],
    *,
    pool_id: Optional[str] = None,
    pool_nickname: Optional[str] = None,
    affinity: Optional[float] = None,
):
    """Delete entries from the routed pool and return a ``GroupFuture``."""
    pool = self.pool_router.route(
        entries = entry_ids,
        pool_id = pool_id,
        pool_nickname = pool_nickname,
        affinity = affinity,
    )

    return self._delete(entry_ids, pool=pool)

DefaultPolicy

Bases: _LAILA_CLI_CAPABLE_CLASS, _LAILA_IDENTIFIABLE_OBJECT

Top-level policy object that owns central command, memory, and logic.

Source code in policy/schema/base.py
class _LAILA_IDENTIFIABLE_POLICY(_LAILA_CLI_CAPABLE_CLASS, _LAILA_IDENTIFIABLE_OBJECT):
    """Top-level policy object that owns central command, memory, and logic."""

    _scopes: list[str] = PrivateAttr(default_factory=lambda: list([_POLICY_SCOPE]))
    class Central(BaseModel):
        """Container for the four central sub-systems of a policy."""

        logic: Optional[Any] = CLIExempt(default=None)
        command: Optional[_LAILA_IDENTIFIABLE_CENTRAL_COMMAND] = CLIExempt(default=None)
        communication: Optional[Any] = CLIExempt(default=None)
        memory: Optional[Any] = CLIExempt(default=None)

        model_config = ConfigDict(arbitrary_types_allowed=True)

    model_config = ConfigDict(arbitrary_types_allowed=True)

    # Core components
    central: Central = CLIExempt(default_factory=Central)
    future_bank: Dict[str, Any] = CLIExempt(default_factory=dict)


    def model_post_init(self, __context: Any) -> None:
        """Lazily wire default central command, memory, and communication if not provided."""
        from ...macros.defaults import (
            DefaultCentralCommand,
            DefaultCentralMemory,
            DefaultCentralCommunication,
        )

        if self.central.memory is None:
            self.central.memory = DefaultCentralMemory()

        if self.central.command is None:
            self.central.command = DefaultCentralCommand(policy_id=self.global_id)

        if self.central.communication is None:
            self.central.communication = DefaultCentralCommunication(
                policy_id=self.global_id,
            )

        self.central.communication._local_policy = self


    def extend(self, new_pool: _LAILA_IDENTIFIABLE_POOL) -> None:
        """Add a MemoryPool instance to the central memory registry."""
        self.central.memory[new_pool.pool_id] = new_pool


    def remember(
        self,
        global_id: str,
        *,
        global_fetch: bool = False,
        pool_subset: Optional[Dict[str, _LAILA_IDENTIFIABLE_POOL]] = None,
        hint: Optional[str] = None,
        _remote_called: bool = False,
    ) -> Optional[Entry]:
        """Fetch an entry from central memory by its *global_id*.

        Parameters
        ----------
        global_id : str
            The unique identifier of the entry to recall.
        global_fetch : bool, optional
            If ``True``, search across all known policies (not implemented).
        pool_subset : dict, optional
            Restrict the search to a subset of pools.
        hint : str, optional
            Routing hint forwarded to central memory.

        Returns
        -------
        Entry or None
            The recovered entry, or ``None`` if not found.
        """
        if global_fetch:
            raise NotImplementedError

        entry = self.central.memory.fetch(
            key = global_id,
            pool_subset = pool_subset,
            hint = hint
        )

        return entry



    def memorize(
        self,
        entries: Any,
        *,
        require_local_update = False, #update only affects the main pool at first.
        require_global_update = False
    ) -> None:
        """Update central memory with a new or modified entry."""
        return self.central.memory.record(entries)

    # ------------------------------------------------------------------
    # RPC helpers for remote future introspection
    # ------------------------------------------------------------------

    def _get_future_status(self, future_id: str) -> Any:
        """Return the status of a future from this policy's bank."""
        future = self.future_bank.get(future_id)
        if future is None:
            raise KeyError(f"Future {future_id} not in bank")
        status = future.status
        if hasattr(status, "value"):
            return status.value
        return status

    def _get_future_exception(self, future_id: str) -> Optional[Dict]:
        """Return a serializable representation of a future's exception."""
        future = self.future_bank.get(future_id)
        if future is None:
            raise KeyError(f"Future {future_id} not in bank")
        exc = future.exception
        if exc is None:
            return None
        return {"type": type(exc).__name__, "message": str(exc)}

    def _get_future_result_id(self, future_id: str) -> Any:
        """Return the ``_result_global_id`` of the future's result Entry.

        For GroupFutures, returns a list of child result IDs.
        """
        future = self.future_bank.get(future_id)
        if future is None:
            raise KeyError(f"Future {future_id} not in bank")
        if hasattr(future, "_result_global_id"):
            return future._result_global_id
        if hasattr(future, "future_ids"):
            ids = []
            for fid in future.future_ids:
                child = self.future_bank.get(fid)
                if child and hasattr(child, "_result_global_id"):
                    ids.append(child._result_global_id)
                else:
                    ids.append(None)
            return ids
        return None

    def _wait_future(self, future_id: str, timeout: float = None) -> Any:
        """Block until a future in this policy's bank completes."""
        future = self.future_bank.get(future_id)
        if future is None:
            raise KeyError(f"Future {future_id} not in bank")
        future.wait(timeout)
        if hasattr(future, "_result_global_id"):
            return future._result_global_id
        return None

Central

Bases: BaseModel

Container for the four central sub-systems of a policy.

Source code in policy/schema/base.py
class Central(BaseModel):
    """Container for the four central sub-systems of a policy."""

    logic: Optional[Any] = CLIExempt(default=None)
    command: Optional[_LAILA_IDENTIFIABLE_CENTRAL_COMMAND] = CLIExempt(default=None)
    communication: Optional[Any] = CLIExempt(default=None)
    memory: Optional[Any] = CLIExempt(default=None)

    model_config = ConfigDict(arbitrary_types_allowed=True)

model_post_init(__context)

Lazily wire default central command, memory, and communication if not provided.

Source code in policy/schema/base.py
def model_post_init(self, __context: Any) -> None:
    """Lazily wire default central command, memory, and communication if not provided."""
    from ...macros.defaults import (
        DefaultCentralCommand,
        DefaultCentralMemory,
        DefaultCentralCommunication,
    )

    if self.central.memory is None:
        self.central.memory = DefaultCentralMemory()

    if self.central.command is None:
        self.central.command = DefaultCentralCommand(policy_id=self.global_id)

    if self.central.communication is None:
        self.central.communication = DefaultCentralCommunication(
            policy_id=self.global_id,
        )

    self.central.communication._local_policy = self

extend(new_pool)

Add a MemoryPool instance to the central memory registry.

Source code in policy/schema/base.py
def extend(self, new_pool: _LAILA_IDENTIFIABLE_POOL) -> None:
    """Add a MemoryPool instance to the central memory registry."""
    self.central.memory[new_pool.pool_id] = new_pool

remember(global_id, *, global_fetch=False, pool_subset=None, hint=None, _remote_called=False)

Fetch an entry from central memory by its global_id.

Parameters:

Name Type Description Default
global_id str

The unique identifier of the entry to recall.

required
global_fetch bool

If True, search across all known policies (not implemented).

False
pool_subset dict

Restrict the search to a subset of pools.

None
hint str

Routing hint forwarded to central memory.

None

Returns:

Type Description
Entry or None

The recovered entry, or None if not found.

Source code in policy/schema/base.py
def remember(
    self,
    global_id: str,
    *,
    global_fetch: bool = False,
    pool_subset: Optional[Dict[str, _LAILA_IDENTIFIABLE_POOL]] = None,
    hint: Optional[str] = None,
    _remote_called: bool = False,
) -> Optional[Entry]:
    """Fetch an entry from central memory by its *global_id*.

    Parameters
    ----------
    global_id : str
        The unique identifier of the entry to recall.
    global_fetch : bool, optional
        If ``True``, search across all known policies (not implemented).
    pool_subset : dict, optional
        Restrict the search to a subset of pools.
    hint : str, optional
        Routing hint forwarded to central memory.

    Returns
    -------
    Entry or None
        The recovered entry, or ``None`` if not found.
    """
    if global_fetch:
        raise NotImplementedError

    entry = self.central.memory.fetch(
        key = global_id,
        pool_subset = pool_subset,
        hint = hint
    )

    return entry

memorize(entries, *, require_local_update=False, require_global_update=False)

Update central memory with a new or modified entry.

Source code in policy/schema/base.py
def memorize(
    self,
    entries: Any,
    *,
    require_local_update = False, #update only affects the main pool at first.
    require_global_update = False
) -> None:
    """Update central memory with a new or modified entry."""
    return self.central.memory.record(entries)

DefaultPool

Bases: _LAILA_CLI_CAPABLE_CLASS, _LAILA_LOCALLY_ATOMIC_IDENTIFIABLE_OBJECT

Abstract base class for all LAILA storage pools.

A pool is a key-value store that persists serialized Entry blobs. Subclasses implement _read, _write, _delete, _keys, _exists, and _empty against a concrete backend (S3, Redis, SQLite, etc.).

Pools support a proxy relationship via _proxy_to. When a pool is a proxy for another (its origin), reads that miss local storage automatically fall back to the origin and cache the result locally.

Source code in pool/schema/base.py
class _LAILA_IDENTIFIABLE_POOL(_LAILA_CLI_CAPABLE_CLASS, _LAILA_LOCALLY_ATOMIC_IDENTIFIABLE_OBJECT):
    """Abstract base class for all LAILA storage pools.

    A pool is a key-value store that persists serialized ``Entry`` blobs.
    Subclasses implement ``_read``, ``_write``, ``_delete``,
    ``_keys``, ``_exists``, and ``_empty`` against a concrete backend (S3,
    Redis, SQLite, etc.).

    Pools support a proxy relationship via ``_proxy_to``.  When a pool is
    a proxy for another (its *origin*), reads that miss local storage
    automatically fall back to the origin and cache the result locally.
    """

    _scopes: list[str] = PrivateAttr(default_factory=lambda: list([_POOL_SCOPE]))
    _proxy_to: Optional[Any] = PrivateAttr(default=None)
    resource: Dict[str, Any] = CLIExempt(default_factory=dict)
    batch_accelerated: bool = Field(default=False)
    transformations: Optional[TransformationSequence] = CLIExempt(default=None)


    class Config:
        arbitrary_types_allowed = True

    @property
    def pool_id(self) -> str:
        """Unique identifier for this pool, aliased from ``global_id``."""
        return self.global_id

    # -------- Proxy properties --------
    @property
    def proxy(self):
        """Write-only property.  ``origin.proxy = cache`` sets ``cache._proxy_to = origin``."""
        return None

    @proxy.setter
    def proxy(self, pool):
        if pool is not None:
            pool._proxy_to = self

    @property
    def proxy_to(self):
        """The origin pool this pool is a cache/proxy for, or ``None``."""
        return self._proxy_to

    @proxy_to.setter
    def proxy_to(self, pool):
        self._proxy_to = pool

    def __lshift__(self, other):
        """``cache << origin``: cache becomes proxy for origin.

        Returns *other* so that chains like ``mem << hdf5 << s3`` work.
        """
        other.proxy = self
        return other

    def __rshift__(self, other):
        """``origin >> cache``: cache becomes proxy for origin.

        Returns *other* so that chains like ``s3 >> hdf5 >> mem`` work.
        """
        self.proxy = other
        return other

    # -------- Internal storage hooks (override in subclasses) --------
    def _read(self, key: str) -> Optional[Any]:
        """Read from own storage.  Subclasses override this."""
        with self.atomic():
            if key not in self.resource:
                return None
            blob = self.resource[key]
        if blob is None:
            return None
        return blob

    def _write(self, key: str, value: Any) -> None:
        """Write to own storage.  Subclasses override this."""
        with self.atomic():
            self.resource[key] = value

    def _delete(self, key: str) -> None:
        """Delete from own storage.  Subclasses override this."""
        with self.atomic():
            if key in self.resource:
                del self.resource[key]

    def _exists(self, key: str) -> bool:
        """Check own storage.  Subclasses override this."""
        with self.atomic():
            return key in self.resource

    def _keys(self, as_generator: bool = False) -> Iterable[str]:
        """List own keys.  Subclasses override this."""
        if not as_generator:
            with self.atomic():
                return list(self.resource.keys())
        else:
            def _gen() -> Iterator[str]:
                with self.atomic():
                    for k in self.resource.keys():
                        yield k
            return _gen()

    def _empty(self) -> None:
        """Clear own storage.  Subclasses override this."""
        with self.atomic():
            self.resource.clear()

    # -------- Proxy-aware public API --------
    def __getitem__(self, key) -> Optional[Any]:
        """Retrieve the stored blob for *key*, or a ``PoolWrapper`` for a ``Manifest``.

        If the key is not found locally and ``_proxy_to`` is set, the
        request falls back to the origin pool.  A successful fallback
        caches the value in this pool before returning.
        """
        from ...policy.central.memory.schema.manifest import Manifest
        if isinstance(key, Manifest):
            from .pool_wrapper import PoolWrapper
            return PoolWrapper(pool=self, manifest=key)

        value = self._read(key)
        if value is not None:
            return value

        if self._proxy_to is not None:
            value = self._proxy_to[key]
            if value is not None:
                self._write(key, value)
                return value

        return None

    def __setitem__(self, key: str, entry: Any) -> None:
        """Store *entry* under *key*.  Local only, no propagation."""
        self._write(key, entry)

    def __delitem__(self, key: str) -> None:
        """Delete the entry for *key*.  Local only, no propagation."""
        self._delete(key)

    def empty(self) -> None:
        """Remove all entries from the pool.  Local only, no propagation."""
        self._empty()

    def exists(self, key: str) -> bool:
        """Return ``True`` if *key* is present.  Local only, no propagation."""
        return self._exists(key)

    def __contains__(self, key: str) -> bool:
        """Check membership, delegates to :meth:`exists`."""
        return self.exists(key)

    def keys(self, as_generator: bool = False) -> Iterable[str]:
        """Return the keys stored in this pool.

        Parameters
        ----------
        as_generator : bool, optional
            If ``False`` (default), return a snapshot list.  If ``True``,
            return an iterator that holds the lock for its lifetime.

        Returns
        -------
        Iterable[str]
            Pool keys.
        """
        return self._keys(as_generator=as_generator)

    def sync(self) -> None:
        """Flush any in-memory cache to the backing store.

        Raises
        ------
        NotImplementedError
            If the pool is cacheless and operates directly on storage.
        """
        raise NotImplementedError("Sync is not implemented for this pool, the pool is cacheless, i.e. operations are immediately executed on the underlying storage.")

    def __le__(self, other: Any):
        """Duplicate *other* pool's contents into this pool via the active policy."""
        from ... import active_policy

        if not isinstance(other, (_LAILA_IDENTIFIABLE_POOL, str)):
            return NotImplemented

        return active_policy.central.memory._duplicate_pool(
            pool_src=other,
            pool_dest=self,
        )

pool_id property

Unique identifier for this pool, aliased from global_id.

proxy property writable

Write-only property. origin.proxy = cache sets cache._proxy_to = origin.

proxy_to property writable

The origin pool this pool is a cache/proxy for, or None.

__lshift__(other)

cache << origin: cache becomes proxy for origin.

Returns other so that chains like mem << hdf5 << s3 work.

Source code in pool/schema/base.py
def __lshift__(self, other):
    """``cache << origin``: cache becomes proxy for origin.

    Returns *other* so that chains like ``mem << hdf5 << s3`` work.
    """
    other.proxy = self
    return other

__rshift__(other)

origin >> cache: cache becomes proxy for origin.

Returns other so that chains like s3 >> hdf5 >> mem work.

Source code in pool/schema/base.py
def __rshift__(self, other):
    """``origin >> cache``: cache becomes proxy for origin.

    Returns *other* so that chains like ``s3 >> hdf5 >> mem`` work.
    """
    self.proxy = other
    return other

__getitem__(key)

Retrieve the stored blob for key, or a PoolWrapper for a Manifest.

If the key is not found locally and _proxy_to is set, the request falls back to the origin pool. A successful fallback caches the value in this pool before returning.

Source code in pool/schema/base.py
def __getitem__(self, key) -> Optional[Any]:
    """Retrieve the stored blob for *key*, or a ``PoolWrapper`` for a ``Manifest``.

    If the key is not found locally and ``_proxy_to`` is set, the
    request falls back to the origin pool.  A successful fallback
    caches the value in this pool before returning.
    """
    from ...policy.central.memory.schema.manifest import Manifest
    if isinstance(key, Manifest):
        from .pool_wrapper import PoolWrapper
        return PoolWrapper(pool=self, manifest=key)

    value = self._read(key)
    if value is not None:
        return value

    if self._proxy_to is not None:
        value = self._proxy_to[key]
        if value is not None:
            self._write(key, value)
            return value

    return None

__setitem__(key, entry)

Store entry under key. Local only, no propagation.

Source code in pool/schema/base.py
def __setitem__(self, key: str, entry: Any) -> None:
    """Store *entry* under *key*.  Local only, no propagation."""
    self._write(key, entry)

__delitem__(key)

Delete the entry for key. Local only, no propagation.

Source code in pool/schema/base.py
def __delitem__(self, key: str) -> None:
    """Delete the entry for *key*.  Local only, no propagation."""
    self._delete(key)

empty()

Remove all entries from the pool. Local only, no propagation.

Source code in pool/schema/base.py
def empty(self) -> None:
    """Remove all entries from the pool.  Local only, no propagation."""
    self._empty()

exists(key)

Return True if key is present. Local only, no propagation.

Source code in pool/schema/base.py
def exists(self, key: str) -> bool:
    """Return ``True`` if *key* is present.  Local only, no propagation."""
    return self._exists(key)

__contains__(key)

Check membership, delegates to :meth:exists.

Source code in pool/schema/base.py
def __contains__(self, key: str) -> bool:
    """Check membership, delegates to :meth:`exists`."""
    return self.exists(key)

keys(as_generator=False)

Return the keys stored in this pool.

Parameters:

Name Type Description Default
as_generator bool

If False (default), return a snapshot list. If True, return an iterator that holds the lock for its lifetime.

False

Returns:

Type Description
Iterable[str]

Pool keys.

Source code in pool/schema/base.py
def keys(self, as_generator: bool = False) -> Iterable[str]:
    """Return the keys stored in this pool.

    Parameters
    ----------
    as_generator : bool, optional
        If ``False`` (default), return a snapshot list.  If ``True``,
        return an iterator that holds the lock for its lifetime.

    Returns
    -------
    Iterable[str]
        Pool keys.
    """
    return self._keys(as_generator=as_generator)

sync()

Flush any in-memory cache to the backing store.

Raises:

Type Description
NotImplementedError

If the pool is cacheless and operates directly on storage.

Source code in pool/schema/base.py
def sync(self) -> None:
    """Flush any in-memory cache to the backing store.

    Raises
    ------
    NotImplementedError
        If the pool is cacheless and operates directly on storage.
    """
    raise NotImplementedError("Sync is not implemented for this pool, the pool is cacheless, i.e. operations are immediately executed on the underlying storage.")

__le__(other)

Duplicate other pool's contents into this pool via the active policy.

Source code in pool/schema/base.py
def __le__(self, other: Any):
    """Duplicate *other* pool's contents into this pool via the active policy."""
    from ... import active_policy

    if not isinstance(other, (_LAILA_IDENTIFIABLE_POOL, str)):
        return NotImplemented

    return active_policy.central.memory._duplicate_pool(
        pool_src=other,
        pool_dest=self,
    )

DefaultPoolRouter

Bases: _LAILA_CLI_CAPABLE_CLASS, _LAILA_IDENTIFIABLE_OBJECT

Routes memory operations to the appropriate pool based on affinity or nickname.

Source code in policy/central/memory/router/pool_router.py
class _LAILA_IDENTIFIABLE_POOL_ROUTER(_LAILA_CLI_CAPABLE_CLASS, _LAILA_IDENTIFIABLE_OBJECT):
    """Routes memory operations to the appropriate pool based on affinity or nickname."""

    _scopes: list[str] = PrivateAttr(default_factory=lambda: list([_POOL_ROUTER_SCOPE]))
    pools: Optional[Dict[str,_LAILA_IDENTIFIABLE_POOL]] = CLIExempt(default_factory = dict)
    pools_pq: Optional[PriorityQueue] = CLIExempt(default_factory = PriorityQueue)
    pools_nicknames: Optional[Dict[str,str]] = Field(default_factory = dict)


    class Config:
        arbitrary_types_allowed = True


    def model_post_init(self, __context: Any) -> None:
        """Register a default pool when none are provided."""
        if len(self.pools) == 0:
            from .....macros.defaults import DefaultPool
            self.extend(
                pool = DefaultPool(), 
                affinity=1,
                pool_nickname=_DEFAULT_POOL_NICKNAME
            )


    def extend(
        self,
        pool: _LAILA_IDENTIFIABLE_POOL,
        *,
        affinity: Optional[float] = None,
        pool_nickname: Optional[str] = None,
    ):
        """Register a pool with optional affinity priority and nickname.

        Parameters
        ----------
        pool : _LAILA_IDENTIFIABLE_POOL
            The pool instance to register.
        affinity : float, optional
            Routing priority (higher = preferred). Defaults to ``0``.
        pool_nickname : str, optional
            Human-readable alias for this pool.
        """
        if affinity is None:
            affinity = 0 #farthest away

        self.pools_pq.put((-affinity, pool.global_id))
        self.pools[pool.global_id] = pool
        if pool_nickname is not None:
            self.pools_nicknames[pool_nickname] = pool.global_id


    def route(
        self,
        entries: List[Entry]|List[str],
        *,
        pool_id: Optional[str] = None,
        pool_nickname: Optional[str] = None,
        affinity: Optional[float] = None,
    ):
        """Resolve the target pool for the given entries.

        Parameters
        ----------
        entries : list[Entry] or list[str]
            Entries (or entry IDs) being routed.
        pool_id : str, optional
            Explicit pool ``global_id`` to use.
        pool_nickname : str, optional
            Nickname to resolve via ``_route_by_nickname``.
        affinity : float, optional
            Reserved for future affinity-based routing.

        Returns
        -------
        _LAILA_IDENTIFIABLE_POOL
            The selected pool.
        """
        if pool_id is not None:
            return self.pools[pool_id]
        return self._route_by_nickname(pool_nickname=pool_nickname)


    def _route_by_nickname(
        self, 
        *,
        pool_nickname: Optional[str] = None,
    ) -> _LAILA_IDENTIFIABLE_POOL:
        """Resolve a pool by nickname, falling back to the default pool."""
        if pool_nickname is not None:
            return self.pools[self.pools_nicknames[pool_nickname]]
        else:
            return self.pools[self.pools_nicknames[_DEFAULT_POOL_NICKNAME]]

model_post_init(__context)

Register a default pool when none are provided.

Source code in policy/central/memory/router/pool_router.py
def model_post_init(self, __context: Any) -> None:
    """Register a default pool when none are provided."""
    if len(self.pools) == 0:
        from .....macros.defaults import DefaultPool
        self.extend(
            pool = DefaultPool(), 
            affinity=1,
            pool_nickname=_DEFAULT_POOL_NICKNAME
        )

extend(pool, *, affinity=None, pool_nickname=None)

Register a pool with optional affinity priority and nickname.

Parameters:

Name Type Description Default
pool _LAILA_IDENTIFIABLE_POOL

The pool instance to register.

required
affinity float

Routing priority (higher = preferred). Defaults to 0.

None
pool_nickname str

Human-readable alias for this pool.

None
Source code in policy/central/memory/router/pool_router.py
def extend(
    self,
    pool: _LAILA_IDENTIFIABLE_POOL,
    *,
    affinity: Optional[float] = None,
    pool_nickname: Optional[str] = None,
):
    """Register a pool with optional affinity priority and nickname.

    Parameters
    ----------
    pool : _LAILA_IDENTIFIABLE_POOL
        The pool instance to register.
    affinity : float, optional
        Routing priority (higher = preferred). Defaults to ``0``.
    pool_nickname : str, optional
        Human-readable alias for this pool.
    """
    if affinity is None:
        affinity = 0 #farthest away

    self.pools_pq.put((-affinity, pool.global_id))
    self.pools[pool.global_id] = pool
    if pool_nickname is not None:
        self.pools_nicknames[pool_nickname] = pool.global_id

route(entries, *, pool_id=None, pool_nickname=None, affinity=None)

Resolve the target pool for the given entries.

Parameters:

Name Type Description Default
entries list[Entry] or list[str]

Entries (or entry IDs) being routed.

required
pool_id str

Explicit pool global_id to use.

None
pool_nickname str

Nickname to resolve via _route_by_nickname.

None
affinity float

Reserved for future affinity-based routing.

None

Returns:

Type Description
_LAILA_IDENTIFIABLE_POOL

The selected pool.

Source code in policy/central/memory/router/pool_router.py
def route(
    self,
    entries: List[Entry]|List[str],
    *,
    pool_id: Optional[str] = None,
    pool_nickname: Optional[str] = None,
    affinity: Optional[float] = None,
):
    """Resolve the target pool for the given entries.

    Parameters
    ----------
    entries : list[Entry] or list[str]
        Entries (or entry IDs) being routed.
    pool_id : str, optional
        Explicit pool ``global_id`` to use.
    pool_nickname : str, optional
        Nickname to resolve via ``_route_by_nickname``.
    affinity : float, optional
        Reserved for future affinity-based routing.

    Returns
    -------
    _LAILA_IDENTIFIABLE_POOL
        The selected pool.
    """
    if pool_id is not None:
        return self.pools[pool_id]
    return self._route_by_nickname(pool_nickname=pool_nickname)

DefaultTCPIPProtocol

Bases: _LAILA_IDENTIFIABLE_COMM_PROTOCOL

WebSocket-based peer-to-peer communication protocol.

Parameters:

Name Type Description Default
host str

Bind address for the WebSocket server. "0.0.0.0" listens on all interfaces.

required
port int

TCP port for the WebSocket server. 0 lets the OS pick a free port.

required
peer_secret_key str

Shared secret that remote peers must supply to connect.

required
Source code in policy/central/communication/protocols/tcpip.py
class _LAILA_IDENTIFIABLE_TCPIP_COMM_PROTOCOL(_LAILA_IDENTIFIABLE_COMM_PROTOCOL):
    """WebSocket-based peer-to-peer communication protocol.

    Parameters
    ----------
    host : str
        Bind address for the WebSocket server.  ``"0.0.0.0"`` listens on
        all interfaces.
    port : int
        TCP port for the WebSocket server.  ``0`` lets the OS pick a free
        port.
    peer_secret_key : str
        Shared secret that remote peers must supply to connect.
    """

    host: str = Field(default="0.0.0.0")
    port: int = Field(default=0)
    peer_secret_key: str = Field(default_factory=lambda: _uuid.uuid4().hex)

    _server: Any = PrivateAttr(default=None)
    _connections: Dict[str, Any] = PrivateAttr(default_factory=dict)
    _pending_rpcs: Dict[str, Dict[str, Any]] = PrivateAttr(default_factory=dict)
    _event_loop: Optional[asyncio.AbstractEventLoop] = PrivateAttr(default=None)
    _loop_thread: Optional[threading.Thread] = PrivateAttr(default=None)
    _started: bool = PrivateAttr(default=False)
    _bound_port: Optional[int] = PrivateAttr(default=None)

    @property
    def bound_port(self) -> int:
        """The actual port the server is listening on.

        After ``start()`` this returns the OS-assigned port (useful when
        ``port`` is ``0``).  Before ``start()`` it falls back to ``port``.
        """
        return self._bound_port if self._bound_port is not None else self.port

    # ------------------------------------------------------------------
    # URI routing
    # ------------------------------------------------------------------

    @classmethod
    def can_handle_uri(cls, uri: str) -> bool:
        return uri.startswith("ws://") or uri.startswith("wss://")

    # ------------------------------------------------------------------
    # Lifecycle
    # ------------------------------------------------------------------

    def start(self) -> None:
        """Boot the background event loop and WebSocket server."""
        if self._started:
            return

        self._event_loop = asyncio.new_event_loop()
        ready = threading.Event()

        def _run_loop() -> None:
            asyncio.set_event_loop(self._event_loop)
            self._event_loop.run_until_complete(self._async_start(ready))
            self._event_loop.run_forever()

        self._loop_thread = threading.Thread(target=_run_loop, daemon=True)
        self._loop_thread.start()
        ready.wait(timeout=10.0)
        self._started = True
        policy_id = self._communication.policy_id if self._communication else None
        log.info(
            "TCP/IP protocol started for policy %s on port %s",
            policy_id, self.bound_port,
        )

    async def _async_start(self, ready: threading.Event) -> None:
        from .. import connection
        await connection.start_server(self)
        ready.set()

    def stop(self) -> None:
        """Shut down the WebSocket server and all connections."""
        if not self._started:
            return

        async def _shutdown() -> None:
            if self._server is not None:
                self._server.close()
                await self._server.wait_closed()
                self._server = None

            close_tasks = []
            for ws in list(self._connections.values()):
                close_tasks.append(asyncio.ensure_future(ws.close()))
            if close_tasks:
                await asyncio.gather(*close_tasks, return_exceptions=True)
            self._connections.clear()

            for task in asyncio.all_tasks(self._event_loop):
                if task is not asyncio.current_task():
                    task.cancel()

        if self._event_loop is not None and self._event_loop.is_running():
            future = asyncio.run_coroutine_threadsafe(
                _shutdown(), self._event_loop,
            )
            try:
                future.result(timeout=5.0)
            except Exception:
                pass
            self._event_loop.call_soon_threadsafe(self._event_loop.stop)

        if self._loop_thread is not None:
            self._loop_thread.join(timeout=5.0)
            self._loop_thread = None

        self._pending_rpcs.clear()
        self._bound_port = None
        self._started = False
        policy_id = self._communication.policy_id if self._communication else None
        log.info("TCP/IP protocol stopped for policy %s", policy_id)

    # ------------------------------------------------------------------
    # Peer management
    # ------------------------------------------------------------------

    def add_peer(self, uri: str, secret: str) -> str:
        """Connect to a remote policy over WebSocket."""
        self.start()

        from .. import connection
        future = asyncio.run_coroutine_threadsafe(
            connection.connect_outbound(self, uri, secret),
            self._event_loop,
        )
        return future.result(timeout=30.0)

    def _register_peer(self, peer_id: str, ws: Any) -> None:
        """Store the WebSocket locally and notify Communication."""
        self._connections[peer_id] = ws
        if self._communication is not None:
            self._communication._register_peer(peer_id)

    def _unregister_peer(self, peer_id: str) -> None:
        """Remove the WebSocket locally and notify Communication."""
        self._connections.pop(peer_id, None)
        if self._communication is not None:
            self._communication._unregister_peer(peer_id)

    def has_peer(self, peer_id: str) -> bool:
        return peer_id in self._connections

    # ------------------------------------------------------------------
    # RPC (outbound)
    # ------------------------------------------------------------------

    def send_rpc(
        self, peer_id: str, path: list[str], args: tuple, kwargs: dict
    ) -> Any:
        """Send a JSON-RPC call over the WebSocket to *peer_id*."""
        ws = self._connections.get(peer_id)
        if ws is None:
            raise ConnectionError(f"No connection to peer {peer_id}")

        request_id = str(_uuid.uuid4())
        event = threading.Event()
        slot: Dict[str, Any] = {"event": event}
        self._pending_rpcs[request_id] = slot

        req = rpc_protocol.make_request("rpc.call", {
            "path": path,
            "args": list(args),
            "kwargs": dict(kwargs),
        }, request_id=request_id)

        asyncio.run_coroutine_threadsafe(
            ws.send(rpc_protocol.encode(req)),
            self._event_loop,
        )

        event.wait(timeout=60.0)
        self._pending_rpcs.pop(request_id, None)

        if "error" in slot:
            err = slot["error"]
            raise RuntimeError(f"Remote RPC error: {err.get('message', err)}")

        return slot.get("result")

bound_port property

The actual port the server is listening on.

After start() this returns the OS-assigned port (useful when port is 0). Before start() it falls back to port.

start()

Boot the background event loop and WebSocket server.

Source code in policy/central/communication/protocols/tcpip.py
def start(self) -> None:
    """Boot the background event loop and WebSocket server."""
    if self._started:
        return

    self._event_loop = asyncio.new_event_loop()
    ready = threading.Event()

    def _run_loop() -> None:
        asyncio.set_event_loop(self._event_loop)
        self._event_loop.run_until_complete(self._async_start(ready))
        self._event_loop.run_forever()

    self._loop_thread = threading.Thread(target=_run_loop, daemon=True)
    self._loop_thread.start()
    ready.wait(timeout=10.0)
    self._started = True
    policy_id = self._communication.policy_id if self._communication else None
    log.info(
        "TCP/IP protocol started for policy %s on port %s",
        policy_id, self.bound_port,
    )

stop()

Shut down the WebSocket server and all connections.

Source code in policy/central/communication/protocols/tcpip.py
def stop(self) -> None:
    """Shut down the WebSocket server and all connections."""
    if not self._started:
        return

    async def _shutdown() -> None:
        if self._server is not None:
            self._server.close()
            await self._server.wait_closed()
            self._server = None

        close_tasks = []
        for ws in list(self._connections.values()):
            close_tasks.append(asyncio.ensure_future(ws.close()))
        if close_tasks:
            await asyncio.gather(*close_tasks, return_exceptions=True)
        self._connections.clear()

        for task in asyncio.all_tasks(self._event_loop):
            if task is not asyncio.current_task():
                task.cancel()

    if self._event_loop is not None and self._event_loop.is_running():
        future = asyncio.run_coroutine_threadsafe(
            _shutdown(), self._event_loop,
        )
        try:
            future.result(timeout=5.0)
        except Exception:
            pass
        self._event_loop.call_soon_threadsafe(self._event_loop.stop)

    if self._loop_thread is not None:
        self._loop_thread.join(timeout=5.0)
        self._loop_thread = None

    self._pending_rpcs.clear()
    self._bound_port = None
    self._started = False
    policy_id = self._communication.policy_id if self._communication else None
    log.info("TCP/IP protocol stopped for policy %s", policy_id)

add_peer(uri, secret)

Connect to a remote policy over WebSocket.

Source code in policy/central/communication/protocols/tcpip.py
def add_peer(self, uri: str, secret: str) -> str:
    """Connect to a remote policy over WebSocket."""
    self.start()

    from .. import connection
    future = asyncio.run_coroutine_threadsafe(
        connection.connect_outbound(self, uri, secret),
        self._event_loop,
    )
    return future.result(timeout=30.0)

send_rpc(peer_id, path, args, kwargs)

Send a JSON-RPC call over the WebSocket to peer_id.

Source code in policy/central/communication/protocols/tcpip.py
def send_rpc(
    self, peer_id: str, path: list[str], args: tuple, kwargs: dict
) -> Any:
    """Send a JSON-RPC call over the WebSocket to *peer_id*."""
    ws = self._connections.get(peer_id)
    if ws is None:
        raise ConnectionError(f"No connection to peer {peer_id}")

    request_id = str(_uuid.uuid4())
    event = threading.Event()
    slot: Dict[str, Any] = {"event": event}
    self._pending_rpcs[request_id] = slot

    req = rpc_protocol.make_request("rpc.call", {
        "path": path,
        "args": list(args),
        "kwargs": dict(kwargs),
    }, request_id=request_id)

    asyncio.run_coroutine_threadsafe(
        ws.send(rpc_protocol.encode(req)),
        self._event_loop,
    )

    event.wait(timeout=60.0)
    self._pending_rpcs.pop(request_id, None)

    if "error" in slot:
        err = slot["error"]
        raise RuntimeError(f"Remote RPC error: {err.get('message', err)}")

    return slot.get("result")

DefaultTaskForce

Bases: _LAILA_IDENTIFIABLE_TASK_FORCE

Thread-pool TaskForce implementation.

Inherits shared queue management, len()/queue_len, and lifecycle surface (start, pause, shutdown) from the base TaskForce.

This subclass implements the backend-specific hooks using a ThreadPoolExecutor and dispatcher thread to consume items from _q.

Source code in policy/central/command/taskforce/thread_pool_executor/taskforce.py
class PythonThreadPoolTaskForce(_LAILA_IDENTIFIABLE_TASK_FORCE):
    """
    Thread-pool TaskForce implementation.

    Inherits shared queue management, len()/queue_len, and lifecycle surface
    (start, pause, shutdown) from the base `TaskForce`.

    This subclass implements the backend-specific hooks using a
    ThreadPoolExecutor and dispatcher thread to consume items from `_q`.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    backend: str = Field(default="threads", description="Execution backend (threads only).")
    num_workers: int = Field(default_factory=lambda: max(1, (os.cpu_count() or 1) // 2), ge=1, description="Number of worker threads.")


    # Runtime (Private)
    _cv: Optional[threading.Condition] = PrivateAttr(default=None)
    _worker_pool: Optional[ThreadPoolExecutor] = PrivateAttr(default=None)
    _stop: Optional[threading.Event] = PrivateAttr(default=None)
    _dispatcher: Optional[threading.Thread] = PrivateAttr(default=None)
    _submit_slots: Optional[threading.Semaphore] = PrivateAttr(default=None)

    # =========================================================
    # Lifecycle hooks required by TaskForce
    # =========================================================


    def _on_start(self) -> None:
        """Initialize thread pool and dispatcher."""
        if self.backend.lower() != "threads":
            raise ValueError("PythonThreadPoolTaskForce supports threads only.")

        # (Re)create runtime primitives
        self._cv = threading.Condition()
        self._stop = threading.Event()
        self._worker_pool = ThreadPoolExecutor(
            max_workers=self.num_workers, thread_name_prefix="TaskForce"
        )
        # Backpressure: cap in-flight submissions so dispatcher cannot flood executor queue.
        self._submit_slots = threading.Semaphore(max(1, self.num_workers * 2))
        self._dispatcher = threading.Thread(
            target=self._loop, name="TaskForce-Dispatcher", daemon=True
        )
        self._dispatcher.start()

    def _on_pause(self) -> None:
        """Pause dispatcher loop without destroying the pool (currently a no-op)."""
        raise NotImplementedError

    def _on_shutdown(self, *, wait: bool = True, cancel_pending: bool = True) -> None:
        """Tear down dispatcher and thread pool."""
        # Signal dispatcher to stop and wake it up before draining queue.
        if self._stop is not None:
            self._stop.set()
        if self._cv is not None:
            with self._cv:
                self._cv.notify_all()

        # Join dispatcher if requested.
        if wait and self._dispatcher is not None:
            self._dispatcher.join()

        # Optionally cancel pending queue items after dispatcher is stopped/signaled.
        if cancel_pending:
            with self._q.atomic("cancel"):
                for _, (_, _, kwargs) in self._q.items():
                    fut = kwargs.get("fut")
                    if fut is None:
                        continue
                    fut.exception = RuntimeError("Task canceled before dispatch.")
                    fut.status = FutureStatus.CANCELLED
                    fut.result = None
                self._q.clear()

        # Shutdown the pool.
        if self._worker_pool is not None:
            self._worker_pool.shutdown(wait=wait)

    # =========================================================
    # Task submission and mapping
    # =========================================================

    def _queue_submit(self, task: Callable[..., Any], *args, **kwargs) -> ConcurrentPackageFuture:
        """Internal: enqueue callable into the task queue."""
        if self.status != TaskForceStatus.RUNNING:
            raise RuntimeError("TaskForce must be running before submitting tasks.")

        fut = ConcurrentPackageFuture(
            taskforce_id=self.global_id,
            policy_id=self.policy_id
        )

        with self._cv:
            with self._q.atomic():
                kwargs["task"] = task
                kwargs["fut"] = fut
                self._q[fut.global_id] = (PythonThreadPoolTaskForce._runner, args, kwargs)
            self._cv.notify()

        return fut

    def imap(self, tasks: Iterable[Callable[[], Any]]) -> Iterable[Any]:
        """Submit an iterable of zero-arg callables, yielding future identities in submission order."""
        for f in tasks:
            fut = self._queue_submit(f)
            yield fut.future_identity

    def submit(
        self,
        tasks: Iterable[Callable[[], Any]],
        wait: bool = False,
    ) -> Union[GroupFuture, Any]:
        """Batch submit zero-arg callables.

        Returns future identities (single) or a hollow GroupFuture (multiple)
        when *wait* is False.  When *wait* is True, blocks and returns values.
        """

        tasks = list(tasks)

        futures: List[ConcurrentPackageFuture] = []

        for task in tasks:
            fut = self._queue_submit(task)
            fut.taskforce_id = self.global_id
            futures.append(fut)

        if len(futures) == 1:
            single = futures[0]
            if wait:
                try:
                    return single.wait(None)
                except Exception as e:
                    raise 
            else:
                return single.future_identity

        gf = GroupFuture(
            taskforce_id=self.global_id,
            policy_id=self.policy_id,
            future_ids=[f.global_id for f in futures],
        )

        for f in futures:
            f.future_group_id = gf.global_id

        if not wait:
            return gf
        else:
            return gf.wait(None)


    # =========================================================
    # Dispatcher loop
    # =========================================================

    def _loop(self):
        """Continuously dispatch tasks from queue to the worker pool."""
        cv = self._cv
        stop = self._stop
        slots = self._submit_slots

        while not stop.is_set():
            with cv:
                while not stop.is_set() and len(self._q) == 0:
                    cv.wait(timeout=0.1)
                if stop.is_set():
                    break
                _, item = self._q.pop_next()  # FIFO
                runner, args, kwargs = item

            while not stop.is_set() and not slots.acquire(timeout=0.1):
                pass
            if stop.is_set():
                with self._q.atomic():
                    self._q[kwargs["fut"].global_id] = (runner, args, kwargs)
                break

            fut = kwargs["fut"]
            try:
                fut.native_future = self._worker_pool.submit(runner, args, kwargs)
                fut.native_future.add_done_callback(lambda _f: slots.release())
            except Exception:
                slots.release()
                raise

    @staticmethod
    def _runner(args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> Any:
        """Execute a queued task inside a worker thread."""
        task = kwargs.pop("task")
        fut = kwargs.pop("fut")
        fut.status = FutureStatus.RUNNING
        return task(*args, **kwargs)

imap(tasks)

Submit an iterable of zero-arg callables, yielding future identities in submission order.

Source code in policy/central/command/taskforce/thread_pool_executor/taskforce.py
def imap(self, tasks: Iterable[Callable[[], Any]]) -> Iterable[Any]:
    """Submit an iterable of zero-arg callables, yielding future identities in submission order."""
    for f in tasks:
        fut = self._queue_submit(f)
        yield fut.future_identity

submit(tasks, wait=False)

Batch submit zero-arg callables.

Returns future identities (single) or a hollow GroupFuture (multiple) when wait is False. When wait is True, blocks and returns values.

Source code in policy/central/command/taskforce/thread_pool_executor/taskforce.py
def submit(
    self,
    tasks: Iterable[Callable[[], Any]],
    wait: bool = False,
) -> Union[GroupFuture, Any]:
    """Batch submit zero-arg callables.

    Returns future identities (single) or a hollow GroupFuture (multiple)
    when *wait* is False.  When *wait* is True, blocks and returns values.
    """

    tasks = list(tasks)

    futures: List[ConcurrentPackageFuture] = []

    for task in tasks:
        fut = self._queue_submit(task)
        fut.taskforce_id = self.global_id
        futures.append(fut)

    if len(futures) == 1:
        single = futures[0]
        if wait:
            try:
                return single.wait(None)
            except Exception as e:
                raise 
        else:
            return single.future_identity

    gf = GroupFuture(
        taskforce_id=self.global_id,
        policy_id=self.policy_id,
        future_ids=[f.global_id for f in futures],
    )

    for f in futures:
        f.future_group_id = gf.global_id

    if not wait:
        return gf
    else:
        return gf.wait(None)

Entry

Bases: _LAILA_LOCALLY_ATOMIC_IDENTIFIABLE_OBJECT

The fundamental unit of data in LAILA.

An Entry wraps arbitrary data (tensors, images, dicts, etc.) with identity (UUID + evolution counter), state tracking, and serialization capabilities. Entries can be constants (immutable) or variables (evolvable).

Create entries via the class methods Entry.variable(...) or Entry.constant(...) rather than calling the constructor directly.

Source code in entry/entry.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
class Entry( 
    _LAILA_LOCALLY_ATOMIC_IDENTIFIABLE_OBJECT
):
    """The fundamental unit of data in LAILA.

    An ``Entry`` wraps arbitrary data (tensors, images, dicts, etc.) with
    identity (UUID + evolution counter), state tracking, and serialization
    capabilities.  Entries can be *constants* (immutable) or *variables*
    (evolvable).

    Create entries via the class methods ``Entry.variable(...)`` or
    ``Entry.constant(...)`` rather than calling the constructor directly.
    """

    model_config = ConfigDict(
        private_attributes=True,
        use_enum_values=True
    )

    _scopes: list[str] = PrivateAttr(default_factory=lambda: list([_ENTRY_SCOPE]))
    _state: EntryState = PrivateAttr(default=EntryState.STAGED)
    _constitution: Optional[EntryConstitution] = PrivateAttr(default = None)
    _payload: Optional[ComputationalData] = PrivateAttr(default=None)

    #TODO: don't allow constitution and state to be set at the same time. 
    def __init__(self, **data: dict):
        """Initialise an Entry from keyword arguments.

        Parameters
        ----------
        **data : dict
            Accepted keys include ``data`` / ``payload``, ``uuid``,
            ``evolution``, ``global_id``, ``nickname``, ``constitution``,
            and ``state``.
        """
        self._initialize_identity(data)
        self._initialize_payload(data)
        self._initialize_constitution(data)
        self._initialize_state(data)


    def _initialize_identity(self, data: dict) -> None:
        """Parse identity fields from *data* and initialise the parent identity."""
        identity_data = {}

        global_id = data.get("global_id", None)
        uuid = data.get("uuid", None)
        evolution = data.get("evolution", None)
        nickname = data.get("nickname", None)
        scopes = data.get("scopes", None)

        if global_id is not None:
            identity_data = _LAILA_IDENTIFIABLE_OBJECT.process_global_id(global_id)
            _LAILA_IDENTIFIABLE_OBJECT.__init__(self, **identity_data)
            return

        if uuid is not None:
            identity_data["uuid"] = uuid

        if evolution is not None:
            identity_data["evolution"] = evolution

        if nickname is not None:
            identity_data["nickname"] = nickname

        if scopes is not None:
            identity_data["scopes"] = scopes

        _LAILA_IDENTIFIABLE_OBJECT.__init__(self, **identity_data)

    def _initialize_payload(self, data: dict) -> None:
        """Wrap raw payload data in a ``ComputationalData`` instance."""
        payload = data.get("payload", data.get("data", None))
        if payload is not None and not isinstance(payload,ComputationalData):
            payload = ComputationalData(payload)

        self._payload = payload

    def _initialize_constitution(self, data: dict) -> None:
        """Set up the ``EntryConstitution`` if one was provided."""
        constitution = data.get("constitution", None)
        if constitution is not None:
            self._constitution = EntryConstitution(constitution=constitution)


    def _initialize_state(self, data: dict) -> None:
        """Determine the initial ``EntryState`` from *data*."""
        constitution = data.get("constitution", None)
        if constitution is not None:
            self._state = EntryState.STAGED
        else:
            self._state = data.get("state", EntryState.STAGED)



    ###################################################
    # Policy
    ###################################################

    def notify_policy(self):
        """Notify the active policy, if any, that this Entry has changed."""
        from .. import active_policy
        if active_policy is not None:
            active_policy.update(self)


    ###################################################
    # Properties
    ###################################################


    @property
    @synchronized
    def data(self) -> Optional[Any]:
        """Unwrapped payload data, or ``None`` if no payload is set."""
        if self._payload == None:
            return None
        return self._payload.data



    @property
    @synchronized
    def state(self) -> EntryState:
        """Current lifecycle state of this Entry."""
        return self._state


    @state.setter
    @synchronized
    def state(self, new_state: EntryState):
        """Set a new lifecycle state.

        Raises
        ------
        ValueError
            If *new_state* is not an ``EntryState``.
        """
        if not isinstance(new_state, EntryState):
            raise ValueError("state must be an EntryState")
        self._state = new_state
        self.notify_policy()


    @property
    @synchronized
    def metadata(self):
        """Entry metadata (not yet implemented).

        Raises
        ------
        NotImplementedError
            Always.
        """
        raise NotImplementedError("Metadata is not implemented for Entry")

    ###################################################
    # Constitution Operations
    ###################################################






    ###################################################
    # Variable
    ###################################################

    @classmethod
    def variable(
        cls, 
        data,
        *, 
        uuid = None,
        evolution=None,
        state = None,
        constitution=None, 
        global_id = None,
        nickname = None
    ):
        """Create a mutable (evolvable) Entry.

        Parameters
        ----------
        data : Any
            The raw payload.
        uuid : str, optional
            Explicit UUID.  Mutually exclusive with *global_id*.
        evolution : int, optional
            Starting evolution counter (defaults to ``0``).
        state : EntryState, optional
            Initial state; auto-set to ``READY`` when no constitution is given.
        constitution : callable, optional
            Not yet implemented.
        global_id : str, optional
            Composite ``uuid:evolution`` identifier.
        nickname : str, optional
            Human-readable name used to derive a deterministic UUID.

        Returns
        -------
        Entry
            A new variable Entry.

        Raises
        ------
        RuntimeError
            If conflicting identity arguments are provided or both
            *constitution* and *data* are set.
        NotImplementedError
            If a constitution is supplied.
        """
        if global_id is not None and (uuid is not None or evolution is not None):
            raise RuntimeError("Cannot set both global_id and <uuid, evolution> at the same time.")

        if constitution and data:
            raise RuntimeError("Cannot set both constitution and data.")

        if constitution is None:
            state = EntryState.READY

        if constitution is not None:
            raise NotImplementedError("Constitution is not implemented in this release.")

        if global_id is not None:
            identity_data = _LAILA_IDENTIFIABLE_OBJECT.process_global_id(global_id)
            uuid = identity_data["uuid"]
            evolution = identity_data["evolution"]

        evolution = evolution if evolution is not None else 0

        if nickname is not None:
            uuid = cls.generate_uuid_from_nickname(nickname)

        return Entry(
            data = data,
            evolution = evolution,
            state = state,
            constitution = constitution,
            uuid = uuid
        )


    @synchronized
    def evolve(
        self,
        precedence:  Optional[List[str]] = None,
        constitution = None,
        data = None
    ):
        """Advance the evolution counter and replace the payload.

        Parameters
        ----------
        precedence : list of str, optional
            Reserved for constitution-based evolution.
        constitution : callable, optional
            Not yet implemented.
        data : Any, optional
            New payload data.

        Raises
        ------
        RuntimeError
            If the Entry is a constant or both *constitution* and *data* are
            provided.
        NotImplementedError
            If a constitution is supplied.
        """
        if self._evolution is None:
            raise RuntimeError("Can't evolve a constant.")

        if constitution and data:
            raise RuntimeError("Cannot set both constitution and data.")

        if constitution is not None:
            raise NotImplementedError("Constitution is not implemented in this release.")


        if constitution is None:
            with self.atomic():
                self._initialize_payload({"data": data})
                self._evolution = self._evolution + 1
        else:
            raise NotImplementedError("Constitution is not implemented in this release.")



    ###################################################
    # Constant
    ###################################################

    @classmethod
    def constant(
        cls, 
        data, 
        *,
        global_id = None,
        uuid = None,
        nickname = None
    ):
        """Create an immutable Entry (evolution is ``None``).

        Parameters
        ----------
        data : Any
            The raw payload.
        global_id : str, optional
            Composite identifier (must not contain an evolution component).
        uuid : str, optional
            Explicit UUID.  Mutually exclusive with *global_id*.
        nickname : str, optional
            Human-readable name used to derive a deterministic UUID.

        Returns
        -------
        Entry
            A new constant Entry.

        Raises
        ------
        RuntimeError
            If conflicting identity arguments are provided or the
            *global_id* includes an evolution component.
        """
        if global_id is not None and (uuid is not None):
            raise RuntimeError("Cannot set both global_id and uuid at the same time.")

        if uuid is None and global_id is not None:
            identity_data = _LAILA_IDENTIFIABLE_OBJECT.process_global_id(global_id)
            uuid = identity_data["uuid"]
            if identity_data["evolution"] is not None:
                raise RuntimeError("Cannot have a constant with an evolution.")

        if nickname is not None:
            uuid = cls.generate_uuid_from_nickname(nickname)

        new_entry = Entry(
            uuid = uuid,
            data = data,
            state=EntryState.READY,
            evolution=None 
        )

        return new_entry



    ###################################################
    # Contingent
    ###################################################
    @classmethod
    def contingent(cls, **kwargs):
        """Create an Entry from raw keyword arguments without validation guards.

        Parameters
        ----------
        **kwargs
            Forwarded directly to the ``Entry`` constructor.

        Returns
        -------
        Entry
        """
        return Entry(**kwargs)


    ###################################################
    # Serialize and Recovery
    ###################################################

    def serialize(
        self, 
        transformations: TransformationSequence = None,
        *,
        exclude_private: set = {"_local_lock","_payload","_state"}
    ) -> dict:
        """Serialise the Entry into a plain dict suitable for storage.

        Parameters
        ----------
        transformations : TransformationSequence, optional
            Pipeline applied to the serialised payload bytes.  If ``None``
            the raw Entry is returned.
        exclude_private : set, optional
            Private attribute names to omit from the output dict.

        Returns
        -------
        dict
            Serialised representation including ``transformed_payload`` and
            ``recovery_sequence`` keys.
        """
        if transformations is None:
            return self

        entry_dict = self.model_dump()

        private_attrs = {
            name: getattr(self, name)
            for name in self.__private_attributes__
            if name not in exclude_private
        }

        entry_dict.update(private_attrs)


        if self._payload is not None:
            serialized_payload, recovery_code = self._payload.serialize()
        else:
            serialized_payload = None
            recovery_code = "null_fn = lambda x:x"


        transformed_payload, transformation_inverse_code = transformations.forward(serialized_payload)

        transformation_inverse_code.append(recovery_code)

        entry_dict.update({
            "transformed_payload": transformed_payload,
            "recovery_sequence": transformation_inverse_code,
            "_state": self.state.name
        })


        return entry_dict


    @classmethod
    def recover(cls, in_dict: dict, notify_on_creation=False):
        """Reconstruct an Entry from a serialised dict or JSON string.

        Parameters
        ----------
        in_dict : dict or str or Entry
            Serialised representation produced by ``serialize()``, a JSON
            string thereof, or an existing Entry (returned as-is).
        notify_on_creation : bool, optional
            If ``True``, notify the active policy after construction.

        Returns
        -------
        Entry
            The recovered Entry instance.

        Raises
        ------
        ValueError
            If *in_dict* is an invalid JSON string.
        RuntimeError
            If the input type is unsupported.
        """
        if isinstance(in_dict, Entry):
            return in_dict

        if isinstance(in_dict, str):
            try:
                in_dict = json.loads(in_dict)
            except Exception as e:
                raise ValueError("Invalid JSON string") from e

        if isinstance (in_dict, dict):
            recovered = Entry(
                uuid = in_dict["_uuid"],
                evolution = in_dict["_evolution"],
                constitution = in_dict["_constitution"],
                scopes = in_dict.get("_scopes", None),
                payload = ComputationalData.recover(
                    payload_blob = in_dict["transformed_payload"],
                    recovery_sequence = in_dict["recovery_sequence"]
                ),
                state = EntryState[in_dict["_state"]],
                notify_on_creation = notify_on_creation
            )

            return recovered

        raise RuntimeError("Invalid input for entry recovery.")

    ###################################################
    # Policy Communication
    ###################################################


    ###################################################
    # String Representation
    ###################################################

    def __str__(self):
        """Return the global identifier string."""
        return self.global_id

    def __repr__(self):
        """Return the global identifier string."""
        return self.global_id

data property

Unwrapped payload data, or None if no payload is set.

state property writable

Current lifecycle state of this Entry.

metadata property

Entry metadata (not yet implemented).

Raises:

Type Description
NotImplementedError

Always.

__init__(**data)

Initialise an Entry from keyword arguments.

Parameters:

Name Type Description Default
**data dict

Accepted keys include data / payload, uuid, evolution, global_id, nickname, constitution, and state.

{}
Source code in entry/entry.py
def __init__(self, **data: dict):
    """Initialise an Entry from keyword arguments.

    Parameters
    ----------
    **data : dict
        Accepted keys include ``data`` / ``payload``, ``uuid``,
        ``evolution``, ``global_id``, ``nickname``, ``constitution``,
        and ``state``.
    """
    self._initialize_identity(data)
    self._initialize_payload(data)
    self._initialize_constitution(data)
    self._initialize_state(data)

notify_policy()

Notify the active policy, if any, that this Entry has changed.

Source code in entry/entry.py
def notify_policy(self):
    """Notify the active policy, if any, that this Entry has changed."""
    from .. import active_policy
    if active_policy is not None:
        active_policy.update(self)

variable(data, *, uuid=None, evolution=None, state=None, constitution=None, global_id=None, nickname=None) classmethod

Create a mutable (evolvable) Entry.

Parameters:

Name Type Description Default
data Any

The raw payload.

required
uuid str

Explicit UUID. Mutually exclusive with global_id.

None
evolution int

Starting evolution counter (defaults to 0).

None
state EntryState

Initial state; auto-set to READY when no constitution is given.

None
constitution callable

Not yet implemented.

None
global_id str

Composite uuid:evolution identifier.

None
nickname str

Human-readable name used to derive a deterministic UUID.

None

Returns:

Type Description
Entry

A new variable Entry.

Raises:

Type Description
RuntimeError

If conflicting identity arguments are provided or both constitution and data are set.

NotImplementedError

If a constitution is supplied.

Source code in entry/entry.py
@classmethod
def variable(
    cls, 
    data,
    *, 
    uuid = None,
    evolution=None,
    state = None,
    constitution=None, 
    global_id = None,
    nickname = None
):
    """Create a mutable (evolvable) Entry.

    Parameters
    ----------
    data : Any
        The raw payload.
    uuid : str, optional
        Explicit UUID.  Mutually exclusive with *global_id*.
    evolution : int, optional
        Starting evolution counter (defaults to ``0``).
    state : EntryState, optional
        Initial state; auto-set to ``READY`` when no constitution is given.
    constitution : callable, optional
        Not yet implemented.
    global_id : str, optional
        Composite ``uuid:evolution`` identifier.
    nickname : str, optional
        Human-readable name used to derive a deterministic UUID.

    Returns
    -------
    Entry
        A new variable Entry.

    Raises
    ------
    RuntimeError
        If conflicting identity arguments are provided or both
        *constitution* and *data* are set.
    NotImplementedError
        If a constitution is supplied.
    """
    if global_id is not None and (uuid is not None or evolution is not None):
        raise RuntimeError("Cannot set both global_id and <uuid, evolution> at the same time.")

    if constitution and data:
        raise RuntimeError("Cannot set both constitution and data.")

    if constitution is None:
        state = EntryState.READY

    if constitution is not None:
        raise NotImplementedError("Constitution is not implemented in this release.")

    if global_id is not None:
        identity_data = _LAILA_IDENTIFIABLE_OBJECT.process_global_id(global_id)
        uuid = identity_data["uuid"]
        evolution = identity_data["evolution"]

    evolution = evolution if evolution is not None else 0

    if nickname is not None:
        uuid = cls.generate_uuid_from_nickname(nickname)

    return Entry(
        data = data,
        evolution = evolution,
        state = state,
        constitution = constitution,
        uuid = uuid
    )

evolve(precedence=None, constitution=None, data=None)

Advance the evolution counter and replace the payload.

Parameters:

Name Type Description Default
precedence list of str

Reserved for constitution-based evolution.

None
constitution callable

Not yet implemented.

None
data Any

New payload data.

None

Raises:

Type Description
RuntimeError

If the Entry is a constant or both constitution and data are provided.

NotImplementedError

If a constitution is supplied.

Source code in entry/entry.py
@synchronized
def evolve(
    self,
    precedence:  Optional[List[str]] = None,
    constitution = None,
    data = None
):
    """Advance the evolution counter and replace the payload.

    Parameters
    ----------
    precedence : list of str, optional
        Reserved for constitution-based evolution.
    constitution : callable, optional
        Not yet implemented.
    data : Any, optional
        New payload data.

    Raises
    ------
    RuntimeError
        If the Entry is a constant or both *constitution* and *data* are
        provided.
    NotImplementedError
        If a constitution is supplied.
    """
    if self._evolution is None:
        raise RuntimeError("Can't evolve a constant.")

    if constitution and data:
        raise RuntimeError("Cannot set both constitution and data.")

    if constitution is not None:
        raise NotImplementedError("Constitution is not implemented in this release.")


    if constitution is None:
        with self.atomic():
            self._initialize_payload({"data": data})
            self._evolution = self._evolution + 1
    else:
        raise NotImplementedError("Constitution is not implemented in this release.")

constant(data, *, global_id=None, uuid=None, nickname=None) classmethod

Create an immutable Entry (evolution is None).

Parameters:

Name Type Description Default
data Any

The raw payload.

required
global_id str

Composite identifier (must not contain an evolution component).

None
uuid str

Explicit UUID. Mutually exclusive with global_id.

None
nickname str

Human-readable name used to derive a deterministic UUID.

None

Returns:

Type Description
Entry

A new constant Entry.

Raises:

Type Description
RuntimeError

If conflicting identity arguments are provided or the global_id includes an evolution component.

Source code in entry/entry.py
@classmethod
def constant(
    cls, 
    data, 
    *,
    global_id = None,
    uuid = None,
    nickname = None
):
    """Create an immutable Entry (evolution is ``None``).

    Parameters
    ----------
    data : Any
        The raw payload.
    global_id : str, optional
        Composite identifier (must not contain an evolution component).
    uuid : str, optional
        Explicit UUID.  Mutually exclusive with *global_id*.
    nickname : str, optional
        Human-readable name used to derive a deterministic UUID.

    Returns
    -------
    Entry
        A new constant Entry.

    Raises
    ------
    RuntimeError
        If conflicting identity arguments are provided or the
        *global_id* includes an evolution component.
    """
    if global_id is not None and (uuid is not None):
        raise RuntimeError("Cannot set both global_id and uuid at the same time.")

    if uuid is None and global_id is not None:
        identity_data = _LAILA_IDENTIFIABLE_OBJECT.process_global_id(global_id)
        uuid = identity_data["uuid"]
        if identity_data["evolution"] is not None:
            raise RuntimeError("Cannot have a constant with an evolution.")

    if nickname is not None:
        uuid = cls.generate_uuid_from_nickname(nickname)

    new_entry = Entry(
        uuid = uuid,
        data = data,
        state=EntryState.READY,
        evolution=None 
    )

    return new_entry

contingent(**kwargs) classmethod

Create an Entry from raw keyword arguments without validation guards.

Parameters:

Name Type Description Default
**kwargs

Forwarded directly to the Entry constructor.

{}

Returns:

Type Description
Entry
Source code in entry/entry.py
@classmethod
def contingent(cls, **kwargs):
    """Create an Entry from raw keyword arguments without validation guards.

    Parameters
    ----------
    **kwargs
        Forwarded directly to the ``Entry`` constructor.

    Returns
    -------
    Entry
    """
    return Entry(**kwargs)

serialize(transformations=None, *, exclude_private={'_local_lock', '_payload', '_state'})

Serialise the Entry into a plain dict suitable for storage.

Parameters:

Name Type Description Default
transformations TransformationSequence

Pipeline applied to the serialised payload bytes. If None the raw Entry is returned.

None
exclude_private set

Private attribute names to omit from the output dict.

{'_local_lock', '_payload', '_state'}

Returns:

Type Description
dict

Serialised representation including transformed_payload and recovery_sequence keys.

Source code in entry/entry.py
def serialize(
    self, 
    transformations: TransformationSequence = None,
    *,
    exclude_private: set = {"_local_lock","_payload","_state"}
) -> dict:
    """Serialise the Entry into a plain dict suitable for storage.

    Parameters
    ----------
    transformations : TransformationSequence, optional
        Pipeline applied to the serialised payload bytes.  If ``None``
        the raw Entry is returned.
    exclude_private : set, optional
        Private attribute names to omit from the output dict.

    Returns
    -------
    dict
        Serialised representation including ``transformed_payload`` and
        ``recovery_sequence`` keys.
    """
    if transformations is None:
        return self

    entry_dict = self.model_dump()

    private_attrs = {
        name: getattr(self, name)
        for name in self.__private_attributes__
        if name not in exclude_private
    }

    entry_dict.update(private_attrs)


    if self._payload is not None:
        serialized_payload, recovery_code = self._payload.serialize()
    else:
        serialized_payload = None
        recovery_code = "null_fn = lambda x:x"


    transformed_payload, transformation_inverse_code = transformations.forward(serialized_payload)

    transformation_inverse_code.append(recovery_code)

    entry_dict.update({
        "transformed_payload": transformed_payload,
        "recovery_sequence": transformation_inverse_code,
        "_state": self.state.name
    })


    return entry_dict

recover(in_dict, notify_on_creation=False) classmethod

Reconstruct an Entry from a serialised dict or JSON string.

Parameters:

Name Type Description Default
in_dict dict or str or Entry

Serialised representation produced by serialize(), a JSON string thereof, or an existing Entry (returned as-is).

required
notify_on_creation bool

If True, notify the active policy after construction.

False

Returns:

Type Description
Entry

The recovered Entry instance.

Raises:

Type Description
ValueError

If in_dict is an invalid JSON string.

RuntimeError

If the input type is unsupported.

Source code in entry/entry.py
@classmethod
def recover(cls, in_dict: dict, notify_on_creation=False):
    """Reconstruct an Entry from a serialised dict or JSON string.

    Parameters
    ----------
    in_dict : dict or str or Entry
        Serialised representation produced by ``serialize()``, a JSON
        string thereof, or an existing Entry (returned as-is).
    notify_on_creation : bool, optional
        If ``True``, notify the active policy after construction.

    Returns
    -------
    Entry
        The recovered Entry instance.

    Raises
    ------
    ValueError
        If *in_dict* is an invalid JSON string.
    RuntimeError
        If the input type is unsupported.
    """
    if isinstance(in_dict, Entry):
        return in_dict

    if isinstance(in_dict, str):
        try:
            in_dict = json.loads(in_dict)
        except Exception as e:
            raise ValueError("Invalid JSON string") from e

    if isinstance (in_dict, dict):
        recovered = Entry(
            uuid = in_dict["_uuid"],
            evolution = in_dict["_evolution"],
            constitution = in_dict["_constitution"],
            scopes = in_dict.get("_scopes", None),
            payload = ComputationalData.recover(
                payload_blob = in_dict["transformed_payload"],
                recovery_sequence = in_dict["recovery_sequence"]
            ),
            state = EntryState[in_dict["_state"]],
            notify_on_creation = notify_on_creation
        )

        return recovered

    raise RuntimeError("Invalid input for entry recovery.")

__str__()

Return the global identifier string.

Source code in entry/entry.py
def __str__(self):
    """Return the global identifier string."""
    return self.global_id

__repr__()

Return the global identifier string.

Source code in entry/entry.py
def __repr__(self):
    """Return the global identifier string."""
    return self.global_id

Future

Bases: _LAILA_IDENTIFIABLE_FUTURE

Abstract Future base class with identity, outcome, and callbacks.

Parameters:

Name Type Description Default
future_identity

Identity metadata for this Future instance.

required
Source code in policy/central/command/schema/future/future/future.py
class Future(_LAILA_IDENTIFIABLE_FUTURE):
    """
    Abstract Future base class with identity, outcome, and callbacks.

    Parameters
    ----------
    future_identity
        Identity metadata for this Future instance.
    """

    _status: FutureStatus = PrivateAttr(default=FutureStatus.NOT_STARTED)
    _return_value: Any = PrivateAttr(default=None)
    _exception: Optional[Exception] = PrivateAttr(default=None)
    _result_global_id: Optional[str] = PrivateAttr(default=None)
    _timeout_ms: int = PrivateAttr(default=100)

    model_config = ConfigDict(arbitrary_types_allowed=True)


    _default_callbacks: Dict[FutureStatus, Callable[..., Any]] = PrivateAttr(default_factory=dict)
    callbacks: Dict[FutureStatus, Callable[..., Any]] = Field(default_factory=dict)

    def model_post_init(self, __context: Any) -> None:
        """Wire default callbacks and register this future with the active policy."""
        self._setup_default_callbacks()
        from ....... import get_active_policy
        policy = get_active_policy()
        policy.central.command._register_future_with_active_guarantees(self)
        policy.future_bank[self.global_id] = self


    def _setup_default_callbacks(self) -> None:
        """Populate default status-transition callbacks."""
        self._default_callbacks[FutureStatus.ERROR] = lambda f: setattr(f, "status", FutureStatus.ERROR)
        self._default_callbacks[FutureStatus.CANCELLED] = lambda f: setattr(f, "status", FutureStatus.CANCELLED)
        self._default_callbacks[FutureStatus.NOT_STARTED] = lambda f: setattr(f, "status", FutureStatus.NOT_STARTED)
        self._default_callbacks[FutureStatus.RUNNING] = lambda f: setattr(f, "status", FutureStatus.RUNNING)
        self._default_callbacks[FutureStatus.POLL_TIMEOUT] = lambda f: setattr(f, "status", FutureStatus.POLL_TIMEOUT)
        self._default_callbacks[FutureStatus.UNKNOWN] = lambda f: setattr(f, "status", FutureStatus.UNKNOWN)
        self._default_callbacks[FutureStatus.FINISHED] = lambda f: setattr(f, "status", FutureStatus.FINISHED)


    @property
    @synchronized
    def status(self) -> FutureStatus:
        """
        Return the current status code for this Future.
        """
        return self._status


    @status.setter
    @synchronized
    def status(self, status: FutureStatus) -> None:
        """
        Set the current status code for this Future.
        """
        self._status = status


    @property
    def result(self) -> Any:
        """
        Return the current result value. Releases the lock before waiting to
        avoid deadlock with the done callback that sets the result.
        """
        with self.atomic():
            if self._status in [FutureStatus.ERROR, FutureStatus.CANCELLED]:
                raise self._exception
            if self._status == FutureStatus.FINISHED:
                return self._return_value
        self.wait(timeout=None)
        with self.atomic():
            if self._status in [FutureStatus.ERROR, FutureStatus.CANCELLED]:
                raise self._exception
            return self._return_value


    @result.setter
    @synchronized
    def result(self, result: Any) -> None:
        """Set the result value, auto-wrapping non-Entry values into an Entry."""
        from .......entry import Entry

        if result is None:
            self._return_value = None
            self._result_global_id = None
        elif isinstance(result, Entry):
            self._return_value = result
            self._result_global_id = result.global_id
        else:
            wrapped = Entry.constant(data=result)
            self._return_value = wrapped
            self._result_global_id = wrapped.global_id

    @property
    def data(self) -> Any:
        """Return the unwrapped payload of the result Entry.

        Raises
        ------
        RuntimeError
            If the result is not an Entry instance.
        """
        from .......entry import Entry
        result = self.result
        if not isinstance(result, Entry):
            raise RuntimeError(
                f"Future result is not an Entry (got {type(result).__name__}); "
                "cannot access .data"
            )
        return result.data

    @property
    def exception(self) -> Optional[Exception]:
        """
        Return the current exception value.
        """
        return self._exception

    @exception.setter
    @synchronized
    def exception(self, exception: Optional[Exception]) -> None:
        """
        Set the exception value.
        """
        self._exception = exception

    def add_callback(self, status: FutureStatus, fn: Callable[["Future"], Any]) -> None:
        """
        Register a callback for a specific status transition.
        """
        self.callbacks[status] = fn

    def remove_callback(self, status: FutureStatus, fn: Callable[["Future"], Any]) -> None:
        """
        Remove a callback for a specific status.
        """
        self.callbacks[status] = None

    def clear_callbacks(self, status: FutureStatus) -> None:
        """
        Clear the callback for a specific status.
        """
        self._callbacks[status] = None

    def clear_all_callbacks(self) -> None:
        """
        Clear all registered callbacks.
        """
        self._callbacks.clear()

    #TODO: This needs to go through the central command.
    def trigger_callback(self, status: FutureStatus) -> None:
        """
        Trigger the callback for the given status, if present.
        """
        fn = self._callbacks[status]
        if fn is not None:
            fn(self.result)



    @property
    def future_identity(self) -> "_LAILA_IDENTIFIABLE_FUTURE":
        """Return a lightweight identity handle for this future."""
        return _LAILA_IDENTIFIABLE_FUTURE(
            taskforce_id=self.taskforce_id,
            policy_id=self.policy_id,
            future_group_id=self.future_group_id,
            precedence=self.precedence,
            purpose=self.purpose,
            uuid=self._uuid,
        )

    def wait(self, timeout: Optional[float] = None) -> Any:
        """Block until the future completes.  Subclasses must override."""
        raise NotImplementedError(
            f"{type(self).__name__} does not implement wait(); "
            "use a concrete subclass such as ConcurrentPackageFuture."
        )

    def finished(self) -> bool:
        """
        Return True if the Future has finished successfully.
        """
        return self.status in [FutureStatus.FINISHED]
    def cancelled(self) -> bool:
        """
        Return True if the Future was cancelled.
        """
        return self.status in [FutureStatus.CANCELLED]
    def error(self) -> bool:
        """
        Return True if the Future finished with an error.
        """
        return self.status in [FutureStatus.ERROR]
    def not_started(self) -> bool:
        """
        Return True if the Future has not started.
        """
        return self.status in [FutureStatus.NOT_STARTED]
    def running(self) -> bool:
        """
        Return True if the Future is currently running.
        """
        return self.status in [FutureStatus.RUNNING]

status property writable

Return the current status code for this Future.

result property writable

Return the current result value. Releases the lock before waiting to avoid deadlock with the done callback that sets the result.

data property

Return the unwrapped payload of the result Entry.

Raises:

Type Description
RuntimeError

If the result is not an Entry instance.

exception property writable

Return the current exception value.

future_identity property

Return a lightweight identity handle for this future.

model_post_init(__context)

Wire default callbacks and register this future with the active policy.

Source code in policy/central/command/schema/future/future/future.py
def model_post_init(self, __context: Any) -> None:
    """Wire default callbacks and register this future with the active policy."""
    self._setup_default_callbacks()
    from ....... import get_active_policy
    policy = get_active_policy()
    policy.central.command._register_future_with_active_guarantees(self)
    policy.future_bank[self.global_id] = self

add_callback(status, fn)

Register a callback for a specific status transition.

Source code in policy/central/command/schema/future/future/future.py
def add_callback(self, status: FutureStatus, fn: Callable[["Future"], Any]) -> None:
    """
    Register a callback for a specific status transition.
    """
    self.callbacks[status] = fn

remove_callback(status, fn)

Remove a callback for a specific status.

Source code in policy/central/command/schema/future/future/future.py
def remove_callback(self, status: FutureStatus, fn: Callable[["Future"], Any]) -> None:
    """
    Remove a callback for a specific status.
    """
    self.callbacks[status] = None

clear_callbacks(status)

Clear the callback for a specific status.

Source code in policy/central/command/schema/future/future/future.py
def clear_callbacks(self, status: FutureStatus) -> None:
    """
    Clear the callback for a specific status.
    """
    self._callbacks[status] = None

clear_all_callbacks()

Clear all registered callbacks.

Source code in policy/central/command/schema/future/future/future.py
def clear_all_callbacks(self) -> None:
    """
    Clear all registered callbacks.
    """
    self._callbacks.clear()

trigger_callback(status)

Trigger the callback for the given status, if present.

Source code in policy/central/command/schema/future/future/future.py
def trigger_callback(self, status: FutureStatus) -> None:
    """
    Trigger the callback for the given status, if present.
    """
    fn = self._callbacks[status]
    if fn is not None:
        fn(self.result)

wait(timeout=None)

Block until the future completes. Subclasses must override.

Source code in policy/central/command/schema/future/future/future.py
def wait(self, timeout: Optional[float] = None) -> Any:
    """Block until the future completes.  Subclasses must override."""
    raise NotImplementedError(
        f"{type(self).__name__} does not implement wait(); "
        "use a concrete subclass such as ConcurrentPackageFuture."
    )

finished()

Return True if the Future has finished successfully.

Source code in policy/central/command/schema/future/future/future.py
def finished(self) -> bool:
    """
    Return True if the Future has finished successfully.
    """
    return self.status in [FutureStatus.FINISHED]

cancelled()

Return True if the Future was cancelled.

Source code in policy/central/command/schema/future/future/future.py
def cancelled(self) -> bool:
    """
    Return True if the Future was cancelled.
    """
    return self.status in [FutureStatus.CANCELLED]

error()

Return True if the Future finished with an error.

Source code in policy/central/command/schema/future/future/future.py
def error(self) -> bool:
    """
    Return True if the Future finished with an error.
    """
    return self.status in [FutureStatus.ERROR]

not_started()

Return True if the Future has not started.

Source code in policy/central/command/schema/future/future/future.py
def not_started(self) -> bool:
    """
    Return True if the Future has not started.
    """
    return self.status in [FutureStatus.NOT_STARTED]

running()

Return True if the Future is currently running.

Source code in policy/central/command/schema/future/future/future.py
def running(self) -> bool:
    """
    Return True if the Future is currently running.
    """
    return self.status in [FutureStatus.RUNNING]

Manifest

Bases: Entry

Entry subclass wrapping a nested dict of global_id references.

A manifest maps user-defined string keys to global_id strings, lists of global_id strings, or recursively nested dicts following the same rules. It can be constructed from raw ID strings or from Entry objects.

The manifest's .data IS the blueprint dict. It carries scope MANIFEST and evolution None (constant).

Parameters:

Name Type Description Default
data dict

A nested dict whose leaves are global_id strings, lists of global_id strings, or Entry instances. Entry instances are converted to a blueprint of global_id strings and stashed for a subsequent memorize() call.

{}
uuid str

Explicit UUID for the manifest's own identity.

required
nickname str

Human-readable name converted to a deterministic UUID.

required
global_id str

Composite identifier used to set identity.

required
Source code in policy/central/memory/schema/manifest.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
class Manifest(Entry):
    """Entry subclass wrapping a nested dict of ``global_id`` references.

    A manifest maps user-defined string keys to ``global_id`` strings, lists
    of ``global_id`` strings, or recursively nested dicts following the same
    rules.  It can be constructed from raw ID strings or from ``Entry``
    objects.

    The manifest's ``.data`` IS the blueprint dict.  It carries scope
    ``MANIFEST`` and evolution ``None`` (constant).

    Parameters
    ----------
    data : dict, optional
        A nested dict whose leaves are ``global_id`` strings, lists of
        ``global_id`` strings, or ``Entry`` instances.  Entry instances are
        converted to a blueprint of ``global_id`` strings and stashed for
        a subsequent ``memorize()`` call.
    uuid : str, optional
        Explicit UUID for the manifest's own identity.
    nickname : str, optional
        Human-readable name converted to a deterministic UUID.
    global_id : str, optional
        Composite identifier used to set identity.
    """

    _scopes: list[str] = PrivateAttr(default_factory=lambda: [_MANIFEST_SCOPE])
    _pending_entries: Optional[list] = PrivateAttr(default=None)

    def __init__(self, **data: Any):
        raw_data = data.pop("data", None)

        blueprint = None
        pending = None

        if raw_data is not None:
            has_entries, has_strings = Manifest._classify_data(raw_data)

            if has_entries and has_strings:
                raise ValueError(
                    "Manifest data must contain either all Entry objects or all "
                    "global_id strings, not a mix of both."
                )

            if has_entries:
                blueprint = Manifest._extract_blueprint(raw_data)
                pending = list(Manifest._iter_entries(raw_data))
            else:
                Manifest._validate_blueprint(raw_data)
                blueprint = copy.deepcopy(raw_data)

        entry_kwargs = dict(data)
        entry_kwargs["evolution"] = None
        if blueprint is not None:
            entry_kwargs["data"] = blueprint
            entry_kwargs["state"] = EntryState.READY

        Entry.__init__(self, **entry_kwargs)
        self._pending_entries = pending

    # ------------------------------------------------------------------
    # Properties
    # ------------------------------------------------------------------

    @property
    def blueprint(self) -> Optional[dict]:
        """The nested dict with ``global_id`` strings as leaf values."""
        return self.data

    @property
    def resolved(self) -> dict:
        """Synchronously fetch all referenced entries through central memory.

        Walks the blueprint, recalls each entry via the memory layer's
        ``remember`` path (routing, deserialization, etc.), and returns a
        nested dict of ``Entry`` objects mirroring the blueprint structure.
        No caching — each access re-fetches.

        Raises
        ------
        RuntimeError
            If the manifest has no blueprint.
        KeyError
            If any referenced entry is missing from the routed pool.
        """
        import laila

        if self.data is None:
            raise RuntimeError("No blueprint to resolve — manifest is empty.")

        all_gids = list(self)
        if not all_gids:
            return Manifest._rebuild_with_entries(self.data, {})

        memory = laila.get_active_policy().central.memory
        ref = memory.remember(entry_ids=all_gids)
        results = ref.wait(None)
        if not isinstance(results, list):
            results = [results]

        resolved_map = dict(zip(all_gids, results))
        return Manifest._rebuild_with_entries(self.data, resolved_map)

    # ------------------------------------------------------------------
    # Core operations (all return GroupFuture)
    # ------------------------------------------------------------------

    def memorize(
        self,
        *,
        pool_nickname: Optional[str] = None,
        pool_id: Optional[str] = None,
        batch_size: int = 128,
    ):
        """Upload all referenced entries and store the manifest's blueprint.

        Collects any pending ``Entry`` objects provided at construction time,
        uploads them in batches, then stores the manifest itself (whose
        payload is the blueprint dict).

        Parameters
        ----------
        pool_nickname : str, optional
            Pool alias to route entries to.  Defaults to the alpha pool.
        pool_id : str, optional
            Explicit pool ``global_id``.  Defaults to the alpha pool.
        batch_size : int
            Maximum entries per upload batch.

        Returns
        -------
        GroupFuture
            Tracks all writes (leaf entries + manifest itself).
        """
        import laila
        from ...command.schema.future.future.group_future import GroupFuture

        if self.data is None:
            raise RuntimeError("Nothing to memorize — manifest has no blueprint.")

        pool_kwargs = Manifest._build_pool_kwargs(pool_nickname, pool_id)
        all_future_ids: list[str] = []
        policy = laila.get_active_policy()

        if self._pending_entries:
            for i in range(0, len(self._pending_entries), batch_size):
                batch = self._pending_entries[i : i + batch_size]
                ref = laila.memorize(batch, **pool_kwargs)
                all_future_ids.extend(Manifest._collect_future_ids(ref))
            self._pending_entries = None

        self_ref = laila.memorize(self, **pool_kwargs)
        all_future_ids.extend(Manifest._collect_future_ids(self_ref))

        return GroupFuture(
            taskforce_id=policy.central.command.alpha_taskforce,
            policy_id=policy.global_id,
            future_ids=all_future_ids,
        )

    def remember(
        self,
        *,
        pool_nickname: Optional[str] = None,
        pool_id: Optional[str] = None,
        batch_size: int = 128,
    ):
        """Recall all referenced entries from the pool.

        Collects every leaf ``global_id`` from the blueprint, fetches them
        in batches via ``laila.remember()``.

        Parameters
        ----------
        pool_nickname : str, optional
            Pool alias to read from.  Defaults to the alpha pool.
        pool_id : str, optional
            Explicit pool ``global_id``.  Defaults to the alpha pool.
        batch_size : int
            Maximum entries per recall batch.

        Returns
        -------
        GroupFuture
            Child futures resolve to the recalled ``Entry`` objects.
        """
        import laila
        from ...command.schema.future.future.group_future import GroupFuture

        if self.data is None:
            raise RuntimeError("No blueprint to resolve — manifest is empty.")

        pool_kwargs = Manifest._build_pool_kwargs(pool_nickname, pool_id)
        all_gids = list(self)
        all_future_ids: list[str] = []
        policy = laila.get_active_policy()

        for i in range(0, len(all_gids), batch_size):
            batch = all_gids[i : i + batch_size]
            ref = laila.remember(batch, **pool_kwargs)
            all_future_ids.extend(Manifest._collect_future_ids(ref))

        return GroupFuture(
            taskforce_id=policy.central.command.alpha_taskforce,
            policy_id=policy.global_id,
            future_ids=all_future_ids,
        )

    def forget(
        self,
        *,
        pool_nickname: Optional[str] = None,
        pool_id: Optional[str] = None,
        batch_size: int = 128,
    ):
        """Delete all referenced entries and the manifest itself from the pool.

        Collects every leaf ``global_id`` plus the manifest's own
        ``global_id``, deletes them in batches via ``laila.forget()``.

        Parameters
        ----------
        pool_nickname : str, optional
            Pool alias to delete from.  Defaults to the alpha pool.
        pool_id : str, optional
            Explicit pool ``global_id``.  Defaults to the alpha pool.
        batch_size : int
            Maximum entries per deletion batch.

        Returns
        -------
        GroupFuture
            Tracks all deletions (leaf entries + manifest itself).
        """
        import laila
        from ...command.schema.future.future.group_future import GroupFuture

        if self.data is None:
            raise RuntimeError("No blueprint — nothing to forget.")

        pool_kwargs = Manifest._build_pool_kwargs(pool_nickname, pool_id)
        all_gids = list(self)
        all_future_ids: list[str] = []
        policy = laila.get_active_policy()

        for i in range(0, len(all_gids), batch_size):
            batch = all_gids[i : i + batch_size]
            ref = laila.forget(batch, **pool_kwargs)
            all_future_ids.extend(Manifest._collect_future_ids(ref))

        self_ref = laila.forget(self.global_id, **pool_kwargs)
        all_future_ids.extend(Manifest._collect_future_ids(self_ref))

        return GroupFuture(
            taskforce_id=policy.central.command.alpha_taskforce,
            policy_id=policy.global_id,
            future_ids=all_future_ids,
        )

    # ------------------------------------------------------------------
    # Serialization override
    # ------------------------------------------------------------------

    def serialize(
        self,
        transformations=None,
        *,
        exclude_private=None,
    ):
        """Serialize the manifest, excluding transient ``_pending_entries``."""
        if exclude_private is None:
            exclude_private = {"_local_lock", "_payload", "_state", "_pending_entries"}
        return Entry.serialize(
            self, transformations, exclude_private=exclude_private
        )

    @classmethod
    def recover(cls, in_dict: dict, notify_on_creation=False):
        """Reconstruct a Manifest from a serialised dict or JSON string.

        Parameters
        ----------
        in_dict : dict or str or Manifest
            Serialised representation produced by ``serialize()``, a JSON
            string thereof, or an existing Manifest (returned as-is).
        notify_on_creation : bool, optional
            If ``True``, notify the active policy after construction.

        Returns
        -------
        Manifest
            The recovered Manifest instance.
        """
        import json

        if isinstance(in_dict, Manifest):
            return in_dict

        if isinstance(in_dict, str):
            try:
                in_dict = json.loads(in_dict)
            except Exception as e:
                raise ValueError("Invalid JSON string") from e

        if isinstance(in_dict, dict):
            payload = ComputationalData.recover(
                payload_blob=in_dict["transformed_payload"],
                recovery_sequence=in_dict["recovery_sequence"],
            )
            blueprint = payload.data if payload is not None else None
            return Manifest(
                data=blueprint,
                uuid=in_dict["_uuid"],
            )

        raise RuntimeError("Invalid input for manifest recovery.")

    # ------------------------------------------------------------------
    # Mapping-like API  (top-level blueprint keys for dict(manifest))
    # ------------------------------------------------------------------

    def keys(self):
        """Top-level blueprint keys."""
        if self.data is None:
            return {}.keys()
        return self.data.keys()

    def values(self):
        """Top-level blueprint values."""
        if self.data is None:
            return {}.values()
        return self.data.values()

    def items(self):
        """Top-level blueprint items."""
        if self.data is None:
            return {}.items()
        return self.data.items()

    def sub_manifest(self, keys: list[str]) -> "Manifest":
        """Return a new ``Manifest`` containing only the specified top-level keys.

        Parameters
        ----------
        keys : list[str]
            Top-level blueprint keys to include in the sub-manifest.

        Returns
        -------
        Manifest
            A new manifest whose blueprint is the subset of this manifest's
            blueprint restricted to *keys*.

        Raises
        ------
        RuntimeError
            If the manifest has no blueprint.
        KeyError
            If any key in *keys* is not present in the blueprint.
        """
        if self.data is None:
            raise RuntimeError("Cannot create sub-manifest — manifest is empty.")
        missing = [k for k in keys if k not in self.data]
        if missing:
            raise KeyError(f"Keys not found in manifest: {missing}")
        subset = {k: copy.deepcopy(self.data[k]) for k in keys}
        return Manifest(data=subset)

    def extend(self, other: "Manifest", *, overwrite: bool = False) -> None:
        """Merge another manifest's blueprint into this one in-place.

        Parameters
        ----------
        other : Manifest
            The manifest whose top-level keys will be added.
        overwrite : bool
            If ``False`` (default), duplicate top-level keys raise
            ``KeyError``.  If ``True``, *other*'s values silently replace
            existing ones (like ``dict.update``).

        Raises
        ------
        TypeError
            If *other* is not a ``Manifest``.
        KeyError
            If *overwrite* is ``False`` and the blueprints share keys.
        """
        if not isinstance(other, Manifest):
            raise TypeError(
                f"extend() requires a Manifest, got {type(other).__name__}"
            )
        if other.data is None:
            return

        if self.data is None:
            from .....entry.compdata.taxonomy.compdata import ComputationalData

            self._payload = ComputationalData(copy.deepcopy(other.data))
            self._state = EntryState.READY
        else:
            if not overwrite:
                overlap = set(self.data) & set(other.data)
                if overlap:
                    raise KeyError(
                        f"Duplicate top-level keys: {sorted(overlap)}"
                    )
            self.data.update(copy.deepcopy(other.data))

        if other._pending_entries:
            if self._pending_entries is None:
                self._pending_entries = list(other._pending_entries)
            else:
                self._pending_entries.extend(other._pending_entries)

    def __iadd__(self, other: Any) -> "Manifest":
        """``manifest += other`` — merge *other* in-place and return self."""
        if not isinstance(other, Manifest):
            return NotImplemented
        self.extend(other)
        return self

    def __add__(self, other: Any) -> "Manifest":
        """``manifest + other`` — return a new manifest with merged blueprints."""
        if not isinstance(other, Manifest):
            return NotImplemented

        if self.data is not None and other.data is not None:
            overlap = set(self.data) & set(other.data)
            if overlap:
                raise KeyError(
                    f"Duplicate top-level keys: {sorted(overlap)}"
                )

        merged: dict = {}
        if self.data is not None:
            merged.update(copy.deepcopy(self.data))
        if other.data is not None:
            merged.update(copy.deepcopy(other.data))

        if not merged:
            return Manifest()

        result = Manifest(data=merged)

        pending: list = []
        if self._pending_entries:
            pending.extend(self._pending_entries)
        if other._pending_entries:
            pending.extend(other._pending_entries)
        if pending:
            result._pending_entries = pending

        return result

    def __getitem__(self, key: str) -> Any:
        """Look up a top-level key in the blueprint."""
        if self.data is None:
            raise KeyError(key)
        return self.data[key]

    def __len__(self) -> int:
        """Number of top-level keys in the blueprint."""
        if self.data is None:
            return 0
        return len(self.data)

    # ------------------------------------------------------------------
    # Iteration / containment  (flattened global_id leaves)
    # ------------------------------------------------------------------

    def __iter__(self) -> Iterator[str]:
        """Yield every ``global_id`` string via a depth-first, insertion-order walk."""
        if self.data is None:
            return
        yield from Manifest._iter_global_ids(self.data)

    def __contains__(self, global_id: object) -> bool:
        """Return ``True`` if *global_id* appears anywhere in the blueprint leaves."""
        if not isinstance(global_id, str):
            return False
        for gid in self:
            if gid == global_id:
                return True
        return False

    # ------------------------------------------------------------------
    # String representation
    # ------------------------------------------------------------------

    def __str__(self) -> str:
        return self.global_id

    def __repr__(self) -> str:
        n = sum(1 for _ in self)
        return f"Manifest({self.global_id}, entries={n})"

    # ------------------------------------------------------------------
    # Static / class helpers
    # ------------------------------------------------------------------

    @staticmethod
    def _collect_future_ids(ref) -> list[str]:
        """Extract future IDs from a GroupFuture or a single future identity."""
        if ref is None:
            return []
        if hasattr(ref, "future_ids"):
            return list(ref.future_ids)
        return [ref.global_id]

    @staticmethod
    def _build_pool_kwargs(
        pool_nickname: Optional[str] = None,
        pool_id: Optional[str] = None,
    ) -> dict[str, str]:
        """Build keyword arguments for pool routing."""
        kwargs: dict[str, str] = {}
        if pool_nickname is not None:
            kwargs["pool_nickname"] = pool_nickname
        if pool_id is not None:
            kwargs["pool_id"] = pool_id
        return kwargs

    @staticmethod
    def _classify_data(data: dict) -> tuple[bool, bool]:
        """Return ``(has_entries, has_strings)`` for leaf values in *data*."""
        from .....entry import Entry as _Entry

        has_entries = False
        has_strings = False

        def _check(val: Any) -> None:
            nonlocal has_entries, has_strings
            if isinstance(val, _Entry):
                has_entries = True
            elif isinstance(val, str):
                has_strings = True
            elif isinstance(val, list):
                for item in val:
                    _check(item)
            elif isinstance(val, dict):
                for v in val.values():
                    _check(v)

        for v in data.values():
            _check(v)

        return has_entries, has_strings

    @staticmethod
    def _iter_entries(data: dict) -> Iterator:
        """Yield every ``Entry`` object from a nested dict."""
        from .....entry import Entry as _Entry

        for val in data.values():
            if isinstance(val, _Entry):
                yield val
            elif isinstance(val, list):
                for item in val:
                    if isinstance(item, _Entry):
                        yield item
            elif isinstance(val, dict):
                yield from Manifest._iter_entries(val)

    @staticmethod
    def _validate_blueprint(data: Any) -> None:
        """Recursively validate that *data* follows the blueprint schema.

        Raises ``ValueError`` if any key is not a string or any leaf is not
        a ``global_id`` string (or list of them).
        """
        if not isinstance(data, dict):
            raise ValueError("Blueprint must be a dict.")
        for key, val in data.items():
            if not isinstance(key, str):
                raise ValueError(f"Blueprint keys must be strings, got {type(key)}")
            if isinstance(val, str):
                continue
            elif isinstance(val, list):
                for item in val:
                    if not isinstance(item, str):
                        raise ValueError(
                            f"List values in blueprint must be strings, got {type(item)}"
                        )
            elif isinstance(val, dict):
                Manifest._validate_blueprint(val)
            else:
                raise ValueError(
                    f"Blueprint values must be str, list[str], or dict — got {type(val)}"
                )

    @staticmethod
    def _extract_blueprint(data: dict) -> dict:
        """Convert a dict of ``Entry`` objects to a blueprint of ``global_id`` strings."""
        from .....entry import Entry as _Entry

        result: dict[str, Any] = {}
        for key, val in data.items():
            if isinstance(val, _Entry):
                result[key] = val.global_id
            elif isinstance(val, str):
                result[key] = val
            elif isinstance(val, list):
                result[key] = [
                    item.global_id if isinstance(item, _Entry) else item
                    for item in val
                ]
            elif isinstance(val, dict):
                result[key] = Manifest._extract_blueprint(val)
            else:
                raise ValueError(f"Unexpected value type in manifest data: {type(val)}")
        return result

    @staticmethod
    def _iter_global_ids(blueprint: dict) -> Iterator[str]:
        """Depth-first, insertion-order walk yielding every leaf ``global_id``."""
        for val in blueprint.values():
            if isinstance(val, str):
                yield val
            elif isinstance(val, list):
                yield from val
            elif isinstance(val, dict):
                yield from Manifest._iter_global_ids(val)

    @staticmethod
    def _rebuild_with_entries(
        blueprint: dict,
        resolved_map: dict[str, Any],
    ) -> dict:
        """Reconstruct the nested structure replacing ``global_id`` strings with entries."""
        result: dict[str, Any] = {}
        for key, val in blueprint.items():
            if isinstance(val, str):
                result[key] = resolved_map[val]
            elif isinstance(val, list):
                result[key] = [resolved_map[gid] for gid in val]
            elif isinstance(val, dict):
                result[key] = Manifest._rebuild_with_entries(val, resolved_map)
        return result

blueprint property

The nested dict with global_id strings as leaf values.

resolved property

Synchronously fetch all referenced entries through central memory.

Walks the blueprint, recalls each entry via the memory layer's remember path (routing, deserialization, etc.), and returns a nested dict of Entry objects mirroring the blueprint structure. No caching — each access re-fetches.

Raises:

Type Description
RuntimeError

If the manifest has no blueprint.

KeyError

If any referenced entry is missing from the routed pool.

memorize(*, pool_nickname=None, pool_id=None, batch_size=128)

Upload all referenced entries and store the manifest's blueprint.

Collects any pending Entry objects provided at construction time, uploads them in batches, then stores the manifest itself (whose payload is the blueprint dict).

Parameters:

Name Type Description Default
pool_nickname str

Pool alias to route entries to. Defaults to the alpha pool.

None
pool_id str

Explicit pool global_id. Defaults to the alpha pool.

None
batch_size int

Maximum entries per upload batch.

128

Returns:

Type Description
GroupFuture

Tracks all writes (leaf entries + manifest itself).

Source code in policy/central/memory/schema/manifest.py
def memorize(
    self,
    *,
    pool_nickname: Optional[str] = None,
    pool_id: Optional[str] = None,
    batch_size: int = 128,
):
    """Upload all referenced entries and store the manifest's blueprint.

    Collects any pending ``Entry`` objects provided at construction time,
    uploads them in batches, then stores the manifest itself (whose
    payload is the blueprint dict).

    Parameters
    ----------
    pool_nickname : str, optional
        Pool alias to route entries to.  Defaults to the alpha pool.
    pool_id : str, optional
        Explicit pool ``global_id``.  Defaults to the alpha pool.
    batch_size : int
        Maximum entries per upload batch.

    Returns
    -------
    GroupFuture
        Tracks all writes (leaf entries + manifest itself).
    """
    import laila
    from ...command.schema.future.future.group_future import GroupFuture

    if self.data is None:
        raise RuntimeError("Nothing to memorize — manifest has no blueprint.")

    pool_kwargs = Manifest._build_pool_kwargs(pool_nickname, pool_id)
    all_future_ids: list[str] = []
    policy = laila.get_active_policy()

    if self._pending_entries:
        for i in range(0, len(self._pending_entries), batch_size):
            batch = self._pending_entries[i : i + batch_size]
            ref = laila.memorize(batch, **pool_kwargs)
            all_future_ids.extend(Manifest._collect_future_ids(ref))
        self._pending_entries = None

    self_ref = laila.memorize(self, **pool_kwargs)
    all_future_ids.extend(Manifest._collect_future_ids(self_ref))

    return GroupFuture(
        taskforce_id=policy.central.command.alpha_taskforce,
        policy_id=policy.global_id,
        future_ids=all_future_ids,
    )

remember(*, pool_nickname=None, pool_id=None, batch_size=128)

Recall all referenced entries from the pool.

Collects every leaf global_id from the blueprint, fetches them in batches via laila.remember().

Parameters:

Name Type Description Default
pool_nickname str

Pool alias to read from. Defaults to the alpha pool.

None
pool_id str

Explicit pool global_id. Defaults to the alpha pool.

None
batch_size int

Maximum entries per recall batch.

128

Returns:

Type Description
GroupFuture

Child futures resolve to the recalled Entry objects.

Source code in policy/central/memory/schema/manifest.py
def remember(
    self,
    *,
    pool_nickname: Optional[str] = None,
    pool_id: Optional[str] = None,
    batch_size: int = 128,
):
    """Recall all referenced entries from the pool.

    Collects every leaf ``global_id`` from the blueprint, fetches them
    in batches via ``laila.remember()``.

    Parameters
    ----------
    pool_nickname : str, optional
        Pool alias to read from.  Defaults to the alpha pool.
    pool_id : str, optional
        Explicit pool ``global_id``.  Defaults to the alpha pool.
    batch_size : int
        Maximum entries per recall batch.

    Returns
    -------
    GroupFuture
        Child futures resolve to the recalled ``Entry`` objects.
    """
    import laila
    from ...command.schema.future.future.group_future import GroupFuture

    if self.data is None:
        raise RuntimeError("No blueprint to resolve — manifest is empty.")

    pool_kwargs = Manifest._build_pool_kwargs(pool_nickname, pool_id)
    all_gids = list(self)
    all_future_ids: list[str] = []
    policy = laila.get_active_policy()

    for i in range(0, len(all_gids), batch_size):
        batch = all_gids[i : i + batch_size]
        ref = laila.remember(batch, **pool_kwargs)
        all_future_ids.extend(Manifest._collect_future_ids(ref))

    return GroupFuture(
        taskforce_id=policy.central.command.alpha_taskforce,
        policy_id=policy.global_id,
        future_ids=all_future_ids,
    )

forget(*, pool_nickname=None, pool_id=None, batch_size=128)

Delete all referenced entries and the manifest itself from the pool.

Collects every leaf global_id plus the manifest's own global_id, deletes them in batches via laila.forget().

Parameters:

Name Type Description Default
pool_nickname str

Pool alias to delete from. Defaults to the alpha pool.

None
pool_id str

Explicit pool global_id. Defaults to the alpha pool.

None
batch_size int

Maximum entries per deletion batch.

128

Returns:

Type Description
GroupFuture

Tracks all deletions (leaf entries + manifest itself).

Source code in policy/central/memory/schema/manifest.py
def forget(
    self,
    *,
    pool_nickname: Optional[str] = None,
    pool_id: Optional[str] = None,
    batch_size: int = 128,
):
    """Delete all referenced entries and the manifest itself from the pool.

    Collects every leaf ``global_id`` plus the manifest's own
    ``global_id``, deletes them in batches via ``laila.forget()``.

    Parameters
    ----------
    pool_nickname : str, optional
        Pool alias to delete from.  Defaults to the alpha pool.
    pool_id : str, optional
        Explicit pool ``global_id``.  Defaults to the alpha pool.
    batch_size : int
        Maximum entries per deletion batch.

    Returns
    -------
    GroupFuture
        Tracks all deletions (leaf entries + manifest itself).
    """
    import laila
    from ...command.schema.future.future.group_future import GroupFuture

    if self.data is None:
        raise RuntimeError("No blueprint — nothing to forget.")

    pool_kwargs = Manifest._build_pool_kwargs(pool_nickname, pool_id)
    all_gids = list(self)
    all_future_ids: list[str] = []
    policy = laila.get_active_policy()

    for i in range(0, len(all_gids), batch_size):
        batch = all_gids[i : i + batch_size]
        ref = laila.forget(batch, **pool_kwargs)
        all_future_ids.extend(Manifest._collect_future_ids(ref))

    self_ref = laila.forget(self.global_id, **pool_kwargs)
    all_future_ids.extend(Manifest._collect_future_ids(self_ref))

    return GroupFuture(
        taskforce_id=policy.central.command.alpha_taskforce,
        policy_id=policy.global_id,
        future_ids=all_future_ids,
    )

serialize(transformations=None, *, exclude_private=None)

Serialize the manifest, excluding transient _pending_entries.

Source code in policy/central/memory/schema/manifest.py
def serialize(
    self,
    transformations=None,
    *,
    exclude_private=None,
):
    """Serialize the manifest, excluding transient ``_pending_entries``."""
    if exclude_private is None:
        exclude_private = {"_local_lock", "_payload", "_state", "_pending_entries"}
    return Entry.serialize(
        self, transformations, exclude_private=exclude_private
    )

recover(in_dict, notify_on_creation=False) classmethod

Reconstruct a Manifest from a serialised dict or JSON string.

Parameters:

Name Type Description Default
in_dict dict or str or Manifest

Serialised representation produced by serialize(), a JSON string thereof, or an existing Manifest (returned as-is).

required
notify_on_creation bool

If True, notify the active policy after construction.

False

Returns:

Type Description
Manifest

The recovered Manifest instance.

Source code in policy/central/memory/schema/manifest.py
@classmethod
def recover(cls, in_dict: dict, notify_on_creation=False):
    """Reconstruct a Manifest from a serialised dict or JSON string.

    Parameters
    ----------
    in_dict : dict or str or Manifest
        Serialised representation produced by ``serialize()``, a JSON
        string thereof, or an existing Manifest (returned as-is).
    notify_on_creation : bool, optional
        If ``True``, notify the active policy after construction.

    Returns
    -------
    Manifest
        The recovered Manifest instance.
    """
    import json

    if isinstance(in_dict, Manifest):
        return in_dict

    if isinstance(in_dict, str):
        try:
            in_dict = json.loads(in_dict)
        except Exception as e:
            raise ValueError("Invalid JSON string") from e

    if isinstance(in_dict, dict):
        payload = ComputationalData.recover(
            payload_blob=in_dict["transformed_payload"],
            recovery_sequence=in_dict["recovery_sequence"],
        )
        blueprint = payload.data if payload is not None else None
        return Manifest(
            data=blueprint,
            uuid=in_dict["_uuid"],
        )

    raise RuntimeError("Invalid input for manifest recovery.")

keys()

Top-level blueprint keys.

Source code in policy/central/memory/schema/manifest.py
def keys(self):
    """Top-level blueprint keys."""
    if self.data is None:
        return {}.keys()
    return self.data.keys()

values()

Top-level blueprint values.

Source code in policy/central/memory/schema/manifest.py
def values(self):
    """Top-level blueprint values."""
    if self.data is None:
        return {}.values()
    return self.data.values()

items()

Top-level blueprint items.

Source code in policy/central/memory/schema/manifest.py
def items(self):
    """Top-level blueprint items."""
    if self.data is None:
        return {}.items()
    return self.data.items()

sub_manifest(keys)

Return a new Manifest containing only the specified top-level keys.

Parameters:

Name Type Description Default
keys list[str]

Top-level blueprint keys to include in the sub-manifest.

required

Returns:

Type Description
Manifest

A new manifest whose blueprint is the subset of this manifest's blueprint restricted to keys.

Raises:

Type Description
RuntimeError

If the manifest has no blueprint.

KeyError

If any key in keys is not present in the blueprint.

Source code in policy/central/memory/schema/manifest.py
def sub_manifest(self, keys: list[str]) -> "Manifest":
    """Return a new ``Manifest`` containing only the specified top-level keys.

    Parameters
    ----------
    keys : list[str]
        Top-level blueprint keys to include in the sub-manifest.

    Returns
    -------
    Manifest
        A new manifest whose blueprint is the subset of this manifest's
        blueprint restricted to *keys*.

    Raises
    ------
    RuntimeError
        If the manifest has no blueprint.
    KeyError
        If any key in *keys* is not present in the blueprint.
    """
    if self.data is None:
        raise RuntimeError("Cannot create sub-manifest — manifest is empty.")
    missing = [k for k in keys if k not in self.data]
    if missing:
        raise KeyError(f"Keys not found in manifest: {missing}")
    subset = {k: copy.deepcopy(self.data[k]) for k in keys}
    return Manifest(data=subset)

extend(other, *, overwrite=False)

Merge another manifest's blueprint into this one in-place.

Parameters:

Name Type Description Default
other Manifest

The manifest whose top-level keys will be added.

required
overwrite bool

If False (default), duplicate top-level keys raise KeyError. If True, other's values silently replace existing ones (like dict.update).

False

Raises:

Type Description
TypeError

If other is not a Manifest.

KeyError

If overwrite is False and the blueprints share keys.

Source code in policy/central/memory/schema/manifest.py
def extend(self, other: "Manifest", *, overwrite: bool = False) -> None:
    """Merge another manifest's blueprint into this one in-place.

    Parameters
    ----------
    other : Manifest
        The manifest whose top-level keys will be added.
    overwrite : bool
        If ``False`` (default), duplicate top-level keys raise
        ``KeyError``.  If ``True``, *other*'s values silently replace
        existing ones (like ``dict.update``).

    Raises
    ------
    TypeError
        If *other* is not a ``Manifest``.
    KeyError
        If *overwrite* is ``False`` and the blueprints share keys.
    """
    if not isinstance(other, Manifest):
        raise TypeError(
            f"extend() requires a Manifest, got {type(other).__name__}"
        )
    if other.data is None:
        return

    if self.data is None:
        from .....entry.compdata.taxonomy.compdata import ComputationalData

        self._payload = ComputationalData(copy.deepcopy(other.data))
        self._state = EntryState.READY
    else:
        if not overwrite:
            overlap = set(self.data) & set(other.data)
            if overlap:
                raise KeyError(
                    f"Duplicate top-level keys: {sorted(overlap)}"
                )
        self.data.update(copy.deepcopy(other.data))

    if other._pending_entries:
        if self._pending_entries is None:
            self._pending_entries = list(other._pending_entries)
        else:
            self._pending_entries.extend(other._pending_entries)

__iadd__(other)

manifest += other — merge other in-place and return self.

Source code in policy/central/memory/schema/manifest.py
def __iadd__(self, other: Any) -> "Manifest":
    """``manifest += other`` — merge *other* in-place and return self."""
    if not isinstance(other, Manifest):
        return NotImplemented
    self.extend(other)
    return self

__add__(other)

manifest + other — return a new manifest with merged blueprints.

Source code in policy/central/memory/schema/manifest.py
def __add__(self, other: Any) -> "Manifest":
    """``manifest + other`` — return a new manifest with merged blueprints."""
    if not isinstance(other, Manifest):
        return NotImplemented

    if self.data is not None and other.data is not None:
        overlap = set(self.data) & set(other.data)
        if overlap:
            raise KeyError(
                f"Duplicate top-level keys: {sorted(overlap)}"
            )

    merged: dict = {}
    if self.data is not None:
        merged.update(copy.deepcopy(self.data))
    if other.data is not None:
        merged.update(copy.deepcopy(other.data))

    if not merged:
        return Manifest()

    result = Manifest(data=merged)

    pending: list = []
    if self._pending_entries:
        pending.extend(self._pending_entries)
    if other._pending_entries:
        pending.extend(other._pending_entries)
    if pending:
        result._pending_entries = pending

    return result

__getitem__(key)

Look up a top-level key in the blueprint.

Source code in policy/central/memory/schema/manifest.py
def __getitem__(self, key: str) -> Any:
    """Look up a top-level key in the blueprint."""
    if self.data is None:
        raise KeyError(key)
    return self.data[key]

__len__()

Number of top-level keys in the blueprint.

Source code in policy/central/memory/schema/manifest.py
def __len__(self) -> int:
    """Number of top-level keys in the blueprint."""
    if self.data is None:
        return 0
    return len(self.data)

__iter__()

Yield every global_id string via a depth-first, insertion-order walk.

Source code in policy/central/memory/schema/manifest.py
def __iter__(self) -> Iterator[str]:
    """Yield every ``global_id`` string via a depth-first, insertion-order walk."""
    if self.data is None:
        return
    yield from Manifest._iter_global_ids(self.data)

__contains__(global_id)

Return True if global_id appears anywhere in the blueprint leaves.

Source code in policy/central/memory/schema/manifest.py
def __contains__(self, global_id: object) -> bool:
    """Return ``True`` if *global_id* appears anywhere in the blueprint leaves."""
    if not isinstance(global_id, str):
        return False
    for gid in self:
        if gid == global_id:
            return True
    return False

PythonThreadPoolTaskForce

Bases: _LAILA_IDENTIFIABLE_TASK_FORCE

Thread-pool TaskForce implementation.

Inherits shared queue management, len()/queue_len, and lifecycle surface (start, pause, shutdown) from the base TaskForce.

This subclass implements the backend-specific hooks using a ThreadPoolExecutor and dispatcher thread to consume items from _q.

Source code in policy/central/command/taskforce/thread_pool_executor/taskforce.py
class PythonThreadPoolTaskForce(_LAILA_IDENTIFIABLE_TASK_FORCE):
    """
    Thread-pool TaskForce implementation.

    Inherits shared queue management, len()/queue_len, and lifecycle surface
    (start, pause, shutdown) from the base `TaskForce`.

    This subclass implements the backend-specific hooks using a
    ThreadPoolExecutor and dispatcher thread to consume items from `_q`.
    """

    model_config = ConfigDict(arbitrary_types_allowed=True)

    backend: str = Field(default="threads", description="Execution backend (threads only).")
    num_workers: int = Field(default_factory=lambda: max(1, (os.cpu_count() or 1) // 2), ge=1, description="Number of worker threads.")


    # Runtime (Private)
    _cv: Optional[threading.Condition] = PrivateAttr(default=None)
    _worker_pool: Optional[ThreadPoolExecutor] = PrivateAttr(default=None)
    _stop: Optional[threading.Event] = PrivateAttr(default=None)
    _dispatcher: Optional[threading.Thread] = PrivateAttr(default=None)
    _submit_slots: Optional[threading.Semaphore] = PrivateAttr(default=None)

    # =========================================================
    # Lifecycle hooks required by TaskForce
    # =========================================================


    def _on_start(self) -> None:
        """Initialize thread pool and dispatcher."""
        if self.backend.lower() != "threads":
            raise ValueError("PythonThreadPoolTaskForce supports threads only.")

        # (Re)create runtime primitives
        self._cv = threading.Condition()
        self._stop = threading.Event()
        self._worker_pool = ThreadPoolExecutor(
            max_workers=self.num_workers, thread_name_prefix="TaskForce"
        )
        # Backpressure: cap in-flight submissions so dispatcher cannot flood executor queue.
        self._submit_slots = threading.Semaphore(max(1, self.num_workers * 2))
        self._dispatcher = threading.Thread(
            target=self._loop, name="TaskForce-Dispatcher", daemon=True
        )
        self._dispatcher.start()

    def _on_pause(self) -> None:
        """Pause dispatcher loop without destroying the pool (currently a no-op)."""
        raise NotImplementedError

    def _on_shutdown(self, *, wait: bool = True, cancel_pending: bool = True) -> None:
        """Tear down dispatcher and thread pool."""
        # Signal dispatcher to stop and wake it up before draining queue.
        if self._stop is not None:
            self._stop.set()
        if self._cv is not None:
            with self._cv:
                self._cv.notify_all()

        # Join dispatcher if requested.
        if wait and self._dispatcher is not None:
            self._dispatcher.join()

        # Optionally cancel pending queue items after dispatcher is stopped/signaled.
        if cancel_pending:
            with self._q.atomic("cancel"):
                for _, (_, _, kwargs) in self._q.items():
                    fut = kwargs.get("fut")
                    if fut is None:
                        continue
                    fut.exception = RuntimeError("Task canceled before dispatch.")
                    fut.status = FutureStatus.CANCELLED
                    fut.result = None
                self._q.clear()

        # Shutdown the pool.
        if self._worker_pool is not None:
            self._worker_pool.shutdown(wait=wait)

    # =========================================================
    # Task submission and mapping
    # =========================================================

    def _queue_submit(self, task: Callable[..., Any], *args, **kwargs) -> ConcurrentPackageFuture:
        """Internal: enqueue callable into the task queue."""
        if self.status != TaskForceStatus.RUNNING:
            raise RuntimeError("TaskForce must be running before submitting tasks.")

        fut = ConcurrentPackageFuture(
            taskforce_id=self.global_id,
            policy_id=self.policy_id
        )

        with self._cv:
            with self._q.atomic():
                kwargs["task"] = task
                kwargs["fut"] = fut
                self._q[fut.global_id] = (PythonThreadPoolTaskForce._runner, args, kwargs)
            self._cv.notify()

        return fut

    def imap(self, tasks: Iterable[Callable[[], Any]]) -> Iterable[Any]:
        """Submit an iterable of zero-arg callables, yielding future identities in submission order."""
        for f in tasks:
            fut = self._queue_submit(f)
            yield fut.future_identity

    def submit(
        self,
        tasks: Iterable[Callable[[], Any]],
        wait: bool = False,
    ) -> Union[GroupFuture, Any]:
        """Batch submit zero-arg callables.

        Returns future identities (single) or a hollow GroupFuture (multiple)
        when *wait* is False.  When *wait* is True, blocks and returns values.
        """

        tasks = list(tasks)

        futures: List[ConcurrentPackageFuture] = []

        for task in tasks:
            fut = self._queue_submit(task)
            fut.taskforce_id = self.global_id
            futures.append(fut)

        if len(futures) == 1:
            single = futures[0]
            if wait:
                try:
                    return single.wait(None)
                except Exception as e:
                    raise 
            else:
                return single.future_identity

        gf = GroupFuture(
            taskforce_id=self.global_id,
            policy_id=self.policy_id,
            future_ids=[f.global_id for f in futures],
        )

        for f in futures:
            f.future_group_id = gf.global_id

        if not wait:
            return gf
        else:
            return gf.wait(None)


    # =========================================================
    # Dispatcher loop
    # =========================================================

    def _loop(self):
        """Continuously dispatch tasks from queue to the worker pool."""
        cv = self._cv
        stop = self._stop
        slots = self._submit_slots

        while not stop.is_set():
            with cv:
                while not stop.is_set() and len(self._q) == 0:
                    cv.wait(timeout=0.1)
                if stop.is_set():
                    break
                _, item = self._q.pop_next()  # FIFO
                runner, args, kwargs = item

            while not stop.is_set() and not slots.acquire(timeout=0.1):
                pass
            if stop.is_set():
                with self._q.atomic():
                    self._q[kwargs["fut"].global_id] = (runner, args, kwargs)
                break

            fut = kwargs["fut"]
            try:
                fut.native_future = self._worker_pool.submit(runner, args, kwargs)
                fut.native_future.add_done_callback(lambda _f: slots.release())
            except Exception:
                slots.release()
                raise

    @staticmethod
    def _runner(args: Tuple[Any, ...], kwargs: Dict[str, Any]) -> Any:
        """Execute a queued task inside a worker thread."""
        task = kwargs.pop("task")
        fut = kwargs.pop("fut")
        fut.status = FutureStatus.RUNNING
        return task(*args, **kwargs)

imap(tasks)

Submit an iterable of zero-arg callables, yielding future identities in submission order.

Source code in policy/central/command/taskforce/thread_pool_executor/taskforce.py
def imap(self, tasks: Iterable[Callable[[], Any]]) -> Iterable[Any]:
    """Submit an iterable of zero-arg callables, yielding future identities in submission order."""
    for f in tasks:
        fut = self._queue_submit(f)
        yield fut.future_identity

submit(tasks, wait=False)

Batch submit zero-arg callables.

Returns future identities (single) or a hollow GroupFuture (multiple) when wait is False. When wait is True, blocks and returns values.

Source code in policy/central/command/taskforce/thread_pool_executor/taskforce.py
def submit(
    self,
    tasks: Iterable[Callable[[], Any]],
    wait: bool = False,
) -> Union[GroupFuture, Any]:
    """Batch submit zero-arg callables.

    Returns future identities (single) or a hollow GroupFuture (multiple)
    when *wait* is False.  When *wait* is True, blocks and returns values.
    """

    tasks = list(tasks)

    futures: List[ConcurrentPackageFuture] = []

    for task in tasks:
        fut = self._queue_submit(task)
        fut.taskforce_id = self.global_id
        futures.append(fut)

    if len(futures) == 1:
        single = futures[0]
        if wait:
            try:
                return single.wait(None)
            except Exception as e:
                raise 
        else:
            return single.future_identity

    gf = GroupFuture(
        taskforce_id=self.global_id,
        policy_id=self.policy_id,
        future_ids=[f.global_id for f in futures],
    )

    for f in futures:
        f.future_group_id = gf.global_id

    if not wait:
        return gf
    else:
        return gf.wait(None)

future

Bases: _LAILA_IDENTIFIABLE_FUTURE

Abstract Future base class with identity, outcome, and callbacks.

Parameters:

Name Type Description Default
future_identity

Identity metadata for this Future instance.

required
Source code in policy/central/command/schema/future/future/future.py
class Future(_LAILA_IDENTIFIABLE_FUTURE):
    """
    Abstract Future base class with identity, outcome, and callbacks.

    Parameters
    ----------
    future_identity
        Identity metadata for this Future instance.
    """

    _status: FutureStatus = PrivateAttr(default=FutureStatus.NOT_STARTED)
    _return_value: Any = PrivateAttr(default=None)
    _exception: Optional[Exception] = PrivateAttr(default=None)
    _result_global_id: Optional[str] = PrivateAttr(default=None)
    _timeout_ms: int = PrivateAttr(default=100)

    model_config = ConfigDict(arbitrary_types_allowed=True)


    _default_callbacks: Dict[FutureStatus, Callable[..., Any]] = PrivateAttr(default_factory=dict)
    callbacks: Dict[FutureStatus, Callable[..., Any]] = Field(default_factory=dict)

    def model_post_init(self, __context: Any) -> None:
        """Wire default callbacks and register this future with the active policy."""
        self._setup_default_callbacks()
        from ....... import get_active_policy
        policy = get_active_policy()
        policy.central.command._register_future_with_active_guarantees(self)
        policy.future_bank[self.global_id] = self


    def _setup_default_callbacks(self) -> None:
        """Populate default status-transition callbacks."""
        self._default_callbacks[FutureStatus.ERROR] = lambda f: setattr(f, "status", FutureStatus.ERROR)
        self._default_callbacks[FutureStatus.CANCELLED] = lambda f: setattr(f, "status", FutureStatus.CANCELLED)
        self._default_callbacks[FutureStatus.NOT_STARTED] = lambda f: setattr(f, "status", FutureStatus.NOT_STARTED)
        self._default_callbacks[FutureStatus.RUNNING] = lambda f: setattr(f, "status", FutureStatus.RUNNING)
        self._default_callbacks[FutureStatus.POLL_TIMEOUT] = lambda f: setattr(f, "status", FutureStatus.POLL_TIMEOUT)
        self._default_callbacks[FutureStatus.UNKNOWN] = lambda f: setattr(f, "status", FutureStatus.UNKNOWN)
        self._default_callbacks[FutureStatus.FINISHED] = lambda f: setattr(f, "status", FutureStatus.FINISHED)


    @property
    @synchronized
    def status(self) -> FutureStatus:
        """
        Return the current status code for this Future.
        """
        return self._status


    @status.setter
    @synchronized
    def status(self, status: FutureStatus) -> None:
        """
        Set the current status code for this Future.
        """
        self._status = status


    @property
    def result(self) -> Any:
        """
        Return the current result value. Releases the lock before waiting to
        avoid deadlock with the done callback that sets the result.
        """
        with self.atomic():
            if self._status in [FutureStatus.ERROR, FutureStatus.CANCELLED]:
                raise self._exception
            if self._status == FutureStatus.FINISHED:
                return self._return_value
        self.wait(timeout=None)
        with self.atomic():
            if self._status in [FutureStatus.ERROR, FutureStatus.CANCELLED]:
                raise self._exception
            return self._return_value


    @result.setter
    @synchronized
    def result(self, result: Any) -> None:
        """Set the result value, auto-wrapping non-Entry values into an Entry."""
        from .......entry import Entry

        if result is None:
            self._return_value = None
            self._result_global_id = None
        elif isinstance(result, Entry):
            self._return_value = result
            self._result_global_id = result.global_id
        else:
            wrapped = Entry.constant(data=result)
            self._return_value = wrapped
            self._result_global_id = wrapped.global_id

    @property
    def data(self) -> Any:
        """Return the unwrapped payload of the result Entry.

        Raises
        ------
        RuntimeError
            If the result is not an Entry instance.
        """
        from .......entry import Entry
        result = self.result
        if not isinstance(result, Entry):
            raise RuntimeError(
                f"Future result is not an Entry (got {type(result).__name__}); "
                "cannot access .data"
            )
        return result.data

    @property
    def exception(self) -> Optional[Exception]:
        """
        Return the current exception value.
        """
        return self._exception

    @exception.setter
    @synchronized
    def exception(self, exception: Optional[Exception]) -> None:
        """
        Set the exception value.
        """
        self._exception = exception

    def add_callback(self, status: FutureStatus, fn: Callable[["Future"], Any]) -> None:
        """
        Register a callback for a specific status transition.
        """
        self.callbacks[status] = fn

    def remove_callback(self, status: FutureStatus, fn: Callable[["Future"], Any]) -> None:
        """
        Remove a callback for a specific status.
        """
        self.callbacks[status] = None

    def clear_callbacks(self, status: FutureStatus) -> None:
        """
        Clear the callback for a specific status.
        """
        self._callbacks[status] = None

    def clear_all_callbacks(self) -> None:
        """
        Clear all registered callbacks.
        """
        self._callbacks.clear()

    #TODO: This needs to go through the central command.
    def trigger_callback(self, status: FutureStatus) -> None:
        """
        Trigger the callback for the given status, if present.
        """
        fn = self._callbacks[status]
        if fn is not None:
            fn(self.result)



    @property
    def future_identity(self) -> "_LAILA_IDENTIFIABLE_FUTURE":
        """Return a lightweight identity handle for this future."""
        return _LAILA_IDENTIFIABLE_FUTURE(
            taskforce_id=self.taskforce_id,
            policy_id=self.policy_id,
            future_group_id=self.future_group_id,
            precedence=self.precedence,
            purpose=self.purpose,
            uuid=self._uuid,
        )

    def wait(self, timeout: Optional[float] = None) -> Any:
        """Block until the future completes.  Subclasses must override."""
        raise NotImplementedError(
            f"{type(self).__name__} does not implement wait(); "
            "use a concrete subclass such as ConcurrentPackageFuture."
        )

    def finished(self) -> bool:
        """
        Return True if the Future has finished successfully.
        """
        return self.status in [FutureStatus.FINISHED]
    def cancelled(self) -> bool:
        """
        Return True if the Future was cancelled.
        """
        return self.status in [FutureStatus.CANCELLED]
    def error(self) -> bool:
        """
        Return True if the Future finished with an error.
        """
        return self.status in [FutureStatus.ERROR]
    def not_started(self) -> bool:
        """
        Return True if the Future has not started.
        """
        return self.status in [FutureStatus.NOT_STARTED]
    def running(self) -> bool:
        """
        Return True if the Future is currently running.
        """
        return self.status in [FutureStatus.RUNNING]

status property writable

Return the current status code for this Future.

result property writable

Return the current result value. Releases the lock before waiting to avoid deadlock with the done callback that sets the result.

data property

Return the unwrapped payload of the result Entry.

Raises:

Type Description
RuntimeError

If the result is not an Entry instance.

exception property writable

Return the current exception value.

future_identity property

Return a lightweight identity handle for this future.

model_post_init(__context)

Wire default callbacks and register this future with the active policy.

Source code in policy/central/command/schema/future/future/future.py
def model_post_init(self, __context: Any) -> None:
    """Wire default callbacks and register this future with the active policy."""
    self._setup_default_callbacks()
    from ....... import get_active_policy
    policy = get_active_policy()
    policy.central.command._register_future_with_active_guarantees(self)
    policy.future_bank[self.global_id] = self

add_callback(status, fn)

Register a callback for a specific status transition.

Source code in policy/central/command/schema/future/future/future.py
def add_callback(self, status: FutureStatus, fn: Callable[["Future"], Any]) -> None:
    """
    Register a callback for a specific status transition.
    """
    self.callbacks[status] = fn

remove_callback(status, fn)

Remove a callback for a specific status.

Source code in policy/central/command/schema/future/future/future.py
def remove_callback(self, status: FutureStatus, fn: Callable[["Future"], Any]) -> None:
    """
    Remove a callback for a specific status.
    """
    self.callbacks[status] = None

clear_callbacks(status)

Clear the callback for a specific status.

Source code in policy/central/command/schema/future/future/future.py
def clear_callbacks(self, status: FutureStatus) -> None:
    """
    Clear the callback for a specific status.
    """
    self._callbacks[status] = None

clear_all_callbacks()

Clear all registered callbacks.

Source code in policy/central/command/schema/future/future/future.py
def clear_all_callbacks(self) -> None:
    """
    Clear all registered callbacks.
    """
    self._callbacks.clear()

trigger_callback(status)

Trigger the callback for the given status, if present.

Source code in policy/central/command/schema/future/future/future.py
def trigger_callback(self, status: FutureStatus) -> None:
    """
    Trigger the callback for the given status, if present.
    """
    fn = self._callbacks[status]
    if fn is not None:
        fn(self.result)

wait(timeout=None)

Block until the future completes. Subclasses must override.

Source code in policy/central/command/schema/future/future/future.py
def wait(self, timeout: Optional[float] = None) -> Any:
    """Block until the future completes.  Subclasses must override."""
    raise NotImplementedError(
        f"{type(self).__name__} does not implement wait(); "
        "use a concrete subclass such as ConcurrentPackageFuture."
    )

finished()

Return True if the Future has finished successfully.

Source code in policy/central/command/schema/future/future/future.py
def finished(self) -> bool:
    """
    Return True if the Future has finished successfully.
    """
    return self.status in [FutureStatus.FINISHED]

cancelled()

Return True if the Future was cancelled.

Source code in policy/central/command/schema/future/future/future.py
def cancelled(self) -> bool:
    """
    Return True if the Future was cancelled.
    """
    return self.status in [FutureStatus.CANCELLED]

error()

Return True if the Future finished with an error.

Source code in policy/central/command/schema/future/future/future.py
def error(self) -> bool:
    """
    Return True if the Future finished with an error.
    """
    return self.status in [FutureStatus.ERROR]

not_started()

Return True if the Future has not started.

Source code in policy/central/command/schema/future/future/future.py
def not_started(self) -> bool:
    """
    Return True if the Future has not started.
    """
    return self.status in [FutureStatus.NOT_STARTED]

running()

Return True if the Future is currently running.

Source code in policy/central/command/schema/future/future/future.py
def running(self) -> bool:
    """
    Return True if the Future is currently running.
    """
    return self.status in [FutureStatus.RUNNING]

manifest

Bases: Entry

Entry subclass wrapping a nested dict of global_id references.

A manifest maps user-defined string keys to global_id strings, lists of global_id strings, or recursively nested dicts following the same rules. It can be constructed from raw ID strings or from Entry objects.

The manifest's .data IS the blueprint dict. It carries scope MANIFEST and evolution None (constant).

Parameters:

Name Type Description Default
data dict

A nested dict whose leaves are global_id strings, lists of global_id strings, or Entry instances. Entry instances are converted to a blueprint of global_id strings and stashed for a subsequent memorize() call.

{}
uuid str

Explicit UUID for the manifest's own identity.

required
nickname str

Human-readable name converted to a deterministic UUID.

required
global_id str

Composite identifier used to set identity.

required
Source code in policy/central/memory/schema/manifest.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
class Manifest(Entry):
    """Entry subclass wrapping a nested dict of ``global_id`` references.

    A manifest maps user-defined string keys to ``global_id`` strings, lists
    of ``global_id`` strings, or recursively nested dicts following the same
    rules.  It can be constructed from raw ID strings or from ``Entry``
    objects.

    The manifest's ``.data`` IS the blueprint dict.  It carries scope
    ``MANIFEST`` and evolution ``None`` (constant).

    Parameters
    ----------
    data : dict, optional
        A nested dict whose leaves are ``global_id`` strings, lists of
        ``global_id`` strings, or ``Entry`` instances.  Entry instances are
        converted to a blueprint of ``global_id`` strings and stashed for
        a subsequent ``memorize()`` call.
    uuid : str, optional
        Explicit UUID for the manifest's own identity.
    nickname : str, optional
        Human-readable name converted to a deterministic UUID.
    global_id : str, optional
        Composite identifier used to set identity.
    """

    _scopes: list[str] = PrivateAttr(default_factory=lambda: [_MANIFEST_SCOPE])
    _pending_entries: Optional[list] = PrivateAttr(default=None)

    def __init__(self, **data: Any):
        raw_data = data.pop("data", None)

        blueprint = None
        pending = None

        if raw_data is not None:
            has_entries, has_strings = Manifest._classify_data(raw_data)

            if has_entries and has_strings:
                raise ValueError(
                    "Manifest data must contain either all Entry objects or all "
                    "global_id strings, not a mix of both."
                )

            if has_entries:
                blueprint = Manifest._extract_blueprint(raw_data)
                pending = list(Manifest._iter_entries(raw_data))
            else:
                Manifest._validate_blueprint(raw_data)
                blueprint = copy.deepcopy(raw_data)

        entry_kwargs = dict(data)
        entry_kwargs["evolution"] = None
        if blueprint is not None:
            entry_kwargs["data"] = blueprint
            entry_kwargs["state"] = EntryState.READY

        Entry.__init__(self, **entry_kwargs)
        self._pending_entries = pending

    # ------------------------------------------------------------------
    # Properties
    # ------------------------------------------------------------------

    @property
    def blueprint(self) -> Optional[dict]:
        """The nested dict with ``global_id`` strings as leaf values."""
        return self.data

    @property
    def resolved(self) -> dict:
        """Synchronously fetch all referenced entries through central memory.

        Walks the blueprint, recalls each entry via the memory layer's
        ``remember`` path (routing, deserialization, etc.), and returns a
        nested dict of ``Entry`` objects mirroring the blueprint structure.
        No caching — each access re-fetches.

        Raises
        ------
        RuntimeError
            If the manifest has no blueprint.
        KeyError
            If any referenced entry is missing from the routed pool.
        """
        import laila

        if self.data is None:
            raise RuntimeError("No blueprint to resolve — manifest is empty.")

        all_gids = list(self)
        if not all_gids:
            return Manifest._rebuild_with_entries(self.data, {})

        memory = laila.get_active_policy().central.memory
        ref = memory.remember(entry_ids=all_gids)
        results = ref.wait(None)
        if not isinstance(results, list):
            results = [results]

        resolved_map = dict(zip(all_gids, results))
        return Manifest._rebuild_with_entries(self.data, resolved_map)

    # ------------------------------------------------------------------
    # Core operations (all return GroupFuture)
    # ------------------------------------------------------------------

    def memorize(
        self,
        *,
        pool_nickname: Optional[str] = None,
        pool_id: Optional[str] = None,
        batch_size: int = 128,
    ):
        """Upload all referenced entries and store the manifest's blueprint.

        Collects any pending ``Entry`` objects provided at construction time,
        uploads them in batches, then stores the manifest itself (whose
        payload is the blueprint dict).

        Parameters
        ----------
        pool_nickname : str, optional
            Pool alias to route entries to.  Defaults to the alpha pool.
        pool_id : str, optional
            Explicit pool ``global_id``.  Defaults to the alpha pool.
        batch_size : int
            Maximum entries per upload batch.

        Returns
        -------
        GroupFuture
            Tracks all writes (leaf entries + manifest itself).
        """
        import laila
        from ...command.schema.future.future.group_future import GroupFuture

        if self.data is None:
            raise RuntimeError("Nothing to memorize — manifest has no blueprint.")

        pool_kwargs = Manifest._build_pool_kwargs(pool_nickname, pool_id)
        all_future_ids: list[str] = []
        policy = laila.get_active_policy()

        if self._pending_entries:
            for i in range(0, len(self._pending_entries), batch_size):
                batch = self._pending_entries[i : i + batch_size]
                ref = laila.memorize(batch, **pool_kwargs)
                all_future_ids.extend(Manifest._collect_future_ids(ref))
            self._pending_entries = None

        self_ref = laila.memorize(self, **pool_kwargs)
        all_future_ids.extend(Manifest._collect_future_ids(self_ref))

        return GroupFuture(
            taskforce_id=policy.central.command.alpha_taskforce,
            policy_id=policy.global_id,
            future_ids=all_future_ids,
        )

    def remember(
        self,
        *,
        pool_nickname: Optional[str] = None,
        pool_id: Optional[str] = None,
        batch_size: int = 128,
    ):
        """Recall all referenced entries from the pool.

        Collects every leaf ``global_id`` from the blueprint, fetches them
        in batches via ``laila.remember()``.

        Parameters
        ----------
        pool_nickname : str, optional
            Pool alias to read from.  Defaults to the alpha pool.
        pool_id : str, optional
            Explicit pool ``global_id``.  Defaults to the alpha pool.
        batch_size : int
            Maximum entries per recall batch.

        Returns
        -------
        GroupFuture
            Child futures resolve to the recalled ``Entry`` objects.
        """
        import laila
        from ...command.schema.future.future.group_future import GroupFuture

        if self.data is None:
            raise RuntimeError("No blueprint to resolve — manifest is empty.")

        pool_kwargs = Manifest._build_pool_kwargs(pool_nickname, pool_id)
        all_gids = list(self)
        all_future_ids: list[str] = []
        policy = laila.get_active_policy()

        for i in range(0, len(all_gids), batch_size):
            batch = all_gids[i : i + batch_size]
            ref = laila.remember(batch, **pool_kwargs)
            all_future_ids.extend(Manifest._collect_future_ids(ref))

        return GroupFuture(
            taskforce_id=policy.central.command.alpha_taskforce,
            policy_id=policy.global_id,
            future_ids=all_future_ids,
        )

    def forget(
        self,
        *,
        pool_nickname: Optional[str] = None,
        pool_id: Optional[str] = None,
        batch_size: int = 128,
    ):
        """Delete all referenced entries and the manifest itself from the pool.

        Collects every leaf ``global_id`` plus the manifest's own
        ``global_id``, deletes them in batches via ``laila.forget()``.

        Parameters
        ----------
        pool_nickname : str, optional
            Pool alias to delete from.  Defaults to the alpha pool.
        pool_id : str, optional
            Explicit pool ``global_id``.  Defaults to the alpha pool.
        batch_size : int
            Maximum entries per deletion batch.

        Returns
        -------
        GroupFuture
            Tracks all deletions (leaf entries + manifest itself).
        """
        import laila
        from ...command.schema.future.future.group_future import GroupFuture

        if self.data is None:
            raise RuntimeError("No blueprint — nothing to forget.")

        pool_kwargs = Manifest._build_pool_kwargs(pool_nickname, pool_id)
        all_gids = list(self)
        all_future_ids: list[str] = []
        policy = laila.get_active_policy()

        for i in range(0, len(all_gids), batch_size):
            batch = all_gids[i : i + batch_size]
            ref = laila.forget(batch, **pool_kwargs)
            all_future_ids.extend(Manifest._collect_future_ids(ref))

        self_ref = laila.forget(self.global_id, **pool_kwargs)
        all_future_ids.extend(Manifest._collect_future_ids(self_ref))

        return GroupFuture(
            taskforce_id=policy.central.command.alpha_taskforce,
            policy_id=policy.global_id,
            future_ids=all_future_ids,
        )

    # ------------------------------------------------------------------
    # Serialization override
    # ------------------------------------------------------------------

    def serialize(
        self,
        transformations=None,
        *,
        exclude_private=None,
    ):
        """Serialize the manifest, excluding transient ``_pending_entries``."""
        if exclude_private is None:
            exclude_private = {"_local_lock", "_payload", "_state", "_pending_entries"}
        return Entry.serialize(
            self, transformations, exclude_private=exclude_private
        )

    @classmethod
    def recover(cls, in_dict: dict, notify_on_creation=False):
        """Reconstruct a Manifest from a serialised dict or JSON string.

        Parameters
        ----------
        in_dict : dict or str or Manifest
            Serialised representation produced by ``serialize()``, a JSON
            string thereof, or an existing Manifest (returned as-is).
        notify_on_creation : bool, optional
            If ``True``, notify the active policy after construction.

        Returns
        -------
        Manifest
            The recovered Manifest instance.
        """
        import json

        if isinstance(in_dict, Manifest):
            return in_dict

        if isinstance(in_dict, str):
            try:
                in_dict = json.loads(in_dict)
            except Exception as e:
                raise ValueError("Invalid JSON string") from e

        if isinstance(in_dict, dict):
            payload = ComputationalData.recover(
                payload_blob=in_dict["transformed_payload"],
                recovery_sequence=in_dict["recovery_sequence"],
            )
            blueprint = payload.data if payload is not None else None
            return Manifest(
                data=blueprint,
                uuid=in_dict["_uuid"],
            )

        raise RuntimeError("Invalid input for manifest recovery.")

    # ------------------------------------------------------------------
    # Mapping-like API  (top-level blueprint keys for dict(manifest))
    # ------------------------------------------------------------------

    def keys(self):
        """Top-level blueprint keys."""
        if self.data is None:
            return {}.keys()
        return self.data.keys()

    def values(self):
        """Top-level blueprint values."""
        if self.data is None:
            return {}.values()
        return self.data.values()

    def items(self):
        """Top-level blueprint items."""
        if self.data is None:
            return {}.items()
        return self.data.items()

    def sub_manifest(self, keys: list[str]) -> "Manifest":
        """Return a new ``Manifest`` containing only the specified top-level keys.

        Parameters
        ----------
        keys : list[str]
            Top-level blueprint keys to include in the sub-manifest.

        Returns
        -------
        Manifest
            A new manifest whose blueprint is the subset of this manifest's
            blueprint restricted to *keys*.

        Raises
        ------
        RuntimeError
            If the manifest has no blueprint.
        KeyError
            If any key in *keys* is not present in the blueprint.
        """
        if self.data is None:
            raise RuntimeError("Cannot create sub-manifest — manifest is empty.")
        missing = [k for k in keys if k not in self.data]
        if missing:
            raise KeyError(f"Keys not found in manifest: {missing}")
        subset = {k: copy.deepcopy(self.data[k]) for k in keys}
        return Manifest(data=subset)

    def extend(self, other: "Manifest", *, overwrite: bool = False) -> None:
        """Merge another manifest's blueprint into this one in-place.

        Parameters
        ----------
        other : Manifest
            The manifest whose top-level keys will be added.
        overwrite : bool
            If ``False`` (default), duplicate top-level keys raise
            ``KeyError``.  If ``True``, *other*'s values silently replace
            existing ones (like ``dict.update``).

        Raises
        ------
        TypeError
            If *other* is not a ``Manifest``.
        KeyError
            If *overwrite* is ``False`` and the blueprints share keys.
        """
        if not isinstance(other, Manifest):
            raise TypeError(
                f"extend() requires a Manifest, got {type(other).__name__}"
            )
        if other.data is None:
            return

        if self.data is None:
            from .....entry.compdata.taxonomy.compdata import ComputationalData

            self._payload = ComputationalData(copy.deepcopy(other.data))
            self._state = EntryState.READY
        else:
            if not overwrite:
                overlap = set(self.data) & set(other.data)
                if overlap:
                    raise KeyError(
                        f"Duplicate top-level keys: {sorted(overlap)}"
                    )
            self.data.update(copy.deepcopy(other.data))

        if other._pending_entries:
            if self._pending_entries is None:
                self._pending_entries = list(other._pending_entries)
            else:
                self._pending_entries.extend(other._pending_entries)

    def __iadd__(self, other: Any) -> "Manifest":
        """``manifest += other`` — merge *other* in-place and return self."""
        if not isinstance(other, Manifest):
            return NotImplemented
        self.extend(other)
        return self

    def __add__(self, other: Any) -> "Manifest":
        """``manifest + other`` — return a new manifest with merged blueprints."""
        if not isinstance(other, Manifest):
            return NotImplemented

        if self.data is not None and other.data is not None:
            overlap = set(self.data) & set(other.data)
            if overlap:
                raise KeyError(
                    f"Duplicate top-level keys: {sorted(overlap)}"
                )

        merged: dict = {}
        if self.data is not None:
            merged.update(copy.deepcopy(self.data))
        if other.data is not None:
            merged.update(copy.deepcopy(other.data))

        if not merged:
            return Manifest()

        result = Manifest(data=merged)

        pending: list = []
        if self._pending_entries:
            pending.extend(self._pending_entries)
        if other._pending_entries:
            pending.extend(other._pending_entries)
        if pending:
            result._pending_entries = pending

        return result

    def __getitem__(self, key: str) -> Any:
        """Look up a top-level key in the blueprint."""
        if self.data is None:
            raise KeyError(key)
        return self.data[key]

    def __len__(self) -> int:
        """Number of top-level keys in the blueprint."""
        if self.data is None:
            return 0
        return len(self.data)

    # ------------------------------------------------------------------
    # Iteration / containment  (flattened global_id leaves)
    # ------------------------------------------------------------------

    def __iter__(self) -> Iterator[str]:
        """Yield every ``global_id`` string via a depth-first, insertion-order walk."""
        if self.data is None:
            return
        yield from Manifest._iter_global_ids(self.data)

    def __contains__(self, global_id: object) -> bool:
        """Return ``True`` if *global_id* appears anywhere in the blueprint leaves."""
        if not isinstance(global_id, str):
            return False
        for gid in self:
            if gid == global_id:
                return True
        return False

    # ------------------------------------------------------------------
    # String representation
    # ------------------------------------------------------------------

    def __str__(self) -> str:
        return self.global_id

    def __repr__(self) -> str:
        n = sum(1 for _ in self)
        return f"Manifest({self.global_id}, entries={n})"

    # ------------------------------------------------------------------
    # Static / class helpers
    # ------------------------------------------------------------------

    @staticmethod
    def _collect_future_ids(ref) -> list[str]:
        """Extract future IDs from a GroupFuture or a single future identity."""
        if ref is None:
            return []
        if hasattr(ref, "future_ids"):
            return list(ref.future_ids)
        return [ref.global_id]

    @staticmethod
    def _build_pool_kwargs(
        pool_nickname: Optional[str] = None,
        pool_id: Optional[str] = None,
    ) -> dict[str, str]:
        """Build keyword arguments for pool routing."""
        kwargs: dict[str, str] = {}
        if pool_nickname is not None:
            kwargs["pool_nickname"] = pool_nickname
        if pool_id is not None:
            kwargs["pool_id"] = pool_id
        return kwargs

    @staticmethod
    def _classify_data(data: dict) -> tuple[bool, bool]:
        """Return ``(has_entries, has_strings)`` for leaf values in *data*."""
        from .....entry import Entry as _Entry

        has_entries = False
        has_strings = False

        def _check(val: Any) -> None:
            nonlocal has_entries, has_strings
            if isinstance(val, _Entry):
                has_entries = True
            elif isinstance(val, str):
                has_strings = True
            elif isinstance(val, list):
                for item in val:
                    _check(item)
            elif isinstance(val, dict):
                for v in val.values():
                    _check(v)

        for v in data.values():
            _check(v)

        return has_entries, has_strings

    @staticmethod
    def _iter_entries(data: dict) -> Iterator:
        """Yield every ``Entry`` object from a nested dict."""
        from .....entry import Entry as _Entry

        for val in data.values():
            if isinstance(val, _Entry):
                yield val
            elif isinstance(val, list):
                for item in val:
                    if isinstance(item, _Entry):
                        yield item
            elif isinstance(val, dict):
                yield from Manifest._iter_entries(val)

    @staticmethod
    def _validate_blueprint(data: Any) -> None:
        """Recursively validate that *data* follows the blueprint schema.

        Raises ``ValueError`` if any key is not a string or any leaf is not
        a ``global_id`` string (or list of them).
        """
        if not isinstance(data, dict):
            raise ValueError("Blueprint must be a dict.")
        for key, val in data.items():
            if not isinstance(key, str):
                raise ValueError(f"Blueprint keys must be strings, got {type(key)}")
            if isinstance(val, str):
                continue
            elif isinstance(val, list):
                for item in val:
                    if not isinstance(item, str):
                        raise ValueError(
                            f"List values in blueprint must be strings, got {type(item)}"
                        )
            elif isinstance(val, dict):
                Manifest._validate_blueprint(val)
            else:
                raise ValueError(
                    f"Blueprint values must be str, list[str], or dict — got {type(val)}"
                )

    @staticmethod
    def _extract_blueprint(data: dict) -> dict:
        """Convert a dict of ``Entry`` objects to a blueprint of ``global_id`` strings."""
        from .....entry import Entry as _Entry

        result: dict[str, Any] = {}
        for key, val in data.items():
            if isinstance(val, _Entry):
                result[key] = val.global_id
            elif isinstance(val, str):
                result[key] = val
            elif isinstance(val, list):
                result[key] = [
                    item.global_id if isinstance(item, _Entry) else item
                    for item in val
                ]
            elif isinstance(val, dict):
                result[key] = Manifest._extract_blueprint(val)
            else:
                raise ValueError(f"Unexpected value type in manifest data: {type(val)}")
        return result

    @staticmethod
    def _iter_global_ids(blueprint: dict) -> Iterator[str]:
        """Depth-first, insertion-order walk yielding every leaf ``global_id``."""
        for val in blueprint.values():
            if isinstance(val, str):
                yield val
            elif isinstance(val, list):
                yield from val
            elif isinstance(val, dict):
                yield from Manifest._iter_global_ids(val)

    @staticmethod
    def _rebuild_with_entries(
        blueprint: dict,
        resolved_map: dict[str, Any],
    ) -> dict:
        """Reconstruct the nested structure replacing ``global_id`` strings with entries."""
        result: dict[str, Any] = {}
        for key, val in blueprint.items():
            if isinstance(val, str):
                result[key] = resolved_map[val]
            elif isinstance(val, list):
                result[key] = [resolved_map[gid] for gid in val]
            elif isinstance(val, dict):
                result[key] = Manifest._rebuild_with_entries(val, resolved_map)
        return result

blueprint property

The nested dict with global_id strings as leaf values.

resolved property

Synchronously fetch all referenced entries through central memory.

Walks the blueprint, recalls each entry via the memory layer's remember path (routing, deserialization, etc.), and returns a nested dict of Entry objects mirroring the blueprint structure. No caching — each access re-fetches.

Raises:

Type Description
RuntimeError

If the manifest has no blueprint.

KeyError

If any referenced entry is missing from the routed pool.

memorize(*, pool_nickname=None, pool_id=None, batch_size=128)

Upload all referenced entries and store the manifest's blueprint.

Collects any pending Entry objects provided at construction time, uploads them in batches, then stores the manifest itself (whose payload is the blueprint dict).

Parameters:

Name Type Description Default
pool_nickname str

Pool alias to route entries to. Defaults to the alpha pool.

None
pool_id str

Explicit pool global_id. Defaults to the alpha pool.

None
batch_size int

Maximum entries per upload batch.

128

Returns:

Type Description
GroupFuture

Tracks all writes (leaf entries + manifest itself).

Source code in policy/central/memory/schema/manifest.py
def memorize(
    self,
    *,
    pool_nickname: Optional[str] = None,
    pool_id: Optional[str] = None,
    batch_size: int = 128,
):
    """Upload all referenced entries and store the manifest's blueprint.

    Collects any pending ``Entry`` objects provided at construction time,
    uploads them in batches, then stores the manifest itself (whose
    payload is the blueprint dict).

    Parameters
    ----------
    pool_nickname : str, optional
        Pool alias to route entries to.  Defaults to the alpha pool.
    pool_id : str, optional
        Explicit pool ``global_id``.  Defaults to the alpha pool.
    batch_size : int
        Maximum entries per upload batch.

    Returns
    -------
    GroupFuture
        Tracks all writes (leaf entries + manifest itself).
    """
    import laila
    from ...command.schema.future.future.group_future import GroupFuture

    if self.data is None:
        raise RuntimeError("Nothing to memorize — manifest has no blueprint.")

    pool_kwargs = Manifest._build_pool_kwargs(pool_nickname, pool_id)
    all_future_ids: list[str] = []
    policy = laila.get_active_policy()

    if self._pending_entries:
        for i in range(0, len(self._pending_entries), batch_size):
            batch = self._pending_entries[i : i + batch_size]
            ref = laila.memorize(batch, **pool_kwargs)
            all_future_ids.extend(Manifest._collect_future_ids(ref))
        self._pending_entries = None

    self_ref = laila.memorize(self, **pool_kwargs)
    all_future_ids.extend(Manifest._collect_future_ids(self_ref))

    return GroupFuture(
        taskforce_id=policy.central.command.alpha_taskforce,
        policy_id=policy.global_id,
        future_ids=all_future_ids,
    )

remember(*, pool_nickname=None, pool_id=None, batch_size=128)

Recall all referenced entries from the pool.

Collects every leaf global_id from the blueprint, fetches them in batches via laila.remember().

Parameters:

Name Type Description Default
pool_nickname str

Pool alias to read from. Defaults to the alpha pool.

None
pool_id str

Explicit pool global_id. Defaults to the alpha pool.

None
batch_size int

Maximum entries per recall batch.

128

Returns:

Type Description
GroupFuture

Child futures resolve to the recalled Entry objects.

Source code in policy/central/memory/schema/manifest.py
def remember(
    self,
    *,
    pool_nickname: Optional[str] = None,
    pool_id: Optional[str] = None,
    batch_size: int = 128,
):
    """Recall all referenced entries from the pool.

    Collects every leaf ``global_id`` from the blueprint, fetches them
    in batches via ``laila.remember()``.

    Parameters
    ----------
    pool_nickname : str, optional
        Pool alias to read from.  Defaults to the alpha pool.
    pool_id : str, optional
        Explicit pool ``global_id``.  Defaults to the alpha pool.
    batch_size : int
        Maximum entries per recall batch.

    Returns
    -------
    GroupFuture
        Child futures resolve to the recalled ``Entry`` objects.
    """
    import laila
    from ...command.schema.future.future.group_future import GroupFuture

    if self.data is None:
        raise RuntimeError("No blueprint to resolve — manifest is empty.")

    pool_kwargs = Manifest._build_pool_kwargs(pool_nickname, pool_id)
    all_gids = list(self)
    all_future_ids: list[str] = []
    policy = laila.get_active_policy()

    for i in range(0, len(all_gids), batch_size):
        batch = all_gids[i : i + batch_size]
        ref = laila.remember(batch, **pool_kwargs)
        all_future_ids.extend(Manifest._collect_future_ids(ref))

    return GroupFuture(
        taskforce_id=policy.central.command.alpha_taskforce,
        policy_id=policy.global_id,
        future_ids=all_future_ids,
    )

forget(*, pool_nickname=None, pool_id=None, batch_size=128)

Delete all referenced entries and the manifest itself from the pool.

Collects every leaf global_id plus the manifest's own global_id, deletes them in batches via laila.forget().

Parameters:

Name Type Description Default
pool_nickname str

Pool alias to delete from. Defaults to the alpha pool.

None
pool_id str

Explicit pool global_id. Defaults to the alpha pool.

None
batch_size int

Maximum entries per deletion batch.

128

Returns:

Type Description
GroupFuture

Tracks all deletions (leaf entries + manifest itself).

Source code in policy/central/memory/schema/manifest.py
def forget(
    self,
    *,
    pool_nickname: Optional[str] = None,
    pool_id: Optional[str] = None,
    batch_size: int = 128,
):
    """Delete all referenced entries and the manifest itself from the pool.

    Collects every leaf ``global_id`` plus the manifest's own
    ``global_id``, deletes them in batches via ``laila.forget()``.

    Parameters
    ----------
    pool_nickname : str, optional
        Pool alias to delete from.  Defaults to the alpha pool.
    pool_id : str, optional
        Explicit pool ``global_id``.  Defaults to the alpha pool.
    batch_size : int
        Maximum entries per deletion batch.

    Returns
    -------
    GroupFuture
        Tracks all deletions (leaf entries + manifest itself).
    """
    import laila
    from ...command.schema.future.future.group_future import GroupFuture

    if self.data is None:
        raise RuntimeError("No blueprint — nothing to forget.")

    pool_kwargs = Manifest._build_pool_kwargs(pool_nickname, pool_id)
    all_gids = list(self)
    all_future_ids: list[str] = []
    policy = laila.get_active_policy()

    for i in range(0, len(all_gids), batch_size):
        batch = all_gids[i : i + batch_size]
        ref = laila.forget(batch, **pool_kwargs)
        all_future_ids.extend(Manifest._collect_future_ids(ref))

    self_ref = laila.forget(self.global_id, **pool_kwargs)
    all_future_ids.extend(Manifest._collect_future_ids(self_ref))

    return GroupFuture(
        taskforce_id=policy.central.command.alpha_taskforce,
        policy_id=policy.global_id,
        future_ids=all_future_ids,
    )

serialize(transformations=None, *, exclude_private=None)

Serialize the manifest, excluding transient _pending_entries.

Source code in policy/central/memory/schema/manifest.py
def serialize(
    self,
    transformations=None,
    *,
    exclude_private=None,
):
    """Serialize the manifest, excluding transient ``_pending_entries``."""
    if exclude_private is None:
        exclude_private = {"_local_lock", "_payload", "_state", "_pending_entries"}
    return Entry.serialize(
        self, transformations, exclude_private=exclude_private
    )

recover(in_dict, notify_on_creation=False) classmethod

Reconstruct a Manifest from a serialised dict or JSON string.

Parameters:

Name Type Description Default
in_dict dict or str or Manifest

Serialised representation produced by serialize(), a JSON string thereof, or an existing Manifest (returned as-is).

required
notify_on_creation bool

If True, notify the active policy after construction.

False

Returns:

Type Description
Manifest

The recovered Manifest instance.

Source code in policy/central/memory/schema/manifest.py
@classmethod
def recover(cls, in_dict: dict, notify_on_creation=False):
    """Reconstruct a Manifest from a serialised dict or JSON string.

    Parameters
    ----------
    in_dict : dict or str or Manifest
        Serialised representation produced by ``serialize()``, a JSON
        string thereof, or an existing Manifest (returned as-is).
    notify_on_creation : bool, optional
        If ``True``, notify the active policy after construction.

    Returns
    -------
    Manifest
        The recovered Manifest instance.
    """
    import json

    if isinstance(in_dict, Manifest):
        return in_dict

    if isinstance(in_dict, str):
        try:
            in_dict = json.loads(in_dict)
        except Exception as e:
            raise ValueError("Invalid JSON string") from e

    if isinstance(in_dict, dict):
        payload = ComputationalData.recover(
            payload_blob=in_dict["transformed_payload"],
            recovery_sequence=in_dict["recovery_sequence"],
        )
        blueprint = payload.data if payload is not None else None
        return Manifest(
            data=blueprint,
            uuid=in_dict["_uuid"],
        )

    raise RuntimeError("Invalid input for manifest recovery.")

keys()

Top-level blueprint keys.

Source code in policy/central/memory/schema/manifest.py
def keys(self):
    """Top-level blueprint keys."""
    if self.data is None:
        return {}.keys()
    return self.data.keys()

values()

Top-level blueprint values.

Source code in policy/central/memory/schema/manifest.py
def values(self):
    """Top-level blueprint values."""
    if self.data is None:
        return {}.values()
    return self.data.values()

items()

Top-level blueprint items.

Source code in policy/central/memory/schema/manifest.py
def items(self):
    """Top-level blueprint items."""
    if self.data is None:
        return {}.items()
    return self.data.items()

sub_manifest(keys)

Return a new Manifest containing only the specified top-level keys.

Parameters:

Name Type Description Default
keys list[str]

Top-level blueprint keys to include in the sub-manifest.

required

Returns:

Type Description
Manifest

A new manifest whose blueprint is the subset of this manifest's blueprint restricted to keys.

Raises:

Type Description
RuntimeError

If the manifest has no blueprint.

KeyError

If any key in keys is not present in the blueprint.

Source code in policy/central/memory/schema/manifest.py
def sub_manifest(self, keys: list[str]) -> "Manifest":
    """Return a new ``Manifest`` containing only the specified top-level keys.

    Parameters
    ----------
    keys : list[str]
        Top-level blueprint keys to include in the sub-manifest.

    Returns
    -------
    Manifest
        A new manifest whose blueprint is the subset of this manifest's
        blueprint restricted to *keys*.

    Raises
    ------
    RuntimeError
        If the manifest has no blueprint.
    KeyError
        If any key in *keys* is not present in the blueprint.
    """
    if self.data is None:
        raise RuntimeError("Cannot create sub-manifest — manifest is empty.")
    missing = [k for k in keys if k not in self.data]
    if missing:
        raise KeyError(f"Keys not found in manifest: {missing}")
    subset = {k: copy.deepcopy(self.data[k]) for k in keys}
    return Manifest(data=subset)

extend(other, *, overwrite=False)

Merge another manifest's blueprint into this one in-place.

Parameters:

Name Type Description Default
other Manifest

The manifest whose top-level keys will be added.

required
overwrite bool

If False (default), duplicate top-level keys raise KeyError. If True, other's values silently replace existing ones (like dict.update).

False

Raises:

Type Description
TypeError

If other is not a Manifest.

KeyError

If overwrite is False and the blueprints share keys.

Source code in policy/central/memory/schema/manifest.py
def extend(self, other: "Manifest", *, overwrite: bool = False) -> None:
    """Merge another manifest's blueprint into this one in-place.

    Parameters
    ----------
    other : Manifest
        The manifest whose top-level keys will be added.
    overwrite : bool
        If ``False`` (default), duplicate top-level keys raise
        ``KeyError``.  If ``True``, *other*'s values silently replace
        existing ones (like ``dict.update``).

    Raises
    ------
    TypeError
        If *other* is not a ``Manifest``.
    KeyError
        If *overwrite* is ``False`` and the blueprints share keys.
    """
    if not isinstance(other, Manifest):
        raise TypeError(
            f"extend() requires a Manifest, got {type(other).__name__}"
        )
    if other.data is None:
        return

    if self.data is None:
        from .....entry.compdata.taxonomy.compdata import ComputationalData

        self._payload = ComputationalData(copy.deepcopy(other.data))
        self._state = EntryState.READY
    else:
        if not overwrite:
            overlap = set(self.data) & set(other.data)
            if overlap:
                raise KeyError(
                    f"Duplicate top-level keys: {sorted(overlap)}"
                )
        self.data.update(copy.deepcopy(other.data))

    if other._pending_entries:
        if self._pending_entries is None:
            self._pending_entries = list(other._pending_entries)
        else:
            self._pending_entries.extend(other._pending_entries)

__iadd__(other)

manifest += other — merge other in-place and return self.

Source code in policy/central/memory/schema/manifest.py
def __iadd__(self, other: Any) -> "Manifest":
    """``manifest += other`` — merge *other* in-place and return self."""
    if not isinstance(other, Manifest):
        return NotImplemented
    self.extend(other)
    return self

__add__(other)

manifest + other — return a new manifest with merged blueprints.

Source code in policy/central/memory/schema/manifest.py
def __add__(self, other: Any) -> "Manifest":
    """``manifest + other`` — return a new manifest with merged blueprints."""
    if not isinstance(other, Manifest):
        return NotImplemented

    if self.data is not None and other.data is not None:
        overlap = set(self.data) & set(other.data)
        if overlap:
            raise KeyError(
                f"Duplicate top-level keys: {sorted(overlap)}"
            )

    merged: dict = {}
    if self.data is not None:
        merged.update(copy.deepcopy(self.data))
    if other.data is not None:
        merged.update(copy.deepcopy(other.data))

    if not merged:
        return Manifest()

    result = Manifest(data=merged)

    pending: list = []
    if self._pending_entries:
        pending.extend(self._pending_entries)
    if other._pending_entries:
        pending.extend(other._pending_entries)
    if pending:
        result._pending_entries = pending

    return result

__getitem__(key)

Look up a top-level key in the blueprint.

Source code in policy/central/memory/schema/manifest.py
def __getitem__(self, key: str) -> Any:
    """Look up a top-level key in the blueprint."""
    if self.data is None:
        raise KeyError(key)
    return self.data[key]

__len__()

Number of top-level keys in the blueprint.

Source code in policy/central/memory/schema/manifest.py
def __len__(self) -> int:
    """Number of top-level keys in the blueprint."""
    if self.data is None:
        return 0
    return len(self.data)

__iter__()

Yield every global_id string via a depth-first, insertion-order walk.

Source code in policy/central/memory/schema/manifest.py
def __iter__(self) -> Iterator[str]:
    """Yield every ``global_id`` string via a depth-first, insertion-order walk."""
    if self.data is None:
        return
    yield from Manifest._iter_global_ids(self.data)

__contains__(global_id)

Return True if global_id appears anywhere in the blueprint leaves.

Source code in policy/central/memory/schema/manifest.py
def __contains__(self, global_id: object) -> bool:
    """Return ``True`` if *global_id* appears anywhere in the blueprint leaves."""
    if not isinstance(global_id, str):
        return False
    for gid in self:
        if gid == global_id:
            return True
    return False

__resolve_nickname(kwargs)

Convert a nickname in kwargs to a deterministic global_id list.

activate_policy(policy)

Replace the active policy singleton.

Accepts a local _LAILA_IDENTIFIABLE_POLICY or a RemotePolicyProxy obtained from laila.peers.

Equivalent to laila.active_policy = policy.

Parameters:

Name Type Description Default
policy _LAILA_IDENTIFIABLE_POLICY | RemotePolicyProxy

The policy instance (local or remote proxy) to activate globally.

required

add_peer(uri, secret)

Connect to a remote policy and register it as a peer.

Parameters:

Name Type Description Default
uri str

URI of the remote policy (e.g. "ws://host:port").

required
secret str

The remote policy's peer_secret_key (on its protocol).

required

Returns:

Type Description
str

The global_id of the newly peered remote policy.

constant(cls, data, *, global_id=None, uuid=None, nickname=None) classmethod

Create an immutable Entry (evolution is None).

Parameters:

Name Type Description Default
data Any

The raw payload.

required
global_id str

Composite identifier (must not contain an evolution component).

None
uuid str

Explicit UUID. Mutually exclusive with global_id.

None
nickname str

Human-readable name used to derive a deterministic UUID.

None

Returns:

Type Description
Entry

A new constant Entry.

Raises:

Type Description
RuntimeError

If conflicting identity arguments are provided or the global_id includes an evolution component.

Source code in entry/entry.py
@classmethod
def constant(
    cls, 
    data, 
    *,
    global_id = None,
    uuid = None,
    nickname = None
):
    """Create an immutable Entry (evolution is ``None``).

    Parameters
    ----------
    data : Any
        The raw payload.
    global_id : str, optional
        Composite identifier (must not contain an evolution component).
    uuid : str, optional
        Explicit UUID.  Mutually exclusive with *global_id*.
    nickname : str, optional
        Human-readable name used to derive a deterministic UUID.

    Returns
    -------
    Entry
        A new constant Entry.

    Raises
    ------
    RuntimeError
        If conflicting identity arguments are provided or the
        *global_id* includes an evolution component.
    """
    if global_id is not None and (uuid is not None):
        raise RuntimeError("Cannot set both global_id and uuid at the same time.")

    if uuid is None and global_id is not None:
        identity_data = _LAILA_IDENTIFIABLE_OBJECT.process_global_id(global_id)
        uuid = identity_data["uuid"]
        if identity_data["evolution"] is not None:
            raise RuntimeError("Cannot have a constant with an evolution.")

    if nickname is not None:
        uuid = cls.generate_uuid_from_nickname(nickname)

    new_entry = Entry(
        uuid = uuid,
        data = data,
        state=EntryState.READY,
        evolution=None 
    )

    return new_entry

contingent(cls, **kwargs) classmethod

Create an Entry from raw keyword arguments without validation guards.

Parameters:

Name Type Description Default
**kwargs

Forwarded directly to the Entry constructor.

{}

Returns:

Type Description
Entry
Source code in entry/entry.py
@classmethod
def contingent(cls, **kwargs):
    """Create an Entry from raw keyword arguments without validation guards.

    Parameters
    ----------
    **kwargs
        Forwarded directly to the ``Entry`` constructor.

    Returns
    -------
    Entry
    """
    return Entry(**kwargs)

forget(*args, **kwargs)

Delete one or more entries from the active policy's memory.

Removes the stored blob from the routed pool. Returns a future that resolves when the deletion completes.

Parameters:

Name Type Description Default
entry_ids str | list[str]

The global_id(s) of the entries to delete.

required
pool_id str

Explicit pool global_id to delete from.

required
pool_nickname str

Pool alias registered via extend.

required
nickname str

Convenience alias – converted to a deterministic global_id.

required

get_active_namespace()

Return the active UUID namespace, initializing it on first access.

get_active_policy()

Return the active policy, lazily creating a DefaultPolicy on first access.

memorize(*args, **kwargs)

Persist one or more entries into the active policy's memory.

Entries are serialized, transformed, and written to the routed pool. Returns a future (or GroupFuture) that resolves when the write completes. Returns None when the default in-memory pool is used.

Parameters:

Name Type Description Default
entries Entry | list[Entry]

The entry or entries to store.

required
pool_id str

Explicit pool global_id to route to.

required
pool_nickname str

Pool alias registered via extend.

required

read_args(source, *, terminal_args=None)

Load user arguments from a TOML/JSON/.env/.xml file or terminal into laila.args.

Mutates laila.args in place. Always returns None.

remember(*args, **kwargs)

Retrieve one or more entries from the active policy's memory.

Reads serialized blobs from the routed pool, applies the inverse transformation sequence, and returns recovered Entry objects wrapped in a future.

Parameters:

Name Type Description Default
entry_ids str | list[str]

The global_id(s) of the entries to recall.

required
pool_id str

Explicit pool global_id to read from.

required
pool_nickname str

Pool alias registered via extend.

required
nickname str

Convenience alias – converted to a deterministic global_id.

required

set_active_namespace(namespace_key)

Set the active UUID namespace derived from namespace_key.

Parameters:

Name Type Description Default
namespace_key str

DNS-style key used with uuid.uuid5 to generate the namespace.

required

set_default_directory(directory)

Override the default root directory and derived sub-directories.

Parameters:

Name Type Description Default
directory str

Filesystem path (may contain ~) to use as the new root.

required

status(future_ref)

Return the status of a future.

.. deprecated:: Use laila.runtime.status() instead.

variable(cls, data, *, uuid=None, evolution=None, state=None, constitution=None, global_id=None, nickname=None) classmethod

Create a mutable (evolvable) Entry.

Parameters:

Name Type Description Default
data Any

The raw payload.

required
uuid str

Explicit UUID. Mutually exclusive with global_id.

None
evolution int

Starting evolution counter (defaults to 0).

None
state EntryState

Initial state; auto-set to READY when no constitution is given.

None
constitution callable

Not yet implemented.

None
global_id str

Composite uuid:evolution identifier.

None
nickname str

Human-readable name used to derive a deterministic UUID.

None

Returns:

Type Description
Entry

A new variable Entry.

Raises:

Type Description
RuntimeError

If conflicting identity arguments are provided or both constitution and data are set.

NotImplementedError

If a constitution is supplied.

Source code in entry/entry.py
@classmethod
def variable(
    cls, 
    data,
    *, 
    uuid = None,
    evolution=None,
    state = None,
    constitution=None, 
    global_id = None,
    nickname = None
):
    """Create a mutable (evolvable) Entry.

    Parameters
    ----------
    data : Any
        The raw payload.
    uuid : str, optional
        Explicit UUID.  Mutually exclusive with *global_id*.
    evolution : int, optional
        Starting evolution counter (defaults to ``0``).
    state : EntryState, optional
        Initial state; auto-set to ``READY`` when no constitution is given.
    constitution : callable, optional
        Not yet implemented.
    global_id : str, optional
        Composite ``uuid:evolution`` identifier.
    nickname : str, optional
        Human-readable name used to derive a deterministic UUID.

    Returns
    -------
    Entry
        A new variable Entry.

    Raises
    ------
    RuntimeError
        If conflicting identity arguments are provided or both
        *constitution* and *data* are set.
    NotImplementedError
        If a constitution is supplied.
    """
    if global_id is not None and (uuid is not None or evolution is not None):
        raise RuntimeError("Cannot set both global_id and <uuid, evolution> at the same time.")

    if constitution and data:
        raise RuntimeError("Cannot set both constitution and data.")

    if constitution is None:
        state = EntryState.READY

    if constitution is not None:
        raise NotImplementedError("Constitution is not implemented in this release.")

    if global_id is not None:
        identity_data = _LAILA_IDENTIFIABLE_OBJECT.process_global_id(global_id)
        uuid = identity_data["uuid"]
        evolution = identity_data["evolution"]

    evolution = evolution if evolution is not None else 0

    if nickname is not None:
        uuid = cls.generate_uuid_from_nickname(nickname)

    return Entry(
        data = data,
        evolution = evolution,
        state = state,
        constitution = constitution,
        uuid = uuid
    )

wait(future_ref, timeout=None)

Block until the future completes and return its result.

.. deprecated:: Use laila.runtime.wait() instead.