Skip to content

Section 5b: Storage Adapters

Status: Design complete — v0.2 target
Last updated: 2026-03-25


Overview

Canon v0.1 contains a stub output_storage config with hard-coded type: local and a warning stub for type: s3. This section specifies the full pluggable storage adapter system for v0.2.

The core insight: storage is a concern of Canon core (where do outputs persist? where do inputs come from?), but staging (making files locally accessible for an executor) is a concern of executor adapters (does this executor need local files, or can it access storage natively?). These two concerns are cleanly separated.


Why a Plugin Architecture

Different execution environments require different storage backends: - Single workstation → local filesystem - Academic HPC cluster → NFS shared filesystem (still "local" semantically) - AWS cloud → S3 - GCP cloud → GCS - Academic data repository → OSF, iRODS

Canon core should not hard-code any of these. Community contributors should be able to publish canon-storage-s3, canon-storage-gcs, canon-storage-irods etc. as independent packages, following the same pattern already established for executor adapters (canon.executor_adapters entry point group).


StorageAdapter ABC

Location: canon/storage/base.py

from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any


class StorageAdapter(ABC):
    """Abstract base class for Canon storage backends.

    Responsible for:
    - put(): relocating CWL output files to permanent storage, returning canonical URI
    - get(): staging a URI to a local path (for executors that need local files)
    - exists(): checking whether a URI is accessible (used by planner REUSE decision)

    NOT responsible for:
    - Input validation or schema enforcement (Hippo's concern)
    - Workflow execution (executor adapter's concern)
    - Whether staging is needed (executor adapter declares this via requires_local_staging)
    """

    #: Entry point name, e.g. "local", "s3", "gcs". Must be unique.
    name: str

    #: URI schemes this adapter handles, e.g. ["file", "s3", "s3a"].
    uri_schemes: list[str]

    @abstractmethod
    def put(self, local_path: Path, dest_uri: str) -> str:
        """Relocate a local file to permanent storage.

        Called by OutputIngestionPipeline after CWL execution completes,
        when executor.requires_output_relocation is True.

        Args:
            local_path: Path to the file in the CWL work directory.
            dest_uri: Target URI (constructed from output_storage config + entity identity).

        Returns:
            Canonical URI of the relocated file (may differ from dest_uri
            if the backend rewrites it, e.g. to add a version hash).

        Raises:
            CanonStorageError: On any storage backend failure.
        """
        ...

    @abstractmethod
    def get(self, uri: str, local_dir: Path) -> Path:
        """Stage a file from permanent storage to a local directory.

        Called by InputStagingLayer before CWL execution,
        when executor.requires_local_staging is True.

        For local adapters where the URI is already a filesystem path,
        this is typically a no-op (return the path as-is) or a copy
        to node-local tmp.

        Args:
            uri: Canonical URI of the file to stage.
            local_dir: Target directory for the staged file.

        Returns:
            Local path to the staged file.

        Raises:
            CanonStorageError: On any storage backend failure.
        """
        ...

    @abstractmethod
    def exists(self, uri: str) -> bool:
        """Check whether a URI is accessible in this storage backend.

        Called by RecursivePlanner before making a REUSE vs BUILD decision.
        A URI that exists in Hippo metadata but is not accessible in storage
        should return False (triggers rebuild, not silent failure).

        Args:
            uri: Canonical URI to check.

        Returns:
            True if the file exists and is accessible.
        """
        ...

    def build_dest_uri(self, entity_type: str, entity_id: str, filename: str) -> str:
        """Construct a canonical destination URI for a new output file.

        Default implementation: subclasses may override for backend-specific
        URI construction (e.g. S3 key naming conventions).

        Args:
            entity_type: Hippo entity type of the output (e.g. "AlignmentFile").
            entity_id: Hippo entity UUID.
            filename: Original filename from CWL output.

        Returns:
            Canonical URI string.
        """
        raise NotImplementedError(
            f"{self.__class__.__name__} must implement build_dest_uri()"
        )

Bundled Adapter: LocalStorageAdapter

Location: canon/storage/local.py
Entry point: canon.storage_adapterslocal
URI schemes: file://, bare paths

class LocalStorageAdapter(StorageAdapter):
    name = "local"
    uri_schemes = ["file", ""]  # empty string = bare filesystem path

    def __init__(self, base_path: str | Path) -> None:
        self._base = Path(base_path)

    def put(self, local_path: Path, dest_uri: str) -> str:
        dest = self._uri_to_path(dest_uri)
        dest.parent.mkdir(parents=True, exist_ok=True)
        shutil.copy2(local_path, dest)
        return f"file://{dest}"

    def get(self, uri: str, local_dir: Path) -> Path:
        src = self._uri_to_path(uri)
        # For NFS/shared filesystems, the file is already accessible — return as-is.
        # For node-local execution without shared FS, caller should pass a different
        # local_dir and we copy; but by default trust the path is accessible.
        if src.exists():
            return src
        # Fall back to copy to local_dir
        dest = local_dir / src.name
        shutil.copy2(src, dest)
        return dest

    def exists(self, uri: str) -> bool:
        return self._uri_to_path(uri).exists()

    def build_dest_uri(self, entity_type: str, entity_id: str, filename: str) -> str:
        dest = self._base / entity_type.lower() / entity_id / filename
        return f"file://{dest}"

    def _uri_to_path(self, uri: str) -> Path:
        return Path(uri.removeprefix("file://"))

Coverage of LocalStorageAdapter:

Scenario Behavior
Single workstation get() returns path as-is (no copy).
HPC cluster, shared NFS Same — path accessible on all nodes.
HPC cluster, no shared FS get() copies to node-local local_dir.
CWL work dir cleanup After put(), caller is responsible for cleaning work dir.

Community Plugin: S3StorageAdapter (example, not bundled)

Package: canon-storage-s3 (separate PyPI package)
Entry point: canon.storage_adapterss3
URI schemes: s3://, s3a://

# pyproject.toml of canon-storage-s3:
[project.entry-points."canon.storage_adapters"]
s3 = "canon_storage_s3:S3StorageAdapter"
class S3StorageAdapter(StorageAdapter):
    name = "s3"
    uri_schemes = ["s3", "s3a"]

    def __init__(self, bucket: str, prefix: str = "", credentials: str = "env") -> None:
        self._bucket = bucket
        self._prefix = prefix
        # credentials: "env" | "instance_role" | "profile:<name>"
        self._client = _build_boto3_client(credentials)

    def put(self, local_path: Path, dest_uri: str) -> str:
        key = self._uri_to_key(dest_uri)
        self._client.upload_file(str(local_path), self._bucket, key)
        return f"s3://{self._bucket}/{key}"

    def get(self, uri: str, local_dir: Path) -> Path:
        key = self._uri_to_key(uri)
        dest = local_dir / Path(key).name
        self._client.download_file(self._bucket, key, str(dest))
        return dest

    def exists(self, uri: str) -> bool:
        key = self._uri_to_key(uri)
        try:
            self._client.head_object(Bucket=self._bucket, Key=key)
            return True
        except self._client.exceptions.ClientError:
            return False

Executor Adapter Integration

CWLExecutorAdapter declares whether it needs Canon to handle staging:

class CWLExecutorAdapter(ABC):
    #: If True, Canon's InputStagingLayer will call storage_adapter.get() for all
    #: non-local input URIs before execution. Set False for cloud-native executors
    #: (Nextflow, Toil on AWS Batch) that access storage URIs natively.
    requires_local_staging: bool = True

    #: If True, Canon's OutputIngestionPipeline will call storage_adapter.put() to
    #: relocate CWL work dir outputs to permanent storage. Set False for executors
    #: that write directly to permanent storage (e.g. Nextflow with publishDir).
    requires_output_relocation: bool = True

Current adapters:

Adapter requires_local_staging requires_output_relocation
CwltoolAdapter (bundled) True True
NextflowAdapter (future plugin) False False
ToilAdapter (future plugin) Depends on backend True for local, False for cloud

Canon pipeline with storage adapters:

RecursivePlanner.resolve(entity_type, params)
    ├─ hippo_client.find_entity() → entity found?
    │       YES → return entity.uri (REUSE)
    │       NO  → continue to BUILD
    ├─ storage_adapter.exists(input_uri) for each required input
    │       (validates inputs are accessible before committing to a build)
    ├─ [if executor.requires_local_staging]
    │       InputStagingLayer.stage(inputs, storage_adapter)
    │           → storage_adapter.get(uri, work_dir) for each non-local URI
    ├─ executor.run(cwl_path, staged_inputs, work_dir)
    │       → CWLRunResult(outputs, exit_code, ...)
    ├─ [if executor.requires_output_relocation]
    │       storage_adapter.put(local_output_path, dest_uri)
    │           → canonical_uri
    └─ hippo_client.ingest_entity(entity_type, {uri: canonical_uri, ...})
           → Entity

StorageAdapterRegistry

Location: canon/storage/registry.py

Discovers adapters via entry points and routes URIs to the correct adapter by scheme:

class StorageAdapterRegistry:
    def __init__(self) -> None:
        self._adapters: dict[str, StorageAdapter] = {}

    def load_from_entry_points(self) -> None:
        for ep in importlib.metadata.entry_points(group="canon.storage_adapters"):
            adapter_cls = ep.load()
            adapter = adapter_cls(...)  # instantiated from CanonConfig
            for scheme in adapter.uri_schemes:
                self._adapters[scheme] = adapter

    def adapter_for_uri(self, uri: str) -> StorageAdapter:
        scheme = uri.split("://")[0] if "://" in uri else ""
        try:
            return self._adapters[scheme]
        except KeyError:
            raise CanonConfigError(
                f"No storage adapter registered for URI scheme '{scheme}'. "
                f"Install a canon-storage-* package or check your canon.yaml output_storage.type."
            )

Config Schema (canon.yaml)

The output_storage section is extended to support adapter-specific config:

# Local filesystem (bundled default)
output_storage:
  type: local
  base_path: /data/outputs

# S3 (requires pip install canon-storage-s3)
output_storage:
  type: s3
  bucket: my-lab-canon-outputs
  prefix: outputs/
  credentials: instance_role   # "env" | "instance_role" | "profile:<name>"

# GCS (requires pip install canon-storage-gcs)
output_storage:
  type: gcs
  bucket: my-lab-canon-outputs
  prefix: outputs/
  credentials: application_default

The type field maps directly to the entry point name. Unknown types raise CanonConfigError at startup with a helpful message listing available adapters.


Error Handling and Atomicity

A storage failure mid-pipeline creates a consistency risk: outputs may be relocated but the Hippo entity record not written (or vice versa).

Protocol: 1. put() is called before ingest_entity(). 2. If put() fails → raise CanonStorageError; WorkflowRun is marked failed; no Hippo entity written; output file may or may not exist in storage depending on where the failure occurred. 3. If put() succeeds but ingest_entity() fails → output file exists in storage without a Hippo record. This is an orphan artifact. 4. Canon does not automatically clean up orphan artifacts (storage may be read-only, deletion may be unsafe). 5. WorkflowRun is marked failed with error_detail including the storage URI so operators can manually recover.

Future mitigation (v0.3): Two-phase commit pattern — write a PendingArtifact record to Hippo before put(), update to confirmed after ingest_entity() succeeds. Orphan detection via hippo query PendingArtifact status=pending created_before=<24h_ago>.


Implementation Plan (v0.2)

New files: - canon/storage/__init__.py - canon/storage/base.pyStorageAdapter ABC + CanonStorageError - canon/storage/local.pyLocalStorageAdapter (replaces current v0.1 stub) - canon/storage/registry.pyStorageAdapterRegistry

Modified files: - canon/ingestion/pipeline.py — replace type: local / type: s3 warn() with registry.adapter_for_uri(dest_uri).put() - canon/executors/base.py — add requires_local_staging and requires_output_relocation class vars - canon/executors/staging.py — update InputStagingLayer to accept StorageAdapter and call .get() - canon/config.py — extend output_storage to support adapter-specific extra fields - canon/resolver/planner.py — add storage_adapter.exists() pre-flight check on inputs - pyproject.toml — add canon.storage_adapters entry point group, register local

External packages (separate repos, community-contributed): - canon-storage-s3S3StorageAdapter (boto3) - canon-storage-gcsGCSStorageAdapter (google-cloud-storage) - canon-storage-osfOSFStorageAdapter (reuse dvc-osf client logic) - canon-storage-irodsiRODSStorageAdapter (python-irodsclient)

Test additions: - canon/tests/test_storage.py — unit tests for LocalStorageAdapter and StorageAdapterRegistry - tests/contracts/test_storage_adapter_contract.py — behavioral contract for StorageAdapter ABC - tests/platform/test_canon_platform.py — extend with storage adapter tests using LocalStorageAdapter