corrosion

package
v0.19.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 24, 2026 License: Apache-2.0 Imports: 19 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	// MembershipStateAlive indicates that the member is active.
	MembershipStateAlive string = "Alive"
	// MembershipStateSuspect indicates that the member is active, but at least one cluster member suspects its down.
	// For all purposes, a Suspect member is treated as if it were Alive until either it refutes the suspicion
	// (becoming Alive) or fails to do so (being declared Down).
	MembershipStateSuspect string = "Suspect"
	// MembershipStateDown indicates that the member is confirmed Down. A member that reaches this state can't join
	// the cluster with the same identity until the cluster forgets this knowledge.
	MembershipStateDown string = "Down"
)

Functions

This section is empty.

Types

type APIClient

type APIClient struct {
	// contains filtered or unexported fields
}

APIClient is a client for the Corrosion API.

func NewAPIClient

func NewAPIClient(addr netip.AddrPort, opts ...APIClientOption) (*APIClient, error)

NewAPIClient creates a new Corrosion API client. The client retries on network errors using an exponential backoff policy with a maximum interval of 1 second and a maximum elapsed time of 10 seconds. It automatically resubscribes to active subscriptions if an error occurs using an exponential backoff policy with a maximum interval of 1 second and a maximum elapsed time of 60 seconds. Use the WithHTTP2Client option to provide a custom HTTP client and the WithResubscribeBackoff option to change the backoff policy for resubscribing to a query.

func (*APIClient) ExecContext

func (c *APIClient) ExecContext(ctx context.Context, query string, args ...any) (*ExecResult, error)

ExecContext writes changes to the Corrosion database for propagation through the cluster. The args are for any placeholder parameters in the query. Corrosion does not sync schema changes made using this method. Use Corrosion's schema_files to create and update the cluster's database schema.

func (*APIClient) ExecMultiContext

func (c *APIClient) ExecMultiContext(ctx context.Context, statements ...Statement) (*ExecResponse, error)

ExecMultiContext writes changes to the Corrosion database for propagation through the cluster. Unlike ExecContext, this method allows multiple statements to be executed in a single transaction.

func (*APIClient) QueryContext

func (c *APIClient) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)

QueryContext executes a query that returns rows, typically a SELECT. The args are for any placeholder parameters in the query.

func (*APIClient) ResubscribeContext

func (c *APIClient) ResubscribeContext(ctx context.Context, id string, fromChange uint64) (*Subscription, error)

func (*APIClient) SubscribeContext

func (c *APIClient) SubscribeContext(
	ctx context.Context, query string, args []any, skipRows bool,
) (*Subscription, error)

SubscribeContext creates a subscription to receive updates for a desired SQL query. If skipRows is false, Subscription.Rows must be consumed before Subscription.Changes can be called. If skipRows is true, Subscription.Rows will return nil.

type APIClientOption

type APIClientOption func(*APIClient)

func WithHTTP2Client

func WithHTTP2Client(client *http.Client) APIClientOption

func WithResubscribeBackoff

func WithResubscribeBackoff(newBackoff func() backoff.BackOff) APIClientOption

WithResubscribeBackoff sets the backoff policy for resubscribing to a query if an error occurs. Pass nil to disable resubscribing.

type AdminClient

type AdminClient struct {
	// contains filtered or unexported fields
}

AdminClient is a client for the Corrosion admin API.

func NewAdminClient

func NewAdminClient(sockPath string) (*AdminClient, error)

func (*AdminClient) ClusterMemberRTTs added in v0.19.0

func (c *AdminClient) ClusterMemberRTTs() ([]MemberRTTStats, error)

ClusterMemberRTTs returns the median and standard deviation of round-trip times to each cluster member.

func (*AdminClient) ClusterMembershipStates

func (c *AdminClient) ClusterMembershipStates(latest bool) ([]ClusterMembershipState, error)

ClusterMembershipStates returns the current membership SWIM states of all cluster members. If latest is true, only the latest state of each member is returned.

func (*AdminClient) SendCommand

func (c *AdminClient) SendCommand(cmd []byte) (<-chan Response, error)

SendCommand sends a command to the Corrosion admin API and returns a channel that will receive responses. The channel will be closed after sending the last or error response. The caller must read from the channel until it is closed.

type ChangeEvent

type ChangeEvent struct {
	Type     ChangeType
	RowID    uint64
	Values   []json.RawMessage
	ChangeID uint64
}

func (*ChangeEvent) MarshalJSON

func (ce *ChangeEvent) MarshalJSON() ([]byte, error)

func (*ChangeEvent) Scan

func (ce *ChangeEvent) Scan(dest ...any) error

Scan copies the column values in the change event into the values pointed at by dest. The number of values in dest must be the same as the number of columns in the change. Scan converts JSON-encoded column values to the provided Go types using json.Unmarshal.

func (*ChangeEvent) UnmarshalJSON

func (ce *ChangeEvent) UnmarshalJSON(data []byte) error

type ChangeType

type ChangeType string
var (
	ChangeTypeInsert ChangeType = "insert"
	ChangeTypeUpdate ChangeType = "update"
	ChangeTypeDelete ChangeType = "delete"
)

type ClusterMembershipState

type ClusterMembershipState struct {
	ID        string
	Addr      netip.AddrPort
	State     string
	Timestamp time.Time
}

type EndOfQuery

type EndOfQuery struct {
	Time     float64 `json:"time"`
	ChangeID *uint64 `json:"change_id"`
}

type ExecResponse

type ExecResponse struct {
	Results []ExecResult `json:"results"`
	Time    float64      `json:"time"`
	Version *uint        `json:"version"`
}

type ExecResult

type ExecResult struct {
	RowsAffected uint    `json:"rows_affected"`
	Time         float64 `json:"time"`
	Error        *string `json:"error"`
}

type MemberRTTStats added in v0.19.0

type MemberRTTStats struct {
	Addr   netip.AddrPort
	Median time.Duration
	StdDev time.Duration
}

type QueryEvent

type QueryEvent struct {
	Columns []string     `json:"columns"`
	Row     *RowEvent    `json:"row"`
	EOQ     *EndOfQuery  `json:"eoq"`
	Change  *ChangeEvent `json:"change"`
	// Error is a server-side error that occurred during query execution. It's considered fatal for the client
	// as it cannot be recovered from server-side.
	Error *string `json:"error"`
}

type Response

type Response struct {
	JSON map[string]any
	// Err is set if the response is an error or if an error occurred while processing the response.
	Err error
}

type RetryRoundTripper

type RetryRoundTripper struct {
	Base http.RoundTripper
	// NewBackoff creates a new backoff policy for each request.
	NewBackoff func() backoff.BackOff
}

func (*RetryRoundTripper) RoundTrip

func (rt *RetryRoundTripper) RoundTrip(req *http.Request) (*http.Response, error)

type RetrySubscription

type RetrySubscription struct {
	// contains filtered or unexported fields
}

func NewRetrySubscription

func NewRetrySubscription(ctx context.Context, client *APIClient, sub *Subscription, boff backoff.BackOff) *RetrySubscription

func (*RetrySubscription) Changes

func (rs *RetrySubscription) Changes() (<-chan *ChangeEvent, error)

func (*RetrySubscription) Close

func (rs *RetrySubscription) Close()

func (*RetrySubscription) Err

func (rs *RetrySubscription) Err() error

func (*RetrySubscription) ID

func (rs *RetrySubscription) ID() string

type RowEvent

type RowEvent struct {
	RowID  uint64
	Values []json.RawMessage
}

func (*RowEvent) MarshalJSON

func (re *RowEvent) MarshalJSON() ([]byte, error)

func (*RowEvent) UnmarshalJSON

func (re *RowEvent) UnmarshalJSON(data []byte) error

type Rows

type Rows struct {
	// contains filtered or unexported fields
}

Rows is the result of a query. Its cursor starts before the first row of the result set. Use Rows.Next to advance from row to row.

func (*Rows) Close

func (rs *Rows) Close() error

Close closes the Rows, preventing further enumeration. If Rows.Next is called and returns false, the Rows are closed automatically and it will suffice to check the result of Rows.Err. Close is idempotent and does not affect the result of Rows.Err.

func (*Rows) Columns

func (rs *Rows) Columns() []string

Columns returns the column names.

func (*Rows) Err

func (rs *Rows) Err() error

Err returns the error, if any, that was encountered during iteration. Err may be called after an explicit or implicit Rows.Close.

func (*Rows) Next

func (rs *Rows) Next() bool

Next prepares the next result row for reading with the Rows.Scan method. It returns true on success, or false if there is no next result row or an error happened while preparing it. Rows.Err should be consulted to distinguish between the two cases.

Every call to Rows.Scan, even the first one, must be preceded by a call to Rows.Next.

func (*Rows) Scan

func (rs *Rows) Scan(dest ...any) error

Scan copies the columns in the current row into the values pointed at by dest. The number of values in dest must be the same as the number of columns in Rows. Scan converts JSON-encoded column values to the provided Go types using json.Unmarshal.

func (*Rows) Time

func (rs *Rows) Time() (float64, error)

Time returns the time taken to execute the query in seconds. It's only available after all rows have been consumed. It doesn't include the time to send the query, receive the response, or iterate over the rows.

type Statement

type Statement struct {
	Query  string `json:"query"`
	Params []any  `json:"params"`
}

type Subscription

type Subscription struct {
	// contains filtered or unexported fields
}

Subscription receives updates from the Corrosion database for a desired SQL query.

func (*Subscription) Changes

func (s *Subscription) Changes() (<-chan *ChangeEvent, error)

Changes returns a channel that receives change events for the query. Changes are not available until all rows are consumed. The channel is closed when the context is done, or an error occurs while reading the changes, or when the subscription is closed explicitly. If it's closed due to an error, Subscription.Err will return the error.

func (*Subscription) Close

func (s *Subscription) Close() error

func (*Subscription) Err

func (s *Subscription) Err() error

Err returns the error, if any, that was encountered during fetching changes. Err may be called after an explicit or implicit Subscription.Close.

func (*Subscription) ID

func (s *Subscription) ID() string

ID returns the subscription ID.

func (*Subscription) Rows

func (s *Subscription) Rows() *Rows

Rows returns the rows of the query or nil if skipRows was true when creating the subscription or if the subscription was created with APIClient.ResubscribeContext.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL