Event Driver

Event Driver is a lightweight and flexible event-driven programming framework for managing and handling events in your applications. It provides a simple and intuitive API to facilitate communication between different components or modules in your software.
Table of Contents
- Features
- Tutorial
- Steps Break Down
- Example Code
- Extensions
- Cloud Events
- Google Cloud
- Event-driven Architecture: Easily implement an event-driven architecture in your application.
- Custom Handlers: Define and dispatch custom handlers tailored to your application's needs.
- Pipeline Structure: Simply put your handlers in order and expect it to work like a pipeline.
- Asynchronous Support: Handle events asynchronously for improved performance and responsiveness.
- Lightweight and Easy to Use: Minimal dependencies and quick integration & usage.
This tutorial is in the format of a case study of a real-world service.
There's an event-driven service that processes orders when event1
but the processing logic only uses data from event2
and event3
This service needs to
- be fault-tolerant (shouldn't crash at errors, and should be able to recover fast when crash does happen)
- be idempotent (not processing duplicate orders)
Steps Break Down
Let's start with joining the events. Create a joiner that makes a join when all required events are present.
myJoiner := joiner.New(joiner.MatchAll("event1", "event2", "event3"), myEventStore)
The event joiner that we just created requires an event store for lookups.
Here we pick GCS rather than the in-memory store to avoid losing data at restart.
gcsTimeout := time.Second * 5 // tune the timeout that fits your content size
gcsConfig := gcs_event_store.Config("my-bucket").
WithTimeout(gcs_event_store.Timeout{Default: &gcsTimeout}) // can also specify finer timeout for each operation
// Create a GCS event store without authentication just for showcase.
myEventStore, err := gcs_event_store.New(ctx, gcsConfig, option.WithoutAuthentication())
Create a cache for idempotency.
The cache stores the events under the same GCS bucket as joiner (beware of source name conflict between them).
The cache achieves idempotency by skipping the process (i.e. not passing result to the next handler) on key conflict.
idempotencyHandler := cache.New(myEventStore, cache.SkipOnConflict())
It would also be great if we can just ignore the content from event1
to save
both GCS storage and the network bandwidth & memory of service pod.
eraseContentOfEvent1 := transformer.EraseContentFromSources("event1")
eraseContentOfEvent1Handler := transformer.New(eraseContentOfEvent1)
Now we need to build a customized handler to handle the business logic.
package business_handler
import (
type MyInput struct {
event2 Event2Type `json:"event2"`
event3 Event3Type `json:"event3"`
type myHandler struct {
config Config
func New(config Config) handlers.Handler {
return &myHandler{
config: config,
func (c *myHandler) Process(ctx context.Context, in *event.Message, next handlers.CallNext) error {
var myInput MyInput
if err := json.UnMarshal(in.GetContent(), &myInput); err != nil {
return err
if err = handlesBusinessLogic(myInput, c.config); err != nil {
return err
return next.Call(ctx, in) // can omit if this is already the last handler
Build a pipeline with the handlers
myPipeline := pipeline.New().
WithNextHandler(eraseContentOfEvent1Handler). // remove the content of event1 before joining
WithNextHandler(myJoiner). // join all events of the same key into a single message
WithNextHandler(idempotencyHandler). // check for idempotency after a joint message is formed
WithNextHandler(businessHandler) // handles the business logic
Start serving traffic!
// Showcasing converting the pipeline into KNative cloud events handler
// One can also use sarama/confluentic kafka, Google Cloud function, etc.
handleKNativeEvent := convert.ToKNativeEventHandler(
cloudEventClient.StartReceiver(ctx, handleKNativeEvent) // assume cloudEventClient is already created
Example Code
Now assemble the step break-downs above to an example code
package main
import (
func main() {
ctx := context.Background()
eraseContentOfEvent1 := transformer.EraseContentFromSources("event1")
eraseContentOfEvent1Handler := transformer.New(eraseContentOfEvent1)
myEventStore, err := createEventStore(ctx)
if err != nil {
log.Panic("failed to create GCS event store", err)
myJoiner := joiner.New(joiner.MatchAll("event1", "event2", "event3"), myEventStore)
idempotencyHandler := cache.New(myEventStore, cache.SkipOnConflict())
businessHandler := business_handler.New()
myPipeline := pipeline.New().
WithNextHandler(eraseContentOfEvent1Handler). // remove the content of event1 before joining
WithNextHandler(myJoiner). // join all events of the same key into a single message
WithNextHandler(idempotencyHandler). // check for idempotency after a joint message is formed
WithNextHandler(businessHandler) // handles the business logic
// Showcasing converting the pipeline into KNative cloud events handler
// One can also use sarama/confluentic kafka, Google Cloud function, etc.
handleKNativeEvent := convert.ToKNativeEventHandler(
cloudEventClient.StartReceiver(ctx, handleKNativeEvent) // assume cloudEventClient is already created
func createEventStore(ctx context.Context) (storage.EventStore, error) {
gcsTimeout := time.Second * 5 // tune the timeout that fits your content size
gcsConfig := gcs_event_store.Config("my-bucket").
WithTimeout(gcs_event_store.Timeout{Default: &gcsTimeout}) // can also specify finer timeout for each operation
// Create a GCS event store without authentication just for showcase.
return gcs_event_store.New(ctx, gcsConfig, option.WithoutAuthentication())
Event Driver also provides the following libs for integrating with other services/frameworks as extensions.
Cloud Events
Link: github.com/honestbank/event-driver/extensions/cloudevents
Integrate event driver with Cloud Events,
i.e. providing a converter that converts the event driver pipeline into a cloud events handler.
Check the document
to see what is currently supported and the latest update.
Google Cloud
Link: github.com/honestbank/event-driver/extensions/google-cloud
Integrate event driver with Google Cloud, including using GCS/BigQuery as event store,
integrating the event driver pipeline with Cloud Functions, etc.
Check the document
to see what is currently supported and the latest update.