Documentation
¶
Overview ¶
Package pdp provides a Curio-compatible PDP provider HTTP client.
PDP endpoints are unauthenticated at the HTTP layer. Authorization for state-changing calls is carried inside the request body as caller-provided EIP-712 signed extraData.
Retry policy ¶
Control-plane GET requests (getJSON and poll endpoints) retry on transient errors — HTTP 5xx (except 501), 429, connection resets, DNS temporaries, unexpected EOF, and request timeouts — with exponential backoff up to MaxRetries.
Streaming piece downloads (DownloadPiece) are executed once with the caller's context as the sole lifetime control; they do not go through the automatic retry loop.
POST and DELETE requests are executed exactly once. These verbs may mutate server state, and a client-side retry after a server-side partial success can cause duplicate work or inconsistent state. Callers that need retry behavior for POST/DELETE must implement it at the business layer with appropriate deduplication (e.g. by polling the resulting resource via a GET endpoint before retrying).
Response size cap ¶
Control-plane JSON responses are capped at MaxControlResponseBytes (16 MiB). Anything larger is treated as a server bug or attack and fails the request. Streaming endpoints (piece download) are not subject to this cap.
Endpoints covered:
- GET /piece/{pieceCid} (download bytes)
- GET /pdp/ping
- POST /pdp/piece/uploads (create upload)
- PUT /pdp/piece/uploads/{uploadUUID} (upload bytes)
- POST /pdp/piece/uploads/{uploadUUID} (finalize upload)
- GET /pdp/piece?pieceCid=... (find)
- POST /pdp/piece/pull (pull pieces)
- POST /pdp/data-sets (create)
- GET /pdp/data-sets/created/{txHash} (poll create)
- GET /pdp/data-sets/{id} (read)
- POST /pdp/data-sets/{id}/pieces (add pieces)
- GET /pdp/data-sets/{id}/pieces/added/{txHash} (poll add)
- DELETE /pdp/data-sets/{id}/pieces/{pieceId} (schedule remove)
Index ¶
- Constants
- Variables
- type AddPieceInput
- type AddPiecesResult
- type AddPiecesStatus
- type Client
- func (c *Client) AddPieces(ctx context.Context, dataSetID types.BigInt, pieces []AddPieceInput, ...) (*AddPiecesResult, error)
- func (c *Client) BaseURL() *url.URL
- func (c *Client) CreateDataSet(ctx context.Context, recordKeeper common.Address, extraData []byte) (*CreateDataSetResult, error)
- func (c *Client) CreateDataSetAndAddPieces(ctx context.Context, recordKeeper common.Address, pieces []AddPieceInput, ...) (*CreateDataSetResult, error)
- func (c *Client) DownloadPiece(ctx context.Context, pieceCID cid.Cid) (io.ReadCloser, int64, error)
- func (c *Client) FindPiece(ctx context.Context, pieceCID cid.Cid) (*FindPieceResult, error)
- func (c *Client) GetAddPiecesStatus(ctx context.Context, statusURL string) (*AddPiecesStatus, error)
- func (c *Client) GetDataSet(ctx context.Context, dataSetID types.BigInt) (*DataSet, error)
- func (c *Client) GetDataSetCreationStatus(ctx context.Context, statusURL string) (*CreateDataSetStatus, error)
- func (c *Client) Ping(ctx context.Context) error
- func (c *Client) PullPieces(ctx context.Context, req PullRequest) (*PullResult, error)
- func (c *Client) SchedulePieceDeletion(ctx context.Context, dataSetID, pieceID types.BigInt, extraData []byte) (common.Hash, error)
- func (c *Client) UploadPieceStreaming(ctx context.Context, data io.Reader, opts UploadPieceStreamingOptions) (*UploadStreamingResult, error)
- func (c *Client) WaitForCreateDataSetAndAddPieces(ctx context.Context, statusURL string, pollInterval time.Duration) (*AddPiecesStatus, error)
- func (c *Client) WaitForDataSetCreated(ctx context.Context, statusURL string, pollInterval time.Duration) (*CreateDataSetStatus, error)
- func (c *Client) WaitForPieceParked(ctx context.Context, pieceCID cid.Cid, pollInterval time.Duration) error
- func (c *Client) WaitForPiecesAdded(ctx context.Context, statusURL string, pollInterval time.Duration) (*AddPiecesStatus, error)
- func (c *Client) WaitForPullComplete(ctx context.Context, req PullRequest, pollInterval time.Duration, ...) (*PullResult, error)
- type CreateDataSetRequest
- type CreateDataSetResult
- type CreateDataSetStatus
- type DataSet
- type DataSetPiece
- type FindPieceResult
- type HTTPError
- type Option
- type PullPieceInput
- type PullPieceStatus
- type PullRequest
- type PullResult
- type PullStatus
- type UploadPieceStreamingOptions
- type UploadStreamingResult
Constants ¶
const DefaultHTTPTimeout = 30 * time.Second
DefaultHTTPTimeout applies to simple JSON operations. Streaming uploads use a separate, longer timeout (or none at all) negotiated per call.
const DefaultMaxRetries = 3
DefaultMaxRetries is the number of retries attempted for transient failures (5xx, 429, network errors) when no explicit value is set.
const DefaultUserAgent = "synapse-go/pdp"
DefaultUserAgent is set on every outgoing request unless overridden.
const MaxControlResponseBytes = 16 << 20 // 16 MiB
MaxControlResponseBytes caps the size of control-plane JSON response bodies read fully into memory. PDP provider control-plane endpoints return small JSON documents (dataset status, piece IDs, etc.); anything larger indicates a buggy or hostile server and should not be allowed to exhaust client memory. Streaming endpoints (piece download) bypass this limit.
Variables ¶
var ErrLocationHeader = errors.New("pdp: missing or invalid Location header")
ErrLocationHeader is returned when the server responds successfully but the expected Location header is missing or malformed.
var ErrPieceNotFound = errors.New("pdp: piece not found")
ErrPieceNotFound is returned when GET /pdp/piece returns 404.
var ErrPieceProcessing = errors.New("pdp: piece still processing")
ErrPieceProcessing is returned when GET /pdp/piece returns 202, meaning the piece is known but not yet parked and queryable.
var ErrPullFailed = errors.New("pdp: pull failed")
ErrPullFailed is returned by WaitForPullComplete when the server reports that the overall pull status is "failed".
var ErrStillPending = errors.New("pdp: still pending")
ErrStillPending is returned by status-polling helpers when the server reports the transaction is still pending. It is the sentinel callers should loop on while waiting.
var ErrTxRejected = errors.New("pdp: transaction rejected")
ErrTxRejected is returned when an on-chain operation posted by the SP was rejected by the network.
Functions ¶
This section is empty.
Types ¶
type AddPieceInput ¶
AddPieceInput mirrors one entry of the pieces array for POST /pdp/data-sets/{id}/pieces. The wire format uses the piece CID as its own single sub-piece.
type AddPiecesResult ¶
AddPiecesResult is what the client gets back from a successful POST.
type AddPiecesStatus ¶
type AddPiecesStatus struct {
TxHash common.Hash `json:"-"`
TxStatus string `json:"txStatus"` // pending | confirmed | rejected
DataSetID types.BigInt `json:"-"`
PieceCount int `json:"pieceCount"`
AddMessageOK *bool `json:"addMessageOk"`
PiecesAdded bool `json:"piecesAdded"`
ConfirmedPieceIDs []types.BigInt `json:"-"`
}
AddPiecesStatus mirrors GET /pdp/data-sets/{id}/pieces/added/{txHash}. TxStatus reports pending, confirmed, or rejected.
type Client ¶
type Client struct {
// contains filtered or unexported fields
}
Client is a thin HTTP client for a single PDP provider service URL. Safe for concurrent use.
func New ¶
New constructs a Client pointing at the given service URL (e.g. https://pdp.example.com).
func (*Client) AddPieces ¶
func (c *Client) AddPieces(ctx context.Context, dataSetID types.BigInt, pieces []AddPieceInput, extraData []byte) (*AddPiecesResult, error)
AddPieces calls POST /pdp/data-sets/{dataSetId}/pieces. extraData must be caller-provided EIP-712 signed data encoded as the PDP provider expects.
func (*Client) CreateDataSet ¶
func (c *Client) CreateDataSet(ctx context.Context, recordKeeper common.Address, extraData []byte) (*CreateDataSetResult, error)
CreateDataSet calls POST /pdp/data-sets. extraData must be an EIP-712 signed blob encoded as the PDP provider expects.
func (*Client) CreateDataSetAndAddPieces ¶
func (c *Client) CreateDataSetAndAddPieces( ctx context.Context, recordKeeper common.Address, pieces []AddPieceInput, extraData []byte, ) (*CreateDataSetResult, error)
CreateDataSetAndAddPieces calls POST /pdp/data-sets/create-and-add.
The provider creates a dataset and immediately submits the add-pieces transaction using the caller-provided EIP-712 signed extraData for the combined create+add flow.
func (*Client) DownloadPiece ¶
DownloadPiece calls GET /piece/{pieceCid} and returns a streaming io.ReadCloser plus the Content-Length (-1 when unknown). The caller must close the reader when done.
The download bypasses the client's default JSON timeout so large pieces are not cut off; callers can enforce a deadline via the context.
func (*Client) FindPiece ¶
FindPiece calls GET /pdp/piece?pieceCid=.... It returns ErrPieceNotFound for HTTP 404 and ErrPieceProcessing for HTTP 202 while the SP is still parking the piece.
func (*Client) GetAddPiecesStatus ¶
func (c *Client) GetAddPiecesStatus(ctx context.Context, statusURL string) (*AddPiecesStatus, error)
GetAddPiecesStatus polls the status URL once.
func (*Client) GetDataSet ¶
GetDataSet calls GET /pdp/data-sets/{dataSetId}.
func (*Client) GetDataSetCreationStatus ¶
func (c *Client) GetDataSetCreationStatus(ctx context.Context, statusURL string) (*CreateDataSetStatus, error)
GetDataSetCreationStatus polls the status URL once.
func (*Client) PullPieces ¶
func (c *Client) PullPieces(ctx context.Context, req PullRequest) (*PullResult, error)
PullPieces calls POST /pdp/piece/pull to request that this SP pull the given pieces from the source URLs.
The endpoint is idempotent: calling again with the same extraData returns the status of the existing pull request rather than creating a duplicate. This makes it safe to poll for status using repeated calls.
func (*Client) SchedulePieceDeletion ¶
func (c *Client) SchedulePieceDeletion(ctx context.Context, dataSetID, pieceID types.BigInt, extraData []byte) (common.Hash, error)
SchedulePieceDeletion issues DELETE /pdp/data-sets/{id}/pieces/{pieceId} with the provided EIP-712 signed extraData.
func (*Client) UploadPieceStreaming ¶
func (c *Client) UploadPieceStreaming( ctx context.Context, data io.Reader, opts UploadPieceStreamingOptions, ) (*UploadStreamingResult, error)
UploadPieceStreaming uploads a piece using the CommP-last 3-step streaming protocol. This is the preferred upload path: data is streamed to the server in a single pass while the PieceCID is computed inline (either by the client via TeeReader or by the caller in advance).
Protocol:
- POST /pdp/piece/uploads — create upload session, get UUID
- PUT /pdp/piece/uploads/{uuid} — stream body; commP is computed in this pass (unless opts.PieceCID is pre-filled)
- POST /pdp/piece/uploads/{uuid} — finalize with the PieceCID; server validates that the uploaded bytes hash to the same value
The PUT clears the default Client timeout so large transfers are not capped at DefaultHTTPTimeout. Callers needing stricter limits should pass a context deadline or use a custom HTTP client.
func (*Client) WaitForCreateDataSetAndAddPieces ¶
func (c *Client) WaitForCreateDataSetAndAddPieces( ctx context.Context, statusURL string, pollInterval time.Duration, ) (*AddPiecesStatus, error)
WaitForCreateDataSetAndAddPieces first waits for dataset creation to confirm, then polls the add-pieces status endpoint for the same transaction hash.
func (*Client) WaitForDataSetCreated ¶
func (c *Client) WaitForDataSetCreated(ctx context.Context, statusURL string, pollInterval time.Duration) (*CreateDataSetStatus, error)
WaitForDataSetCreated polls until the server reports txStatus=confirmed with dataSetCreated=true (success) or txStatus=rejected (ErrTxRejected). Transport errors, including HTTP 404s from the status URL, are returned immediately rather than retried. pollInterval defaults to 4 seconds.
func (*Client) WaitForPieceParked ¶
func (c *Client) WaitForPieceParked(ctx context.Context, pieceCID cid.Cid, pollInterval time.Duration) error
WaitForPieceParked polls FindPiece until the piece is found or the context is cancelled / timeout is reached. A zero pollInterval defaults to one second.
func (*Client) WaitForPiecesAdded ¶
func (c *Client) WaitForPiecesAdded(ctx context.Context, statusURL string, pollInterval time.Duration) (*AddPiecesStatus, error)
WaitForPiecesAdded polls until the add-pieces tx is confirmed with piecesAdded=true or rejected. Transport errors, including HTTP 404s from the status URL, are returned immediately rather than retried. pollInterval defaults to 4s.
func (*Client) WaitForPullComplete ¶
func (c *Client) WaitForPullComplete( ctx context.Context, req PullRequest, pollInterval time.Duration, onStatus func(*PullResult), ) (*PullResult, error)
WaitForPullComplete polls PullPieces until the overall pull status is "complete" or "failed". On failure it returns (result, ErrPullFailed) so callers can inspect the per-piece statuses.
onStatus is invoked after each poll (may be nil). A zero pollInterval defaults to 4 seconds.
type CreateDataSetRequest ¶
type CreateDataSetRequest struct {
RecordKeeper common.Address `json:"recordKeeper"`
ExtraData string `json:"extraData"` // 0x-prefixed hex
}
CreateDataSetRequest is the body of POST /pdp/data-sets.
type CreateDataSetResult ¶
CreateDataSetResult carries what the server returns from POST /pdp/data-sets: the transaction hash (parsed out of the Location header) and the absolute status URL the caller can poll.
type CreateDataSetStatus ¶
type CreateDataSetStatus struct {
CreateMessageHash common.Hash `json:"createMessageHash"`
Service string `json:"service"`
TxStatus string `json:"txStatus"` // pending | confirmed | rejected
DataSetCreated bool `json:"dataSetCreated"`
OK *bool `json:"ok"`
DataSetID *types.BigInt `json:"-"`
}
CreateDataSetStatus mirrors GET /pdp/data-sets/created/{txHash}. TxStatus reports pending, confirmed, or rejected.
type DataSet ¶
type DataSet struct {
ID types.BigInt `json:"-"`
NextChallengeEpoch int64 `json:"nextChallengeEpoch"`
Pieces []DataSetPiece `json:"pieces"`
}
DataSet mirrors the JSON returned by GET /pdp/data-sets/{id}.
func (DataSet) MarshalJSON ¶
func (*DataSet) UnmarshalJSON ¶
type DataSetPiece ¶
type DataSetPiece struct {
PieceCID string `json:"pieceCid"`
PieceID types.BigInt `json:"-"`
SubPieceCID string `json:"subPieceCid"`
SubPieceOffset uint64 `json:"subPieceOffset"`
}
DataSetPiece mirrors a piece entry returned by GET /pdp/data-sets/{id}.
func (DataSetPiece) MarshalJSON ¶
func (p DataSetPiece) MarshalJSON() ([]byte, error)
func (*DataSetPiece) UnmarshalJSON ¶
func (p *DataSetPiece) UnmarshalJSON(data []byte) error
type FindPieceResult ¶
type FindPieceResult struct {
PieceCID string `json:"pieceCid"`
}
FindPieceResult mirrors the JSON body of GET /pdp/piece?pieceCid=...
type HTTPError ¶
type HTTPError struct {
Method string
URL string
StatusCode int
Body string
// RetryAfter is the server-requested wait duration from the Retry-After
// header. Non-zero on HTTP 429 (Too Many Requests) and 503 (Service
// Unavailable) responses when the server provides the header.
RetryAfter time.Duration
}
HTTPError wraps a non-success response from the PDP provider API.
The URL field is always pre-redacted: userinfo is stripped and sensitive query parameters (see sensitiveQueryKeys in redact.go) are masked as "***". This removes the footgun where a caller logs `%+v` or JSON marshals the struct and leaks credentials. The pre-redacted form is sufficient for debugging — path, scheme, host and non-sensitive query values are preserved.
func (*HTTPError) RedactedURL ¶
RedactedURL returns the log-safe URL stored in e.URL. Retained for backwards compatibility; new code can simply read e.URL directly as it is always pre-redacted.
type Option ¶
type Option func(*Client)
Option configures a Client.
func WithHTTPClient ¶
WithHTTPClient supplies a custom *http.Client. Useful to inject timeouts, custom transports, or test doubles.
func WithLogger ¶
WithLogger attaches a structured logger. A nil logger disables logging.
func WithMaxRetries ¶
WithMaxRetries sets the maximum number of retry attempts for transient failures (5xx, 429, network errors). A value of 0 disables retries. Negative values are treated as 0. Defaults to DefaultMaxRetries (3).
func WithUserAgent ¶
WithUserAgent sets the User-Agent header on every request.
type PullPieceInput ¶
type PullPieceInput struct {
// PieceCID is the piece to pull.
PieceCID cid.Cid
// SourceURL is an HTTPS URL ending in /piece/{pieceCid} on the source SP.
SourceURL string
}
PullPieceInput is one entry in a pull request.
type PullPieceStatus ¶
type PullPieceStatus struct {
PieceCID string `json:"pieceCid"`
Status PullStatus `json:"status"`
}
PullPieceStatus is the per-piece status returned by POST /pdp/piece/pull.
type PullRequest ¶
type PullRequest struct {
// RecordKeeper is the record-keeper contract address (e.g. FWSS). The
// pull request body always carries it, even when reusing an existing dataset.
RecordKeeper common.Address
// ExtraData is caller-provided EIP-712 signed data encoded as the provider expects.
ExtraData []byte
// DataSetID is the target dataset. Nil means create a new dataset.
DataSetID *types.BigInt
// Pieces are the pieces to pull with their source URLs.
Pieces []PullPieceInput
}
PullRequest carries all parameters for POST /pdp/piece/pull.
type PullResult ¶
type PullResult struct {
Status PullStatus `json:"status"`
Pieces []PullPieceStatus `json:"pieces"`
}
PullResult is what the server returns from POST /pdp/piece/pull.
type PullStatus ¶
type PullStatus string
PullStatus mirrors the status values used by the PDP pull endpoint.
const ( PullStatusPending PullStatus = "pending" PullStatusInProgress PullStatus = "inProgress" PullStatusRetrying PullStatus = "retrying" PullStatusComplete PullStatus = "complete" PullStatusFailed PullStatus = "failed" )
type UploadPieceStreamingOptions ¶
type UploadPieceStreamingOptions struct {
// Size is the payload byte count. When > 0 it is used to set the
// Content-Length request header. When 0 the request is sent with
// Transfer-Encoding: chunked.
Size int64
// PieceCID, when defined, is a pre-computed PieceCIDv2 of the payload.
// If set, the client skips incremental commP calculation during the
// streaming PUT. The server still verifies the uploaded bytes against
// this value during finalize; a mismatch yields an HTTP error.
PieceCID cid.Cid
// OnProgress is invoked after each non-empty Read from the data reader,
// with the cumulative byte count sent so far. It may be nil.
OnProgress func(bytesUploaded int64)
}
UploadPieceStreamingOptions configures a single streaming upload.
type UploadStreamingResult ¶
type UploadStreamingResult struct {
// PieceCID is the PieceCIDv2 of the uploaded piece — either the caller-
// provided value (when opts.PieceCID was set) or the value computed
// client-side during the streaming PUT.
PieceCID cid.Cid
// Size is the total byte count consumed from the data reader.
Size int64
}
UploadStreamingResult is returned by UploadPieceStreaming on success.