Documentation
¶
Index ¶
- Variables
- func UnpackEventPayload[T proto.Message](event *pb.BaseEvent, empty T) (T, error)
- type Builder
- type CircuitBreaker
- type Config
- type Consumer
- type EventIDGenerator
- type MessageHandler
- type RetryableError
- type Service
- func (s *Service) GetMetrics() map[string]any
- func (s *Service) GetStatus() map[string]interface{}
- func (s *Service) PublishEvent(ctx context.Context, subject string, eventType string, payload proto.Message) error
- func (s *Service) RegisterHandler(eventType string, handler MessageHandler) error
- func (s *Service) Shutdown(ctx context.Context) error
- func (s *Service) Start(ctx context.Context) error
- type ServiceBuilder
- func (b *ServiceBuilder) AddConsumer(ctx context.Context, stream jetstream.Stream, cfg jetstream.ConsumerConfig, ...) error
- func (b *ServiceBuilder) AddStream(ctx context.Context, cfg jetstream.StreamConfig) (jetstream.Stream, error)
- func (b *ServiceBuilder) Build(ctx context.Context) (*Service, error)
- func (b *ServiceBuilder) WithErrorBufferSize(size int) Builder
- func (b *ServiceBuilder) WithEventIDGenerator(generator EventIDGenerator) Builder
- func (b *ServiceBuilder) WithNATS(opts ...nats.Option) Builder
- func (b *ServiceBuilder) WithTracer(ctx context.Context) Builder
- type ServiceMetrics
- type ServiceState
- type UUIDGenerator
- type UnixIDGenerator
Constants ¶
This section is empty.
Variables ¶
var ( ErrValidation = errors.New("validation error") ErrEventProcessing = errors.New("event processing error") ErrEventPublishing = errors.New("event publishing error") ErrServiceShutdown = errors.New("service shutdown error") ErrNATSConnection = errors.New("NATS connection error") ErrConsumerSetup = errors.New("consumer setup error") ErrMessageUnmarshal = errors.New("message unmarshal error") )
Custom errors
Functions ¶
Types ¶
type Builder ¶
type Builder interface { // Build creates and returns a new Service instance Build(ctx context.Context) (*Service, error) // Configuration methods WithNATS(opts ...nats.Option) Builder WithEventIDGenerator(generator EventIDGenerator) Builder WithErrorBufferSize(size int) Builder WithTracer(ctx context.Context) Builder // Stream and consumer management AddStream(ctx context.Context, config jetstream.StreamConfig) (jetstream.Stream, error) AddConsumer(ctx context.Context, stream jetstream.Stream, config jetstream.ConsumerConfig, workers int8) error }
Builder interface defines the contract for service construction
type CircuitBreaker ¶
type CircuitBreaker struct {
// contains filtered or unexported fields
}
CircuitBreaker implements the circuit breaker pattern
func NewCircuitBreaker ¶
func NewCircuitBreaker(threshold int32, resetAfter time.Duration) *CircuitBreaker
func (*CircuitBreaker) AllowRequest ¶
func (cb *CircuitBreaker) AllowRequest() bool
func (*CircuitBreaker) RecordFailure ¶
func (cb *CircuitBreaker) RecordFailure()
type Config ¶
type Config struct { ServiceName string // ServiceName is used to identify service inside jaeger NatsURL string RetryCount int // Number of retries for failed messages RetryDelay time.Duration }
Config holds the service configuration with validation
type EventIDGenerator ¶
type EventIDGenerator interface {
GenerateID() string
}
EventIDGenerator implementations remain the same
type MessageHandler ¶
MessageHandler is a function type for message handlers
type RetryableError ¶
RetryableError indicates if an error should trigger a retry
func (*RetryableError) Error ¶
func (e *RetryableError) Error() string
type Service ¶
type Service struct {
// contains filtered or unexported fields
}
Service represents the microservice
func (*Service) GetMetrics ¶
GetMetrics returns current service metrics
func (*Service) PublishEvent ¶
func (s *Service) PublishEvent(ctx context.Context, subject string, eventType string, payload proto.Message) error
PublishEvent sends a protobuf event to NATS
func (*Service) RegisterHandler ¶
func (s *Service) RegisterHandler(eventType string, handler MessageHandler) error
RegisterHandler registers a message handler for a specific event type
type ServiceBuilder ¶
type ServiceBuilder struct {
// contains filtered or unexported fields
}
ServiceBuilder implements the Builder interface
func NewServiceBuilder ¶
func NewServiceBuilder(cfg Config) (*ServiceBuilder, error)
NewServiceBuilder creates a new ServiceBuilder instance
func (*ServiceBuilder) AddConsumer ¶
func (b *ServiceBuilder) AddConsumer(ctx context.Context, stream jetstream.Stream, cfg jetstream.ConsumerConfig, workers int8) error
AddConsumer creates or updates a consumer
func (*ServiceBuilder) AddStream ¶
func (b *ServiceBuilder) AddStream(ctx context.Context, cfg jetstream.StreamConfig) (jetstream.Stream, error)
AddStream creates or updates a NATS stream
func (*ServiceBuilder) Build ¶
func (b *ServiceBuilder) Build(ctx context.Context) (*Service, error)
Build creates the final Service instance
func (*ServiceBuilder) WithErrorBufferSize ¶
func (b *ServiceBuilder) WithErrorBufferSize(size int) Builder
WithErrorBufferSize sets the error channel buffer size
func (*ServiceBuilder) WithEventIDGenerator ¶
func (b *ServiceBuilder) WithEventIDGenerator(generator EventIDGenerator) Builder
WithEventIDGenerator sets a custom event ID generator
func (*ServiceBuilder) WithNATS ¶
func (b *ServiceBuilder) WithNATS(opts ...nats.Option) Builder
WithNATS configures NATS connection options
func (*ServiceBuilder) WithTracer ¶
func (b *ServiceBuilder) WithTracer(ctx context.Context) Builder
WithTracer configures OpenTelemetry tracing
type ServiceMetrics ¶
type ServiceMetrics struct { MessagesProcessed atomic.Int64 MessagesSuccessful atomic.Int64 MessagesFailed atomic.Int64 MessagesPublished atomic.Int64 ProcessingDuration atomic.Int64 // nanoseconds ActiveWorkers atomic.Int32 }
ServiceMetrics holds service-level metrics
type ServiceState ¶
type ServiceState int32
ServiceState represents the current state of the service
const ( StateInitial ServiceState = iota StateStarting StateRunning StateStopping StateStopped )
type UUIDGenerator ¶
type UUIDGenerator struct{}
func (*UUIDGenerator) GenerateID ¶
func (g *UUIDGenerator) GenerateID() string
type UnixIDGenerator ¶
type UnixIDGenerator struct{}
func (*UnixIDGenerator) GenerateID ¶
func (g *UnixIDGenerator) GenerateID() string