messaging

package
v0.10.18 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 9, 2025 License: MIT Imports: 18 Imported by: 1

Documentation

Index

Constants

View Source
const (
	DefaultMsgLimit          = math.MaxInt16
	DefaultAttKey            = "payload"
	DefaultSchedule          = "@every 30m"
	DefaultDeadLetterDurtion = 8 * time.Hour //if messaging pending for more than this duration, will be put to dead letter
)

Variables

View Source
var (
	AbandonedChan       chan any
	GormCallbackEnabled = true
)
View Source
var (
	DefaultGormToipc     = "scm.gorm.saved"
	GormMessagingEnabled = true
)
View Source
var ConsumerName string
View Source
var ResetTopics []string
View Source
var SyncPageSize = 1000

Functions

func GetMapping added in v0.10.15

func GetMapping() map[string]reflect.Type

func GetPayloadID added in v0.10.10

func GetPayloadID(payload any) (uint, bool)

try to get payload ID value as uint, return false if payload doesn't have ID field

func GetRegisted

func GetRegisted() []any

GetRegisted function is used to obtain a list of registered entities.

func PubEntitiesSince added in v0.10.7

func PubEntitiesSince(ctx context.Context, keys []string, since time.Time, to time.Time) error

func QueryEntities added in v0.10.7

func QueryEntities[T any](ctx context.Context, db *gorm.DB, logger *zap.Logger, since time.Time, to time.Time, index int, queryDeleted bool) ([]any, error)

func Reg

func Reg[T any](payload T)

should never pass any into this function

Types

type DefaultMessgingService

type DefaultMessgingService struct {
	Logger          *zap.Logger
	Client          *redis.Client
	PendingSchedule string
	Settings        map[string]int64 // settings for streaming limit settings. default 10000
}

func (*DefaultMessgingService) ProcessPendings

func (msg *DefaultMessgingService) ProcessPendings(ctx context.Context, topic, group string, processor Processor)

func (*DefaultMessgingService) Pub

func (msg *DefaultMessgingService) Pub(ctx context.Context, topic string, payload any) error

func (*DefaultMessgingService) Sub

func (msg *DefaultMessgingService) Sub(ctx context.Context, topic, group string, processor Processor) error

type DispathFn added in v0.10.14

type DispathFn func(ctx context.Context, topic, consumer string, kp *GormPayload, payload any, tt reflect.Type) error

type GormAction

type GormAction string
const (
	GormActionSave   GormAction = "save"
	GormActionDelete GormAction = "delete"
)

type GormObjSyncService

type GormObjSyncService struct {
	MessageService MessagingService
	DB             *gorm.DB
	Logger         *zap.Logger
	Sharding       Sharding
	Dispath        DispathFn
}

func NewGormObjSyncService

func NewGormObjSyncService(ms MessagingService, logger *zap.Logger, db *gorm.DB) *GormObjSyncService

func (*GormObjSyncService) Abandoned added in v0.10.14

func (ss *GormObjSyncService) Abandoned(kp *GormPayload, errCode, topic, consumer string)

func (*GormObjSyncService) ProcessGormObject added in v0.10.14

func (ss *GormObjSyncService) ProcessGormObject(ctx context.Context, topic, consumer string, kp *GormPayload, payload any, tt reflect.Type) error

func (*GormObjSyncService) ReceiveGormObjectSaved

func (ss *GormObjSyncService) ReceiveGormObjectSaved(ctx context.Context, topic, consumer string, raw []byte) error

type GormPayload

type GormPayload struct {
	Key     string
	Action  GormAction
	Payload string
	SynctAt time.Time
}

func ToKeyAndPayload added in v0.10.14

func ToKeyAndPayload(raw []byte) (*GormPayload, error)

type MessagingService

type MessagingService interface {
	Pub(ctx context.Context, topic string, payload any) error
	Sub(ctx context.Context, topic, consumer string, processor Processor) error
}

MessagingService, default impl Redis streaming.

type MessagnePending added in v0.10.10

type MessagnePending struct {
	Topic    string
	Group    string
	Schedule string
	MaxBatch int
	Limit    int
}

type Processor

type Processor func(ctx context.Context, topic, consumer string, payload []byte) error

type QueryFn added in v0.10.7

type QueryFn func(ctx context.Context, db *gorm.DB, logger *zap.Logger, since time.Time, to time.Time, index int, queryDeleted bool) ([]any, error)

type Sharding

type Sharding func(tx *gorm.DB, key string, payload any) (tablename string, err error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL