restapi

package
v1.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2026 License: MPL-2.0 Imports: 15 Imported by: 0

Documentation

Overview

Package restapi provides an HTTP REST integration block for go-code-blocks.

It supports GET, POST, PUT, PATCH, DELETE and custom verbs against any HTTP API, with pluggable authentication strategies: static Bearer token, OAuth2 client_credentials (with automatic token caching and refresh), HTTP Basic, and API Key (header or query param).

Token chaining

A Block configured with WithOAuth2ClientCredentials implements the TokenProvider interface, so it can authorise another block:

auth := restapi.New("auth",
    restapi.WithBaseURL("https://auth.example.com"),
    restapi.WithOAuth2ClientCredentials(
        "https://auth.example.com/oauth/token",
        os.Getenv("CLIENT_ID"),
        os.Getenv("CLIENT_SECRET"),
    ),
)

api := restapi.New("api",
    restapi.WithBaseURL("https://api.example.com"),
    restapi.WithTokenProvider(auth), // 'auth' fetches & refreshes the token
)

app.MustRegister(auth)
app.MustRegister(api)
app.InitAll(ctx)

Index

Constants

This section is empty.

Variables

View Source
var ErrSkipped = errors.New("step skipped: dependency failed")

ErrSkipped is recorded on a step that was never executed because one or more of its dependencies failed or were themselves skipped. Wrap-unwrap via errors.Is works: errors.Is(sr.Err, ErrSkipped).

Functions

This section is empty.

Types

type Block

type Block struct {
	// contains filtered or unexported fields
}

Block is a REST API integration block. It holds a configured *http.Client, base URL, default headers, and an optional auth strategy. All HTTP operations are performed through Do or the typed helpers (Get, Post, Put, Patch, Delete).

func New

func New(name string, opts ...Option) *Block

New creates a new REST API Block.

block := restapi.New("payments-api",
    restapi.WithBaseURL("https://payments.example.com/v1"),
    restapi.WithTimeout(10*time.Second),
    restapi.WithOAuth2ClientCredentials(tokenURL, clientID, clientSecret),
)

func (*Block) Delete

func (b *Block) Delete(ctx context.Context, path string) (*Response, error)

Delete performs a DELETE request.

resp, err := api.Delete(ctx, "/users/123")

func (*Block) Do

func (b *Block) Do(ctx context.Context, r Request) (*Response, error)

Do executes a fully described HTTP Request and returns the raw Response. It handles URL construction, body encoding, default + per-request headers, authentication, and response body reading.

Body encoding rules:

  • nil → no body
  • string → sent as-is
  • []byte → sent as-is
  • io.Reader → streamed as-is
  • anything else → JSON-marshalled; Content-Type set to application/json

func (*Block) FanOut

func (b *Block) FanOut(ctx context.Context, requests map[string]Request, opts ...PipelineOption) (*Results, error)

FanOut executes all provided requests concurrently and collects every result. It is a single-wave shorthand — all requests are independent and run simultaneously. Use Pipeline when steps have dependencies.

results, err := api.FanOut(ctx, map[string]restapi.Request{
    "user":    {Path: "/users/123"},
    "catalog": {Path: "/products?limit=50"},
    "rates":   {Path: "/shipping/rates"},
}, restapi.WithDefaultRetry(restapi.RetryPolicy{MaxAttempts: 3, Delay: 100*time.Millisecond, Backoff: 2.0}))

func (*Block) Get

func (b *Block) Get(ctx context.Context, path string, query map[string]string) (*Response, error)

Get performs a GET request to the given path.

resp, err := api.Get(ctx, "/users/123", map[string]string{"expand": "roles"})

func (*Block) GetJSON

func (b *Block) GetJSON(ctx context.Context, path string, query map[string]string, out any) error

GetJSON performs a GET and unmarshals the response body into out.

var user User
err := api.GetJSON(ctx, "/users/123", nil, &user)

func (*Block) Head

func (b *Block) Head(ctx context.Context, path string) (*Response, error)

Head performs a HEAD request.

func (*Block) Init

func (b *Block) Init(_ context.Context) error

Init implements core.Block. It builds the http.Client with the configured transport, TLS settings, and connection pool.

func (*Block) Name

func (b *Block) Name() string

Name implements core.Block.

func (*Block) Patch

func (b *Block) Patch(ctx context.Context, path string, body any) (*Response, error)

Patch performs a PATCH request, JSON-encoding body automatically.

func (*Block) PatchJSON

func (b *Block) PatchJSON(ctx context.Context, path string, body, out any) error

PatchJSON performs a PATCH and unmarshals the response body into out.

func (*Block) Pipeline

func (b *Block) Pipeline(ctx context.Context, steps []PipelineStep, opts ...PipelineOption) (*Results, error)

Pipeline executes steps in dependency order using DAG wave levelling. Within each wave, all independent steps run concurrently.

Cascade abort

When a step fails, every step that directly or transitively depends on it is automatically skipped — their Build functions are never called, and their StepResult carries ErrSkipped. This prevents corrupt data from propagating through the graph and surfaces the root cause immediately.

Retry

Each step may declare its own RetryPolicy, or inherit the pipeline default set via WithDefaultRetry. Retries occur only on transient errors (network errors and HTTP 429/500/502/503/504). The delay between attempts grows by the configured Backoff multiplier.

steps := []restapi.PipelineStep{
    {Name: "user",    Build: ...},       // wave 0, no retry
    {Name: "catalog", Build: ...},       // wave 0, no retry
    {
        Name:      "orders",
        DependsOn: []string{"user"},     // wave 1; skipped if user fails
        Retry:     &restapi.RetryPolicy{ // overrides pipeline default
            MaxAttempts: 4,
            Delay:       200 * time.Millisecond,
            Backoff:     2.0,
        },
        Build: func(ctx context.Context, prev *restapi.Results) (restapi.Request, error) {
            var u User
            prev.JSON("user", &u)
            return restapi.Request{Path: "/orders?user_id=" + u.ID}, nil
        },
    },
}

results, err := api.Pipeline(ctx, steps,
    restapi.WithDefaultRetry(restapi.RetryPolicy{MaxAttempts: 2, Delay: 100*time.Millisecond, Backoff: 1.5}),
    restapi.WithContinueOnError(),
)

func (*Block) Post

func (b *Block) Post(ctx context.Context, path string, body any) (*Response, error)

Post performs a POST request, JSON-encoding body automatically.

resp, err := api.Post(ctx, "/users", map[string]any{"name": "Alice"})

func (*Block) PostJSON

func (b *Block) PostJSON(ctx context.Context, path string, body, out any) error

PostJSON performs a POST and unmarshals the response body into out.

var created User
err := api.PostJSON(ctx, "/users", payload, &created)

func (*Block) Put

func (b *Block) Put(ctx context.Context, path string, body any) (*Response, error)

Put performs a PUT request, JSON-encoding body automatically.

func (*Block) PutJSON

func (b *Block) PutJSON(ctx context.Context, path string, body, out any) error

PutJSON performs a PUT and unmarshals the response body into out.

func (*Block) Shutdown

func (b *Block) Shutdown(_ context.Context) error

Shutdown implements core.Block. Closes idle connections in the transport pool.

func (*Block) Token

func (b *Block) Token(ctx context.Context) (string, error)

Token implements TokenProvider. Allows this block to be passed as WithTokenProvider to another block, chaining authentication flows without coupling blocks.

Returns an error when the block has no auth strategy configured or is not yet initialised.

type Option

type Option func(*blockConfig)

Option configures a REST API Block.

func WithAPIKeyHeader

func WithAPIKeyHeader(headerName, key string) Option

WithAPIKeyHeader sends the API key as a request header.

restapi.WithAPIKeyHeader("X-API-Key", "abc123")

func WithAPIKeyQuery

func WithAPIKeyQuery(paramName, key string) Option

WithAPIKeyQuery appends the API key as a query parameter.

restapi.WithAPIKeyQuery("api_key", "abc123")

func WithBaseURL

func WithBaseURL(u string) Option

WithBaseURL sets the base URL prepended to every request path. Trailing slashes are normalised internally.

restapi.WithBaseURL("https://api.example.com/v2")

func WithBasicAuth

func WithBasicAuth(username, password string) Option

WithBasicAuth configures HTTP Basic Authentication.

restapi.WithBasicAuth("admin", "s3cr3t")

func WithBearerToken

func WithBearerToken(token string) Option

WithBearerToken configures a static Bearer token sent on every request. Use WithTokenProvider for dynamic tokens that require fetching or refreshing.

restapi.WithBearerToken("eyJhbGciOiJSUzI1NiJ9...")

func WithHeader

func WithHeader(key, value string) Option

WithHeader adds a default header sent with every request. Call multiple times to add several headers.

restapi.WithHeader("X-API-Version", "2024-01")
restapi.WithHeader("Accept-Language", "pt-BR")

func WithMaxIdleConns

func WithMaxIdleConns(n int) Option

WithMaxIdleConns controls connection pool size for keep-alive reuse.

func WithOAuth2ClientCredentials

func WithOAuth2ClientCredentials(tokenURL, clientID, clientSecret string, scopes ...string) Option

WithOAuth2ClientCredentials configures the block to fetch and cache a Bearer token via the OAuth2 client_credentials grant before each request.

The token is refreshed automatically 30 s before expiry.

restapi.WithOAuth2ClientCredentials(
    "https://auth.example.com/oauth/token",
    "my-client-id",
    "my-client-secret",
    "read:data", "write:data",   // optional scopes
)

func WithTLS

func WithTLS(cfg *tls.Config) Option

WithTLS configures a custom TLS configuration (e.g. mTLS, custom CA).

func WithTimeout

func WithTimeout(d time.Duration) Option

WithTimeout sets the per-request timeout. Defaults to 30 s when not provided.

func WithTokenProvider

func WithTokenProvider(p TokenProvider) Option

WithTokenProvider configures a dynamic Bearer token source. The provider is called before each request; it is responsible for caching and refreshing the token internally.

Any Block configured with WithOAuth2ClientCredentials satisfies TokenProvider, enabling one block to authorise another:

authBlock := restapi.New("auth", restapi.WithOAuth2ClientCredentials(...))
apiBlock  := restapi.New("api",  restapi.WithTokenProvider(authBlock))

func WithTransport

func WithTransport(rt http.RoundTripper) Option

WithTransport injects a fully custom http.RoundTripper. Useful for testing (e.g. httptest transports) or adding middleware like retry logic, tracing, or circuit breakers.

type PipelineOption

type PipelineOption func(*pipelineConfig)

PipelineOption configures pipeline execution behaviour.

func WithContinueOnError

func WithContinueOnError() PipelineOption

WithContinueOnError instructs the pipeline to continue executing subsequent waves even when steps in the current wave fail (but not when they are cascade-aborted — skipped steps never block the next wave). By default the pipeline aborts on the first step failure.

func WithDefaultRetry

func WithDefaultRetry(policy RetryPolicy) PipelineOption

WithDefaultRetry sets the retry policy applied to every step that does not declare its own Retry field.

// Retry up to 3 times total, doubling the wait each attempt: 100ms, 200ms
restapi.WithDefaultRetry(restapi.RetryPolicy{
    MaxAttempts: 3,
    Delay:       100 * time.Millisecond,
    Backoff:     2.0,
})

func WithMaxConcurrency

func WithMaxConcurrency(n int) PipelineOption

WithMaxConcurrency limits the number of goroutines used simultaneously per wave. Useful to respect downstream rate limits. Defaults to unlimited.

func WithWaveTimeout

func WithWaveTimeout(d time.Duration) PipelineOption

WithWaveTimeout sets a per-wave deadline. If a wave does not complete within the duration, all running steps in that wave are cancelled and the pipeline aborts.

type PipelineStep

type PipelineStep struct {
	// Name uniquely identifies this step within the pipeline.
	Name string
	// DependsOn lists names of steps that must complete successfully before
	// this step runs. If any dependency fails or is skipped, this step is
	// cascade-aborted with ErrSkipped.
	DependsOn []string
	// Retry overrides the block-level default retry policy for this step only.
	// nil means: use the block default (if any), or no retry.
	Retry *RetryPolicy
	// Build constructs the HTTP Request for this step. It receives the Results
	// of all previously completed waves. It is never called when a dependency
	// has failed (cascade abort fires first).
	Build func(ctx context.Context, prev *Results) (Request, error)
}

PipelineStep is a single unit of work in a Pipeline.

Steps without DependsOn are placed in the first execution wave and run concurrently. Steps that declare dependencies are placed in a later wave, after all their dependencies have completed. If any direct dependency fails or is skipped, this step is automatically skipped (cascade abort) — its Build function is never called.

// Wave 0 — independent, run in parallel
{Name: "user",    Build: func(ctx, _) (Request, error) { ... }},
{Name: "catalog", Build: func(ctx, _) (Request, error) { ... }},

// Wave 1 — depends on "user"; auto-skipped if "user" fails
{
    Name:      "orders",
    DependsOn: []string{"user"},
    Retry:     &restapi.RetryPolicy{MaxAttempts: 3, Delay: 200*time.Millisecond, Backoff: 2.0},
    Build: func(ctx context.Context, prev *restapi.Results) (restapi.Request, error) {
        var u User
        prev.JSON("user", &u)
        return restapi.Request{Path: "/orders?user_id=" + u.ID}, nil
    },
},

type Request

type Request struct {
	// Method is the HTTP verb (GET, POST, PUT, PATCH, DELETE, HEAD, OPTIONS).
	// Defaults to GET when empty.
	Method string
	// Path is appended to the block's BaseURL.
	// It may include path parameters: "/users/{id}".
	Path string
	// Query is serialised into the URL query string.
	Query map[string]string
	// Headers are merged over the block's default headers.
	// Per-request headers take precedence.
	Headers map[string]string
	// Body is the request body. Pass nil for GET/DELETE/HEAD.
	// Strings, []byte, and io.Reader are accepted; anything else is
	// JSON-marshalled automatically.
	Body any
	// ContentType overrides "Content-Type" for this request.
	// Defaults to "application/json" when Body is non-nil.
	ContentType string
}

Request describes a single outgoing HTTP call. Fields left as zero values use the block's defaults.

type Response

type Response struct {
	// StatusCode is the HTTP status code (200, 201, 404, …).
	StatusCode int
	// Headers contains the response headers.
	Headers http.Header
	// Body holds the raw response bytes.
	Body []byte
	// contains filtered or unexported fields
}

Response is the parsed result of an HTTP call.

func (*Response) Latency

func (r *Response) Latency() time.Duration

Latency returns the round-trip duration of the request.

func (*Response) OK

func (r *Response) OK() bool

OK reports whether the response status is in the 2xx range.

type Results

type Results struct {
	// contains filtered or unexported fields
}

Results is the accumulated snapshot of all step outcomes in a Pipeline. It is passed read-only to each step's Build function so the step can use data from previously completed steps to construct its own request.

func (*Results) Get

func (r *Results) Get(name string) *StepResult

Get returns the StepResult for the named step, or nil if unknown.

func (*Results) JSON

func (r *Results) JSON(name string, v any) error

JSON unmarshals the response body of the named step into v. Returns an error when the step is unknown, failed, skipped, or the body is not valid JSON.

func (*Results) Names

func (r *Results) Names() []string

Names returns the names of all completed steps (succeeded, failed, skipped).

type RetryPolicy

type RetryPolicy struct {
	// MaxAttempts is the total number of attempts (initial + retries).
	// 1 means no retry; 0 is treated as 1.
	MaxAttempts int
	// Delay is the wait time before the second attempt.
	// Subsequent delays are Delay × Backoff^(attempt-1).
	Delay time.Duration
	// Backoff is the exponential multiplier applied to Delay each attempt.
	// 1.0 = constant delay; 2.0 = double each time. Clamped to ≥ 1.0.
	Backoff float64
}

RetryPolicy defines how many times a step is retried on transient failure and how long to wait between attempts.

Only transient errors trigger a retry: network errors and HTTP status codes 429, 500, 502, 503, and 504. Client errors (4xx except 429) are not retried.

// 3 total attempts, 200 ms → 400 ms → done (exponential ×2)
RetryPolicy{MaxAttempts: 3, Delay: 200*time.Millisecond, Backoff: 2.0}

// 5 total attempts, fixed 100 ms between each
RetryPolicy{MaxAttempts: 5, Delay: 100*time.Millisecond, Backoff: 1.0}

type StepResult

type StepResult struct {
	// Name is the step identifier.
	Name string
	// Response is the HTTP response. Nil when Err is non-nil.
	Response *Response
	// Err is the execution error. Nil on success.
	// errors.Is(sr.Err, ErrSkipped) reports whether the step was cascade-aborted.
	Err error
	// Attempts is the number of HTTP calls made (1 = no retry needed).
	Attempts int
	// Latency is the total wall-clock time including all retry attempts.
	Latency time.Duration
}

StepResult holds the outcome of a single pipeline step.

func (*StepResult) OK

func (sr *StepResult) OK() bool

OK reports whether the step completed without error and returned a 2xx status.

func (*StepResult) Skipped

func (sr *StepResult) Skipped() bool

Skipped reports whether the step was cascade-aborted due to a dependency failure.

type TokenProvider

type TokenProvider interface {
	// Token returns a valid Bearer token string, fetching or refreshing it
	// when necessary.
	Token(ctx context.Context) (string, error)
}

TokenProvider is the interface implemented by any entity that can supply a Bearer token. A Block satisfies this interface when configured with WithOAuth2ClientCredentials, enabling one block to authorise another.

authBlock := restapi.New("auth", restapi.WithOAuth2ClientCredentials(...))
apiBlock  := restapi.New("api",  restapi.WithTokenProvider(authBlock))

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL