wego

package module
v0.0.0-...-c37c174 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 20, 2026 License: MIT Imports: 26 Imported by: 0

README

Project Overview

This is a microservices project based on go-micro, It is suitable for lightweight mobile apps or small game backends.

Tech Stack

Framework

  • API gateway / HTTP handler: Gin
  • Microservices framework: go-micro

Drivers

  • Database: pgx (PostgreSQL driver)
  • Cache: go-redis (Redis client)

Middleware & Components

  • Cache: Redis
  • Database: PostgreSQL
  • Service discovery & configuration: etcd

Reminder MVP

The framework now includes a minimal reminder worker built on PostgreSQL and the Mesa lifecycle.

Included capabilities

  • Store reminder records in PostgreSQL
  • Poll due reminders in the background
  • Deliver reminders through a user-supplied notifier
  • Retry failed deliveries with backoff
  • Recover stale processing reminders after restart
  • Cancel or reschedule reminders by key

Basic usage

mesa := wego.New(
	wego.WithDSN(dsn),
)

svc := reminder.NewService(
	mesa.DB,
	reminder.NotifierFunc(func(ctx context.Context, r *reminder.Reminder) error {
		// send email, push, webhook, etc.
		return nil
	}),
)

if err := mesa.RegisterComponent(svc); err != nil {
	panic(err)
}

reminderSvc, ok := wego.GetComponentAs[*reminder.Service](mesa, svc.Name())
if !ok {
	panic("reminder component not found")
}

_, err := reminderSvc.Create(context.Background(), reminder.CreateParams{
	Key:        "order-123-pay-deadline",
	UserID:     "42",
	Channel:    "in_app",
	Payload:    `{"title":"Payment due soon"}`,
	ScheduleAt: time.Now().Add(10 * time.Minute),
})

Dispatcher notifier

When reminder scenes grow, you can register handlers by channel, by payload type, or by an exact channel + type route.

dispatcher := reminder.NewDispatcherNotifier()

_ = dispatcher.RegisterChannel("in_app", reminder.NotifierFunc(func(ctx context.Context, r *reminder.Reminder) error {
	// generic in-app delivery
	return nil
}))

_ = dispatcher.RegisterType("payment_due", reminder.NotifierFunc(func(ctx context.Context, r *reminder.Reminder) error {
	// shared payment_due template logic
	return nil
}))

_ = dispatcher.Register("sms", "payment_due", reminder.NotifierFunc(func(ctx context.Context, r *reminder.Reminder) error {
	// exact route for sms + payment_due
	return nil
}))

_ = dispatcher.SetFallback(reminder.NotifierFunc(func(ctx context.Context, r *reminder.Reminder) error {
	// last-resort handler
	return nil
}))

mesa := wego.New(
	wego.WithDSN(dsn),
)

svc := reminder.NewService(mesa.DB, dispatcher)

if err := mesa.RegisterComponent(svc); err != nil {
	panic(err)
}

component, ok := mesa.GetComponent(svc.Name())
if !ok {
	panic("component not found")
}

reminderSvc := component.(*reminder.Service)

_, _ = reminderSvc.Create(ctx, reminder.CreateParams{
	Key:        "order-123-pay-deadline",
	UserID:     "42",
	Channel:    "sms",
	Payload:    `{"type":"payment_due","title":"Payment due soon"}`,
	ScheduleAt: time.Now().Add(10 * time.Minute),
})

Dispatch priority is:

  • exact channel + type
  • payload type
  • channel
  • fallback

Component registry

Mesa now acts as a generic component registry.

if err := mesa.RegisterComponent(svc); err != nil {
	panic(err)
}

component, ok := mesa.GetComponent("reminder")
if !ok {
	panic("component not found")
}

reminderSvc, ok := wego.GetComponentAs[*reminder.Service](mesa, "reminder")
if !ok {
	panic("unexpected component type")
}

_ = component
_ = reminderSvc

Default component configuration

WithComponents(...) lets Mesa auto-register built-in components in one place.

mesa := wego.New(
	wego.WithDSN(dsn),
	wego.WithRedisConfig(wego.RedisConfig{
		Addr: "127.0.0.1:6379",
	}),
	wego.WithComponents(wego.ComponentsConfig{
		Reminder: wego.ReminderComponentConfig{
			Enabled:  true,
			Notifier: reminder.NotifierFunc(func(ctx context.Context, r *reminder.Reminder) error {
				return nil
			}),
			PollInterval: 2 * time.Second,
		},
		PubSub: wego.PubSubComponentConfig{
			Enabled:      true,
			StreamPrefix: "wego:events",
		},
	}),
)

reminderSvc := mesa.MustGetReminder()
pubsubSvc := mesa.MustGetPubSub()

_ = reminderSvc
_ = pubsubSvc

Pub/Sub component

pubsub provides a Redis Streams based component for cross-instance publish and subscribe with consumer groups.

Included capabilities

  • Publish messages to a topic backed by Redis Streams
  • Register subscriptions before Mesa runtime starts
  • Competing consumers within one consumer group
  • Fan-out delivery through different consumer groups
  • Retry failed deliveries with configurable backoff
  • Ack on success and dead-letter on max delivery attempts

Basic usage

mesa := wego.New(
	wego.WithDSN(dsn),
	wego.WithRedisConfig(wego.RedisConfig{
		Addr: "127.0.0.1:6379",
	}),
	wego.WithComponents(wego.ComponentsConfig{
		PubSub: wego.PubSubComponentConfig{
			Enabled: true,
		},
	}),
)

pubsubSvc := mesa.MustGetPubSub()

pubsubSvc.MustSubscribe(pubsub.Subscription{
	Topic: "order.created",
	Group: "billing",
	Handler: pubsub.HandlerFunc(func(ctx context.Context, delivery *pubsub.Delivery) error {
		fmt.Println("received:", delivery.Message.Type, string(delivery.Message.Data))
		return nil
	}),
})

_, err := pubsubSvc.Publish(context.Background(), "order.created", pubsub.Message{
	Key:  "order-123",
	Type: "order.created",
	Data: []byte(`{"id":"123"}`),
})
if err != nil {
	panic(err)
}

Runtime rules are:

  • subscriptions must be registered before mesa.Run()
  • handlers must be idempotent because delivery is at least once
  • one topic + group maps to one handler within a process
  • use different consumer groups when the same topic needs fan-out
  • handler failures are re-published through an internal retry queue using the configured backoff policy
  • auto-registration only happens when wego.WithComponents(...) or wego.WithPubSub(...) is provided and Redis is configured

Pub/Sub can also be configured with framework-level fields such as Name, RetryPolicy, StreamPrefix, StreamMaxLen, StreamBlock, StreamReadCount, and dead-letter options through wego.PubSubComponentConfig.

Pub/Sub integration tests

The Redis-backed integration tests for pubsub are opt-in and use the integration build tag.

Required environment variables:

  • WEGO_REDIS_TEST_ADDR: Redis address such as 127.0.0.1:6379

Optional environment variables:

  • WEGO_REDIS_TEST_PASSWORD: Redis password
  • WEGO_REDIS_TEST_DB: Redis database index, defaults to 15

Run the default unit tests:

go test ./pubsub

Run the Redis integration tests:

WEGO_REDIS_TEST_ADDR=127.0.0.1:6379 go test -tags=integration ./pubsub

Run the full repository test suite without integration tests:

go test ./...

If WEGO_REDIS_TEST_ADDR is not set or Redis is unreachable, the integration tests are skipped.

TCP transport

transport/tcp provides a generic TCP server implementation that matches the existing transport.Server lifecycle.

tcpServer := tcp.NewTCPServer(
	tcp.WithHost(net.ParseIP("0.0.0.0")),
	tcp.WithPort(9000),
	tcp.WithHandler(func(ctx context.Context, conn net.Conn) {
		defer conn.Close()

		buf := make([]byte, 1024)
		n, err := conn.Read(buf)
		if err != nil {
			return
		}

		_, _ = conn.Write(buf[:n])
	}),
)

mesa := wego.New(
	wego.WithDSN(dsn),
	wego.WithServers(tcpServer),
)

Installation (main)

go get github.com/Jinchenyuan/wego@main

Usage

Import packages to your .go files.

"github.com/Jinchenyuan/wego"
"github.com/Jinchenyuan/wego/logger"
"github.com/Jinchenyuan/wego/middleware"
"github.com/Jinchenyuan/wego/transport"
"github.com/Jinchenyuan/wego/transport/micro"

Run a mesa instance with your configuration.

// read config from file
cfg, err := config.Read("config.toml")
if err != nil {
	fmt.Printf("failed to read config: %v\n", err)
	return
}
m := wego.New(
	wego.WithEtcdConfig(clientv3.Config{
		Endpoints:   cfg.Etcd.Endpoints,
		DialTimeout: 5 * time.Second,
		Username:    cfg.Etcd.User,
		Password:    cfg.Etcd.Password,
	}),
	wego.WithHttpPort(cfg.Http.Port),
	wego.WithDSN(cfg.PostgreSQL.DSN),
	wego.WithLogLevel(logger.ParseLevel(cfg.Log.Level)),
	wego.WithRedisConfig(wego.RedisConfig{
		Addr:     cfg.Redis.Addr,
		Password: cfg.Redis.Password,
		DB:       cfg.Redis.DB,
	}),
	wego.WithProfile(wego.Profile{
		Name: cfg.Profile.Name,
	}),
)

ginhandler.SetAuthMiddleware(middleware.AuthMiddleware("account", func(id string) string {
	cacheToken, err := m.Redis.Get(context.Background(), fmt.Sprintf("token:%s", id)).Result()
	if err != nil {
		return ""
	}
	return cacheToken
}, cfg.Http.ExcludeAuthPaths...))

ginhandler.Registry()

ms := m.GetServerByType(transport.MICRO_SERVER).(*micro.Service)
ms.NewServiceClients(serviceclient.Registry)

if err := m.Run(); err != nil {
	fmt.Printf("failed to run mesa: %v\n", err)
}

example project see: https://github.com/Jinchenyuan/weserver

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrComponentExists = errors.New("component already registered")
View Source
var ErrComponentNotFound = errors.New("component not found")
View Source
var ErrReminderDBNotConfigured = errors.New("reminder db is not configured")
View Source
var ErrReminderNotifierNotConfigured = errors.New("reminder notifier is not configured")
View Source
var ErrRuntimeStarted = errors.New("mesa runtime already started")

Functions

func GetComponentAs

func GetComponentAs[T any](m *Mesa, name string) (T, bool)

func GetGlobalLogger

func GetGlobalLogger() *logger.Logger

func MustGetComponentAs

func MustGetComponentAs[T any](m *Mesa, name string) T

func SetGlobalLogger

func SetGlobalLogger(l *logger.Logger)

func SetGlobalMesa

func SetGlobalMesa(m *Mesa)

Types

type Component

type Component interface {
	Start(context.Context) error
	Name() string
}

type ComponentsConfig

type ComponentsConfig struct {
	PubSub   PubSubComponentConfig
	Reminder ReminderComponentConfig
}

type Mesa

type Mesa struct {
	DB    *bun.DB
	Redis *redis.Client
	// contains filtered or unexported fields
}

func GetGlobalMesa

func GetGlobalMesa() *Mesa

func New

func New(opts ...Options) *Mesa

func (*Mesa) Components

func (m *Mesa) Components() []Component

func (*Mesa) GetComponent

func (m *Mesa) GetComponent(name string) (Component, bool)

func (*Mesa) GetPubSub

func (m *Mesa) GetPubSub() (*pubsub.Service, bool)

func (*Mesa) GetReminder

func (m *Mesa) GetReminder() (*reminder.Service, bool)

func (*Mesa) GetServerByType

func (m *Mesa) GetServerByType(typ transport.NetType) transport.Server

func (*Mesa) MustGetComponent

func (m *Mesa) MustGetComponent(name string) Component

func (*Mesa) MustGetPubSub

func (m *Mesa) MustGetPubSub() *pubsub.Service

func (*Mesa) MustGetReminder

func (m *Mesa) MustGetReminder() *reminder.Service

func (*Mesa) RegisterComponent

func (m *Mesa) RegisterComponent(components ...Component) error

func (*Mesa) Run

func (m *Mesa) Run() error

type Options

type Options func(o *options)

func WithComponents

func WithComponents(cfg ComponentsConfig) Options

func WithDSN

func WithDSN(dsn string) Options

func WithEtcdConfig

func WithEtcdConfig(config clientv3.Config) Options

func WithHttpPort

func WithHttpPort(port int) Options

func WithLogLevel

func WithLogLevel(level logger.Level) Options

func WithProfile

func WithProfile(p Profile) Options

func WithPubSub

func WithPubSub(opts ...pubsub.Option) Options

func WithRedisConfig

func WithRedisConfig(cfg RedisConfig) Options

func WithReminder

func WithReminder(notifier reminder.Notifier, opts ...reminder.Option) Options

func WithServers

func WithServers(servers ...transport.Server) Options

func WithServiceScheme

func WithServiceScheme(scheme micro.ServiceScheme) Options

type Profile

type Profile struct {
	Name string
}

type PubSubComponentConfig

type PubSubComponentConfig struct {
	Enabled            bool
	Name               string
	RetryPolicy        *pubsub.RetryPolicy
	StreamPrefix       string
	StreamMaxLen       int64
	StreamApproxMaxLen *bool
	StreamBlock        time.Duration
	StreamReadCount    int64
	DeadLetterEnabled  *bool
	DeadLetterSuffix   string
	Logger             *logger.Logger
	Options            []pubsub.Option
}

type RedisConfig

type RedisConfig struct {
	Addr     string
	Password string
	DB       int
}

type ReminderComponentConfig

type ReminderComponentConfig struct {
	Enabled         bool
	Notifier        reminder.Notifier
	Name            string
	PollInterval    time.Duration
	BatchSize       int
	ProcessingTTL   time.Duration
	AutoCreateTable *bool
	RetryDelays     []time.Duration
	Logger          *logger.Logger
	Options         []reminder.Option
}

Directories

Path Synopsis
third_party
postgres command
tcp

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL