inngestgo

package module
v0.6.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 31, 2024 License: Apache-2.0 Imports: 26 Imported by: 3

README



A durable event-driven workflow engine SDK for Golang.
Read the documentation and get started in minutes.

GoDoc discord twitter


Inngest Go SDK

Inngest's Go SDK allows you to create event-driven, durable workflows in your existing API — without new infrastructure.

It's useful if you want to build reliable software without worrying about queues, events, subscribers, workers, or other complex primitives such as concurrency, parallelism, event batching, or distributed debounce. These are all built in.

Features

  • Type safe functions, durable workflows, and steps using generics
  • Event stream sampling built in
  • Declarative flow control (concurrency, prioritization, batching, debounce, rate limiting)
  • Zero-infrastructure. Inngest handles orchestration and calls your functions.

Examples

The following is the bare minimum setup for a fully distributed durable workflow server:

package main

import (
	"context"
	"fmt"
	"net/http"
	"time"

	"github.com/inngest/inngestgo"
	"github.com/inngest/inngestgo/step"
)

func main() {
	h := inngestgo.NewHandler("core", inngestgo.HandlerOpts{})
	f := inngestgo.CreateFunction(
		inngestgo.FunctionOpts{
			ID:   "account-created",
			Name: "Account creation flow",
		},
		// Run on every api/account.created event.
		inngestgo.EventTrigger("api/account.created", nil),
		AccountCreated,
	)
	h.Register(f)
	http.ListenAndServe(":8080", h)
}

// AccountCreated is a durable function which runs any time the "api/account.created"
// event is received by Inngest.
//
// It is invoked by Inngest, with each step being backed by Inngest's orchestrator.
// Function state is automatically managed, and persists across server restarts,
// cloud migrations, and language changes.
func AccountCreated(ctx context.Context, input inngestgo.Input[AccountCreatedEvent]) (any, error) {
	// Sleep for a second, minute, hour, week across server restarts.
	step.Sleep(ctx, "initial-delay", time.Second)

	// Run a step which emails the user.  This automatically retries on error.
	// This returns the fully typed result of the lambda.
	result := step.Run(ctx, "on-user-created", func(ctx context.Context) (bool, error) {
		// Run any code inside a step.
		result, err := emails.Send(emails.Opts{})
		return result, err
	})
	// `result` is  fully typed from the lambda
	_ = result

	// Sample from the event stream for new events.  The function will stop
	// running and automatially resume when a matching event is found, or if
	// the timeout is reached.
	fn, err := step.WaitForEvent[FunctionCreatedEvent](
		ctx,
		"wait-for-activity",
		step.WaitForEventOpts{
			Name:    "Wait for a function to be created",
			Event:   "api/function.created",
			Timeout: time.Hour * 72,
			// Match events where the user_id is the same in the async sampled event.
			If: inngestgo.StrPtr("event.data.user_id == async.data.user_id"),
		},
	)
	if err == step.ErrEventNotReceived {
		// A function wasn't created within 3 days.  Send a follow-up email.
		step.Run(ctx, "follow-up-email", func(ctx context.Context) (any, error) {
			// ...
			return true, nil
		})
		return nil, nil
	}

	// The event returned from `step.WaitForEvent` is fully typed.
	fmt.Println(fn.Data.FunctionID)

	return nil, nil
}

// AccountCreatedEvent represents the fully defined event received when an account is created.
//
// This is shorthand for defining a new Inngest-conforming struct:
//
//	type AccountCreatedEvent struct {
//		Name      string                  `json:"name"`
//		Data      AccountCreatedEventData `json:"data"`
//		User      any                     `json:"user"`
//		Timestamp int64                   `json:"ts,omitempty"`
//		Version   string                  `json:"v,omitempty"`
//	}
type AccountCreatedEvent inngestgo.GenericEvent[AccountCreatedEventData, any]
type AccountCreatedEventData struct {
	AccountID string
}

type FunctionCreatedEvent inngestgo.GenericEvent[FunctionCreatedEventData, any]
type FunctionCreatedEventData struct {
	FunctionID string
}

Documentation

Index

Constants

