pdp

package
v0.2.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 8, 2026 License: Apache-2.0 Imports: 23 Imported by: 0

Documentation

Overview

Package pdp provides a Curio-compatible PDP provider HTTP client.

PDP endpoints are unauthenticated at the HTTP layer. Authorization for state-changing calls is carried inside the request body as caller-provided EIP-712 signed extraData.

Retry policy

Control-plane GET requests (getJSON and poll endpoints) retry on transient errors — HTTP 5xx (except 501), 429, connection resets, DNS temporaries, unexpected EOF, and request timeouts — with exponential backoff up to MaxRetries.

Streaming piece downloads (DownloadPiece) are executed once with the caller's context as the sole lifetime control; they do not go through the automatic retry loop.

POST and DELETE requests are executed exactly once. These verbs may mutate server state, and a client-side retry after a server-side partial success can cause duplicate work or inconsistent state. Callers that need retry behavior for POST/DELETE must implement it at the business layer with appropriate deduplication (e.g. by polling the resulting resource via a GET endpoint before retrying).

Response size cap

Control-plane JSON responses are capped at MaxControlResponseBytes (16 MiB). Anything larger is treated as a server bug or attack and fails the request. Streaming endpoints (piece download) are not subject to this cap.

Endpoints covered:

  • GET /piece/{pieceCid} (download bytes)
  • GET /pdp/ping
  • POST /pdp/piece/uploads (create upload)
  • PUT /pdp/piece/uploads/{uploadUUID} (upload bytes)
  • POST /pdp/piece/uploads/{uploadUUID} (finalize upload)
  • GET /pdp/piece?pieceCid=... (find)
  • POST /pdp/piece/pull (pull pieces)
  • POST /pdp/data-sets (create)
  • GET /pdp/data-sets/created/{txHash} (poll create)
  • GET /pdp/data-sets/{id} (read)
  • POST /pdp/data-sets/{id}/pieces (add pieces)
  • GET /pdp/data-sets/{id}/pieces/added/{txHash} (poll add)
  • DELETE /pdp/data-sets/{id}/pieces/{pieceId} (schedule remove)

Index

Constants

View Source
const DefaultHTTPTimeout = 30 * time.Second

DefaultHTTPTimeout applies to simple JSON operations. Streaming uploads use a separate, longer timeout (or none at all) negotiated per call.

View Source
const DefaultMaxRetries = 3

DefaultMaxRetries is the number of retries attempted for transient failures (5xx, 429, network errors) when no explicit value is set.

View Source
const DefaultUserAgent = "synapse-go/pdp"

DefaultUserAgent is set on every outgoing request unless overridden.

View Source
const MaxControlResponseBytes = 16 << 20 // 16 MiB

MaxControlResponseBytes caps the size of control-plane JSON response bodies read fully into memory. PDP provider control-plane endpoints return small JSON documents (dataset status, piece IDs, etc.); anything larger indicates a buggy or hostile server and should not be allowed to exhaust client memory. Streaming endpoints (piece download) bypass this limit.

Variables

View Source
var ErrLocationHeader = errors.New("pdp: missing or invalid Location header")

ErrLocationHeader is returned when the server responds successfully but the expected Location header is missing or malformed.

View Source
var ErrPieceNotFound = errors.New("pdp: piece not found")

ErrPieceNotFound is returned when GET /pdp/piece returns 404.

View Source
var ErrPieceProcessing = errors.New("pdp: piece still processing")

ErrPieceProcessing is returned when GET /pdp/piece returns 202, meaning the piece is known but not yet parked and queryable.

View Source
var ErrPullFailed = errors.New("pdp: pull failed")

ErrPullFailed is returned by WaitForPullComplete when the server reports that the overall pull status is "failed".

View Source
var ErrStillPending = errors.New("pdp: still pending")

ErrStillPending is returned by status-polling helpers when the server reports the transaction is still pending. It is the sentinel callers should loop on while waiting.

View Source
var ErrTxRejected = errors.New("pdp: transaction rejected")

ErrTxRejected is returned when an on-chain operation posted by the SP was rejected by the network.

Functions

This section is empty.

Types

type AddPieceInput

type AddPieceInput struct {
	PieceCID cid.Cid
}

AddPieceInput mirrors one entry of the pieces array for POST /pdp/data-sets/{id}/pieces. The wire format uses the piece CID as its own single sub-piece.

type AddPiecesResult

type AddPiecesResult struct {
	TxHash    common.Hash
	StatusURL string
}

AddPiecesResult is what the client gets back from a successful POST.

type AddPiecesStatus

type AddPiecesStatus struct {
	TxHash            common.Hash    `json:"-"`
	TxStatus          string         `json:"txStatus"` // pending | confirmed | rejected
	DataSetID         types.BigInt   `json:"-"`
	PieceCount        int            `json:"pieceCount"`
	AddMessageOK      *bool          `json:"addMessageOk"`
	PiecesAdded       bool           `json:"piecesAdded"`
	ConfirmedPieceIDs []types.BigInt `json:"-"`
}

AddPiecesStatus mirrors GET /pdp/data-sets/{id}/pieces/added/{txHash}. TxStatus reports pending, confirmed, or rejected.

type Client

type Client struct {
	// contains filtered or unexported fields
}

Client is a thin HTTP client for a single PDP provider service URL. Safe for concurrent use.

func New

func New(serviceURL string, opts ...Option) (*Client, error)

New constructs a Client pointing at the given service URL (e.g. https://pdp.example.com).

func (*Client) AddPieces

func (c *Client) AddPieces(ctx context.Context, dataSetID types.BigInt, pieces []AddPieceInput, extraData []byte) (*AddPiecesResult, error)

AddPieces calls POST /pdp/data-sets/{dataSetId}/pieces. extraData must be caller-provided EIP-712 signed data encoded as the PDP provider expects.

func (*Client) BaseURL

func (c *Client) BaseURL() *url.URL

BaseURL returns a copy of the configured base URL.

func (*Client) CreateDataSet

func (c *Client) CreateDataSet(ctx context.Context, recordKeeper common.Address, extraData []byte) (*CreateDataSetResult, error)

CreateDataSet calls POST /pdp/data-sets. extraData must be an EIP-712 signed blob encoded as the PDP provider expects.

func (*Client) CreateDataSetAndAddPieces

func (c *Client) CreateDataSetAndAddPieces(
	ctx context.Context,
	recordKeeper common.Address,
	pieces []AddPieceInput,
	extraData []byte,
) (*CreateDataSetResult, error)

CreateDataSetAndAddPieces calls POST /pdp/data-sets/create-and-add.

The provider creates a dataset and immediately submits the add-pieces transaction using the caller-provided EIP-712 signed extraData for the combined create+add flow.

func (*Client) DownloadPiece

func (c *Client) DownloadPiece(ctx context.Context, pieceCID cid.Cid) (io.ReadCloser, int64, error)

DownloadPiece calls GET /piece/{pieceCid} and returns a streaming io.ReadCloser plus the Content-Length (-1 when unknown). The caller must close the reader when done.

The download bypasses the client's default JSON timeout so large pieces are not cut off; callers can enforce a deadline via the context.

func (*Client) FindPiece

func (c *Client) FindPiece(ctx context.Context, pieceCID cid.Cid) (*FindPieceResult, error)

FindPiece calls GET /pdp/piece?pieceCid=.... It returns ErrPieceNotFound for HTTP 404 and ErrPieceProcessing for HTTP 202 while the SP is still parking the piece.

func (*Client) GetAddPiecesStatus

func (c *Client) GetAddPiecesStatus(ctx context.Context, statusURL string) (*AddPiecesStatus, error)

GetAddPiecesStatus polls the status URL once.

func (*Client) GetDataSet

func (c *Client) GetDataSet(ctx context.Context, dataSetID types.BigInt) (*DataSet, error)

GetDataSet calls GET /pdp/data-sets/{dataSetId}.

func (*Client) GetDataSetCreationStatus

func (c *Client) GetDataSetCreationStatus(ctx context.Context, statusURL string) (*CreateDataSetStatus, error)

GetDataSetCreationStatus polls the status URL once.

func (*Client) Ping

func (c *Client) Ping(ctx context.Context) error

Ping issues GET /pdp/ping to verify the service is reachable.

func (*Client) PullPieces

func (c *Client) PullPieces(ctx context.Context, req PullRequest) (*PullResult, error)

PullPieces calls POST /pdp/piece/pull to request that this SP pull the given pieces from the source URLs.

The endpoint is idempotent: calling again with the same extraData returns the status of the existing pull request rather than creating a duplicate. This makes it safe to poll for status using repeated calls.

func (*Client) SchedulePieceDeletion

func (c *Client) SchedulePieceDeletion(ctx context.Context, dataSetID, pieceID types.BigInt, extraData []byte) (common.Hash, error)

SchedulePieceDeletion issues DELETE /pdp/data-sets/{id}/pieces/{pieceId} with the provided EIP-712 signed extraData.

func (*Client) UploadPieceStreaming

func (c *Client) UploadPieceStreaming(
	ctx context.Context,
	data io.Reader,
	opts UploadPieceStreamingOptions,
) (*UploadStreamingResult, error)

UploadPieceStreaming uploads a piece using the CommP-last 3-step streaming protocol. This is the preferred upload path: data is streamed to the server in a single pass while the PieceCID is computed inline (either by the client via TeeReader or by the caller in advance).

Protocol:

  1. POST /pdp/piece/uploads — create upload session, get UUID
  2. PUT /pdp/piece/uploads/{uuid} — stream body; commP is computed in this pass (unless opts.PieceCID is pre-filled)
  3. POST /pdp/piece/uploads/{uuid} — finalize with the PieceCID; server validates that the uploaded bytes hash to the same value

The PUT clears the default Client timeout so large transfers are not capped at DefaultHTTPTimeout. Callers needing stricter limits should pass a context deadline or use a custom HTTP client.

func (*Client) WaitForCreateDataSetAndAddPieces

func (c *Client) WaitForCreateDataSetAndAddPieces(
	ctx context.Context,
	statusURL string,
	pollInterval time.Duration,
) (*AddPiecesStatus, error)

WaitForCreateDataSetAndAddPieces first waits for dataset creation to confirm, then polls the add-pieces status endpoint for the same transaction hash.

func (*Client) WaitForDataSetCreated

func (c *Client) WaitForDataSetCreated(ctx context.Context, statusURL string, pollInterval time.Duration) (*CreateDataSetStatus, error)

WaitForDataSetCreated polls until the server reports txStatus=confirmed with dataSetCreated=true (success) or txStatus=rejected (ErrTxRejected). Transport errors, including HTTP 404s from the status URL, are returned immediately rather than retried. pollInterval defaults to 4 seconds.

func (*Client) WaitForPieceParked

func (c *Client) WaitForPieceParked(ctx context.Context, pieceCID cid.Cid, pollInterval time.Duration) error

WaitForPieceParked polls FindPiece until the piece is found or the context is cancelled / timeout is reached. A zero pollInterval defaults to one second.

func (*Client) WaitForPiecesAdded

func (c *Client) WaitForPiecesAdded(ctx context.Context, statusURL string, pollInterval time.Duration) (*AddPiecesStatus, error)

WaitForPiecesAdded polls until the add-pieces tx is confirmed with piecesAdded=true or rejected. Transport errors, including HTTP 404s from the status URL, are returned immediately rather than retried. pollInterval defaults to 4s.

func (*Client) WaitForPullComplete

func (c *Client) WaitForPullComplete(
	ctx context.Context,
	req PullRequest,
	pollInterval time.Duration,
	onStatus func(*PullResult),
) (*PullResult, error)

WaitForPullComplete polls PullPieces until the overall pull status is "complete" or "failed". On failure it returns (result, ErrPullFailed) so callers can inspect the per-piece statuses.

onStatus is invoked after each poll (may be nil). A zero pollInterval defaults to 4 seconds.

type CreateDataSetRequest

type CreateDataSetRequest struct {
	RecordKeeper common.Address `json:"recordKeeper"`
	ExtraData    string         `json:"extraData"` // 0x-prefixed hex
}

CreateDataSetRequest is the body of POST /pdp/data-sets.

type CreateDataSetResult

type CreateDataSetResult struct {
	TxHash    common.Hash
	StatusURL string
}

CreateDataSetResult carries what the server returns from POST /pdp/data-sets: the transaction hash (parsed out of the Location header) and the absolute status URL the caller can poll.

type CreateDataSetStatus

type CreateDataSetStatus struct {
	CreateMessageHash common.Hash   `json:"createMessageHash"`
	Service           string        `json:"service"`
	TxStatus          string        `json:"txStatus"` // pending | confirmed | rejected
	DataSetCreated    bool          `json:"dataSetCreated"`
	OK                *bool         `json:"ok"`
	DataSetID         *types.BigInt `json:"-"`
}

CreateDataSetStatus mirrors GET /pdp/data-sets/created/{txHash}. TxStatus reports pending, confirmed, or rejected.

type DataSet

type DataSet struct {
	ID                 types.BigInt   `json:"-"`
	NextChallengeEpoch int64          `json:"nextChallengeEpoch"`
	Pieces             []DataSetPiece `json:"pieces"`
}

DataSet mirrors the JSON returned by GET /pdp/data-sets/{id}.

func (DataSet) MarshalJSON

func (d DataSet) MarshalJSON() ([]byte, error)

func (*DataSet) UnmarshalJSON

func (d *DataSet) UnmarshalJSON(data []byte) error

type DataSetPiece

type DataSetPiece struct {
	PieceCID       string       `json:"pieceCid"`
	PieceID        types.BigInt `json:"-"`
	SubPieceCID    string       `json:"subPieceCid"`
	SubPieceOffset uint64       `json:"subPieceOffset"`
}

DataSetPiece mirrors a piece entry returned by GET /pdp/data-sets/{id}.

func (DataSetPiece) MarshalJSON

func (p DataSetPiece) MarshalJSON() ([]byte, error)

func (*DataSetPiece) UnmarshalJSON

func (p *DataSetPiece) UnmarshalJSON(data []byte) error

type FindPieceResult

type FindPieceResult struct {
	PieceCID string `json:"pieceCid"`
}

FindPieceResult mirrors the JSON body of GET /pdp/piece?pieceCid=...

type HTTPError

type HTTPError struct {
	Method     string
	URL        string
	StatusCode int
	Body       string
	// RetryAfter is the server-requested wait duration from the Retry-After
	// header. Non-zero on HTTP 429 (Too Many Requests) and 503 (Service
	// Unavailable) responses when the server provides the header.
	RetryAfter time.Duration
}

HTTPError wraps a non-success response from the PDP provider API.

The URL field is always pre-redacted: userinfo is stripped and sensitive query parameters (see sensitiveQueryKeys in redact.go) are masked as "***". This removes the footgun where a caller logs `%+v` or JSON marshals the struct and leaks credentials. The pre-redacted form is sufficient for debugging — path, scheme, host and non-sensitive query values are preserved.

func (*HTTPError) Error

func (e *HTTPError) Error() string

Error implements the error interface using the pre-redacted URL.

func (*HTTPError) RedactedURL

func (e *HTTPError) RedactedURL() string

RedactedURL returns the log-safe URL stored in e.URL. Retained for backwards compatibility; new code can simply read e.URL directly as it is always pre-redacted.

type Option

type Option func(*Client)

Option configures a Client.

func WithHTTPClient

func WithHTTPClient(h *http.Client) Option

WithHTTPClient supplies a custom *http.Client. Useful to inject timeouts, custom transports, or test doubles.

func WithLogger

func WithLogger(l *slog.Logger) Option

WithLogger attaches a structured logger. A nil logger disables logging.

func WithMaxRetries

func WithMaxRetries(n int) Option

WithMaxRetries sets the maximum number of retry attempts for transient failures (5xx, 429, network errors). A value of 0 disables retries. Negative values are treated as 0. Defaults to DefaultMaxRetries (3).

func WithUserAgent

func WithUserAgent(ua string) Option

WithUserAgent sets the User-Agent header on every request.

type PullPieceInput

type PullPieceInput struct {
	// PieceCID is the piece to pull.
	PieceCID cid.Cid
	// SourceURL is an HTTPS URL ending in /piece/{pieceCid} on the source SP.
	SourceURL string
}

PullPieceInput is one entry in a pull request.

type PullPieceStatus

type PullPieceStatus struct {
	PieceCID string     `json:"pieceCid"`
	Status   PullStatus `json:"status"`
}

PullPieceStatus is the per-piece status returned by POST /pdp/piece/pull.

type PullRequest

type PullRequest struct {
	// RecordKeeper is the record-keeper contract address (e.g. FWSS). The
	// pull request body always carries it, even when reusing an existing dataset.
	RecordKeeper common.Address
	// ExtraData is caller-provided EIP-712 signed data encoded as the provider expects.
	ExtraData []byte
	// DataSetID is the target dataset. Nil means create a new dataset.
	DataSetID *types.BigInt
	// Pieces are the pieces to pull with their source URLs.
	Pieces []PullPieceInput
}

PullRequest carries all parameters for POST /pdp/piece/pull.

type PullResult

type PullResult struct {
	Status PullStatus        `json:"status"`
	Pieces []PullPieceStatus `json:"pieces"`
}

PullResult is what the server returns from POST /pdp/piece/pull.

type PullStatus

type PullStatus string

PullStatus mirrors the status values used by the PDP pull endpoint.

const (
	PullStatusPending    PullStatus = "pending"
	PullStatusInProgress PullStatus = "inProgress"
	PullStatusRetrying   PullStatus = "retrying"
	PullStatusComplete   PullStatus = "complete"
	PullStatusFailed     PullStatus = "failed"
)

type UploadPieceStreamingOptions

type UploadPieceStreamingOptions struct {
	// Size is the payload byte count. When > 0 it is used to set the
	// Content-Length request header. When 0 the request is sent with
	// Transfer-Encoding: chunked.
	Size int64
	// PieceCID, when defined, is a pre-computed PieceCIDv2 of the payload.
	// If set, the client skips incremental commP calculation during the
	// streaming PUT. The server still verifies the uploaded bytes against
	// this value during finalize; a mismatch yields an HTTP error.
	PieceCID cid.Cid
	// OnProgress is invoked after each non-empty Read from the data reader,
	// with the cumulative byte count sent so far. It may be nil.
	OnProgress func(bytesUploaded int64)
}

UploadPieceStreamingOptions configures a single streaming upload.

type UploadStreamingResult

type UploadStreamingResult struct {
	// PieceCID is the PieceCIDv2 of the uploaded piece — either the caller-
	// provided value (when opts.PieceCID was set) or the value computed
	// client-side during the streaming PUT.
	PieceCID cid.Cid
	// Size is the total byte count consumed from the data reader.
	Size int64
}

UploadStreamingResult is returned by UploadPieceStreaming on success.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL