Documentation
¶
Overview ¶
Package storage implements the storage routing layer for AppView. It routes manifests to ATProto PDS (as io.atcr.manifest records) and blobs to hold services via XRPC, with database-based hold DID lookups. All storage operations are proxied - AppView stores nothing locally.
Index ¶
- Constants
- func EnsureCrewMembership(ctx context.Context, userDID string, defaultHoldDID string, ...)
- func EnsureProfile(ctx context.Context, client *atproto.Client, defaultHoldDID string) error
- func GetProfile(ctx context.Context, client *atproto.Client) (*atproto.SailorProfileRecord, error)
- func MigrateManifestsForSuccessor(ctx context.Context, client *atproto.Client, authorizer auth.HoldAuthorizer, ...)
- func SetRetryAfter(ctx context.Context, d time.Duration)
- func UpdateProfile(ctx context.Context, client *atproto.Client, ...) error
- type CompletedPart
- type HoldDIDLookup
- type ManifestReferenceChecker
- type ManifestStore
- func (s *ManifestStore) Delete(ctx context.Context, dgst digest.Digest) error
- func (s *ManifestStore) Exists(ctx context.Context, dgst digest.Digest) (bool, error)
- func (s *ManifestStore) Get(ctx context.Context, dgst digest.Digest, ...) (distribution.Manifest, error)
- func (s *ManifestStore) Put(ctx context.Context, manifest distribution.Manifest, ...) (digest.Digest, error)
- type PartUploadInfo
- type ProxyBlobStore
- func (p *ProxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error)
- func (p *ProxyBlobStore) Delete(ctx context.Context, dgst digest.Digest) error
- func (p *ProxyBlobStore) Get(ctx context.Context, dgst digest.Digest) ([]byte, error)
- func (p *ProxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error)
- func (p *ProxyBlobStore) Put(ctx context.Context, mediaType string, content []byte) (distribution.Descriptor, error)
- func (p *ProxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error)
- func (p *ProxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, ...) error
- func (p *ProxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error)
- type ProxyBlobWriter
- func (w *ProxyBlobWriter) Cancel(ctx context.Context) error
- func (w *ProxyBlobWriter) Close() error
- func (w *ProxyBlobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error)
- func (w *ProxyBlobWriter) ID() string
- func (w *ProxyBlobWriter) ReadFrom(r io.Reader) (int64, error)
- func (w *ProxyBlobWriter) Size() int64
- func (w *ProxyBlobWriter) StartedAt() time.Time
- func (w *ProxyBlobWriter) Write(p []byte) (int, error)
- type PushWebhookDispatcher
- type PushWebhookEvent
- type RegistryContext
- type RetryAfterCarrier
- type RoutingRepository
- type ServiceTokenFetcher
- type TagStore
- func (s *TagStore) All(ctx context.Context) ([]string, error)
- func (s *TagStore) Get(ctx context.Context, tag string) (distribution.Descriptor, error)
- func (s *TagStore) List(ctx context.Context, limit int, last string) ([]string, error)
- func (s *TagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([]string, error)
- func (s *TagStore) Tag(ctx context.Context, tag string, desc distribution.Descriptor) error
- func (s *TagStore) Untag(ctx context.Context, tag string) error
Constants ¶
const HTTPRequestMethod contextKey = "http.request.method"
const ProfileRKey = "self"
ProfileRKey is always "self" per lexicon
const RetryAfterContextKey contextKey = "atcr.retry-after"
Variables ¶
This section is empty.
Functions ¶
func EnsureCrewMembership ¶
func EnsureCrewMembership( ctx context.Context, userDID string, defaultHoldDID string, authorizer auth.HoldAuthorizer, fetchServiceToken ServiceTokenFetcher, )
EnsureCrewMembership attempts to register the user as a crew member on their default hold. The hold's requestCrew endpoint handles all authorization logic (checking allowAllCrew, existing membership, etc). On success, warms the approval cache and clears any cached denial. Best-effort: logs and returns on any error.
fetchServiceToken is invoked only on cache miss. Pass nil to skip when no auth path is available (callers that just want the cache short-circuit behavior).
func EnsureProfile ¶
EnsureProfile checks if a user's profile exists and creates it if needed. If the profile already exists, missing fields are reconciled against the AppView's defaults so downstream logic (e.g. successor migration) has a concrete defaultHold to anchor on instead of relying on the empty fallback. Currently only defaultHold is reconciled; other zero-valued fields have their own runtime defaults and shouldn't be written unprompted. This should be called during authentication (OAuth exchange or token service). Expected format for defaultHoldDID: "did:web:hold01.atcr.io"
func GetProfile ¶
GetProfile retrieves the user's profile from their PDS Returns nil if profile doesn't exist Automatically migrates old URL-based defaultHold values to DIDs
func MigrateManifestsForSuccessor ¶
func MigrateManifestsForSuccessor( ctx context.Context, client *atproto.Client, authorizer auth.HoldAuthorizer, db HoldDIDLookup, did string, )
MigrateManifestsForSuccessor rewrites manifest records and profile when any of the user's holds have a successor. Best-effort, runs in background.
Steps: 1. Get user's sailor profile 2. Collect candidate holds: profile.DefaultHold + distinct holds from manifests DB 3. For each hold with a successor: rewrite PDS manifest records and local DB 4. If profile.DefaultHold itself had a successor, update the profile too
func SetRetryAfter ¶ added in v0.1.3
SetRetryAfter is a convenience helper for handlers that have a context but not a direct carrier reference.
func UpdateProfile ¶
func UpdateProfile(ctx context.Context, client *atproto.Client, profile *atproto.SailorProfileRecord) error
UpdateProfile updates the user's profile Normalizes defaultHold to DID format before saving
Types ¶
type CompletedPart ¶
CompletedPart represents an uploaded part with its ETag
type HoldDIDLookup ¶
type HoldDIDLookup interface {
GetLatestHoldDIDForRepo(did, repository string) (string, error)
UpdateManifestHoldDID(did, oldHoldDID, newHoldDID string) (int64, error)
GetDistinctManifestHoldDIDs(did string) ([]string, error)
}
HoldDIDLookup interface for querying and updating hold DIDs in manifests
type ManifestReferenceChecker ¶
ManifestReferenceChecker checks if a manifest digest is referenced as a child of a manifest list (multi-arch image). Used to protect manifest list children from auto-removal when their parent list is still tagged.
type ManifestStore ¶
type ManifestStore struct {
// contains filtered or unexported fields
}
ManifestStore implements distribution.ManifestService It stores manifests in ATProto as records
func NewManifestStore ¶
func NewManifestStore(ctx *RegistryContext, blobStore distribution.BlobStore) *ManifestStore
NewManifestStore creates a new ATProto-backed manifest store
func (*ManifestStore) Get ¶
func (s *ManifestStore) Get(ctx context.Context, dgst digest.Digest, options ...distribution.ManifestServiceOption) (distribution.Manifest, error)
Get retrieves a manifest by digest
func (*ManifestStore) Put ¶
func (s *ManifestStore) Put(ctx context.Context, manifest distribution.Manifest, options ...distribution.ManifestServiceOption) (digest.Digest, error)
Put stores a manifest
type PartUploadInfo ¶
type PartUploadInfo struct {
URL string `json:"url"` // Presigned URL to PUT the part to
}
PartUploadInfo contains the presigned URL for uploading a part
type ProxyBlobStore ¶
type ProxyBlobStore struct {
// contains filtered or unexported fields
}
ProxyBlobStore proxies blob requests to an external storage service
func NewProxyBlobStore ¶
func NewProxyBlobStore(ctx *RegistryContext) *ProxyBlobStore
NewProxyBlobStore creates a new proxy blob store
func (*ProxyBlobStore) Create ¶
func (p *ProxyBlobStore) Create(ctx context.Context, options ...distribution.BlobCreateOption) (distribution.BlobWriter, error)
Create returns a blob writer for uploading using multipart upload
func (*ProxyBlobStore) Open ¶
func (p *ProxyBlobStore) Open(ctx context.Context, dgst digest.Digest) (io.ReadSeekCloser, error)
Open returns a reader for a blob
func (*ProxyBlobStore) Put ¶
func (p *ProxyBlobStore) Put(ctx context.Context, mediaType string, content []byte) (distribution.Descriptor, error)
Put stores a blob using the multipart upload flow This ensures all uploads go through the same XRPC path
func (*ProxyBlobStore) Resume ¶
func (p *ProxyBlobStore) Resume(ctx context.Context, id string) (distribution.BlobWriter, error)
Resume returns a blob writer for resuming an upload
func (*ProxyBlobStore) ServeBlob ¶
func (p *ProxyBlobStore) ServeBlob(ctx context.Context, w http.ResponseWriter, r *http.Request, dgst digest.Digest) error
ServeBlob serves a blob via HTTP redirect or proxied response
func (*ProxyBlobStore) Stat ¶
func (p *ProxyBlobStore) Stat(ctx context.Context, dgst digest.Digest) (distribution.Descriptor, error)
Stat returns the descriptor for a blob
type ProxyBlobWriter ¶
type ProxyBlobWriter struct {
// contains filtered or unexported fields
}
ProxyBlobWriter implements distribution.BlobWriter for proxy uploads using multipart upload
func (*ProxyBlobWriter) Cancel ¶
func (w *ProxyBlobWriter) Cancel(ctx context.Context) error
Cancel cancels the upload by aborting the multipart upload
func (*ProxyBlobWriter) Close ¶
func (w *ProxyBlobWriter) Close() error
Close closes the writer Parts are flushed on demand, so this is a no-op
func (*ProxyBlobWriter) Commit ¶
func (w *ProxyBlobWriter) Commit(ctx context.Context, desc distribution.Descriptor) (distribution.Descriptor, error)
Commit finalizes the upload by completing multipart upload and moving to final location
func (*ProxyBlobWriter) ReadFrom ¶
func (w *ProxyBlobWriter) ReadFrom(r io.Reader) (int64, error)
ReadFrom reads from a reader
func (*ProxyBlobWriter) StartedAt ¶
func (w *ProxyBlobWriter) StartedAt() time.Time
StartedAt returns when the upload started
type PushWebhookDispatcher ¶
type PushWebhookDispatcher interface {
DispatchForPush(ctx context.Context, event PushWebhookEvent)
}
PushWebhookDispatcher dispatches push event webhooks. Defined here (in storage) to avoid import cycles with the webhooks package.
type PushWebhookEvent ¶
type PushWebhookEvent struct {
OwnerDID string
OwnerHandle string
PusherDID string
PusherHandle string
Repository string
Tag string
Digest string
MediaType string
HoldDID string
HoldEndpoint string
}
PushWebhookEvent contains the data needed to dispatch a push webhook.
type RegistryContext ¶
type RegistryContext struct {
// Per-request identity and routing information
// Owner = the user whose repository is being accessed
// Puller = the authenticated user making the request (from JWT Subject)
DID string // Owner's DID - whose repo is being accessed (e.g., "did:plc:abc123")
Handle string // Owner's handle (e.g., "alice.bsky.social")
HoldDID string // Hold service DID (e.g., "did:web:hold01.atcr.io" or "did:plc:abc123")
HoldURL string // Resolved HTTP URL for the hold service
PDSEndpoint string // Owner's PDS endpoint URL
Repository string // Image repository name (e.g., "debian")
ServiceToken string // Service token for hold authentication (from puller's PDS)
ATProtoClient *atproto.Client // Authenticated ATProto client for the owner
AuthMethod string // Auth method used ("oauth" or "app_password")
PullerDID string // Puller's DID - who is making the request (from JWT Subject)
PullerPDSEndpoint string // Puller's PDS endpoint URL
HasPushScope bool // Whether the JWT token has push scope (used to filter pull stats)
// Per-request user preferences
AutoRemoveUntagged bool // Whether to auto-delete untagged manifests on tag overwrite
// Shared services (same for all requests)
Database HoldDIDLookup // Database for hold DID lookups
Authorizer auth.HoldAuthorizer // Hold access authorization
Refresher *oauth.Refresher // OAuth session manager
ReadmeFetcher *readme.Fetcher // README fetcher for repo pages
WebhookDispatcher PushWebhookDispatcher // Push webhook dispatcher (nil if not configured)
ManifestRefChecker ManifestReferenceChecker // Checks if digest is a manifest list child (nil-safe)
}
RegistryContext bundles all the context needed for registry operations This includes both per-request data (DID, hold) and shared services
type RetryAfterCarrier ¶ added in v0.1.3
type RetryAfterCarrier struct {
// contains filtered or unexported fields
}
RetryAfterCarrier is a request-scoped, mutable container for a Retry-After hint emitted by storage handlers (e.g., when an upstream PDS returns 429). HTTP middleware injects an empty carrier into the request context; deep handlers populate it via SetRetryAfter when they convert a rate-limit error into a 429 response. The middleware then reads it back to set the Retry-After response header.
func NewRetryAfterCarrier ¶ added in v0.1.3
func NewRetryAfterCarrier() *RetryAfterCarrier
NewRetryAfterCarrier returns an empty carrier ready to be stored in context.
func (*RetryAfterCarrier) Duration ¶ added in v0.1.3
func (c *RetryAfterCarrier) Duration() time.Duration
Duration returns the recorded retry-after value, or 0 if none was set.
func (*RetryAfterCarrier) Set ¶ added in v0.1.3
func (c *RetryAfterCarrier) Set(d time.Duration)
Set records a retry-after hint. Largest value wins (a later, longer throttle window in a multi-write request shouldn't be clobbered by a shorter one).
type RoutingRepository ¶
type RoutingRepository struct {
distribution.Repository
Ctx *RegistryContext // All context and services (exported for token updates)
// contains filtered or unexported fields
}
RoutingRepository routes manifests to ATProto and blobs to external hold service The registry (AppView) is stateless and NEVER stores blobs locally NOTE: A fresh instance is created per-request (see middleware/registry.go)
func NewRoutingRepository ¶
func NewRoutingRepository(baseRepo distribution.Repository, ctx *RegistryContext) *RoutingRepository
NewRoutingRepository creates a new routing repository
func (*RoutingRepository) Blobs ¶
func (r *RoutingRepository) Blobs(ctx context.Context) distribution.BlobStore
Blobs returns a proxy blob store that routes to external hold service The registry (AppView) NEVER stores blobs locally - all blobs go through hold service
func (*RoutingRepository) Manifests ¶
func (r *RoutingRepository) Manifests(ctx context.Context, options ...distribution.ManifestServiceOption) (distribution.ManifestService, error)
Manifests returns the ATProto-backed manifest service
func (*RoutingRepository) Tags ¶
func (r *RoutingRepository) Tags(ctx context.Context) distribution.TagService
Tags returns the tag service Tags are stored in ATProto as io.atcr.tag records
type ServiceTokenFetcher ¶ added in v0.1.3
ServiceTokenFetcher returns a hold service token for the resolved holdDID. Implementations wire to the appropriate auth path (OAuth refresher or app-password access token).
type TagStore ¶
type TagStore struct {
// contains filtered or unexported fields
}
TagStore implements distribution.TagService It stores tags in ATProto as records
func NewTagStore ¶
NewTagStore creates a new ATProto-backed tag store
func (*TagStore) Get ¶
func (s *TagStore) Get(ctx context.Context, tag string) (distribution.Descriptor, error)
Get retrieves the descriptor for a tag
func (*TagStore) List ¶ added in v0.1.3
List returns tags after `last` up to `limit` (pagination over All).
func (*TagStore) Lookup ¶
func (s *TagStore) Lookup(ctx context.Context, desc distribution.Descriptor) ([]string, error)
Lookup returns the set of tags for a given digest
func (*TagStore) Tag ¶
func (s *TagStore) Tag(ctx context.Context, tag string, desc distribution.Descriptor) error
Tag associates a tag with a descriptor (manifest digest)