View Source
const (
	SDKLanguage = "go"
	SDKVersion  = "0.5.4"
)
View Source
const (
	// ExternalID is the field name used to reference the user's ID within your
	// systems.  This is _your_ UUID or ID for referencing the user, and allows
	// Inngest to match contacts to your users.
	ExternalID = "external_id"

	// Email is the field name used to reference the user's email.
	Email = "email"
)
View Source
const (
	HeaderKeyAuthorization = "Authorization"
	HeaderKeyContentType   = "Content-Type"
	HeaderKeyEnv           = "X-Inngest-Env"
	HeaderKeyNoRetry       = "X-Inngest-No-Retry"
	HeaderKeyRetryAfter    = "Retry-After"
	HeaderKeySDK           = "X-Inngest-SDK"
	HeaderKeySignature     = "X-Inngest-Signature"
	HeaderKeyUserAgent     = "User-Agent"
)

Variables

View Source
var (
	// DefaultHandler provides a default handler for registering and serving functions
	// globally.
	//
	// It's recommended to call SetOptions() to set configuration before serving
	// this in production environments;  this is set up for development and will
	// attempt to connect to the dev server.
	DefaultHandler Handler = NewHandler("Go app", HandlerOpts{})

	ErrTypeMismatch = fmt.Errorf("cannot invoke function with mismatched types")

	// DefaultMaxBodySize is the default maximum size read within a single incoming
	// invoke request (100MB).
	DefaultMaxBodySize = 1024 * 1024 * 100
)
View Source
var (
	ErrExpiredSignature = fmt.Errorf("expired signature")
	ErrInvalidSignature = fmt.Errorf("invalid signature")
	ErrInvalidTimestamp = fmt.Errorf("invalid timestamp")
)
View Source
var (
	HeaderValueSDK = fmt.Sprintf("%s:v%s", SDKLanguage, SDKVersion)
)
View Source
var NoRetryError = errors.NoRetryError

Re-export internal errors for users

View Source
var RetryAtError = errors.RetryAtError

Functions

func CronTrigger added in v0.5.0

func CronTrigger(cron string) inngest.Trigger

func DevServerURL added in v0.5.0

func DevServerURL() string

DevServerURL returns the URL for the Inngest dev server. This uses the INNGEST_DEV environment variable, or defaults to 'http://127.0.0.1:8288' if unset.

func EventTrigger added in v0.5.0

func EventTrigger(name string, expression *string) inngest.Trigger

func IntPtr added in v0.5.0

func IntPtr(i int) *int

func IsDev added in v0.5.0

func IsDev() bool

IsDev returns whether to use the dev server, by checking the presence of the INNGEST_DEV environment variable.

To use the dev server, set INNGEST_DEV to any non-empty value OR the URL of the development server, eg:

INNGEST_DEV=1
INNGEST_DEV=http://192.168.1.254:8288

func NowMillis added in v0.5.1

func NowMillis() int64

NowMillis returns a timestamp with millisecond precision used for the Event.Timestamp field.

func Register added in v0.5.0

func Register(funcs ...ServableFunction)

Register adds the given functions to the default handler for serving. You must register all functions with a handler prior to serving the handler for them to be enabled.

func Send

func Send(ctx context.Context, e any) (string, error)

Send uses the DefaultClient to send the given event.

func SendMany added in v0.5.0

func SendMany(ctx context.Context, e []any) ([]string, error)

SendMany uses the DefaultClient to send the given event batch.

func Serve added in v0.5.0

func Serve(w http.ResponseWriter, r *http.Request)

Serve serves all registered functions within the default handler.

func SetBasicRequestHeaders added in v0.5.2

func SetBasicRequestHeaders(req *http.Request)

func SetBasicResponseHeaders added in v0.5.2

func SetBasicResponseHeaders(w http.ResponseWriter)

func Sign added in v0.5.0

func Sign(ctx context.Context, at time.Time, key, body []byte) string

Sign signs a request body with the given key at the given timestamp.

func StrPtr added in v0.5.0

func StrPtr(i string) *string

func Timestamp

func Timestamp(t time.Time) int64

Timestamp converts a go time.Time into a timestamp with millisecond precision used for the Event.Timestamp field.

func ValidateSignature added in v0.5.0

func ValidateSignature(ctx context.Context, sig string, key, body []byte) (bool, error)

ValidateSignature ensures that the signature for the given body is signed with the given key within a given time period to prevent invalid requests or replay attacks.

Types

type Client

type Client interface {
	// Send sends the specific event to the ingest API.
	Send(ctx context.Context, evt any) (string, error)
	// Send sends a batch of events to the ingest API.
	SendMany(ctx context.Context, evt []any) ([]string, error)
}

Client represents a client used to send events to Inngest.

var (
	// DefaultClient represents the default, mutable, global client used
	// within the `Send` function provided by this package.
	//
	// You should initialize this within an init() function using `NewClient`
	// if you plan to use the `Send` function:
	//
	// 	func init() {
	// 		inngestgo.DefaultClient = inngestgo.NewClient(
	// 			"key",
	// 			inngestgo.WithHTTPClient(&http.Client{Timeout: 10 * time.Second}),
	// 		)
	// 	}
	//
	// If this client is not set, Send will return an error.
	DefaultClient Client
)

func NewClient

func NewClient(opts ClientOpts) Client

NewClient returns a concrete client initialized with the given ingest key, which can immediately send events to the ingest API.

type ClientOpts added in v0.5.0

type ClientOpts struct {
	// HTTPClient is the HTTP client used to send events.
	HTTPClient *http.Client
	// EventKey is your Inngest event key for sending events.  This defaults to the
	// `INNGEST_EVENT_KEY` environment variable if nil.
	EventKey *string
	// EventURL is the URL of the event API to send events to.  This defaults to
	// https://inn.gs if nil.
	EventURL *string
	// Env is the branch environment to deploy to.  If nil, this uses
	// os.Getenv("INNGEST_ENV").  This only deploys to branches if the
	// signing key is a branch signing key.
	Env *string
}

type Debounce added in v0.5.0

type Debounce struct {
	// Key is an optional expression to use for constraining the debounce to a given
	// value.
	Key string `json:"key,omitempty"`
	// Period is how long to listen for new events matching the optional key.  Any event
	// received during this period will reschedule the debounce to run after another period
	// interval.
	Period time.Duration `json:"period"`
	// Timeout specifies the optional max lifetime of a debounce, ensuring that functions
	// run after the given duration when a debounce is rescheduled indefinitely.
	Timeout *time.Duration `json:"timeout,omitempty"`
}

Debounce represents debounce configuration used when creating a new function within FunctionOpts

type Event

type Event struct {
	// ID is an optional event ID used for deduplication.
	ID *string `json:"id,omitempty"`

	// Name represents the name of the event.  We recommend the following
	// simple format: "noun.action".  For example, this may be "signup.new",
	// "payment.succeeded", "email.sent", "post.viewed".
	//
	// Name is required.
	Name string `json:"name"`

	// Data is a key-value map of data belonging to the event.  This should
	// include all relevant data.  For example, a "signup.new" event may include
	// the user's email, their plan information, the signup method, etc.
	Data map[string]any `json:"data"`

	// User is a key-value map of data belonging to the user that authored the
	// event.  This data will be upserted into the contact store.
	//
	// We match the user via one of two fields: "external_id" and "email", defined
	// as consts within this package.
	//
	// If these fields are present in this map the attributes specified here
	// will be updated within Inngest, and the event will be attributed to
	// this contact.
	User any `json:"user,omitempty"`

	// Timestamp is the time the event occured at *millisecond* (not nanosecond)
	// precision.  This defaults to the time the event is received if left blank.
	//
	// Inngest does not guarantee that events are processed within the
	// order specified by this field.  However, we do guarantee that user data
	// is stored correctly according to this timestamp.  For example,  if there
	// two events set the same user attribute, the event with the latest timestamp
	// is guaranteed to set the user attributes correctly.
	Timestamp int64 `json:"ts,omitempty"`

	// Version represents the event's version.  Versions can be used to denote
	// when the structure of an event changes over time.
	//
	// Versions typically change when the keys in `Data` change, allowing you to
	// keep the same event name (eg. "signup.new") as fields change within data
	// over time.
	//
	// We recommend the versioning scheme "YYYY-MM-DD.XX", where .XX increments:
	// "2021-03-19.01".
	Version string `json:"v,omitempty"`
}

func (Event) Map added in v0.5.0

func (e Event) Map() map[string]any

func (Event) Validate

func (e Event) Validate() error

Validate returns an error if the event is not well formed

type FunctionOpts added in v0.5.0

type FunctionOpts struct {
	// ID is an optional function ID.  If not specified, the ID
	// will be auto-generated by lowercasing and slugging the name.
	ID string
	// Name represents a human-readable function name.
	Name string

	Priority    *inngest.Priority
	Concurrency []inngest.Concurrency
	Idempotency *string
	Retries     *int
	Cancel      []inngest.Cancel
	Debounce    *Debounce

	// RateLimit allows the function to be rate limited.
	RateLimit   *RateLimit
	BatchEvents *inngest.EventBatchConfig
}

func (FunctionOpts) GetRateLimit added in v0.5.0

func (f FunctionOpts) GetRateLimit() *inngest.RateLimit

GetRateLimit returns the inngest.RateLimit for function configuration. The SDK's RateLimit type is incompatible with the inngest.RateLimit type signature for ease of definition.

type GenericEvent added in v0.5.0

type GenericEvent[DATA any, USER any] struct {
	// ID is an optional event ID used for deduplication.
	ID *string `json:"id,omitempty"`
	// Name represents the name of the event.  We recommend the following
	// simple format: "noun.action".  For example, this may be "signup.new",
	// "payment.succeeded", "email.sent", "post.viewed".
	//
	// Name is required.
	Name string `json:"name"`

	// Data is a struct or key-value map of data belonging to the event.  This should
	// include all relevant data.  For example, a "signup.new" event may include
	// the user's email, their plan information, the signup method, etc.
	Data DATA `json:"data"`

	// User is a struct or key-value map of data belonging to the user that authored the
	// event.  This data will be upserted into the contact store.
	//
	// We match the user via one of two fields: "external_id" and "email", defined
	// as consts within this package.
	//
	// If these fields are present in this map the attributes specified here
	// will be updated within Inngest, and the event will be attributed to
	// this contact.
	User USER `json:"user,omitempty"`

	// Timestamp is the time the event occured at *millisecond* (not nanosecond)
	// precision.  This defaults to the time the event is received if left blank.
	//
	// Inngest does not guarantee that events are processed within the
	// order specified by this field.  However, we do guarantee that user data
	// is stored correctly according to this timestamp.  For example,  if there
	// two events set the same user attribute, the event with the latest timestamp
	// is guaranteed to set the user attributes correctly.
	Timestamp int64 `json:"ts,omitempty"`

	// Version represents the event's version.  Versions can be used to denote
	// when the structure of an event changes over time.
	//
	// Versions typically change when the keys in `Data` change, allowing you to
	// keep the same event name (eg. "signup.new") as fields change within data
	// over time.
	//
	// We recommend the versioning scheme "YYYY-MM-DD.XX", where .XX increments:
	// "2021-03-19.01".
	Version string `json:"v,omitempty"`
}

GenericEvent represents a single event generated from your system to be sent to Inngest.

func (GenericEvent[D, U]) Event added in v0.5.0

func (ge GenericEvent[D, U]) Event() Event

Event() turns the GenericEvent into a normal Event.

NOTE: This is a naive inefficient implementation and should not be used in performance constrained systems.

func (GenericEvent[D, U]) Validate added in v0.5.0

func (ge GenericEvent[D, U]) Validate() error

type Handler added in v0.5.0

type Handler interface {
	http.Handler

	// SetAppName updates the handler's app name.  This is used to group functions
	// and track deploys within the UI.
	SetAppName(name string) Handler

	// SetOptions sets the handler's options used to register functions.
	SetOptions(h HandlerOpts) Handler

	// Register registers the given functions with the handler, allowing them to
	// be invoked by Inngest.
	Register(...ServableFunction)
}

Handler represents a handler which serves the Inngest API via HTTP. This provides function registration to Inngest, plus the invocation of registered functions via an HTTP POST.

func NewHandler added in v0.5.0

func NewHandler(appName string, opts HandlerOpts) Handler

NewHandler returns a new Handler for serving Inngest functions.

type HandlerOpts added in v0.5.0

type HandlerOpts struct {
	// Logger is the structured logger to use from Go's builtin structured
	// logging package.
	Logger *slog.Logger

	// SigningKey is the signing key for your app.  If nil, this defaults
	// to os.Getenv("INNGEST_SIGNING_KEY").
	SigningKey *string

	// Env is the branch environment to deploy to.  If nil, this uses
	// os.Getenv("INNGEST_ENV").  This only deploys to branches if the
	// signing key is a branch signing key.
	Env *string

	// RegisterURL is the URL to use when registering functions.  If nil
	// this defaults to Inngest's API.
	//
	// This only needs to be set when self hosting.
	RegisterURL *string

	// MaxBodySize is the max body size to read for incoming invoke requests
	MaxBodySize int

	// URL that the function is served at.  If not supplied this is taken from
	// the incoming request's data.
	URL *url.URL

	// UseStreaming enables streaming - continued writes to the HTTP writer.  This
	// differs from true streaming in that we don't support server-sent events.
	UseStreaming bool
}

func (HandlerOpts) GetEnv added in v0.5.0

func (h HandlerOpts) GetEnv() string

GetEnv returns the env defined within HandlerOpts, or the default defined within INNGEST_ENV.

This is the environment name used for preview/branch environments within Inngest.

func (HandlerOpts) GetRegisterURL added in v0.5.0

func (h HandlerOpts) GetRegisterURL() string

GetRegisterURL returns the registration URL defined wtihin HandlerOpts, defaulting to the production Inngest URL if nil.

func (HandlerOpts) GetSigningKey added in v0.5.0

func (h HandlerOpts) GetSigningKey() string

GetSigningKey returns the signing key defined within HandlerOpts, or the default defined within INNGEST_SIGNING_KEY.

This is the private key used to register functions and communicate with the private API.

type Input added in v0.5.0

type Input[T any] struct {
	Event    T        `json:"event"`
	Events   []T      `json:"events"`
	InputCtx InputCtx `json:"ctx"`
}

Input is the input data passed to your function. It is comprised of the triggering event and call context.

type InputCtx added in v0.5.0

type InputCtx struct {
	Env        string `json:"env"`
	FunctionID string `json:"fn_id"`
	RunID      string `json:"run_id"`
	StepID     string `json:"step_id"`
	Attempt    int    `json:"attempt"`
}

type RateLimit added in v0.5.0

type RateLimit struct {
	// Limit is how often the function can be called within the specified period
	Limit uint `json:"limit"`
	// Period represents the time period for throttling the function
	Period time.Duration `json:"period"`
	// Key is an optional string to constrain throttling using event data.  For
	// example, if you want to throttle incoming notifications based off of a user's
	// ID in an event you can use the following key: "{{ event.user.id }}".  This ensures
	// that we throttle functions for each user independently.
	Key *string `json:"key,omitempty"`
}

func (RateLimit) Convert added in v0.5.0

func (r RateLimit) Convert() *inngest.RateLimit

Convert converts a RateLimit to an inngest.RateLimit

type SDKFunction added in v0.5.0

type SDKFunction[T any] func(ctx context.Context, input Input[T]) (any, error)

SDKFunction represents a user-defined function to be called based off of events or on a schedule.

The function is registered with the SDK by calling `CreateFunction` with the function name, the trigger, the event type for marshalling, and any options.

This uses generics to strongly type input events:

func(ctx context.Context, input gosdk.Input[SignupEvent]) (any, error) {
	// .. Your logic here.  input.Event will be strongly typed as a SignupEvent.
}

type ServableFunction added in v0.5.0

type ServableFunction interface {
	// Slug returns the function's human-readable ID, such as "sign-up-flow".
	Slug() string

	// Name returns the function name.
	Name() string

	Config() FunctionOpts

	// Trigger returns the event name or schedule that triggers the function.
	Trigger() inngest.Trigger

	// ZeroEvent returns the zero event type to marshal the event into, given an
	// event name.
	ZeroEvent() any

	// Func returns the SDK function to call.  This must alawys be of type SDKFunction,
	// but has an any type as we register many functions of different types into a
	// type-agnostic handler; this is a generic implementation detail, unfortunately.
	Func() any
}

ServableFunction defines a function which can be called by a handler's Serve method.

This is created via CreateFunction in this package.

func CreateFunction added in v0.5.0

func CreateFunction[T any](
	fc FunctionOpts,
	trigger inngest.Trigger,
	f SDKFunction[T],
) ServableFunction

CreateFunction creates a new function which can be registered within a handler.

This function uses generics, allowing you to supply the event that triggers the function. For example, if you have a signup event defined as a struct you can use this to strongly type your input:

type SignupEvent struct {
	Name string
	Data struct {
		Email     string
		AccountID string
	}
}

f := CreateFunction(
	inngestgo.FunctionOptions{Name: "Post-signup flow"},
	inngestgo.EventTrigger("user/signed.up"),
	func(ctx context.Context, input gosdk.Input[SignupEvent]) (any, error) {
		// .. Your logic here.  input.Event will be strongly typed as a SignupEvent.
		// step.Run(ctx, "Do some logic", func(ctx context.Context) (string, error) { return "hi", nil })
	},
)

type StepError added in v0.6.0

type StepError = errors.StepError

type StreamResponse added in v0.5.0

type StreamResponse struct {
	StatusCode int               `json:"status"`
	Body       any               `json:"body"`
	RetryAt    *time.Time        `json:"retryAt"`
	NoRetry    bool              `json:"noRetry"`
	Headers    map[string]string `json:"headers"`
}

Directories

Path Synopsis
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL