Documentation
¶
Overview ¶
Package audit provides a robust, in-memory, publish-subscribe bus for managing and processing audit events. It is designed for high-throughput, low-latency event processing, incorporating features for resilience, observability, and data integrity. The package offers mechanisms for asynchronous event delivery, historical event storage, spillover to disk during backpressure, rate limiting, and circuit breaking.
Core Concepts:
The central component of this package is the `Bus` which facilitates the publish-subscribe pattern.
Event: The `Event` interface represents a single auditable occurrence. It defines methods to access common attributes such as `ID()`, `Type()`, `Time()`, `Source()`, `ContextID()`, and `Payload()`. The `BasicEvent` struct provides a simple, concrete implementation of this interface. Events also carry `SpanContext()` for distributed tracing correlation.
EventType: A `EventType` is a string identifier for the kind of an audit event (e.g., "http_request_received", "auth_login"). The special `EventAny` constant allows subscribing to all events.
Handler: A `Handler` is a function type `func(evt Event) error` that processes an incoming `Event`. Handlers are subscribed to specific `EventType`s or to `EventAny`.
BusConfig: The `BusConfig` struct holds all configurable parameters for the `Bus`, allowing fine-grained control over its behavior, including buffer sizes, worker counts, asynchronous behavior, sampling rates, spillover, memory limits, circuit breaker settings, rate limiting, error handling, metrics, transport, and access control. `DefaultBusConfig()` provides sensible defaults.
Key Features:
- Event Publishing: Events can be published to the bus using several methods: - `Publish(evt Event)`: Publishes an event. If the bus is configured for asynchronous delivery (`Async: true`), the event is placed into an internal queue for processing by workers. If the queue is full, the event may be dropped or spilled to disk. If `Async` is `false`, it acts like `PublishSync`. - `PublishSync(evt Event)`: Publishes an event synchronously, meaning all subscribed handlers for that event type and global handlers are executed immediately within the calling goroutine. - `PublishWithTimeout(evt Event, timeout time.Duration)`: Publishes an event asynchronously but blocks until the event is accepted into the internal queue or the specified timeout is reached. If the timeout is exceeded, an `ErrPublishTimeout` is returned, and the event may be spilled.
- Event Subscription: Handlers can be registered to receive events using `Subscribe(et EventType, h Handler)`. Multiple handlers can subscribe to the same event type. Handlers can also subscribe to `EventAny` to receive all events.
- Event History: The `Bus` maintains an in-memory history of recently published events, capped by `HistoryCap`. - `History(ctx context.Context)`: Retrieves a slice of the stored historical events. - `SetHistoryCap(n int)`: Dynamically adjusts the capacity of the history buffer. The history also respects a `MaxMemoryMB` limit to prevent excessive memory consumption, estimating event sizes to manage the memory footprint.
- Spillover to Disk: When the internal event queue is full or the circuit breaker is open, events can be spilled to disk if `SpilloverDir` is configured in `BusConfig`. The `spilloverHandler` writes events as JSON lines to a log file. The `RecoverSpillover()` method attempts to re-publish these spilled events when the bus is operating normally and the queue has capacity.
- Rate Limiting: The bus can enforce a publishing rate limit using the `RateLimit` and `RateBurst` parameters in `BusConfig`. Events exceeding this rate are dropped or spilled.
- Circuit Breaker: A `circuitBreaker` mechanism is integrated to protect the bus from continuously failing handlers or external transports. If `CircuitMaxFails` consecutive handler errors occur within a `CircuitTimeout`, the circuit opens, and events are dropped or spilled instead of being processed, allowing the failing component to recover.
- Metrics Integration: The `BusMetrics` interface defines a contract for reporting metrics such as published events, dropped events, and handler latency. `PrometheusMetrics` provides a Prometheus-compatible implementation, and `nopMetrics` is a no-operation implementation for when metrics are not needed. Metrics can be configured using `WithMetrics` or `WithMetricsRegisterer`.
- External Transport: The `Transport` interface allows integrating external systems (e.g., message queues like Kafka) for event persistence or further processing. `KafkaTransport` is provided as a concrete implementation for sending events to Kafka, supporting retries and asynchronous sending. A transport can be set via `WithTransport`.
Event Definition and Types:
The package defines a comprehensive set of predefined `EventType` constants and corresponding `New...` helper functions for common application events. These include:
- HTTP Events: `EventTypeHTTPRequestReceived`, `EventTypeHTTPResponseSent`, `EventTypeHTTPRouteNotFound`, `EventTypeHTTPMethodNotAllowed`.
- Authentication Events: `EventTypeAuthRegister`, `EventTypeAuthLogin`, `EventTypeAuthLogout`, `EventTypeAuthTokenIssued`, `EventTypeAuthTokenRevoked`, `EventTypeAuthCredentialsChecked`.
- Database Events: `EventTypeDBConnected`, `EventTypeDBInit`, `EventTypeDBError`, `EventTypeDBQuery`, `EventTypeDBExec`, `EventTypeDBTxStarted`, `EventTypeDBTxCommitted`, `EventTypeDBTxRolledBack`.
- Work Item Events: `EventTypeWorkItemCreated`, `EventTypeWorkItemUpdated`, `EventTypeWorkItemDeleted`, `EventTypeWorkItemAssigned`, `EventTypeWorkItemUnassigned`, `EventTypeCustomFieldSet`.
- Comment & Attachment Events: `EventTypeCommentAdded`, `EventTypeCommentDeleted`, `EventTypeAttachmentAdded`, `EventTypeAttachmentRemoved`.
- User Events: `EventTypeUserCreated`, `EventTypeUserUpdated`, `EventTypeUserDeleted`, `EventTypeUserLoggedIn`, `EventTypeUserLoggedOut`.
- Team Events: `EventTypeTeamCreated`, `EventTypeTeamUpdated`, `EventTypeTeamDeleted`, `EventTypeTeamMemberAdded`, `EventTypeTeamMemberRemoved`.
Generic Typed Events: The `EventT[T any]` interface and `BasicEventT[T any]` struct provide support for creating events with a specific, strongly-typed payload, enhancing compile-time safety and readability. For instance, `NewHTTPRequestReceivedT` creates an event with `HTTPRequestPayload`.
Persistence and Observability:
- Logging Integration: The `Logger` struct provides a high-level API for publishing various log-level events (Info, Warning, Debug, Error, Fatal, AssertionFailed) to the `Bus`. `SetupLogging` configures persistent storage for audit events: - File-based logging using `lumberjack` for log rotation, compression, and retention. - Database persistence to a SQL database (e.g., PostgreSQL, MySQL, SQLite) with batch insertion and retry logic. `SetupDatabase` initializes the necessary `audit` table and indexes. The `LogOption` functional options allow detailed configuration of file and database handlers.
- Schema Validation: `EventSchema` allows defining expected fields and their types for an event's payload. `RegisterSchema` is used to register these schemas, and `validatePayload` ensures that published events conform to their registered schema, catching data inconsistencies early. Predefined event types have their schemas registered during package initialization.
Security and Data Integrity:
- Access Control for History: The `AccessControlFunc` type defines a function that can be used to enforce permissions before allowing access to the event history. It can be configured using `WithAccessControl` in `BusConfig`. `CheckHistoryAccess` provides a default role-based check (`"admin"`) if no custom function is provided.
- Payload Sanitization: The `SanitizePayload` function automatically redacts sensitive information (e.g., "email", "password") from event payloads before they are stored or processed further. Custom `Sanitizer` functions can be defined and applied.
- Event Encryption: `EncryptEvent` provides a mechanism to encrypt event payloads using AES-256 GCM, ensuring that sensitive data is protected at rest or in transit. `GenerateAESKey` can be used to generate strong encryption keys.
Configuration:
The `Bus` is initialized using `NewBus` with variadic `BusOption` functions. These options cover various aspects such as history capacity (`WithHistoryCap`), async buffer size (`WithBufferSize`), worker count (`WithWorkerCount`), asynchronous delivery (`WithAsync`), sampling rate (`WithSampleRate`), spillover directory (`WithSpilloverDir`), maximum memory usage (`WithMaxMemoryMB`), circuit breaker parameters (`WithCircuitBreaker`), metrics implementation (`WithMetrics`, `WithMetricsRegisterer`), external transport (`WithTransport`), access control for history (`WithAccessControl`), and rate limiting (`WithRateLimit`).
Configuration can also be loaded from environment variables using `LoadConfigFromEnv()`.
Usage Patterns:
- Initializing the Audit Bus: bus, err := audit.NewBus( audit.WithHistoryCap(1000), audit.WithAsync(true), audit.WithBufferSize(500), audit.WithSpilloverDir("/var/log/audit"), audit.WithMaxMemoryMB(50), audit.WithMetricsRegisterer(prometheus.DefaultRegisterer), ) if err != nil { log.Fatalf("Failed to create audit bus: %v", err) } defer bus.Close()
- Subscribing a Handler: bus.Subscribe(audit.EventTypeAuthLogin, func(evt audit.Event) error { fmt.Printf("User %s logged in at %s\n", evt.Payload().(map[string]interface{})["user_id"], evt.Time()) return nil })
- Publishing an Event: ctx := context.Background() // Assuming a user ID and source are available loginEvent := audit.NewAuthLogin(ctx, "my-service", "user123") bus.Publish(loginEvent)
- Setting up Persistent Logging: db, err := sql.Open("sqlite3", "./audit.db") // Example using SQLite if err != nil { log.Fatalf("Failed to open database: %v", err) } if err := audit.SetupDatabase(db); err != nil { log.Fatalf("Failed to setup audit database: %v", err) } closers, err := audit.SetupLogging(bus, db, audit.WithFilePath("/var/log/app_audit.log"), audit.WithDBBatchSize(50), ) if err != nil { log.Fatalf("Failed to setup logging: %v", err) } for _, closer := range closers { defer closer() }
- Using the Logger API: logger := audit.NewLogger(bus) logger.Info(ctx, "my-service", "Application started", map[string]string{"version": "1.0.0"}) logger.Error(ctx, "my-service", "Database connection failed", fmt.Errorf("connection refused"), nil)
Concurrency:
The `Bus` and its components are designed to be safe for concurrent use. Internal synchronization primitives (mutexes, atomic operations, channels, and wait groups) are used to protect shared data and manage concurrent access. Asynchronous event processing is handled by a worker pool.
Error Handling:
Errors during event processing by handlers or internal bus operations are typically reported via the `ErrorFunc` configured in `BusConfig`, which defaults to logging the error. For methods like `PublishWithTimeout`, specific errors are returned to the caller.
This package aims to provide a comprehensive solution for managing audit events within Go applications, offering flexibility, performance, and resilience for critical operational insights.
Index ¶
- Variables
- func CheckHistoryAccess(ctx context.Context) error
- func ContextIDFrom(ctx context.Context) string
- func GenerateAESKey() ([]byte, error)
- func RegisterSchema(et EventType, s EventSchema)
- func SetupDatabase(db *sql.DB) error
- func SetupLogging(bus *Bus, db *sql.DB, opts ...LogOption) ([]func() error, error)
- func WithContextID(ctx context.Context, id string) context.Context
- type AccessControlFunc
- type AuthRegisterPayload
- type BasicEvent
- type BasicEventT
- func NewAuthRegisterT(ctx context.Context, source, userID, email string) BasicEventT[AuthRegisterPayload]
- func NewHTTPRequestReceivedT(ctx context.Context, source, method, path string) BasicEventT[HTTPRequestPayload]
- func NewHTTPResponseSentT(ctx context.Context, source string, status int, duration time.Duration) BasicEventT[HTTPResponsePayload]
- type Bus
- func (b *Bus) Close()
- func (b *Bus) History(ctx context.Context) ([]Event, error)
- func (b *Bus) Publish(evt Event)
- func (b *Bus) PublishSync(evt Event)
- func (b *Bus) PublishWithTimeout(evt Event, timeout time.Duration) error
- func (b *Bus) RecoverSpillover() error
- func (b *Bus) SetHistoryCap(n int)
- func (b *Bus) SetSampleRate(rate float64)
- func (b *Bus) Subscribe(et EventType, h Handler)
- type BusConfig
- type BusMetrics
- type BusOption
- func LoadConfigFromEnv() []BusOption
- func WithAccessControl(f AccessControlFunc) BusOption
- func WithAsync(async bool) BusOption
- func WithBufferSize(n int) BusOption
- func WithCircuitBreaker(timeout time.Duration, maxFails int) BusOption
- func WithHistoryCap(n int) BusOption
- func WithMaxMemoryMB(mb int) BusOption
- func WithMetrics(metrics BusMetrics) BusOption
- func WithMetricsRegisterer(registerer prometheus.Registerer) BusOption
- func WithRateLimit(rate, burst int) BusOption
- func WithSampleRate(rate float64) BusOption
- func WithSpilloverDir(dir string) BusOption
- func WithTransport(transport Transport) BusOption
- func WithWorkerCount(n int) BusOption
- type ContextIDKey
- type Event
- func EncryptEvent(evt Event, key []byte) (Event, error)
- func NewAssertionFailed(ctx context.Context, source, message, detail string) Event
- func NewAttachmentAdded(ctx context.Context, source, attachmentID, itemID, filename string) Event
- func NewAttachmentRemoved(ctx context.Context, source, attachmentID, itemID string) Event
- func NewAuthCredentialsChecked(ctx context.Context, source, email string, success bool) Event
- func NewAuthLogin(ctx context.Context, source, userID string) Event
- func NewAuthLogout(ctx context.Context, source, userID string) Event
- func NewAuthRegister(ctx context.Context, source, userID, email string) Event
- func NewAuthTokenIssued(ctx context.Context, source, userID string, expiresIn time.Duration) Event
- func NewAuthTokenRevoked(ctx context.Context, source, tokenID string) Event
- func NewCommentAdded(ctx context.Context, source, commentID, itemID, content string) Event
- func NewCommentDeleted(ctx context.Context, source, commentID, itemID string) Event
- func NewCustomFieldSet(ctx context.Context, source, itemID, field string, value interface{}) Event
- func NewDBConnected(ctx context.Context, source, driver, dsn string) Event
- func NewDBError(ctx context.Context, source, query string, err error) Event
- func NewDBExec(ctx context.Context, source, stmt string, rowsAffected int64) Event
- func NewDBInit(ctx context.Context, source, schema string) Event
- func NewDBQuery(ctx context.Context, source, query string, duration time.Duration) Event
- func NewDBTxCommitted(ctx context.Context, source, txID string) Event
- func NewDBTxRolledBack(ctx context.Context, source, txID, reason string) Event
- func NewDBTxStarted(ctx context.Context, source, txID string) Event
- func NewDebug(ctx context.Context, source, message string, fields map[string]string) Event
- func NewError(ctx context.Context, source, message string, err error, ...) Event
- func NewFatal(ctx context.Context, source, message string, fields map[string]string) Event
- func NewHTTPMethodNotAllowed(ctx context.Context, source, method string) Event
- func NewHTTPRequestReceived(ctx context.Context, source, method, path string) Event
- func NewHTTPResponseSent(ctx context.Context, source string, status int, duration time.Duration) Event
- func NewHTTPRouteNotFound(ctx context.Context, source, path string) Event
- func NewInfo(ctx context.Context, source, message string, fields map[string]string) Event
- func NewTeamCreated(ctx context.Context, source, teamID, name string) Event
- func NewTeamDeleted(ctx context.Context, source, teamID string) Event
- func NewTeamMemberAdded(ctx context.Context, source, teamID, userID string) Event
- func NewTeamMemberRemoved(ctx context.Context, source, teamID, userID string) Event
- func NewTeamUpdated(ctx context.Context, source, teamID string, changes map[string]interface{}) Event
- func NewUserCreated(ctx context.Context, source, userID, email string) Event
- func NewUserDeleted(ctx context.Context, source, userID string) Event
- func NewUserLoggedIn(ctx context.Context, source, userID string) Event
- func NewUserLoggedOut(ctx context.Context, source, userID string) Event
- func NewUserUpdated(ctx context.Context, source, userID string, changes map[string]interface{}) Event
- func NewWarning(ctx context.Context, source, message string, fields map[string]string) Event
- func NewWorkItemAssigned(ctx context.Context, source, itemID, assigneeID string) Event
- func NewWorkItemCreated(ctx context.Context, source, id string, metadata map[string]string) Event
- func NewWorkItemDeleted(ctx context.Context, source, id string) Event
- func NewWorkItemUnassigned(ctx context.Context, source, itemID, assigneeID string) Event
- func NewWorkItemUpdated(ctx context.Context, source, id string, changes map[string]interface{}) Event
- func SanitizePayload(evt Event) Event
- type EventSchema
- type EventT
- type EventType
- type HTTPRequestPayload
- type HTTPResponsePayload
- type Handler
- type KafkaOption
- type KafkaTransport
- type LogConfig
- type LogOption
- func WithCompress(compress bool) LogOption
- func WithDBBatchSize(size int) LogOption
- func WithFilePath(path string) LogOption
- func WithFlushInterval(interval time.Duration) LogOption
- func WithMaxAgeDays(days int) LogOption
- func WithMaxBackups(backups int) LogOption
- func WithMaxSizeMB(size int) LogOption
- func WithRetryCount(count int) LogOption
- func WithRetryDelay(delay time.Duration) LogOption
- type LogPayload
- type Logger
- func (l *Logger) AssertEqual(ctx context.Context, source, name string, got, want interface{})
- func (l *Logger) AssertNoError(ctx context.Context, source string, err error)
- func (l *Logger) AssertTrue(ctx context.Context, source, name string, cond bool)
- func (l *Logger) Debug(ctx context.Context, source, message string, fields map[string]string)
- func (l *Logger) Error(ctx context.Context, source, message string, err error, ...)
- func (l *Logger) Fatal(ctx context.Context, source, message string, fields map[string]string)
- func (l *Logger) Info(ctx context.Context, source, message string, fields map[string]string)
- func (l *Logger) Warning(ctx context.Context, source, message string, fields map[string]string)
- type PrometheusMetrics
- type Sanitizer
- type Transport
Constants ¶
This section is empty.
Variables ¶
var ErrPublishTimeout = fmt.Errorf("audit bus: publish timeout")
Functions ¶
func CheckHistoryAccess ¶
CheckHistoryAccess verifies access to history data.
func ContextIDFrom ¶
ContextIDFrom retrieves the correlation ID from the context, returning an empty string if not set.
func GenerateAESKey ¶
GenerateAESKey generates a 32-byte AES key for encryption.
func RegisterSchema ¶
func RegisterSchema(et EventType, s EventSchema)
RegisterSchema registers a schema for an event type.
func SetupDatabase ¶
SetupDatabase initializes the audit table and indexes in the database.
func SetupLogging ¶
SetupLogging configures file and database persistence for audit events.
Types ¶
type AccessControlFunc ¶
AccessControlFunc defines a function to check history access permissions.
type AuthRegisterPayload ¶
AuthRegisterPayload is the payload for user registration events.
type BasicEvent ¶
type BasicEvent struct {
IDVal string
TypeVal EventType
TimeVal time.Time
SourceVal string
ContextIDVal string
PayloadVal interface{}
SpanCtx trace.SpanContext
}
BasicEvent is a simple implementation of Event.
func NewBasicEvent ¶
func NewBasicEvent(t EventType, source, contextID string, payload interface{}, spanCtx trace.SpanContext) BasicEvent
func (BasicEvent) ContextID ¶
func (e BasicEvent) ContextID() string
func (BasicEvent) ID ¶
func (e BasicEvent) ID() string
func (BasicEvent) Payload ¶
func (e BasicEvent) Payload() interface{}
func (BasicEvent) Source ¶
func (e BasicEvent) Source() string
func (BasicEvent) SpanContext ¶
func (e BasicEvent) SpanContext() trace.SpanContext
func (BasicEvent) Time ¶
func (e BasicEvent) Time() time.Time
func (BasicEvent) Type ¶
func (e BasicEvent) Type() EventType
type BasicEventT ¶
type BasicEventT[T any] struct { BasicEvent // contains filtered or unexported fields }
BasicEventT is a generic implementation of EventT.
func NewAuthRegisterT ¶
func NewAuthRegisterT(ctx context.Context, source, userID, email string) BasicEventT[AuthRegisterPayload]
NewAuthRegisterT creates a typed Event for user registration.
func NewHTTPRequestReceivedT ¶
func NewHTTPRequestReceivedT(ctx context.Context, source, method, path string) BasicEventT[HTTPRequestPayload]
NewHTTPRequestReceivedT creates a typed Event for an incoming HTTP request.
func NewHTTPResponseSentT ¶
func NewHTTPResponseSentT(ctx context.Context, source string, status int, duration time.Duration) BasicEventT[HTTPResponsePayload]
NewHTTPResponseSentT creates a typed Event for an HTTP response sent.
func (BasicEventT[T]) TypedPayload ¶
func (e BasicEventT[T]) TypedPayload() T
TypedPayload returns the typed payload.
type Bus ¶
type Bus struct {
// contains filtered or unexported fields
}
Bus is an in-memory publish/subscribe bus for audit events.
func DefaultBus ¶
func DefaultBus() *Bus
func (*Bus) PublishSync ¶
func (*Bus) PublishWithTimeout ¶
func (*Bus) RecoverSpillover ¶
func (*Bus) SetHistoryCap ¶
func (*Bus) SetSampleRate ¶
type BusConfig ¶
type BusConfig struct {
HistoryCap int
BufferSize int
WorkerCount int
Async bool
SampleRate float64
SpilloverDir string
MaxMemoryMB int
CircuitTimeout time.Duration
CircuitMaxFails int
RateLimit int
RateBurst int
ErrorFunc func(error, Event)
Metrics BusMetrics
Transport Transport
AccessControl AccessControlFunc
}
BusConfig holds parameters to configure a Bus.
func DefaultBusConfig ¶
func DefaultBusConfig() BusConfig
type BusMetrics ¶
type BusMetrics interface {
EventPublished(et EventType)
EventDropped(et EventType)
HandlerLatency(et EventType, d time.Duration)
}
BusMetrics defines the interface for audit bus metrics.
type BusOption ¶
type BusOption func(*BusConfig)
BusOption defines a functional option for configuring Bus.
func LoadConfigFromEnv ¶
func LoadConfigFromEnv() []BusOption
LoadConfigFromEnv loads configuration from environment variables.
func WithAccessControl ¶
func WithAccessControl(f AccessControlFunc) BusOption
WithAccessControl sets the access control function for history access.
func WithBufferSize ¶
WithBufferSize sets the async queue size.
func WithCircuitBreaker ¶
WithCircuitBreaker sets circuit breaker parameters.
func WithHistoryCap ¶
WithHistoryCap sets the history buffer size.
func WithMaxMemoryMB ¶
WithMaxMemoryMB sets the memory limit in MB.
func WithMetrics ¶
func WithMetrics(metrics BusMetrics) BusOption
WithMetrics sets the metrics implementation.
func WithMetricsRegisterer ¶
func WithMetricsRegisterer(registerer prometheus.Registerer) BusOption
WithMetricsRegisterer sets the Prometheus registerer for metrics.
func WithRateLimit ¶
WithRateLimit sets the rate limit for event publishing (events per second) and burst size.
func WithSampleRate ¶
WithSampleRate sets the sampling rate.
func WithSpilloverDir ¶
WithSpilloverDir sets the spillover directory.
func WithTransport ¶
WithTransport sets the external transport.
func WithWorkerCount ¶
WithWorkerCount sets the number of workers.
type ContextIDKey ¶
type ContextIDKey struct{}
ContextIDKey is used to store correlation IDs in context.
type Event ¶
type Event interface {
ID() string
Type() EventType
Time() time.Time
Source() string
ContextID() string
Payload() interface{}
SpanContext() trace.SpanContext
}
Event represents an occurrence to be audited.
func EncryptEvent ¶
EncryptEvent encrypts an event's payload using AES.
func NewAssertionFailed ¶
NewAssertionFailed creates an assertion-failed logging event.
func NewAttachmentAdded ¶
NewAttachmentAdded creates an Event when an attachment is added.
func NewAttachmentRemoved ¶
NewAttachmentRemoved creates an Event when an attachment is removed.
func NewAuthCredentialsChecked ¶
NewAuthCredentialsChecked creates an Event after credential verification.
func NewAuthLogin ¶
NewAuthLogin creates an Event for a successful login.
func NewAuthLogout ¶
NewAuthLogout creates an Event for user logout.
func NewAuthRegister ¶
NewAuthRegister creates an Event for user registration.
func NewAuthTokenIssued ¶
NewAuthTokenIssued creates an Event when a token is issued.
func NewAuthTokenRevoked ¶
NewAuthTokenRevoked creates an Event when a token is revoked.
func NewCommentAdded ¶
NewCommentAdded creates an Event for a new comment.
func NewCommentDeleted ¶
NewCommentDeleted creates an Event when a comment is deleted.
func NewCustomFieldSet ¶
NewCustomFieldSet creates an Event when a custom field is set.
func NewDBConnected ¶
NewDBConnected creates an Event for successful DB connection.
func NewDBError ¶
NewDBError creates an Event on database error.
func NewDBQuery ¶
NewDBQuery creates an Event after executing a query.
func NewDBTxCommitted ¶
NewDBTxCommitted creates an Event when a transaction commits.
func NewDBTxRolledBack ¶
NewDBTxRolledBack creates an Event when a transaction rolls back.
func NewDBTxStarted ¶
NewDBTxStarted creates an Event when a transaction begins.
func NewError ¶
func NewError(ctx context.Context, source, message string, err error, fields map[string]string) Event
NewError creates an error-level logging event.
func NewHTTPMethodNotAllowed ¶
NewHTTPMethodNotAllowed creates an Event when the HTTP method is not allowed.
func NewHTTPRequestReceived ¶
NewHTTPRequestReceived creates an Event for an incoming HTTP request.
func NewHTTPResponseSent ¶
func NewHTTPResponseSent(ctx context.Context, source string, status int, duration time.Duration) Event
NewHTTPResponseSent creates an Event when an HTTP response is sent.
func NewHTTPRouteNotFound ¶
NewHTTPRouteNotFound creates an Event when no route matches.
func NewTeamCreated ¶
NewTeamCreated creates an Event for a new team.
func NewTeamDeleted ¶
NewTeamDeleted creates an Event when a team is deleted.
func NewTeamMemberAdded ¶
NewTeamMemberAdded creates an Event when a user joins a team.
func NewTeamMemberRemoved ¶
NewTeamMemberRemoved creates an Event when a user leaves a team.
func NewTeamUpdated ¶
func NewTeamUpdated(ctx context.Context, source, teamID string, changes map[string]interface{}) Event
NewTeamUpdated creates an Event for team updates.
func NewUserCreated ¶
NewUserCreated creates an Event for a new user.
func NewUserDeleted ¶
NewUserDeleted creates an Event when a user is deleted.
func NewUserLoggedIn ¶
NewUserLoggedIn creates an Event when a user logs in.
func NewUserLoggedOut ¶
NewUserLoggedOut creates an Event when a user logs out.
func NewUserUpdated ¶
func NewUserUpdated(ctx context.Context, source, userID string, changes map[string]interface{}) Event
NewUserUpdated creates an Event for user updates.
func NewWarning ¶
NewWarning creates a warning-level logging event.
func NewWorkItemAssigned ¶
NewWorkItemAssigned creates an Event when a work item is assigned.
func NewWorkItemCreated ¶
NewWorkItemCreated creates an Event for a new work item.
func NewWorkItemDeleted ¶
NewWorkItemDeleted creates an Event for deletion of a work item.
func NewWorkItemUnassigned ¶
NewWorkItemUnassigned creates an Event when a work item is unassigned.
func NewWorkItemUpdated ¶
func NewWorkItemUpdated(ctx context.Context, source, id string, changes map[string]interface{}) Event
NewWorkItemUpdated creates an Event for a work item update.
func SanitizePayload ¶
SanitizePayload sanitizes sensitive fields in an event's payload.
type EventSchema ¶
EventSchema defines the schema for an event's payload.
type EventType ¶
type EventType string
EventType identifies the kind of an audit event.
const ( EventTypeHTTPRequestReceived EventType = "http_request_received" EventTypeHTTPResponseSent EventType = "http_response_sent" EventTypeHTTPRouteNotFound EventType = "http_route_not_found" EventTypeHTTPMethodNotAllowed EventType = "http_method_not_allowed" )
HTTP Event Types
const ( EventTypeAuthRegister EventType = "auth_register" EventTypeAuthLogin EventType = "auth_login" EventTypeAuthLogout EventType = "auth_logout" EventTypeAuthTokenIssued EventType = "auth_token_issued" EventTypeAuthTokenRevoked EventType = "auth_token_revoked" EventTypeAuthCredentialsChecked EventType = "auth_credentials_checked" )
Authentication Event Types
const ( EventTypeDBConnected EventType = "db_connected" EventTypeDBInit EventType = "db_init" EventTypeDBError EventType = "db_error" EventTypeDBQuery EventType = "db_query" EventTypeDBExec EventType = "db_exec" EventTypeDBTxStarted EventType = "db_tx_started" EventTypeDBTxCommitted EventType = "db_tx_committed" EventTypeDBTxRolledBack EventType = "db_tx_rolled_back" )
Database Event Types
const ( EventTypeWorkItemCreated EventType = "work_item_created" EventTypeWorkItemUpdated EventType = "work_item_updated" EventTypeWorkItemDeleted EventType = "work_item_deleted" EventTypeWorkItemAssigned EventType = "work_item_assigned" EventTypeWorkItemUnassigned EventType = "work_item_unassigned" EventTypeCustomFieldSet EventType = "custom_field_set" )
Work Item Event Types
const ( EventTypeCommentAdded EventType = "comment_added" EventTypeCommentDeleted EventType = "comment_deleted" EventTypeAttachmentAdded EventType = "attachment_added" EventTypeAttachmentRemoved EventType = "attachment_removed" )
Comment & Attachment Event Types
const ( EventTypeUserCreated EventType = "user_created" EventTypeUserUpdated EventType = "user_updated" EventTypeUserDeleted EventType = "user_deleted" EventTypeUserLoggedIn EventType = "user_logged_in" EventTypeUserLoggedOut EventType = "user_logged_out" )
User Event Types
const ( EventTypeTeamCreated EventType = "team_created" EventTypeTeamUpdated EventType = "team_updated" EventTypeTeamDeleted EventType = "team_deleted" EventTypeTeamMemberAdded EventType = "team_member_added" EventTypeTeamMemberRemoved EventType = "team_member_removed" )
Team Event Types
const ( EventTypeLogInfo EventType = "log_info" EventTypeLogWarning EventType = "log_warning" EventTypeLogDebug EventType = "log_debug" EventTypeLogError EventType = "log_error" EventTypeLogFatal EventType = "log_fatal" EventTypeLogAssertionFailed EventType = "log_assertion_failed" )
Logging Event Types
const EventAny EventType = "*"
EventAny is used to subscribe to all events.
type HTTPRequestPayload ¶
HTTPRequestPayload is the payload for HTTP request events.
type HTTPResponsePayload ¶
HTTPResponsePayload is the payload for HTTP response events.
type KafkaOption ¶
type KafkaOption func(*KafkaTransport)
KafkaOption configures KafkaTransport.
func WithKafkaAsync ¶
func WithKafkaAsync(async bool) KafkaOption
WithKafkaAsync enables asynchronous producing.
func WithKafkaRetries ¶
func WithKafkaRetries(n int) KafkaOption
WithKafkaRetries sets the number of retries.
func WithKafkaRetryDelay ¶
func WithKafkaRetryDelay(d time.Duration) KafkaOption
WithKafkaRetryDelay sets the initial retry delay.
type KafkaTransport ¶
type KafkaTransport struct {
// contains filtered or unexported fields
}
KafkaTransport implements Transport using Kafka.
func NewKafkaTransport ¶
func NewKafkaTransport(brokers []string, topic string, opts ...KafkaOption) (*KafkaTransport, error)
NewKafkaTransport creates a Kafka transport.
func (*KafkaTransport) Close ¶
func (t *KafkaTransport) Close() error
Close shuts down the transport.
func (*KafkaTransport) Send ¶
func (t *KafkaTransport) Send(evt Event) error
Send sends an event to Kafka with retry logic.
func (*KafkaTransport) Start ¶
func (t *KafkaTransport) Start() error
Start initializes the transport.
type LogConfig ¶
type LogConfig struct {
FilePath string
MaxSizeMB int
MaxBackups int
MaxAgeDays int
Compress bool
DBBatchSize int
FlushInterval time.Duration
RetryCount int
RetryDelay time.Duration
}
LogConfig holds configuration for logging handlers.
func DefaultLogConfig ¶
func DefaultLogConfig() LogConfig
DefaultLogConfig returns a default logging configuration.
type LogOption ¶
type LogOption func(*LogConfig)
LogOption is a functional option for configuring logging.
func WithCompress ¶
func WithDBBatchSize ¶
func WithFilePath ¶
func WithFlushInterval ¶
func WithMaxAgeDays ¶
func WithMaxBackups ¶
func WithMaxSizeMB ¶
func WithRetryCount ¶
func WithRetryDelay ¶
type LogPayload ¶
type LogPayload struct {
Message string
Fields map[string]string
Error string // Only for error and fatal events
Detail string // Only for assertion failed events
}
LogPayload is the payload structure for logging events.
type Logger ¶
type Logger struct {
Bus *Bus
Metrics BusMetrics
}
func (*Logger) AssertEqual ¶
func (*Logger) AssertNoError ¶
func (*Logger) AssertTrue ¶
type PrometheusMetrics ¶
type PrometheusMetrics struct {
// contains filtered or unexported fields
}
PrometheusMetrics implements BusMetrics with Prometheus.
func NewPrometheusMetrics ¶
func NewPrometheusMetrics(registerer prometheus.Registerer) *PrometheusMetrics
NewPrometheusMetrics creates a new PrometheusMetrics instance.
func (*PrometheusMetrics) EventDropped ¶
func (m *PrometheusMetrics) EventDropped(et EventType)
EventDropped increments the dropped counter.
func (*PrometheusMetrics) EventPublished ¶
func (m *PrometheusMetrics) EventPublished(et EventType)
EventPublished increments the published counter.
func (*PrometheusMetrics) HandlerLatency ¶
func (m *PrometheusMetrics) HandlerLatency(et EventType, d time.Duration)
HandlerLatency records the handler latency.