messaging

package
v0.10.12 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2024 License: MIT Imports: 18 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DefaultMsgLimit          = math.MaxInt16
	DefaultAttKey            = "payload"
	DefaultSchedule          = "@every 30m"
	DefaultDeadLetterDurtion = 8 * time.Hour //if messaging pending for more than this duration, will be put to dead letter
)
View Source
const SyncPageSize = 1000

Variables

View Source
var AbandonedChan chan any
View Source
var ConsumerName string
View Source
var DefaultGormToipc = "scm.gorm.saved"
View Source
var GormMessagingEnabled = true
View Source
var ResetTopics []string

Functions

func GetPayloadID added in v0.10.10

func GetPayloadID(payload any) (uint, bool)

try to get payload ID value as uint, return false if payload doesn't have ID field

func GetRegisted

func GetRegisted() []any

GetRegisted function is used to obtain a list of registered entities.

func PubEntitiesSince added in v0.10.7

func PubEntitiesSince(ctx context.Context, key string, since time.Time, to time.Time) error

func QueryEntities added in v0.10.7

func QueryEntities[T any](ctx context.Context, db *gorm.DB, logger *zap.Logger, since time.Time, to time.Time, index int, queryDeleted bool) ([]any, error)

func Reg

func Reg[T any](payload T)

should never pass any into this function

Types

type DefaultMessgingService

type DefaultMessgingService struct {
	Logger          *zap.Logger
	Client          *redis.Client
	PendingSchedule string
	Settings        map[string]int64 // settings for streaming limit settings. default 10000
}

func (*DefaultMessgingService) ProcessPendings

func (msg *DefaultMessgingService) ProcessPendings(ctx context.Context, topic, group string, processor Processor)

func (*DefaultMessgingService) Pub

func (msg *DefaultMessgingService) Pub(ctx context.Context, topic string, payload any) error

func (*DefaultMessgingService) Sub

func (msg *DefaultMessgingService) Sub(ctx context.Context, topic, group string, processor Processor) error

type GormAction

type GormAction string
const (
	GormActionSave   GormAction = "save"
	GormActionDelete GormAction = "delete"
)

type GormObjSyncService

type GormObjSyncService struct {
	MessageService MessagingService
	DB             *gorm.DB
	Logger         *zap.Logger
	Sharding       Sharding
}

func NewGormObjSyncService

func NewGormObjSyncService(ms MessagingService, logger *zap.Logger, db *gorm.DB) *GormObjSyncService

func (*GormObjSyncService) ReceiveGormObjectSaved

func (ss *GormObjSyncService) ReceiveGormObjectSaved(ctx context.Context, topic, consumer string, raw []byte) error

type GormPayload

type GormPayload struct {
	Key     string
	Action  GormAction
	Payload []byte
}

type MessagingService

type MessagingService interface {
	Pub(ctx context.Context, topic string, payload any) error
	Sub(ctx context.Context, topic, consumer string, processor Processor) error
}

MessagingService, default impl Redis streaming.

type MessagnePending added in v0.10.10

type MessagnePending struct {
	Topic    string
	Group    string
	Schedule string
	MaxBatch int
	Limit    int
}

type Processor

type Processor func(ctx context.Context, topic, consumer string, payload []byte) error

type QueryFn added in v0.10.7

type QueryFn func(ctx context.Context, db *gorm.DB, logger *zap.Logger, since time.Time, to time.Time, index int, queryDeleted bool) ([]any, error)

type Sharding

type Sharding func(tx *gorm.DB, key string, payload any) (tablename string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL