projection

package
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 11, 2022 License: MIT Imports: 12 Imported by: 5

README

Projections

The projection package provides the boilerplate to create and apply projections from events.

Projections

Event Applier

The basic interface that a projection type needs to implement is the projection.EventApplier interface, which defines a single ApplyEvent(event.Event) method. A projection only needs to be able to apply events onto itself. Additional behavior can be added using extensions.

package projection

// An EventApplier applies events onto itself to build the projection state.
type EventApplier interface {
	ApplyEvent(event.Event)
}
Apply Events

Given a projection type that has an ApplyEvent(event.Event) method, you can use the projection.Apply() method to apply a given set of events onto the projection. Given a basic projection p that does not implement any extensions, this does the same as iterating over the given events and manually calling p.ApplyEvent(evt).

package example

import (
  "log"
  "github.com/modernice/goes/event"
  "github.com/modernice/goes/projection"
)

type Foo struct {}

func (f *Foo) ApplyEvent(evt event.Event) {
  log.Printf("Event applied: %v", evt.Name())
}

func example() {
  var events []event.Event // e.g. fetched from event store
  var foo Foo // Instantiate your projection type

  err := projection.Apply(&foo, events)
  // handle err
}
Extensions

A projection type may implement additional interfaces that extend or modify the projection behavior of a projection. Read the documentation of these interfaces for more information on how to use them:

  • projection.Progressing
  • projection.Resetter
  • projection.Guard
  • projection.HistoryDependent

Scheduling

Projection schedules trigger projection jobs based on the schedule type. goes provides two types of projection schedules:

  • Continuous
  • Periodic
Continuous Schedule

The continuous schedule subscribes to a given set of events and triggers a projection job every time one of those events is published over the underlying event bus.

package example

import (
  "github.com/modernice/goes/event"
  "github.com/modernice/goes/projection/schedule"
)

type Foo struct {}

func (f *Foo) ApplyEvent(evt event.Event) { ... }

func example(bus event.Bus, store event.Store) {
  eventNames := []string{"example.foo", "example.bar", "example.foobar"}
  s := schedule.Continuously(bus, store, eventNames)

  var foo Foo

  errs, err := s.Subscribe(context.TODO(), func(ctx projection.Job) error {
    // Every time the schedule triggers a job, this function is called.
    // The provided projection.Job provides an `Apply` method, which applies
    // the published events onto the passed projection `&foo`.
    return ctx.Apply(ctx, &foo)
  })
  if err != nil {
    log.Fatalf("subscribe to projection schedule: %w", err)
  }

  for err := range errs {
    log.Printf("projection: %v", err)
  }
}

The example above subscribes to events with the names "example.foo", "example.bar" and "example.foobar". Each time such an event is published, the schedule created a projection.Job and calls the provided callback function. In this example, the callback function just calls the ctx.Apply() function, which itself calls the projection.Apply() function with the events from the projection job.

Debounce Jobs

When creating a continuous schedule, the schedule.Debounce() option can be used to debounce the creation of projection jobs when multiple events are published successively within a short time period. Without the debounce option, when multiple event are published "at once", one projection job per published event would be created, which, depending on the complexity within your callback function, can be a performance hit for your application. The debounce option ensures that at most one projection job is created within a specified interval.

package example

import (
  "log"
  "github.com/modernice/goes/event"
  "github.com/modernice/goes/projection/schedule"
)

func example(bus event.Bus, store event.Store) {
  eventNames := []string{"example.foo", "example.bar"}
  s := schedule.Continuously(bus, store, eventNames, schedule.Debounce(time.Second))
  s.Subscribe(context.TODO(), func(ctx projection.Job) error {
    log.Println("This function should be called only once.")
    return nil
  })

  // Two events published "at once" but only 1 projection job that provides
  // both events will be created & triggered.
  bus.Publish(context.TODO(), event.New("example.foo", ...))
  bus.Publish(context.TODO(), event.New("example.bar", ...))
}
Periodic Schedule

The periodic schedule does not subscribe to events over an event bus. Instead, it triggers a projection job in a fixed interval and fetches the entire event stream of the configured events from the event store within the triggered projection jobs.

package example

import (
  "github.com/modernice/goes/event"
  "github.com/modernice/goes/projection/schedule"
)

type Foo struct {}

func (f *Foo) ApplyEvent(evt event.Event) { ... }

func example(store event.Store) {
  eventNames := []string{"example.foo", "example.bar", "example.foobar"}
  s := schedule.Periodically(store, eventNames)

  var foo Foo

  errs, err := s.Subscribe(context.TODO(), func(ctx projection.Job) error {
    // ALL "example.foo", "example.bar" and "example.foobar" events are
    // fetched from the event store and applied onto &foo.
    return ctx.Apply(ctx, &foo)
  })
  if err != nil {
    log.Fatalf("subscribe to projection schedule: %w", err)
  }

  for err := range errs {
    log.Printf("projection: %v", err)
  }
}
Projection Jobs

A projection job provides additional query helpers to extract event and aggregate information from the events in the job. All query functions of the projection.Job use caching to avoid querying the underlying event stream unnecessarily. Jobs are thread-safe, which means that they can be applied concurrently onto multiple projections if needed.

package example

func example(s projection.Schedule) {
  var foo Foo

  errs, err := s.Subscribe(context.TODO(), func(ctx projection.Job) error {
    // Query all events of the job.
    events, errs, err := ctx.Events(ctx)

    // Query all events of the job that belong to one of the given aggregate names.
    events, errs, err := ctx.EventsOf(ctx, "example.foobar")

    // Query all events of the job that would be applied onto the given projection type.
    events, errs, err := ctx.EventsFor(ctx, &foo)

    // Extract all aggregates from the job's events as aggregate.Refs.
    refs, errs, err := ctx.Aggregates(ctx)

    // Extract all aggregates with one of the given names from the job's events
    // as aggregate.Refs.
    refs, errs, err := ctx.Aggregates(ctx, "example.foobar")

    // Extract the first UUID of the aggregate with the given name from the events
    // of the job.
    id, err := ctx.Aggregate(ctx, "example.foobar")

    return nil
  })
  if err != nil {
    log.Fatalf("subscribe to projection schedule: %w", err)
  }

  for err := range errs {
    log.Printf("projection: %w", err)
  }
}
Trigger Jobs

Both continuous and periodic schedules can be manually triggered at any time using the projection.Schedule.Trigger() method. Manually triggered schedules always create projection jobs that fetch/query the entire event history of the configured events when applied onto a projection. This is also true for continuous schedules, which normally only apply the published events that triggered the schedule.

package example

func example(s projection.Schedule) {
  errs, err := s.Subscribe(context.TODO(), func(ctx projection.Job) error {
    log.Println("Schedule triggered.")

    // This fetches the entire event history of the events configured
    // in the schedule, even for continuous schedules (but only when
    // triggered manually).
    events, errs, err := ctx.Events(ctx)

    return nil
  })
  // handle err & errs

  err := s.Trigger(context.TODO())
  // handle err
}

Projection Service

The projection.Service implements an event-driven projection service, which allows projection schedules to be triggered from another service/process than it was defined in.

package service1

func example(reg *codec.Registry, bus event.Bus) {
  // Register the events of the projection service into a registry.
  projection.RegisterService(reg)

  svc := projection.NewService(bus)

  // Given some schedules with names for each of them
  var schedules map[string]projection.Schedule

  // When registering them in the projection service
  for name, s := range schedules {
    svc.Register(name, s)
  }
}

package service2

func example(bus event.Bus) {
  svc := projection.NewService(bus)

  // Another service that uses the same underlying event bus can
  // trigger the registered projection schedules
  err := svc.Trigger(context.TODO(), "foo")
}

Documentation

Index

Constants

View Source
const (
	// Triggered is the event name for triggering a Schedule.
	Triggered = "goes.projection.schedule.triggered"

	// TriggerAccepted is the event name for accepting a trigger.
	TriggerAccepted = "goes.projection.schedule.trigger_accepted"
)

Variables

View Source
var (
	// DefaultTriggerTimeout is the default timeout for triggering a Schedule.
	DefaultTriggerTimeout = 5 * time.Second

	// ErrUnhandledTrigger is returned when trying to trigger a Schedule that
	// isn't registered in a running Service.
	ErrUnhandledTrigger = errors.New("unhandled trigger")
)
View Source
var (
	// ErrAggregateNotFound is returned when trying to extract an AggregateID
	// from a Job's Events and none of those Events belong to an aggregate with
	// that name.
	ErrAggregateNotFound = errors.New("aggregate not found in events")
)
View Source
var (
	// ErrProgressed is returned when trying to apply an Event onto a Projection
	// that has a progress Time that is after the Time of the Event.
	ErrProgressed = errors.New("projection already progressed")
)

Functions

func Apply

func Apply(proj EventApplier, events []event.Event, opts ...ApplyOption) error

Apply applies events onto the given projection.

If proj implements guard (or embeds Guard), proj.GuardProjection(evt) is called for every Event evt to determine if the Event should be applied onto the Projection.

If proj implements progressor (or embeds *Progressor), proj.SetProgress(evt) is called for every applied Event evt.

func RegisterService

func RegisterService(r *codec.Registry)

RegisterService register the projection service events into an event registry.

Types

type ApplyOption

type ApplyOption func(*applyConfig)

ApplyOption is an option for Apply.

func IgnoreProgress

func IgnoreProgress() ApplyOption

IgnoreProgress returns an ApplyOption that makes Apply ignore the current progress of a projection so that it applies Events onto a projection even if an Event's time is before the progress time of the projection.

type EventApplier

type EventApplier interface {
	ApplyEvent(event.Event)
}

An EventApplier applies events onto itself to build the projection state.

type Guard

type Guard interface {
	// GuardProjection determines whether an Event is allowed to be applied onto a projection.
	GuardProjection(event.Event) bool
}

Guard can be implemented by projection to prevent the application of an events. When a projection p implements Guard, p.GuardProjection(e) is called for every Event e and prevents the p.ApplyEvent(e) call if GuardProjection returns false.

type GuardFunc

type GuardFunc func(event.Event) bool

GuardFunc allows functions to be used as Guards.

func (GuardFunc) GuardProjection

func (guard GuardFunc) GuardProjection(evt event.Event) bool

GuardProjection returns guard(evt).

type HistoryDependent

type HistoryDependent interface {
	RequiresFullHistory() bool
}

HistoryDependent can be implemented by continuous projections that need the full event history (of the events that are configured in the Schedule) instead of just the events that triggered the continuous projection.

Example:

// A Product is a product of a specific shop.
type Product struct {
	*aggregate.Base

	ShopID uuid.UUID
	Name string
}

// SearchIndex projections the product catalog of a specific shop.
type SearchIndex struct {
	shopID      uuid.UUID
	products    []Product
	initialized bool
}

func (idx SearchIndex) ApplyEvent(event.Event) { ... }

// RequiresFullHistory implements projection.HistoryDependent. If the
// projection hasn't been run yet, the full history of the events is
// required for the projection.
func (idx SearchIndex) RequiresFullHistory() bool {
	return !idx.initialized
}

var repo agggregate.Repository
s := schedule.Continuously(bus, store, []string{"product.created"})
s.Subscribe(context.TODO(), func(ctx projection.Job) error {
	str, errs, err := ctx.Aggregates(ctx)

	done := make(map[uuid.UUID]bool) // SearchIndexes that have been projected

	return aggregate.WalkRefs(ctx, func(r aggregate.Ref) error {
		p := &Product{Base: aggregate.New("product", r.ID)}
		err := repo.Fetch(ctx, p)
		shopID := p.ShopID

		if done[shopID] {
			return nil
		}
		done[shopID] = true

		// Fetch the current SearchIndex for the given shop.
		// If it does not exist yet, create the SearchIndex.
		var idx *SearchIndex

		// if idx.initialized == false, ctx.Apply fetches all past events.
		if err := ctx.Apply(ctx, idx); err != nil {
			return err
		}

		// Set initialized to true (after the first run of the projection).
		idx.initialized = true

		return nil
	}, str, errs)
})

type Job

type Job interface {
	context.Context

	// Events fetches all Events that match the Job's Query and returns an Event
	// channel and a channel of asynchronous query errors.
	//
	//	var job Job
	//	str, errs, err := job.Events(job)
	//	// handle err
	//	events, err := event.Drain(job, str, errs)
	//
	// Optional Queries may be provided as filters for the fetched Events. If
	// filters are provided, the returned Event channel will only receive Events
	// that match all provided Queries:
	//
	//	var job Job
	//	str, errs, err := job.Events(job, query.New(query.Name("foo")), query.New(...))
	//	// handle err
	//	events, err := event.Drain(job, str, errs)
	//
	// If you need the Events for a specific Projection, use EventsFor instead.
	Events(_ context.Context, filters ...event.Query) (<-chan event.Event, <-chan error, error)

	// EventsOf fetches all Events that belong to aggregates that have one of
	// aggregateNames and returns an Event channel and a channel of asynchronous
	// query errors.
	//
	//	var job Job
	//	str, errs, err := job.EventsOf(job, "foo", "bar", "baz")
	//	// handle err
	//	events, err := event.Drain(job, str, errs)
	EventsOf(_ context.Context, aggregateNames ...string) (<-chan event.Event, <-chan error, error)

	// EventsFor fetches all Events that are appropriate for the given
	// Projection and returns an Event channel and a channel of asynchronous
	// query errors. Which Events are queried depends on the Projection: If the
	// Projection implements guard (or embeds Guard), the Guard's Query is added
	// as a filter when querying Events. If the Projection implements progressor
	// (or embeds *Progressor), the progress time of the Projection is used to
	// only query Events that happened after that time.
	//
	//	var job Job
	//	var proj projection.Projection
	//	str, errs, err := job.EventsFor(job, proj)
	//	// handle err
	//	events, err := event.Drain(job, str, errs)
	EventsFor(context.Context, EventApplier) (<-chan event.Event, <-chan error, error)

	// Aggregates returns a channel of aggregate Tuples and a channel of
	// asynchronous query errors. It fetches Events, extracts the Tuples from
	// those Events and pushes them into the returned Tuple channel. Every
	// unique Tuple is guarenteed to be received exactly once, even if there are
	// muliple Events that belong to the same aggregate.
	//
	// If aggregateNames are provided, they are used to query only Events that
	// belong to one of the given aggregates.
	//
	//	var job Job
	//	str, errs, err := job.Aggregates(job, "foo", "bar", "baz")
	//	// handle err
	//	events, err := event.Drain(job, str, errs)
	Aggregates(_ context.Context, aggregateNames ...string) (<-chan aggregate.Ref, <-chan error, error)

	// Aggregate returns the UUID of the first aggregate with the given
	// aggregateName that can be found in the Events of the Job, or
	// ErrAggregateNotFound if no Event belongs to an aggregate with that name.
	Aggregate(_ context.Context, aggregateName string) (uuid.UUID, error)

	// Apply applies the Job onto the Projection. A Job may be applied onto as
	// many Projections as needed.
	Apply(context.Context, EventApplier, ...ApplyOption) error
}

Job is a projection job. Jobs are typically created within Schedules and passed to subscribers of those Schedules.

func NewJob

func NewJob(ctx context.Context, store event.Store, q event.Query, opts ...JobOption) Job

NewJob returns a new projection Job. The Job uses the provided Query to fetch the Events from the Store.

type JobOption

type JobOption func(*job)

JobOption is a Job option.

func WithFilter

func WithFilter(queries ...event.Query) JobOption

WithFilter returns a JobOption that adds queries as filters to the Job. Fetched Events are matched against every Query and only returned in the result if they match all Queries.

func WithHistoryStore

func WithHistoryStore(store event.Store) JobOption

WithHistoryStore returns a JobOption that provides the projection job with an event store that is used to query events for projections that require the full event history to build the projection state.

func WithReset

func WithReset() JobOption

WithReset returns a JobOption that resets Projections before applying Events onto them. Resetting a Projection is done by first resetting the progress of the Projection (if it implements progressor). Then, if the Projection has a Reset method, that method is called to allow for custom reset logic.

type Progressing

type Progressing interface {
	// Progress returns the projection's progress as the Time of the last
	// applied event.
	Progress() time.Time

	// SetProgress sets the progress of the projection to the provided Time.
	SetProgress(time.Time)
}

Progressing makes projections track their projection progress.

Embed *Progressor into a projection type to implement this interface.

The current progress of a projection is the Time of the last applied event. A projection that provides its projection progress only receives events with a Time that is after the current progress Time.

type Progressor

type Progressor struct {
	LatestEventTime int64
}

Progressor can be embedded into a projection to implement the Progressing interface.

func (*Progressor) Progress

func (p *Progressor) Progress() time.Time

Progress returns the projection progress in terms of the time of the latest applied event. If p.LatestEventTime is 0, the zero Time is returned.

func (*Progressor) SetProgress

func (p *Progressor) SetProgress(t time.Time)

SetProgress sets the projection progress as the time of the latest applied event.

type QueryGuard

type QueryGuard query.Query

QueryGuard is a Guard that used an event query to determine the events that are allowed to be applied onto a projection.

func (QueryGuard) GuardProjection

func (g QueryGuard) GuardProjection(evt event.Event) bool

GuardProjection tests the Guard's Query against a given Event and returns whether the Event is allowed to be applied onto the projection.

type Resetter

type Resetter interface {
	// Reset should implement any custom logic to reset the state of a
	// projection besides resetting the progress (if the projection implements
	// Progressing).
	Reset()
}

A Resetter is a projection that can reset its state.

type Schedule

type Schedule interface {
	// Subscribe subscribes the provided function to the Schedule and returns a
	// channel of asynchronous projection errors. When the Schedule is
	// triggered, a Job is created and passed to subscribers of the Schedule.
	// Errors returned from subscribers are pushed into the returned error
	// channel.
	//
	//	var proj projection.Projection // created by yourself
	//	s := schedule.Continuously(bus, store, []string{"foo", "bar", "baz"})
	//	errs, err := s.Subscribe(context.TODO(), func(job projection.Job) error {
	//		return job.Apply(job, proj) // job.Apply applies the appropriate events onto the projection
	//	})
	//	// handle err
	//	for err := range errs {
	//		log.Printf("projection failed: %v\n", err)
	//	}
	Subscribe(context.Context, func(Job) error) (<-chan error, error)

	// Trigger manually triggers the Schedule immediately. A Job is created and
	// passed to every subscriber of the Schedule. Trigger does not wait for the
	// Job to be handled by the subscribers.
	//
	// Reset projections
	//
	// The created Job can be configured to reset Projections before applying
	// events onto them, effectively rebuilding the entire projection from the
	// beginning (first event):
	//
	//	var s projection.Schedule
	//	err := s.Trigger(context.TODO(), projection.Reset())
	//
	// When a Projection implements progressor (or embeds *Progressor), the
	// progress time of the Projection is set to 0.
	//
	// When a Projection has a `Reset()` method, that method is called to allow
	// for custom reset logic. Implementers of Projection should appropriately
	// reset the state of the Projection.
	//
	// Custom event query
	//
	// When a Job is created, it is passed an event query to fetch the events
	// for the Projections. By default, this query fetches the events configured
	// in the Schedule sorted by time. A custom query may be provided using the
	// Query option. Don't forget to configure correct sorting when providing a
	// custom query:
	//
	//	var s projection.Schedule
	//	err := s.Trigger(context.TODO(), projection.Query(query.New(
	//		query.AggregateName("foo", "bar"),
	//		query.SortBy(event.SortTime, event.SortAsc),
	//	)))
	//
	// Event filters
	//
	// Queried events can be further filtered using the Filter option. Filters
	// are applied in-memory, after the events have already been fetched from
	// the event store. When multiple filters are passed, events must match
	// against every filter to be applied to the Projections. Sorting options of
	// filters are ignored.
	//
	//	var s projection.Schedule
	//	err := s.Trigger(context.TODO(), projection.Filter(query.New(...), query.New(...)))
	Trigger(context.Context, ...TriggerOption) error
}

Schedule is a projection schedule.

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service is an event-driven projection service. A Service allows to trigger Schedules that are registered in Services that communicate over the same event bus.

func NewService

func NewService(bus event.Bus, opts ...ServiceOption) *Service

NewService returns a new Service.

var bus event.Bus
var fooSchedule projection.Schedule
var barSchedule projection.Schedule
svc := NewService(
	bus,
	projection.RegisterSchedule("foo", fooSchedule),
	projection.RegisterSchedule("bar", barSchedule),
)

errs, err := svc.Run(context.TODO())

func (*Service) Register

func (svc *Service) Register(name string, s Schedule)

Register registers a Schedule with the given name into the Service.

func (*Service) Run

func (svc *Service) Run(ctx context.Context) (<-chan error, error)

Run starts the Service is a new goroutine and returns a channel of asynchronous errors, or a single error if the event bus fails to subscribe. When another Service triggers a Schedule with a name that is registered in svc, svc accepts that trigger by publishing a TriggerAccepted event and then actually triggers the Schedule.

func (*Service) Trigger

func (svc *Service) Trigger(ctx context.Context, name string, opts ...TriggerOption) error

Trigger triggers the Schedule with the given name.

Trigger publishes a Triggered event over the event bus and waits for a TriggerAccepted event to be published by another Service. Should the TriggerAccepted event not be published within the trigger timeout, ErrUnhandledTrigger is returned. When ctx is canceled, ctx.Err() is returned.

type ServiceOption

type ServiceOption func(*Service)

ServiceOption is an option for creating a Service.

func RegisterSchedule

func RegisterSchedule(name string, s Schedule) ServiceOption

RegisterSchedule returns a ServiceOption that registers the Schedule s with the given name into a Service

func TriggerTimeout

func TriggerTimeout(d time.Duration) ServiceOption

TriggerTimeout returns a ServiceOption that overrides the default timeout for triggering a Schedule. Default is 5s. Zero Duration means no timeout.

type Trigger

type Trigger struct {
	// Reset projections before applying events.
	Reset bool

	// Override the Query that is used to query events from the event store.
	Query event.Query

	// Additional filters that are applied in-memory to the query result from
	// the event store.
	Filter []event.Query
}

A Trigger is used by Schedules to trigger a Job.

func NewTrigger

func NewTrigger(opts ...TriggerOption) Trigger

NewTrigger returns a Trigger.

func (Trigger) Options

func (t Trigger) Options() []TriggerOption

Options returns the TriggerOptions to build t.

type TriggerAcceptedData

type TriggerAcceptedData struct {
	TriggerID uuid.UUID
}

TriggerAcceptedData is the event data for accepting a trigger.

type TriggerOption

type TriggerOption func(*Trigger)

TriggerOption is a Trigger option.

func Filter

func Filter(queries ...event.Query) TriggerOption

Filter returns a TriggerOption that adds filters to a Trigger.

Queried events can be further filtered using the `Filter` option. Filters are applied in-memory, after the events have been fetched from the event store. When multiple filters are configured, events must match against every filter to be applied to projections. Sorting options of filters are ignored.

var s projection.Schedule
err := s.Trigger(context.TODO(), projection.Filter(query.New(...), query.New(...)))

func Query

func Query(q event.Query) TriggerOption

Query returns a TriggerOption that sets the Query of a Trigger.

When a Job is created by a Schedule, it is passed an event query to fetch the events for the projections. By default, this query fetches all events that are configured in the triggered Schedule, sorted by time. A custom query may be provided using the `Query` option. Don't forget to configure correct sorting when providing a custom query:

var s projection.Schedule
err := s.Trigger(context.TODO(), projection.Query(query.New(
	query.AggregateName("foo", "bar"),
	query.SortBy(event.SortTime, event.SortAsc), // to ensure correct sorting
)))

func Reset

func Reset(reset bool) TriggerOption

Reset returns a TriggerOption that resets projections before applying events onto them. Resetting a projection is done by first resetting the progress of the projection (if it implements progressor). Then, if the projection has a `Reset()` method, that method is called to allow for custom reset logic.

type TriggeredData

type TriggeredData struct {
	TriggerID uuid.UUID
	Trigger   Trigger
	Schedule  string
}

TriggeredData is the event data for triggering a Schedule.

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL