natscracker

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 25, 2024 License: GPL-3.0 Imports: 23 Imported by: 0

README

natscracker

Overview

Данный пакет позволяет реализовать event-driven архитектуру совместно с NATS и трейсингом OTLP.

Для обмена сообщениями используется protobuf и базовая структура BaseEvent, которая содержит в себе необходимый payload.

При проведении тестов OTLP в качестве трейсинга использовался Jaeger.

Features

  • Обмен сообщениями в формате protobuf
  • Единая структура для обмена данными
  • Возможность масштабирования увеличивая количество consumers и workers
  • Публикация сообщений в NATS
  • Интеграция OTLP
  • Graceful shutdown, пока workers не обработают сообщения, которые они обрабатывают в данный момент времени, программа не будет завершена
  • Возможность повтора тех сообщений, при отработке которых произошла ошибка, а также их сохранение для дальнейшего изучения, при исчерпывании количества повторов.

Under development

TO DO:
  • Implement retries on failed messages
  • Managing consumers depending on working policies
  • FIX: When adding more than one consumer only one subscription initialized
  • Implement full DI support: logging, error handling, etc.
  • Ability to make a service that just publishes messages

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrValidation       = errors.New("validation error")
	ErrEventProcessing  = errors.New("event processing error")
	ErrEventPublishing  = errors.New("event publishing error")
	ErrServiceShutdown  = errors.New("service shutdown error")
	ErrNATSConnection   = errors.New("NATS connection error")
	ErrConsumerSetup    = errors.New("consumer setup error")
	ErrMessageUnmarshal = errors.New("message unmarshal error")
)

Custom errors

Functions

func UnpackEventPayload

func UnpackEventPayload[T proto.Message](event *pb.BaseEvent, empty T) (T, error)

Helper function to unpack event payload

Types

type Builder

type Builder interface {
	// Build creates and returns a new Service instance
	Build(ctx context.Context) (*Service, error)

	// Configuration methods
	WithNATS(opts ...nats.Option) Builder
	WithEventIDGenerator(generator EventIDGenerator) Builder
	WithErrorBufferSize(size int) Builder
	WithTracer(ctx context.Context) Builder

	// Stream and consumer management
	AddStream(ctx context.Context, config jetstream.StreamConfig) (jetstream.Stream, error)
	AddConsumer(ctx context.Context, stream jetstream.Stream, config jetstream.ConsumerConfig, workers int8) error
}

Builder interface defines the contract for service construction

type CircuitBreaker

type CircuitBreaker struct {
	// contains filtered or unexported fields
}

CircuitBreaker implements the circuit breaker pattern

func NewCircuitBreaker

func NewCircuitBreaker(threshold int32, resetAfter time.Duration) *CircuitBreaker

func (*CircuitBreaker) AllowRequest

func (cb *CircuitBreaker) AllowRequest() bool

func (*CircuitBreaker) RecordFailure

func (cb *CircuitBreaker) RecordFailure()

type Config

type Config struct {
	ServiceName string // ServiceName is used to identify service inside jaeger
	NatsURL     string
	RetryCount  int // Number of retries for failed messages
	RetryDelay  time.Duration
}

Config holds the service configuration with validation

func (*Config) Validate

func (c *Config) Validate() error

Validate configuration

type Consumer

type Consumer struct {
	Name    string
	Cons    jetstream.Consumer
	Workers int8
}

type EventIDGenerator

type EventIDGenerator interface {
	GenerateID() string
}

EventIDGenerator implementations remain the same

type MessageHandler

type MessageHandler func(ctx context.Context, event *pb.BaseEvent) error

MessageHandler is a function type for message handlers

type RetryableError

type RetryableError struct {
	Err       error
	Retryable bool
}

RetryableError indicates if an error should trigger a retry

func (*RetryableError) Error

func (e *RetryableError) Error() string

type Service

type Service struct {
	// contains filtered or unexported fields
}

Service represents the microservice

func (*Service) GetMetrics

func (s *Service) GetMetrics() map[string]any

GetMetrics returns current service metrics

func (*Service) GetStatus

func (s *Service) GetStatus() map[string]interface{}

GetStatus returns the current service status

func (*Service) PublishEvent

func (s *Service) PublishEvent(ctx context.Context, subject string, eventType string, payload proto.Message) error

PublishEvent sends a protobuf event to NATS

func (*Service) RegisterHandler

func (s *Service) RegisterHandler(eventType string, handler MessageHandler) error

RegisterHandler registers a message handler for a specific event type

func (*Service) Shutdown

func (s *Service) Shutdown(ctx context.Context) error

Shutdown gracefully stops the service

func (*Service) Start

func (s *Service) Start(ctx context.Context) error

Start begins processing messages

type ServiceBuilder

type ServiceBuilder struct {
	// contains filtered or unexported fields
}

ServiceBuilder implements the Builder interface

func NewServiceBuilder

func NewServiceBuilder(cfg Config) (*ServiceBuilder, error)

NewServiceBuilder creates a new ServiceBuilder instance

func (*ServiceBuilder) AddConsumer

func (b *ServiceBuilder) AddConsumer(ctx context.Context, stream jetstream.Stream, cfg jetstream.ConsumerConfig, workers int8) error

AddConsumer creates or updates a consumer

func (*ServiceBuilder) AddStream

AddStream creates or updates a NATS stream

func (*ServiceBuilder) Build

func (b *ServiceBuilder) Build(ctx context.Context) (*Service, error)

Build creates the final Service instance

func (*ServiceBuilder) WithErrorBufferSize

func (b *ServiceBuilder) WithErrorBufferSize(size int) Builder

WithErrorBufferSize sets the error channel buffer size

func (*ServiceBuilder) WithEventIDGenerator

func (b *ServiceBuilder) WithEventIDGenerator(generator EventIDGenerator) Builder

WithEventIDGenerator sets a custom event ID generator

func (*ServiceBuilder) WithNATS

func (b *ServiceBuilder) WithNATS(opts ...nats.Option) Builder

WithNATS configures NATS connection options

func (*ServiceBuilder) WithTracer

func (b *ServiceBuilder) WithTracer(ctx context.Context) Builder

WithTracer configures OpenTelemetry tracing

type ServiceMetrics

type ServiceMetrics struct {
	MessagesProcessed  atomic.Int64
	MessagesSuccessful atomic.Int64
	MessagesFailed     atomic.Int64
	MessagesPublished  atomic.Int64
	ProcessingDuration atomic.Int64 // nanoseconds
	ActiveWorkers      atomic.Int32
}

ServiceMetrics holds service-level metrics

type ServiceState

type ServiceState int32

ServiceState represents the current state of the service

const (
	StateInitial ServiceState = iota
	StateStarting
	StateRunning
	StateStopping
	StateStopped
)

type UUIDGenerator

type UUIDGenerator struct{}

func (*UUIDGenerator) GenerateID

func (g *UUIDGenerator) GenerateID() string

type UnixIDGenerator

type UnixIDGenerator struct{}

func (*UnixIDGenerator) GenerateID

func (g *UnixIDGenerator) GenerateID() string

Directories

Path Synopsis
pkg
pb

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL