angzarr

package module
v0.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 12, 2026 License: AGPL-3.0 Imports: 16 Imported by: 0

README

angzarr-client-go

Go client library for Angzarr event-sourcing services.

Installation

go get github.com/benjaminabbitt/angzarr/client/go

Usage

Sending Commands
package main

import (
    "context"
    "log"

    "github.com/google/uuid"
    angzarr "github.com/benjaminabbitt/angzarr/client/go"
    pb "github.com/benjaminabbitt/angzarr/client/go/proto/angzarr"
)

func main() {
    // Connect to aggregate coordinator
    client, err := angzarr.NewAggregateClient("localhost:1310")
    if err != nil {
        log.Fatal(err)
    }
    defer client.Close()

    // Send a command to create a new aggregate
    resp, err := client.CommandNew("orders").
        WithCorrelationID("order-123").
        WithCommand("type.googleapis.com/examples.CreateOrder", &CreateOrderCommand{
            CustomerId: "customer-456",
        }).
        Execute(context.Background())
    if err != nil {
        log.Fatal(err)
    }

    // Get the new aggregate root ID
    rootID := angzarr.RootUUID(resp.Events)
    log.Printf("Created order: %s", rootID)
}
Querying Events
// Connect to query service
queryClient, err := angzarr.NewQueryClient("localhost:1310")
if err != nil {
    log.Fatal(err)
}
defer queryClient.Close()

// Query events for an aggregate
rootID := uuid.MustParse("...")
events, err := queryClient.Query("orders", rootID).
    GetEventBook(context.Background())
if err != nil {
    log.Fatal(err)
}

// Iterate over events
for _, page := range events.Pages {
    log.Printf("Event %d: %s", angzarr.SequenceNum(page), angzarr.TypeNameFromURL(page.Event.TypeUrl))
}
Using Environment Variables
// Connect using environment variable with fallback
client, err := angzarr.AggregateClientFromEnv("ANGZARR_ENDPOINT", "localhost:1310")
Temporal Queries
// Query state as of a specific sequence
events, err := queryClient.Query("orders", rootID).
    AsOfSequence(10).
    GetEventBook(ctx)

// Query state as of a specific time
events, err := queryClient.Query("orders", rootID).
    AsOfTime("2024-01-15T10:30:00Z").
    GetEventBook(ctx)

// Query a range of sequences
events, err := queryClient.Query("orders", rootID).
    RangeTo(5, 15).
    GetEventBook(ctx)
Error Handling
resp, err := client.Command("orders", rootID).
    WithSequence(5).
    WithCommand(typeURL, cmd).
    Execute(ctx)

if err != nil {
    if clientErr := angzarr.AsClientError(err); clientErr != nil {
        if clientErr.IsNotFound() {
            // Aggregate doesn't exist
        } else if clientErr.IsPreconditionFailed() {
            // Sequence mismatch (optimistic locking failure)
        } else if clientErr.IsConnectionError() {
            // Network/transport error
        }
    }
}

Client Types

Client Purpose
QueryClient Query events from aggregates
AggregateClient Send commands to aggregates
SpeculativeClient Dry-run commands, test projectors/sagas
DomainClient Combined query + aggregate for a domain
Client Full client with all capabilities

Helper Functions

// UUID conversion
protoUUID := angzarr.UUIDToProto(uuid)
uuid, err := angzarr.ProtoToUUID(protoUUID)

// Type URL helpers
typeURL := angzarr.TypeURL("examples.CreateOrder")  // "type.googleapis.com/examples.CreateOrder"
typeName := angzarr.TypeNameFromURL(typeURL)        // "CreateOrder"

// Cover accessors
domain := angzarr.Domain(eventBook)
correlationID := angzarr.CorrelationID(eventBook)
rootUUID := angzarr.RootUUID(eventBook)

// Sequence helpers
nextSeq := angzarr.NextSequence(eventBook)

License

AGPL-3.0-only

Documentation

Overview

Package angzarr provides a client library for Angzarr gRPC services.

Index

Constants

View Source
const (
	UnknownDomain          = "unknown"
	WildcardDomain         = "*"
	DefaultEdition         = "angzarr"
	MetaAngzarrDomain      = "_angzarr"
	ProjectionDomainPrefix = "projection:"
	CorrelationIDHeader    = "x-correlation-id"
	TypeURLPrefix          = "type.googleapis.com/"
)

Constants matching Rust proto_ext::constants

Variables

This section is empty.

Functions

func CacheKey

func CacheKey(v interface{}) string

CacheKey generates a cache key based on domain + root.

func CommandPages

func CommandPages(book *pb.CommandBook) []*pb.CommandPage

CommandPages returns the command pages from a CommandBook, or empty slice if nil.

func CorrelationID

func CorrelationID(v interface{}) string

CorrelationID returns the correlation_id from a Cover-bearing type, or empty if missing.

func CoverOf

func CoverOf(v interface{}) *pb.Cover

CoverOf extracts the Cover from various proto types.

func DecodeEvent

func DecodeEvent(page *pb.EventPage, typeSuffix string, msg interface{ Unmarshal([]byte) error }) bool

DecodeEvent attempts to decode an event payload if the type URL matches.

func DivergenceFor

func DivergenceFor(e *pb.Edition, domain string) int64

DivergenceFor returns the divergence sequence for a domain, or -1 if not found.

func Domain

func Domain(v interface{}) string

Domain returns the domain from a Cover-bearing type, or UnknownDomain if missing.

func Edition

func Edition(v interface{}) string

Edition returns the edition name from a Cover-bearing type, defaulting to DefaultEdition.

func EditionOpt

func EditionOpt(v interface{}) *string

EditionOpt returns the edition name as a pointer, nil if not set.

func EventPages

func EventPages(book *pb.EventBook) []*pb.EventPage

EventPages returns the event pages from an EventBook, or empty slice if nil.

func EventsFromResponse

func EventsFromResponse(resp *pb.CommandResponse) []*pb.EventPage

EventsFromResponse extracts the event pages from a CommandResponse.

func ExplicitEdition

func ExplicitEdition(name string, divergences []*pb.DomainDivergence) *pb.Edition

ExplicitEdition creates an edition with divergence points.

func HasCorrelationID

func HasCorrelationID(v interface{}) bool

HasCorrelationID returns true if the correlation_id is present and non-empty.

func ImplicitEdition

func ImplicitEdition(name string) *pb.Edition

ImplicitEdition creates an edition with the given name but no divergences.

func IsClientError

func IsClientError(err error) bool

IsClientError checks if an error is a ClientError.

func IsMainTimeline

func IsMainTimeline(e *pb.Edition) bool

IsMainTimeline checks if an edition represents the main timeline.

func MainTimeline

func MainTimeline() *pb.Edition

MainTimeline returns an Edition representing the main timeline.

func NewCommandBook

func NewCommandBook(cover *pb.Cover, pages ...*pb.CommandPage) *pb.CommandBook

NewCommandBook creates a CommandBook with a single command.

func NewCommandPage

func NewCommandPage(sequence uint32, command *anypb.Any) *pb.CommandPage

NewCommandPage creates a command page from a sequence and Any message.

func NewCover

func NewCover(domain string, root uuid.UUID, correlationID string) *pb.Cover

NewCover creates a new Cover with the given parameters.

func NewCoverWithEdition

func NewCoverWithEdition(domain string, root uuid.UUID, correlationID string, edition *pb.Edition) *pb.Cover

NewCoverWithEdition creates a Cover with an edition.

func NewQueryWithRange

func NewQueryWithRange(cover *pb.Cover, lower uint32, upper *uint32) *pb.Query

NewQueryWithRange creates a Query with a cover and range selection.

func NewQueryWithTemporal

func NewQueryWithTemporal(cover *pb.Cover, temporal *pb.TemporalQuery) *pb.Query

NewQueryWithTemporal creates a Query with a temporal selection.

func NextSequence

func NextSequence(book *pb.EventBook) uint32

NextSequence returns the next sequence number from an EventBook. The framework computes this value on load.

func Now

func Now() *timestamppb.Timestamp

Now returns the current time as a protobuf Timestamp.

func ParseTimestamp

func ParseTimestamp(rfc3339 string) (*timestamppb.Timestamp, error)

ParseTimestamp parses an RFC3339 timestamp string.

func ProtoToUUID

func ProtoToUUID(u *pb.UUID) (uuid.UUID, error)

ProtoToUUID converts a proto UUID to uuid.UUID.

func RangeSelection

func RangeSelection(lower uint32, upper *uint32) *pb.Query_Range

RangeSelection creates a sequence range selection (returns the oneof wrapper).

func RootIDHex

func RootIDHex(v interface{}) string

RootIDHex returns the root UUID as a hex string, or empty if missing.

func RootUUID

func RootUUID(v interface{}) (uuid.UUID, bool)

RootUUID extracts the root UUID from a Cover-bearing type.

func RoutingKey

func RoutingKey(v interface{}) string

RoutingKey computes the bus routing key for a Cover-bearing type.

func TemporalSelectionBySequence

func TemporalSelectionBySequence(seq uint32) *pb.Query_Temporal

TemporalSelectionBySequence creates a temporal selection as-of a sequence.

func TemporalSelectionByTime

func TemporalSelectionByTime(ts *timestamppb.Timestamp) *pb.Query_Temporal

TemporalSelectionByTime creates a temporal selection as-of a timestamp.

func TypeNameFromURL

func TypeNameFromURL(typeURL string) string

TypeNameFromURL extracts the type name from a type URL.

func TypeURL

func TypeURL(packageName, typeName string) string

TypeURL constructs a full type URL from a package and type name.

func TypeURLMatches

func TypeURLMatches(typeURL, suffix string) bool

TypeURLMatches checks if a type URL ends with the given suffix.

func UUIDToProto

func UUIDToProto(u uuid.UUID) *pb.UUID

UUIDToProto converts a uuid.UUID to a proto UUID.

Types

type AggregateClient

type AggregateClient struct {
	// contains filtered or unexported fields
}

AggregateClient wraps the AggregateCoordinatorService for command execution.

func AggregateClientFromConn

func AggregateClientFromConn(conn *grpc.ClientConn) *AggregateClient

AggregateClientFromConn creates a client from an existing connection.

func AggregateClientFromEnv

func AggregateClientFromEnv(envVar, defaultEndpoint string) (*AggregateClient, error)

AggregateClientFromEnv connects using an environment variable with fallback.

func NewAggregateClient

func NewAggregateClient(endpoint string) (*AggregateClient, error)

NewAggregateClient connects to an aggregate coordinator at the given endpoint.

func (*AggregateClient) Close

func (c *AggregateClient) Close() error

Close closes the underlying connection.

func (*AggregateClient) Command

func (c *AggregateClient) Command(domain string, root uuid.UUID) *CommandBuilder

Command starts building a command for the given domain and root.

func (*AggregateClient) CommandNew

func (c *AggregateClient) CommandNew(domain string) *CommandBuilder

CommandNew starts building a command for a new aggregate.

func (*AggregateClient) DryRunHandle

func (c *AggregateClient) DryRunHandle(ctx context.Context, req *pb.DryRunRequest) (*pb.CommandResponse, error)

DryRunHandle executes a command in dry-run mode (no persistence).

func (*AggregateClient) Handle

Handle executes a command asynchronously.

func (*AggregateClient) HandleSync

HandleSync executes a command synchronously with the specified sync mode.

type Client

type Client struct {
	Aggregate   *AggregateClient
	Query       *QueryClient
	Speculative *SpeculativeClient
	// contains filtered or unexported fields
}

Client combines aggregate, query, and speculative clients.

func ClientFromConn

func ClientFromConn(conn *grpc.ClientConn) *Client

ClientFromConn creates a client from an existing connection.

func ClientFromEnv

func ClientFromEnv(envVar, defaultEndpoint string) (*Client, error)

ClientFromEnv connects using an environment variable with fallback.

func NewClient

func NewClient(endpoint string) (*Client, error)

NewClient connects to a server providing all services.

func (*Client) Close

func (c *Client) Close() error

Close closes the underlying connection.

type ClientError

type ClientError struct {
	Kind    ErrorKind
	Message string
	Cause   error
}

ClientError represents errors from client operations.

func AsClientError

func AsClientError(err error) *ClientError

AsClientError extracts a ClientError from an error chain.

func ConnectionError

func ConnectionError(msg string) *ClientError

ConnectionError creates a connection error.

func GRPCError

func GRPCError(err error) *ClientError

GRPCError wraps a gRPC error.

func InvalidArgumentError

func InvalidArgumentError(msg string) *ClientError

InvalidArgumentError creates an invalid argument error.

func InvalidTimestampError

func InvalidTimestampError(msg string) *ClientError

InvalidTimestampError creates a timestamp parsing error.

func TransportError

func TransportError(err error) *ClientError

TransportError wraps a transport error.

func (*ClientError) Code

func (e *ClientError) Code() codes.Code

Code returns the gRPC status code if this is a gRPC error.

func (*ClientError) Error

func (e *ClientError) Error() string

func (*ClientError) IsConnectionError

func (e *ClientError) IsConnectionError() bool

IsConnectionError returns true if this is a connection or transport error.

func (*ClientError) IsInvalidArgument

func (e *ClientError) IsInvalidArgument() bool

IsInvalidArgument returns true if this is an "invalid argument" error.

func (*ClientError) IsNotFound

func (e *ClientError) IsNotFound() bool

IsNotFound returns true if this is a "not found" error.

func (*ClientError) IsPreconditionFailed

func (e *ClientError) IsPreconditionFailed() bool

IsPreconditionFailed returns true if this is a "precondition failed" error.

func (*ClientError) Status

func (e *ClientError) Status() *status.Status

Status returns the gRPC Status if this is a gRPC error.

func (*ClientError) Unwrap

func (e *ClientError) Unwrap() error

type CommandBookW added in v0.1.1

type CommandBookW struct {
	*pb.CommandBook
}

CommandBookW wraps a CommandBook proto with extension methods.

func NewCommandBookW added in v0.1.1

func NewCommandBookW(proto *pb.CommandBook) *CommandBookW

NewCommandBookW creates a new CommandBookW wrapper.

func (*CommandBookW) CacheKey added in v0.1.1

func (w *CommandBookW) CacheKey() string

CacheKey generates a cache key based on domain + root.

func (*CommandBookW) CorrelationID added in v0.1.1

func (w *CommandBookW) CorrelationID() string

CorrelationID returns the correlation_id from the cover, or empty string if missing.

func (*CommandBookW) CoverWrapper added in v0.1.1

func (w *CommandBookW) CoverWrapper() *CoverW

CoverWrapper returns a CoverW wrapping the cover.

func (*CommandBookW) Domain added in v0.1.1

func (w *CommandBookW) Domain() string

Domain returns the domain from the cover, or UnknownDomain if missing.

func (*CommandBookW) HasCorrelationID added in v0.1.1

func (w *CommandBookW) HasCorrelationID() bool

HasCorrelationID returns true if the correlation_id is present and non-empty.

func (*CommandBookW) Pages added in v0.1.1

func (w *CommandBookW) Pages() []*CommandPageW

Pages returns the command pages as wrapped CommandPageW instances.

func (*CommandBookW) RootUUID added in v0.1.1

func (w *CommandBookW) RootUUID() (uuid.UUID, bool)

RootUUID extracts the root UUID from the cover.

func (*CommandBookW) RoutingKey added in v0.1.1

func (w *CommandBookW) RoutingKey() string

RoutingKey computes the bus routing key.

type CommandBuilder

type CommandBuilder struct {
	// contains filtered or unexported fields
}

CommandBuilder provides fluent construction and execution of commands.

func NewCommandBuilder

func NewCommandBuilder(client *AggregateClient, domain string, root uuid.UUID) *CommandBuilder

NewCommandBuilder creates a command builder for an existing aggregate.

func NewCommandBuilderNew

func NewCommandBuilderNew(client *AggregateClient, domain string) *CommandBuilder

NewCommandBuilderNew creates a command builder for a new aggregate (no root yet).

func (*CommandBuilder) Build

func (b *CommandBuilder) Build() (*pb.CommandBook, error)

Build constructs the CommandBook without executing.

func (*CommandBuilder) Execute

func (b *CommandBuilder) Execute(ctx context.Context) (*pb.CommandResponse, error)

Execute builds and executes the command.

func (*CommandBuilder) WithCommand

func (b *CommandBuilder) WithCommand(typeURL string, msg proto.Message) *CommandBuilder

WithCommand sets the command type URL and message.

func (*CommandBuilder) WithCorrelationID

func (b *CommandBuilder) WithCorrelationID(id string) *CommandBuilder

WithCorrelationID sets the correlation ID for request tracing.

func (*CommandBuilder) WithSequence

func (b *CommandBuilder) WithSequence(seq uint32) *CommandBuilder

WithSequence sets the expected sequence number for optimistic locking.

type CommandPageW added in v0.1.1

type CommandPageW struct {
	*pb.CommandPage
}

CommandPageW wraps a CommandPage proto with extension methods.

func NewCommandPageW added in v0.1.1

func NewCommandPageW(proto *pb.CommandPage) *CommandPageW

NewCommandPageW creates a new CommandPageW wrapper.

func (*CommandPageW) Sequence added in v0.1.1

func (w *CommandPageW) Sequence() uint32

Sequence returns the sequence number.

type CommandResponseW added in v0.1.1

type CommandResponseW struct {
	*pb.CommandResponse
}

CommandResponseW wraps a CommandResponse proto with extension methods.

func NewCommandResponseW added in v0.1.1

func NewCommandResponseW(proto *pb.CommandResponse) *CommandResponseW

NewCommandResponseW creates a new CommandResponseW wrapper.

func (*CommandResponseW) Events added in v0.1.1

func (w *CommandResponseW) Events() []*EventPageW

Events extracts the event pages as wrapped EventPageW instances.

func (*CommandResponseW) EventsBook added in v0.1.1

func (w *CommandResponseW) EventsBook() *EventBookW

EventsBook returns the events as a wrapped EventBookW, or nil if not set.

type CoverW added in v0.1.1

type CoverW struct {
	*pb.Cover
}

CoverW wraps a Cover proto with extension methods.

func NewCoverW added in v0.1.1

func NewCoverW(proto *pb.Cover) *CoverW

NewCoverW creates a new CoverW wrapper.

func (*CoverW) CacheKey added in v0.1.1

func (w *CoverW) CacheKey() string

CacheKey generates a cache key based on domain + root.

func (*CoverW) CorrelationID added in v0.1.1

func (w *CoverW) CorrelationID() string

CorrelationID returns the correlation_id, or empty string if missing.

func (*CoverW) Domain added in v0.1.1

func (w *CoverW) Domain() string

Domain returns the domain, or UnknownDomain if missing.

func (*CoverW) Edition added in v0.1.1

func (w *CoverW) Edition() string

Edition returns the edition name, defaulting to DefaultEdition.

func (*CoverW) EditionOpt added in v0.1.1

func (w *CoverW) EditionOpt() *string

EditionOpt returns the edition name as a pointer, nil if not set.

func (*CoverW) HasCorrelationID added in v0.1.1

func (w *CoverW) HasCorrelationID() bool

HasCorrelationID returns true if the correlation_id is present and non-empty.

func (*CoverW) RootIDHex added in v0.1.1

func (w *CoverW) RootIDHex() string

RootIDHex returns the root UUID as a hex string, or empty string if missing.

func (*CoverW) RootUUID added in v0.1.1

func (w *CoverW) RootUUID() (uuid.UUID, bool)

RootUUID extracts the root UUID.

func (*CoverW) RoutingKey added in v0.1.1

func (w *CoverW) RoutingKey() string

RoutingKey computes the bus routing key.

type DomainClient

type DomainClient struct {
	Aggregate *AggregateClient
	Query     *QueryClient
	// contains filtered or unexported fields
}

DomainClient combines aggregate and query clients for a single domain.

func DomainClientFromConn

func DomainClientFromConn(conn *grpc.ClientConn) *DomainClient

DomainClientFromConn creates a client from an existing connection.

func DomainClientFromEnv

func DomainClientFromEnv(envVar, defaultEndpoint string) (*DomainClient, error)

DomainClientFromEnv connects using an environment variable with fallback.

func NewDomainClient

func NewDomainClient(endpoint string) (*DomainClient, error)

NewDomainClient connects to a domain's coordinator at the given endpoint.

func (*DomainClient) Close

func (c *DomainClient) Close() error

Close closes the underlying connection.

func (*DomainClient) Command

func (c *DomainClient) Command(domain string, root uuid.UUID) *CommandBuilder

Command starts building a command via the domain client's aggregate.

func (*DomainClient) CommandNew

func (c *DomainClient) CommandNew(domain string) *CommandBuilder

CommandNew starts building a command for a new aggregate.

func (*DomainClient) Execute

func (c *DomainClient) Execute(ctx context.Context, cmd *pb.CommandBook) (*pb.CommandResponse, error)

Execute is a convenience method that delegates to Aggregate.Handle.

func (*DomainClient) NewQuery

func (c *DomainClient) NewQuery(domain string, root uuid.UUID) *QueryBuilder

NewQuery starts building a query via the domain client's query client.

func (*DomainClient) NewQueryDomain

func (c *DomainClient) NewQueryDomain(domain string) *QueryBuilder

NewQueryDomain starts building a query by domain only.

type ErrorKind

type ErrorKind int

ErrorKind categorizes client errors.

const (
	// ErrConnection indicates a connection failure.
	ErrConnection ErrorKind = iota
	// ErrTransport indicates a transport-level error.
	ErrTransport
	// ErrGRPC indicates a gRPC error from the server.
	ErrGRPC
	// ErrInvalidArgument indicates an invalid argument from the caller.
	ErrInvalidArgument
	// ErrInvalidTimestamp indicates a timestamp parsing failure.
	ErrInvalidTimestamp
)

type EventBookW added in v0.1.1

type EventBookW struct {
	*pb.EventBook
}

EventBookW wraps an EventBook proto with extension methods.

func NewEventBookW added in v0.1.1

func NewEventBookW(proto *pb.EventBook) *EventBookW

NewEventBookW creates a new EventBookW wrapper.

func (*EventBookW) CacheKey added in v0.1.1

func (w *EventBookW) CacheKey() string

CacheKey generates a cache key based on domain + root.

func (*EventBookW) CorrelationID added in v0.1.1

func (w *EventBookW) CorrelationID() string

CorrelationID returns the correlation_id from the cover, or empty string if missing.

func (*EventBookW) CoverWrapper added in v0.1.1

func (w *EventBookW) CoverWrapper() *CoverW

CoverWrapper returns a CoverW wrapping the cover.

func (*EventBookW) Domain added in v0.1.1

func (w *EventBookW) Domain() string

Domain returns the domain from the cover, or UnknownDomain if missing.

func (*EventBookW) Edition added in v0.1.1

func (w *EventBookW) Edition() string

Edition returns the edition name, defaulting to DefaultEdition.

func (*EventBookW) HasCorrelationID added in v0.1.1

func (w *EventBookW) HasCorrelationID() bool

HasCorrelationID returns true if the correlation_id is present and non-empty.

func (*EventBookW) NextSequence added in v0.1.1

func (w *EventBookW) NextSequence() uint32

NextSequence returns the next sequence number.

func (*EventBookW) Pages added in v0.1.1

func (w *EventBookW) Pages() []*EventPageW

Pages returns the event pages as wrapped EventPageW instances.

func (*EventBookW) RootIDHex added in v0.1.1

func (w *EventBookW) RootIDHex() string

RootIDHex returns the root UUID as a hex string, or empty string if missing.

func (*EventBookW) RootUUID added in v0.1.1

func (w *EventBookW) RootUUID() (uuid.UUID, bool)

RootUUID extracts the root UUID from the cover.

func (*EventBookW) RoutingKey added in v0.1.1

func (w *EventBookW) RoutingKey() string

RoutingKey computes the bus routing key.

type EventPageW added in v0.1.1

type EventPageW struct {
	*pb.EventPage
}

EventPageW wraps an EventPage proto with extension methods.

func NewEventPageW added in v0.1.1

func NewEventPageW(proto *pb.EventPage) *EventPageW

NewEventPageW creates a new EventPageW wrapper.

func (*EventPageW) DecodeEvent added in v0.1.1

func (w *EventPageW) DecodeEvent(typeSuffix string, msg interface{ Unmarshal([]byte) error }) bool

DecodeEvent attempts to decode an event payload if the type URL matches.

type QueryBuilder

type QueryBuilder struct {
	// contains filtered or unexported fields
}

QueryBuilder provides fluent construction and execution of queries.

func NewQueryBuilder

func NewQueryBuilder(client *QueryClient, domain string, root uuid.UUID) *QueryBuilder

NewQueryBuilder creates a query builder for a specific aggregate.

func NewQueryBuilderDomain

func NewQueryBuilderDomain(client *QueryClient, domain string) *QueryBuilder

NewQueryBuilderDomain creates a query builder by domain only (use with ByCorrelationID).

func (*QueryBuilder) AsOfSequence

func (b *QueryBuilder) AsOfSequence(seq uint32) *QueryBuilder

AsOfSequence queries state as of a specific sequence number.

func (*QueryBuilder) AsOfTime

func (b *QueryBuilder) AsOfTime(rfc3339 string) *QueryBuilder

AsOfTime queries state as of a specific timestamp (RFC3339 format).

func (*QueryBuilder) Build

func (b *QueryBuilder) Build() (*pb.Query, error)

Build constructs the Query without executing.

func (*QueryBuilder) ByCorrelationID

func (b *QueryBuilder) ByCorrelationID(id string) *QueryBuilder

ByCorrelationID queries by correlation ID instead of root.

func (*QueryBuilder) GetEventBook

func (b *QueryBuilder) GetEventBook(ctx context.Context) (*pb.EventBook, error)

GetEventBook executes the query and returns a single EventBook.

func (*QueryBuilder) GetEvents

func (b *QueryBuilder) GetEvents(ctx context.Context) ([]*pb.EventBook, error)

GetEvents executes the query and returns all matching EventBooks.

func (*QueryBuilder) GetPages

func (b *QueryBuilder) GetPages(ctx context.Context) ([]*pb.EventPage, error)

GetPages executes the query and returns just the event pages.

func (*QueryBuilder) Range

func (b *QueryBuilder) Range(lower uint32) *QueryBuilder

Range queries a range of sequences from lower (inclusive).

func (*QueryBuilder) RangeTo

func (b *QueryBuilder) RangeTo(lower, upper uint32) *QueryBuilder

RangeTo queries a range of sequences with upper bound (inclusive).

func (*QueryBuilder) WithEdition

func (b *QueryBuilder) WithEdition(edition string) *QueryBuilder

WithEdition queries events from a specific edition.

type QueryClient

type QueryClient struct {
	// contains filtered or unexported fields
}

QueryClient wraps the EventQueryService for event retrieval.

func NewQueryClient

func NewQueryClient(endpoint string) (*QueryClient, error)

NewQueryClient connects to an event query service at the given endpoint.

func QueryClientFromConn

func QueryClientFromConn(conn *grpc.ClientConn) *QueryClient

QueryClientFromConn creates a client from an existing connection.

func QueryClientFromEnv

func QueryClientFromEnv(envVar, defaultEndpoint string) (*QueryClient, error)

QueryClientFromEnv connects using an environment variable with fallback.

func (*QueryClient) Close

func (c *QueryClient) Close() error

Close closes the underlying connection.

func (*QueryClient) GetEventBook

func (c *QueryClient) GetEventBook(ctx context.Context, query *pb.Query) (*pb.EventBook, error)

GetEventBook retrieves a single EventBook for the query.

func (*QueryClient) GetEvents

func (c *QueryClient) GetEvents(ctx context.Context, query *pb.Query) ([]*pb.EventBook, error)

GetEvents retrieves all EventBooks matching the query.

func (*QueryClient) Query

func (c *QueryClient) Query(domain string, root uuid.UUID) *QueryBuilder

Query starts building a query for the given domain and root.

func (*QueryClient) QueryDomain

func (c *QueryClient) QueryDomain(domain string) *QueryBuilder

QueryDomain starts building a query by domain only.

type QueryW added in v0.1.1

type QueryW struct {
	*pb.Query
}

QueryW wraps a Query proto with extension methods.

func NewQueryW added in v0.1.1

func NewQueryW(proto *pb.Query) *QueryW

NewQueryW creates a new QueryW wrapper.

func (*QueryW) CorrelationID added in v0.1.1

func (w *QueryW) CorrelationID() string

CorrelationID returns the correlation_id from the cover, or empty string if missing.

func (*QueryW) CoverWrapper added in v0.1.1

func (w *QueryW) CoverWrapper() *CoverW

CoverWrapper returns a CoverW wrapping the cover.

func (*QueryW) Domain added in v0.1.1

func (w *QueryW) Domain() string

Domain returns the domain from the cover, or UnknownDomain if missing.

func (*QueryW) HasCorrelationID added in v0.1.1

func (w *QueryW) HasCorrelationID() bool

HasCorrelationID returns true if the correlation_id is present and non-empty.

func (*QueryW) RootUUID added in v0.1.1

func (w *QueryW) RootUUID() (uuid.UUID, bool)

RootUUID extracts the root UUID from the cover.

func (*QueryW) RoutingKey added in v0.1.1

func (w *QueryW) RoutingKey() string

RoutingKey computes the bus routing key.

type SpeculativeClient

type SpeculativeClient struct {
	// contains filtered or unexported fields
}

SpeculativeClient wraps the SpeculativeService for what-if scenarios.

func NewSpeculativeClient

func NewSpeculativeClient(endpoint string) (*SpeculativeClient, error)

NewSpeculativeClient connects to a speculative service at the given endpoint.

func SpeculativeClientFromConn

func SpeculativeClientFromConn(conn *grpc.ClientConn) *SpeculativeClient

SpeculativeClientFromConn creates a client from an existing connection.

func SpeculativeClientFromEnv

func SpeculativeClientFromEnv(envVar, defaultEndpoint string) (*SpeculativeClient, error)

SpeculativeClientFromEnv connects using an environment variable with fallback.

func (*SpeculativeClient) Close

func (c *SpeculativeClient) Close() error

Close closes the underlying connection.

func (*SpeculativeClient) DryRun

DryRun executes a command without persistence.

func (*SpeculativeClient) ProcessManager

ProcessManager speculatively executes a process manager.

func (*SpeculativeClient) Projector

Projector speculatively executes a projector against events.

func (*SpeculativeClient) Saga

Saga speculatively executes a saga against events.

Directories

Path Synopsis
proto

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL