storage

package
v0.1.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 9, 2026 License: MIT Imports: 19 Imported by: 0

Documentation

Overview

Package storage implements the storage routing layer for AppView. It routes manifests to ATProto PDS (as io.atcr.manifest records) and blobs to hold services via XRPC, with database-based hold DID lookups. All storage operations are proxied - AppView stores nothing locally.

Index

Constants

View Source
const HTTPRequestMethod contextKey = "http.request.method"
View Source
const ProfileRKey = "self"

ProfileRKey is always "self" per lexicon

View Source
const RetryAfterContextKey contextKey = "atcr.retry-after"

Variables

This section is empty.

Functions

func EnsureCrewMembership

func EnsureCrewMembership(
	ctx context.Context,
	userDID string,
	defaultHoldDID string,
	authorizer auth.HoldAuthorizer,
	fetchServiceToken ServiceTokenFetcher,
)

EnsureCrewMembership attempts to register the user as a crew member on their default hold. The hold's requestCrew endpoint handles all authorization logic (checking allowAllCrew, existing membership, etc). On success, warms the approval cache and clears any cached denial. Best-effort: logs and returns on any error.

fetchServiceToken is invoked only on cache miss. Pass nil to skip when no auth path is available (callers that just want the cache short-circuit behavior).

func EnsureProfile

func EnsureProfile(ctx context.Context, client *atproto.Client, defaultHoldDID string) error

EnsureProfile checks if a user's profile exists and creates it if needed. If the profile already exists, missing fields are reconciled against the AppView's defaults so downstream logic (e.g. successor migration) has a concrete defaultHold to anchor on instead of relying on the empty fallback. Currently only defaultHold is reconciled; other zero-valued fields have their own runtime defaults and shouldn't be written unprompted. This should be called during authentication (OAuth exchange or token service). Expected format for defaultHoldDID: "did:web:hold01.atcr.io"

func GetProfile

func GetProfile(ctx context.Context, client *atproto.Client) (*atproto.SailorProfileRecord, error)

GetProfile retrieves the user's profile from their PDS Returns nil if profile doesn't exist Automatically migrates old URL-based defaultHold values to DIDs

func MigrateManifestsForSuccessor

func MigrateManifestsForSuccessor(
	ctx context.Context,
	client *atproto.Client,
	authorizer auth.HoldAuthorizer,
	db HoldDIDLookup,
	did string,
)

MigrateManifestsForSuccessor rewrites manifest records and profile when any of the user's holds have a successor. Best-effort, runs in background.

Steps: 1. Get user's sailor profile 2. Collect candidate holds: profile.DefaultHold + distinct holds from manifests DB 3. For each hold with a successor: rewrite PDS manifest records and local DB 4. If profile.DefaultHold itself had a successor, update the profile too

func SetRetryAfter added in v0.1.3

func SetRetryAfter(ctx context.Context, d time.Duration)

SetRetryAfter is a convenience helper for handlers that have a context but not a direct carrier reference.

func UpdateProfile

func UpdateProfile(ctx context.Context, client *atproto.Client, profile *atproto.SailorProfileRecord) error

UpdateProfile updates the user's profile Normalizes defaultHold to DID format before saving

Types

type CompletedPart

type CompletedPart struct {
	PartNumber int    `json:"part_number"`
	ETag       string `json:"etag"`
}

CompletedPart represents an uploaded part with its ETag

type HoldDIDLookup

type HoldDIDLookup interface {
	GetLatestHoldDIDForRepo(did, repository string) (string, error)
	UpdateManifestHoldDID(did, oldHoldDID, newHoldDID string) (int64, error)
	GetDistinctManifestHoldDIDs(did string) ([]string, error)
}

HoldDIDLookup interface for querying and updating hold DIDs in manifests

type ManifestReferenceChecker

type ManifestReferenceChecker interface {
	IsManifestReferenced(did, digest string) (bool, error)
}

ManifestReferenceChecker checks if a manifest digest is referenced as a child of a manifest list (multi-arch image). Used to protect manifest list children from auto-removal when their parent list is still tagged.

type ManifestStore

type ManifestStore struct {
	// contains filtered or unexported fields
}

ManifestStore implements distribution.ManifestService It stores manifests in ATProto as records

func NewManifestStore

func NewManifestStore(ctx *RegistryContext, blobStore distribution.BlobStore) *ManifestStore

NewManifestStore creates a new ATProto-backed manifest store

func (*ManifestStore) Delete

func (s *ManifestStore) Delete(ctx context.Context, dgst digest.Digest) error

Delete removes a manifest

func (*ManifestStore) Exists

func (s *ManifestStore) Exists(ctx context.Context, dgst digest.Digest) (bool, error)

Exists checks if a manifest exists by digest

func (*ManifestStore) Get

Get retrieves a manifest by digest

func (*ManifestStore) Put

Put stores a manifest

type PartUploadInfo

type PartUploadInfo struct {
	URL string `json:"url"` // Presigned URL to PUT the part to
}

PartUploadInfo contains the presigned URL for uploading a part

type ProxyBlobStore

type ProxyBlobStore struct {
	// contains filtered or unexported fields
}

ProxyBlobStore proxies blob requests to an external storage service

func NewProxyBlobStore

func NewProxyBlobStore(ctx *RegistryContext) *ProxyBlobStore

NewProxyBlobStore creates a new proxy blob store

func (*ProxyBlobStore) Create

Create returns a blob writer for uploading using multipart upload

func (*ProxyBlobStore) Delete

func (p *ProxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error

Delete removes a blob

func (*ProxyBlobStore) Get

func (p *ProxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error)

Get retrieves a blob

func (*ProxyBlobStore) Open

Open returns a reader for a blob

func (*ProxyBlobStore) Put

func (p *ProxyBlobStore) Put(ctx context.Context, mediaType string, content []byte) (distribution.Descriptor, error)

Put stores a blob using the multipart upload flow This ensures all uploads go through the same XRPC path

func (*ProxyBlobStore) Resume

Resume returns a blob writer for resuming an upload

func (*ProxyBlobStore) ServeBlob

ServeBlob serves a blob via HTTP redirect or proxied response

func (*ProxyBlobStore) Stat

Stat returns the descriptor for a blob

type ProxyBlobWriter

type ProxyBlobWriter struct {
	// contains filtered or unexported fields
}

ProxyBlobWriter implements distribution.BlobWriter for proxy uploads using multipart upload

func (*ProxyBlobWriter) Cancel

func (w *ProxyBlobWriter) Cancel(ctx context.Context) error

Cancel cancels the upload by aborting the multipart upload

func (*ProxyBlobWriter) Close

func (w *ProxyBlobWriter) Close() error

Close closes the writer Parts are flushed on demand, so this is a no-op

func (*ProxyBlobWriter) Commit

Commit finalizes the upload by completing multipart upload and moving to final location

func (*ProxyBlobWriter) ID

func (w *ProxyBlobWriter) ID() string

ID returns the upload ID

func (*ProxyBlobWriter) ReadFrom

func (w *ProxyBlobWriter) ReadFrom(r io.Reader) (int64, error)

ReadFrom reads from a reader

func (*ProxyBlobWriter) Size

func (w *ProxyBlobWriter) Size() int64

Size returns the current size

func (*ProxyBlobWriter) StartedAt

func (w *ProxyBlobWriter) StartedAt() time.Time

StartedAt returns when the upload started

func (*ProxyBlobWriter) Write

func (w *ProxyBlobWriter) Write(p []byte) (int, error)

Write writes data to the upload Buffers data and flushes when buffer reaches 5MB

type PushWebhookDispatcher

type PushWebhookDispatcher interface {
	DispatchForPush(ctx context.Context, event PushWebhookEvent)
}

PushWebhookDispatcher dispatches push event webhooks. Defined here (in storage) to avoid import cycles with the webhooks package.

type PushWebhookEvent

type PushWebhookEvent struct {
	OwnerDID     string
	OwnerHandle  string
	PusherDID    string
	PusherHandle string
	Repository   string
	Tag          string
	Digest       string
	MediaType    string
	HoldDID      string
	HoldEndpoint string
}

PushWebhookEvent contains the data needed to dispatch a push webhook.

type RegistryContext

type RegistryContext struct {
	// Per-request identity and routing information
	// Owner = the user whose repository is being accessed
	// Puller = the authenticated user making the request (from JWT Subject)
	DID               string          // Owner's DID - whose repo is being accessed (e.g., "did:plc:abc123")
	Handle            string          // Owner's handle (e.g., "alice.bsky.social")
	HoldDID           string          // Hold service DID (e.g., "did:web:hold01.atcr.io" or "did:plc:abc123")
	HoldURL           string          // Resolved HTTP URL for the hold service
	PDSEndpoint       string          // Owner's PDS endpoint URL
	Repository        string          // Image repository name (e.g., "debian")
	ServiceToken      string          // Service token for hold authentication (from puller's PDS)
	ATProtoClient     *atproto.Client // Authenticated ATProto client for the owner
	AuthMethod        string          // Auth method used ("oauth" or "app_password")
	PullerDID         string          // Puller's DID - who is making the request (from JWT Subject)
	PullerPDSEndpoint string          // Puller's PDS endpoint URL
	HasPushScope      bool            // Whether the JWT token has push scope (used to filter pull stats)

	// Per-request user preferences
	AutoRemoveUntagged bool // Whether to auto-delete untagged manifests on tag overwrite

	// Shared services (same for all requests)
	Database           HoldDIDLookup            // Database for hold DID lookups
	Authorizer         auth.HoldAuthorizer      // Hold access authorization
	Refresher          *oauth.Refresher         // OAuth session manager
	ReadmeFetcher      *readme.Fetcher          // README fetcher for repo pages
	WebhookDispatcher  PushWebhookDispatcher    // Push webhook dispatcher (nil if not configured)
	ManifestRefChecker ManifestReferenceChecker // Checks if digest is a manifest list child (nil-safe)
}

RegistryContext bundles all the context needed for registry operations This includes both per-request data (DID, hold) and shared services

type RetryAfterCarrier added in v0.1.3

type RetryAfterCarrier struct {
	// contains filtered or unexported fields
}

RetryAfterCarrier is a request-scoped, mutable container for a Retry-After hint emitted by storage handlers (e.g., when an upstream PDS returns 429). HTTP middleware injects an empty carrier into the request context; deep handlers populate it via SetRetryAfter when they convert a rate-limit error into a 429 response. The middleware then reads it back to set the Retry-After response header.

func NewRetryAfterCarrier added in v0.1.3

func NewRetryAfterCarrier() *RetryAfterCarrier

NewRetryAfterCarrier returns an empty carrier ready to be stored in context.

func (*RetryAfterCarrier) Duration added in v0.1.3

func (c *RetryAfterCarrier) Duration() time.Duration

Duration returns the recorded retry-after value, or 0 if none was set.

func (*RetryAfterCarrier) Set added in v0.1.3

func (c *RetryAfterCarrier) Set(d time.Duration)

Set records a retry-after hint. Largest value wins (a later, longer throttle window in a multi-write request shouldn't be clobbered by a shorter one).

type RoutingRepository

type RoutingRepository struct {
	distribution.Repository
	Ctx *RegistryContext // All context and services (exported for token updates)
	// contains filtered or unexported fields
}

RoutingRepository routes manifests to ATProto and blobs to external hold service The registry (AppView) is stateless and NEVER stores blobs locally NOTE: A fresh instance is created per-request (see middleware/registry.go)

func NewRoutingRepository

func NewRoutingRepository(baseRepo distribution.Repository, ctx *RegistryContext) *RoutingRepository

NewRoutingRepository creates a new routing repository

func (*RoutingRepository) Blobs

Blobs returns a proxy blob store that routes to external hold service The registry (AppView) NEVER stores blobs locally - all blobs go through hold service

func (*RoutingRepository) Manifests

Manifests returns the ATProto-backed manifest service

func (*RoutingRepository) Tags

Tags returns the tag service Tags are stored in ATProto as io.atcr.tag records

type ServiceTokenFetcher added in v0.1.3

type ServiceTokenFetcher func(ctx context.Context, holdDID string) (string, error)

ServiceTokenFetcher returns a hold service token for the resolved holdDID. Implementations wire to the appropriate auth path (OAuth refresher or app-password access token).

type TagStore

type TagStore struct {
	// contains filtered or unexported fields
}

TagStore implements distribution.TagService It stores tags in ATProto as records

func NewTagStore

func NewTagStore(client *atproto.Client, repository string) *TagStore

NewTagStore creates a new ATProto-backed tag store

func (*TagStore) All

func (s *TagStore) All(ctx context.Context) ([]string, error)

All returns all tags for this repository

func (*TagStore) Get

Get retrieves the descriptor for a tag

func (*TagStore) List added in v0.1.3

func (s *TagStore) List(ctx context.Context, limit int, last string) ([]string, error)

List returns tags after `last` up to `limit` (pagination over All).

func (*TagStore) Lookup

func (s *TagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([]string, error)

Lookup returns the set of tags for a given digest

func (*TagStore) Tag

func (s *TagStore) Tag(ctx context.Context, tag string, desc distribution.Descriptor) error

Tag associates a tag with a descriptor (manifest digest)

func (*TagStore) Untag

func (s *TagStore) Untag(ctx context.Context, tag string) error

Untag removes a tag

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL