Documentation
¶
Overview ¶
Package driver implements a lightweight Bolt protocol client used to communicate with Neo4j and Memgraph databases.
Index ¶
- Constants
- func IsRetriable(err error) bool
- func Retry[T any](ctx context.Context, policy *RetryPolicy, fn func() (T, error)) (T, error)
- func RetryVoid(ctx context.Context, policy *RetryPolicy, fn func() error) error
- func UserAgent() string
- func Version() string
- type BackpressureHandler
- type BackpressureStrategy
- type BoltLogger
- type CategorizedLogger
- type ChannelSubscriber
- type Config
- type ConsoleLogger
- func (c *ConsoleLogger) Debug(msg string, keysAndValues ...interface{})
- func (c *ConsoleLogger) Error(msg string, keysAndValues ...interface{})
- func (c *ConsoleLogger) Info(msg string, keysAndValues ...interface{})
- func (c *ConsoleLogger) IsDebugEnabled() bool
- func (c *ConsoleLogger) IsInfoEnabled() bool
- func (c *ConsoleLogger) SetLevel(level LogLevel)
- func (c *ConsoleLogger) SetTimeFormat(format string)
- func (c *ConsoleLogger) Warn(msg string, keysAndValues ...interface{})
- type DatabaseError
- type DedicatedBoltLogger
- func (l *DedicatedBoltLogger) Debug(msg string, keysAndValues ...interface{})
- func (l *DedicatedBoltLogger) Error(msg string, keysAndValues ...interface{})
- func (l *DedicatedBoltLogger) Info(msg string, keysAndValues ...interface{})
- func (l *DedicatedBoltLogger) IsDebugEnabled() bool
- func (l *DedicatedBoltLogger) IsInfoEnabled() bool
- func (l *DedicatedBoltLogger) LogBoltError(code string, message string, metadata map[string]interface{})
- func (l *DedicatedBoltLogger) LogBoltHandshake(version string, clientName string, authScheme string)
- func (l *DedicatedBoltLogger) LogBoltMessage(direction string, messageType string, fields []interface{})
- func (l *DedicatedBoltLogger) Warn(msg string, keysAndValues ...interface{})
- type Driver
- type EnhancedConsoleLogger
- func (l *EnhancedConsoleLogger) Debug(msg string, keysAndValues ...interface{})
- func (l *EnhancedConsoleLogger) Error(msg string, keysAndValues ...interface{})
- func (l *EnhancedConsoleLogger) Info(msg string, keysAndValues ...interface{})
- func (l *EnhancedConsoleLogger) IsCategoryEnabled(category LogCategory) bool
- func (l *EnhancedConsoleLogger) IsDebugEnabled() bool
- func (l *EnhancedConsoleLogger) IsInfoEnabled() bool
- func (l *EnhancedConsoleLogger) IsLevelEnabled(level LogLevel) bool
- func (l *EnhancedConsoleLogger) LogWithCategory(level LogLevel, category LogCategory, msg string, keysAndValues ...interface{})
- func (l *EnhancedConsoleLogger) SetCategoryLevel(category LogCategory, level LogLevel)
- func (l *EnhancedConsoleLogger) Warn(msg string, keysAndValues ...interface{})
- type EnhancedStructuredLogger
- func (l *EnhancedStructuredLogger) Debug(msg string, keysAndValues ...interface{})
- func (l *EnhancedStructuredLogger) Error(msg string, keysAndValues ...interface{})
- func (l *EnhancedStructuredLogger) Info(msg string, keysAndValues ...interface{})
- func (l *EnhancedStructuredLogger) IsCategoryEnabled(category LogCategory) bool
- func (l *EnhancedStructuredLogger) IsDebugEnabled() bool
- func (l *EnhancedStructuredLogger) IsInfoEnabled() bool
- func (l *EnhancedStructuredLogger) IsLevelEnabled(level LogLevel) bool
- func (l *EnhancedStructuredLogger) LogStructured(entry LogEntry)
- func (l *EnhancedStructuredLogger) LogWithCategory(level LogLevel, category LogCategory, msg string, keysAndValues ...interface{})
- func (l *EnhancedStructuredLogger) SetCategoryLevel(category LogCategory, level LogLevel)
- func (l *EnhancedStructuredLogger) Warn(msg string, keysAndValues ...interface{})
- type ErrorHandler
- type FilterFunc
- type FuncSubscriber
- type LogCategory
- type LogEntry
- type LogLevel
- type Logger
- type LoggerAdapter
- func (a *LoggerAdapter) Debug(msg string, keysAndValues ...interface{})
- func (a *LoggerAdapter) Error(msg string, keysAndValues ...interface{})
- func (a *LoggerAdapter) Info(msg string, keysAndValues ...interface{})
- func (a *LoggerAdapter) IsDebugEnabled() bool
- func (a *LoggerAdapter) IsInfoEnabled() bool
- func (a *LoggerAdapter) Warn(msg string, keysAndValues ...interface{})
- type LoggingConfig
- type MapFunc
- type NoOpLogger
- func (l *NoOpLogger) Debug(msg string, keysAndValues ...interface{})
- func (l *NoOpLogger) Error(msg string, keysAndValues ...interface{})
- func (l *NoOpLogger) Info(msg string, keysAndValues ...interface{})
- func (l *NoOpLogger) IsDebugEnabled() bool
- func (l *NoOpLogger) IsInfoEnabled() bool
- func (l *NoOpLogger) Warn(msg string, keysAndValues ...interface{})
- type Notification
- type ObservabilityConfig
- type PoolConfig
- type Position
- type QueryPlan
- type QueryProfile
- type ReactiveConfig
- type ReactiveDriver
- type ReactiveMetrics
- type ReactiveResult
- type Record
- type RecordEvent
- type Result
- type ResultSummary
- type RetryContext
- type RetryError
- type RetryPolicy
- type StreamConnection
- type StreamingDriver
- type StreamingResult
- func (r *StreamingResult) Collect(ctx context.Context) ([]*Record, error)
- func (r *StreamingResult) Consume(ctx context.Context) (*ResultSummary, error)
- func (r *StreamingResult) Err() error
- func (r *StreamingResult) IsOpen() bool
- func (r *StreamingResult) Keys() ([]string, error)
- func (r *StreamingResult) Next(ctx context.Context) bool
- func (r *StreamingResult) NextRecord(ctx context.Context, record **Record) bool
- func (r *StreamingResult) Peek(ctx context.Context) bool
- func (r *StreamingResult) PeekRecord(ctx context.Context, record **Record) bool
- func (r *StreamingResult) Record() *Record
- func (r *StreamingResult) Single(ctx context.Context) (*Record, error)
- type StructuredLogger
- type Subscriber
- type TLSConfig
- type TransformFunc
- type UsageError
Constants ¶
const ( ColorReset = "\033[0m" ColorRed = "\033[31m" ColorYellow = "\033[33m" ColorBlue = "\033[34m" ColorGray = "\033[37m" ColorGreen = "\033[32m" )
Color codes for different log levels
Variables ¶
This section is empty.
Functions ¶
func IsRetriable ¶
IsRetriable checks if an error should trigger a retry.
func RetryVoid ¶
func RetryVoid(ctx context.Context, policy *RetryPolicy, fn func() error) error
RetryVoid executes fn with retry logic for functions that don't return a value.
Types ¶
type BackpressureHandler ¶
type BackpressureHandler struct {
// contains filtered or unexported fields
}
BackpressureHandler manages backpressure in reactive streams
func NewBackpressureHandler ¶
func NewBackpressureHandler(strategy BackpressureStrategy, bufferSize int) *BackpressureHandler
NewBackpressureHandler creates a new backpressure handler
func (*BackpressureHandler) DrainBuffer ¶
func (bp *BackpressureHandler) DrainBuffer(ctx context.Context, output chan<- RecordEvent) error
DrainBuffer drains the backpressure buffer
func (*BackpressureHandler) GetDroppedCount ¶
func (bp *BackpressureHandler) GetDroppedCount() int64
GetDroppedCount returns the number of events dropped due to backpressure
func (*BackpressureHandler) Handle ¶
func (bp *BackpressureHandler) Handle(ctx context.Context, event RecordEvent, output chan<- RecordEvent) error
Handle applies backpressure strategy to an event
type BackpressureStrategy ¶
type BackpressureStrategy int
BackpressureStrategy defines how to handle backpressure
const ( // BackpressureBuffer buffers items when subscriber is slow BackpressureBuffer BackpressureStrategy = iota // BackpressureDrop drops items when subscriber is slow BackpressureDrop // BackpressureBlock blocks emission when subscriber is slow BackpressureBlock // BackpressureLatest keeps only the latest item when subscriber is slow BackpressureLatest )
type BoltLogger ¶
type BoltLogger interface {
Logger
// LogBoltMessage logs a Bolt protocol message with full details
LogBoltMessage(direction string, messageType string, fields []interface{})
// LogBoltHandshake logs Bolt handshake details
LogBoltHandshake(version string, clientName string, authScheme string)
// LogBoltError logs Bolt protocol errors
LogBoltError(code string, message string, metadata map[string]interface{})
}
BoltLogger is a specialized logger for Bolt protocol tracing Similar to Neo4j's dedicated Bolt debug logger
type CategorizedLogger ¶
type CategorizedLogger interface {
Logger
// LogWithCategory logs a message with a specific category for granular control
LogWithCategory(level LogLevel, category LogCategory, msg string, keysAndValues ...interface{})
// IsLevelEnabled returns true if the specified level is enabled
IsLevelEnabled(level LogLevel) bool
// IsCategoryEnabled returns true if the specified category is enabled
IsCategoryEnabled(category LogCategory) bool
// SetCategoryLevel sets the log level for a specific category
SetCategoryLevel(category LogCategory, level LogLevel)
}
CategorizedLogger extends Logger with category-specific and leveled logging
type ChannelSubscriber ¶
type ChannelSubscriber struct {
RecordChan chan *Record
ErrorChan chan error
DoneChan chan *ResultSummary
}
ChannelSubscriber writes events to a channel
func NewChannelSubscriber ¶
func NewChannelSubscriber(bufferSize int) *ChannelSubscriber
NewChannelSubscriber creates a new channel subscriber with buffered channels
func (*ChannelSubscriber) OnComplete ¶
func (c *ChannelSubscriber) OnComplete(summary *ResultSummary)
func (*ChannelSubscriber) OnError ¶
func (c *ChannelSubscriber) OnError(err error)
func (*ChannelSubscriber) OnNext ¶
func (c *ChannelSubscriber) OnNext(record *Record)
type Config ¶
type Config struct {
// TLS holds TLS-specific configuration
TLS *TLSConfig
// ConnectionPool holds connection pool configuration
ConnectionPool *PoolConfig
// Observability holds telemetry configuration
Observability *ObservabilityConfig
// Logging holds logging configuration
Logging *LoggingConfig
}
Config holds configuration options for the driver
func DefaultConfig ¶
func DefaultConfig() *Config
DefaultConfig returns a Config with sensible defaults
type ConsoleLogger ¶
type ConsoleLogger struct {
// contains filtered or unexported fields
}
ConsoleLogger logs to stdout/stderr with configurable level and formatting
func NewConsoleLogger ¶
func NewConsoleLogger(level LogLevel) *ConsoleLogger
NewConsoleLogger creates a new console logger with the specified level
func NewConsoleLoggerWithOutput ¶
func NewConsoleLoggerWithOutput(level LogLevel, stdout, stderr io.Writer) *ConsoleLogger
NewConsoleLoggerWithOutput creates a console logger with custom output writers
func (*ConsoleLogger) Debug ¶
func (c *ConsoleLogger) Debug(msg string, keysAndValues ...interface{})
func (*ConsoleLogger) Error ¶
func (c *ConsoleLogger) Error(msg string, keysAndValues ...interface{})
func (*ConsoleLogger) Info ¶
func (c *ConsoleLogger) Info(msg string, keysAndValues ...interface{})
func (*ConsoleLogger) IsDebugEnabled ¶
func (c *ConsoleLogger) IsDebugEnabled() bool
func (*ConsoleLogger) IsInfoEnabled ¶
func (c *ConsoleLogger) IsInfoEnabled() bool
func (*ConsoleLogger) SetLevel ¶
func (c *ConsoleLogger) SetLevel(level LogLevel)
SetLevel updates the log level
func (*ConsoleLogger) SetTimeFormat ¶
func (c *ConsoleLogger) SetTimeFormat(format string)
SetTimeFormat sets the time format for log messages
func (*ConsoleLogger) Warn ¶
func (c *ConsoleLogger) Warn(msg string, keysAndValues ...interface{})
type DatabaseError ¶
DatabaseError represents a database server error (Neo4j, Memgraph, etc.)
func (*DatabaseError) Error ¶
func (e *DatabaseError) Error() string
func (*DatabaseError) IsAuthError ¶
func (e *DatabaseError) IsAuthError() bool
IsAuthError returns true for authentication/authorization errors.
func (*DatabaseError) IsClusterError ¶
func (e *DatabaseError) IsClusterError() bool
IsClusterError returns true for cluster/replication errors.
func (*DatabaseError) IsConflict ¶
func (e *DatabaseError) IsConflict() bool
IsConflict returns true for transaction conflict/deadlock errors.
func (*DatabaseError) IsRetriable ¶
func (e *DatabaseError) IsRetriable() bool
IsRetriable returns true if the error is transient and can be retried.
func (*DatabaseError) IsTransient ¶
func (e *DatabaseError) IsTransient() bool
IsTransient returns true for transient/temporary errors.
type DedicatedBoltLogger ¶
type DedicatedBoltLogger struct {
Level LogLevel
Output io.Writer
// contains filtered or unexported fields
}
DedicatedBoltLogger provides specialized Bolt protocol logging Similar to Neo4j's dedicated Bolt debug logger
func (*DedicatedBoltLogger) Debug ¶
func (l *DedicatedBoltLogger) Debug(msg string, keysAndValues ...interface{})
func (*DedicatedBoltLogger) Error ¶
func (l *DedicatedBoltLogger) Error(msg string, keysAndValues ...interface{})
func (*DedicatedBoltLogger) Info ¶
func (l *DedicatedBoltLogger) Info(msg string, keysAndValues ...interface{})
func (*DedicatedBoltLogger) IsDebugEnabled ¶
func (l *DedicatedBoltLogger) IsDebugEnabled() bool
func (*DedicatedBoltLogger) IsInfoEnabled ¶
func (l *DedicatedBoltLogger) IsInfoEnabled() bool
func (*DedicatedBoltLogger) LogBoltError ¶
func (l *DedicatedBoltLogger) LogBoltError(code string, message string, metadata map[string]interface{})
func (*DedicatedBoltLogger) LogBoltHandshake ¶
func (l *DedicatedBoltLogger) LogBoltHandshake(version string, clientName string, authScheme string)
func (*DedicatedBoltLogger) LogBoltMessage ¶
func (l *DedicatedBoltLogger) LogBoltMessage(direction string, messageType string, fields []interface{})
func (*DedicatedBoltLogger) Warn ¶
func (l *DedicatedBoltLogger) Warn(msg string, keysAndValues ...interface{})
type Driver ¶
type Driver interface {
// Close releases all resources associated with the driver.
Close() error
// Ping verifies the server is reachable and the Bolt protocol is
// supported.
Ping() error
// Run executes a Cypher query with context, optional parameters and metadata.
// It returns the column names and resulting rows.
Run(ctx context.Context, query string, params map[string]interface{}, metaData map[string]interface{}) ([]string, []map[string]interface{}, error)
// RunWithContext executes a Cypher query with context support and returns detailed summary.
// This is the recommended method for production use with observability.
RunWithContext(ctx context.Context, query string, params map[string]interface{}, metaData map[string]interface{}) ([]string, []map[string]interface{}, *ResultSummary, error)
}
Driver defines the minimal functionality required to communicate with a Cypher-compatible database. Implementations must manage their own connections and provide simple query execution utilities.
type EnhancedConsoleLogger ¶
type EnhancedConsoleLogger struct {
Level LogLevel
Output io.Writer
IncludeTimestamp bool
IncludeSource bool
ColorEnabled bool
CategoryLevels map[LogCategory]LogLevel
// contains filtered or unexported fields
}
EnhancedConsoleLogger provides rich console logging with colors and formatting Similar to Neo4j's ConsoleLogger but with enhanced capabilities
func (*EnhancedConsoleLogger) Debug ¶
func (l *EnhancedConsoleLogger) Debug(msg string, keysAndValues ...interface{})
func (*EnhancedConsoleLogger) Error ¶
func (l *EnhancedConsoleLogger) Error(msg string, keysAndValues ...interface{})
func (*EnhancedConsoleLogger) Info ¶
func (l *EnhancedConsoleLogger) Info(msg string, keysAndValues ...interface{})
func (*EnhancedConsoleLogger) IsCategoryEnabled ¶
func (l *EnhancedConsoleLogger) IsCategoryEnabled(category LogCategory) bool
func (*EnhancedConsoleLogger) IsDebugEnabled ¶
func (l *EnhancedConsoleLogger) IsDebugEnabled() bool
func (*EnhancedConsoleLogger) IsInfoEnabled ¶
func (l *EnhancedConsoleLogger) IsInfoEnabled() bool
func (*EnhancedConsoleLogger) IsLevelEnabled ¶
func (l *EnhancedConsoleLogger) IsLevelEnabled(level LogLevel) bool
func (*EnhancedConsoleLogger) LogWithCategory ¶
func (l *EnhancedConsoleLogger) LogWithCategory(level LogLevel, category LogCategory, msg string, keysAndValues ...interface{})
LogWithCategory implements CategorizedLogger interface
func (*EnhancedConsoleLogger) SetCategoryLevel ¶
func (l *EnhancedConsoleLogger) SetCategoryLevel(category LogCategory, level LogLevel)
func (*EnhancedConsoleLogger) Warn ¶
func (l *EnhancedConsoleLogger) Warn(msg string, keysAndValues ...interface{})
type EnhancedStructuredLogger ¶
type EnhancedStructuredLogger struct {
Level LogLevel
Output io.Writer
IncludeTimestamp bool
IncludeSource bool
RequestIDEnabled bool
CategoryLevels map[LogCategory]LogLevel
// contains filtered or unexported fields
}
EnhancedStructuredLogger provides JSON structured logging output
func (*EnhancedStructuredLogger) Debug ¶
func (l *EnhancedStructuredLogger) Debug(msg string, keysAndValues ...interface{})
func (*EnhancedStructuredLogger) Error ¶
func (l *EnhancedStructuredLogger) Error(msg string, keysAndValues ...interface{})
func (*EnhancedStructuredLogger) Info ¶
func (l *EnhancedStructuredLogger) Info(msg string, keysAndValues ...interface{})
func (*EnhancedStructuredLogger) IsCategoryEnabled ¶
func (l *EnhancedStructuredLogger) IsCategoryEnabled(category LogCategory) bool
func (*EnhancedStructuredLogger) IsDebugEnabled ¶
func (l *EnhancedStructuredLogger) IsDebugEnabled() bool
func (*EnhancedStructuredLogger) IsInfoEnabled ¶
func (l *EnhancedStructuredLogger) IsInfoEnabled() bool
func (*EnhancedStructuredLogger) IsLevelEnabled ¶
func (l *EnhancedStructuredLogger) IsLevelEnabled(level LogLevel) bool
func (*EnhancedStructuredLogger) LogStructured ¶
func (l *EnhancedStructuredLogger) LogStructured(entry LogEntry)
func (*EnhancedStructuredLogger) LogWithCategory ¶
func (l *EnhancedStructuredLogger) LogWithCategory(level LogLevel, category LogCategory, msg string, keysAndValues ...interface{})
func (*EnhancedStructuredLogger) SetCategoryLevel ¶
func (l *EnhancedStructuredLogger) SetCategoryLevel(category LogCategory, level LogLevel)
func (*EnhancedStructuredLogger) Warn ¶
func (l *EnhancedStructuredLogger) Warn(msg string, keysAndValues ...interface{})
type ErrorHandler ¶
type FilterFunc ¶
type FuncSubscriber ¶
type FuncSubscriber struct {
OnNextFunc func(*Record)
OnErrorFunc func(error)
OnCompleteFunc func(*ResultSummary)
}
FuncSubscriber allows using functions as subscribers
func (*FuncSubscriber) OnComplete ¶
func (f *FuncSubscriber) OnComplete(summary *ResultSummary)
func (*FuncSubscriber) OnError ¶
func (f *FuncSubscriber) OnError(err error)
func (*FuncSubscriber) OnNext ¶
func (f *FuncSubscriber) OnNext(record *Record)
type LogCategory ¶
type LogCategory string
LogCategory represents different categories of logging for granular control
const ( // LogCategoryGeneral for general driver operations LogCategoryGeneral LogCategory = "driver" // LogCategoryConnection for connection pool and networking events LogCategoryConnection LogCategory = "connection" // LogCategoryQuery for query execution and timing LogCategoryQuery LogCategory = "query" // LogCategoryBolt for low-level Bolt protocol messages LogCategoryBolt LogCategory = "bolt" // LogCategoryAuth for authentication events LogCategoryAuth LogCategory = "auth" // LogCategoryTLS for TLS/SSL related events LogCategoryTLS LogCategory = "tls" // LogCategoryStreaming for streaming result processing LogCategoryStreaming LogCategory = "streaming" // LogCategoryReactive for reactive programming events LogCategoryReactive LogCategory = "reactive" )
type LogEntry ¶
type LogEntry struct {
Timestamp time.Time `json:"timestamp"`
Level LogLevel `json:"level"`
Category LogCategory `json:"category"`
Message string `json:"message"`
Fields map[string]interface{} `json:"fields,omitempty"`
Error string `json:"error,omitempty"`
Source string `json:"source,omitempty"`
RequestID string `json:"request_id,omitempty"`
}
LogEntry represents a structured log entry with full metadata
type LogLevel ¶
type LogLevel int
LogLevel represents the severity of a log message
const ( // LogLevelDebug logs everything including detailed protocol messages LogLevelDebug LogLevel = iota // LogLevelInfo logs general information about driver operations LogLevelInfo // LogLevelWarn logs warning messages that don't stop execution LogLevelWarn // LogLevelError logs only error conditions LogLevelError // LogLevelOff disables all logging LogLevelOff )
func ParseLogLevel ¶
ParseLogLevel parses a string into a LogLevel
type Logger ¶
type Logger interface {
// Debug logs a debug message with optional key-value pairs
Debug(msg string, keysAndValues ...interface{})
// Info logs an info message with optional key-value pairs
Info(msg string, keysAndValues ...interface{})
// Warn logs a warning message with optional key-value pairs
Warn(msg string, keysAndValues ...interface{})
// Error logs an error message with optional key-value pairs
Error(msg string, keysAndValues ...interface{})
// IsDebugEnabled returns true if debug logging is enabled
IsDebugEnabled() bool
// IsInfoEnabled returns true if info logging is enabled
IsInfoEnabled() bool
}
Logger defines the interface for pluggable logging in the driver. Compatible with Neo4j's log.Logger interface but with enhanced capabilities.
func NewLoggerAdapter ¶
func NewLoggerAdapter( debugFunc, infoFunc, warnFunc, errorFunc func(msg string, args ...interface{}), debugEnabled, infoEnabled bool, ) Logger
NewLoggerAdapter creates a logger adapter from external logging functions This allows easy integration with existing logging frameworks
type LoggerAdapter ¶
type LoggerAdapter struct {
DebugFunc func(msg string, args ...interface{})
InfoFunc func(msg string, args ...interface{})
WarnFunc func(msg string, args ...interface{})
ErrorFunc func(msg string, args ...interface{})
DebugEnabled bool
InfoEnabled bool
}
LoggerAdapter allows wrapping external loggers to implement our Logger interface Similar to how Neo4j allows custom logger implementations
func (*LoggerAdapter) Debug ¶
func (a *LoggerAdapter) Debug(msg string, keysAndValues ...interface{})
func (*LoggerAdapter) Error ¶
func (a *LoggerAdapter) Error(msg string, keysAndValues ...interface{})
func (*LoggerAdapter) Info ¶
func (a *LoggerAdapter) Info(msg string, keysAndValues ...interface{})
func (*LoggerAdapter) IsDebugEnabled ¶
func (a *LoggerAdapter) IsDebugEnabled() bool
func (*LoggerAdapter) IsInfoEnabled ¶
func (a *LoggerAdapter) IsInfoEnabled() bool
func (*LoggerAdapter) Warn ¶
func (a *LoggerAdapter) Warn(msg string, keysAndValues ...interface{})
type LoggingConfig ¶
type LoggingConfig struct {
// Logger is the pluggable logger implementation (compatible with Neo4j's interface)
Logger Logger
// BoltLogger is an optional specialized logger for Bolt protocol tracing
BoltLogger BoltLogger
// Level sets the global minimum log level to output
Level LogLevel
// CategoryLevels allows setting different log levels per category
CategoryLevels map[LogCategory]LogLevel
// EnabledCategories specifies which categories should be logged
EnabledCategories map[LogCategory]bool
// StructuredOutput enables JSON structured logging format
StructuredOutput bool
// IncludeTimestamp includes timestamp in log output
IncludeTimestamp bool
// IncludeSource includes source location in logs
IncludeSource bool
// RequestIDEnabled enables request ID tracking
RequestIDEnabled bool
// Specific feature flags for backward compatibility
// LogBoltMessages enables detailed Bolt protocol message logging
LogBoltMessages bool
// LogConnectionPool enables connection pool event logging
LogConnectionPool bool
// LogQueryTiming enables query execution timing logs
LogQueryTiming bool
// LogAuthEvents enables authentication event logging
LogAuthEvents bool
// LogTLSEvents enables TLS/SSL event logging
LogTLSEvents bool
// LogStreamingEvents enables streaming result logging
LogStreamingEvents bool
// LogReactiveEvents enables reactive programming event logging
LogReactiveEvents bool
}
LoggingConfig holds comprehensive logging configuration
func DefaultLoggingConfig ¶
func DefaultLoggingConfig() *LoggingConfig
DefaultLoggingConfig returns a logging configuration with no-op logger (silent by default) Similar to Neo4j driver's default behavior
func NewBoltTracingConfig ¶
func NewBoltTracingConfig(level LogLevel, output io.Writer) *LoggingConfig
NewBoltTracingConfig creates a configuration with detailed Bolt protocol tracing Similar to Neo4j's Bolt debug logger
func NewConsoleLoggingConfig ¶
func NewConsoleLoggingConfig(level LogLevel) *LoggingConfig
NewConsoleLoggingConfig creates a console logging configuration similar to Neo4j's ConsoleLogger
func NewStructuredLoggingConfig ¶
func NewStructuredLoggingConfig(level LogLevel, output io.Writer) *LoggingConfig
NewStructuredLoggingConfig creates a structured JSON logging configuration
type NoOpLogger ¶
type NoOpLogger struct{}
NoOpLogger is a logger that does nothing (default behavior)
func (*NoOpLogger) Debug ¶
func (l *NoOpLogger) Debug(msg string, keysAndValues ...interface{})
func (*NoOpLogger) Error ¶
func (l *NoOpLogger) Error(msg string, keysAndValues ...interface{})
func (*NoOpLogger) Info ¶
func (l *NoOpLogger) Info(msg string, keysAndValues ...interface{})
func (*NoOpLogger) IsDebugEnabled ¶
func (l *NoOpLogger) IsDebugEnabled() bool
func (*NoOpLogger) IsInfoEnabled ¶
func (l *NoOpLogger) IsInfoEnabled() bool
func (*NoOpLogger) Warn ¶
func (l *NoOpLogger) Warn(msg string, keysAndValues ...interface{})
type Notification ¶
type Notification struct {
Code string
Title string
Description string
Severity string
Position *Position
}
Notification represents a server notification
type ObservabilityConfig ¶
type ObservabilityConfig struct {
// EnableTracing enables OpenTelemetry distributed tracing
EnableTracing bool
// EnableMetrics enables OpenTelemetry metrics collection
EnableMetrics bool
// TracingAttributes are additional attributes to add to all spans
TracingAttributes []attribute.KeyValue
// MetricAttributes are additional attributes to add to all metrics
MetricAttributes []attribute.KeyValue
}
ObservabilityConfig controls telemetry collection
func DefaultObservabilityConfig ¶
func DefaultObservabilityConfig() *ObservabilityConfig
DefaultObservabilityConfig returns default observability configuration
type PoolConfig ¶
type PoolConfig struct {
// MaxConnections specifies the maximum number of connections in the pool
// Default: 100 (matching Neo4j driver)
MaxConnections int
// MaxIdleTime specifies how long connections can be idle before being closed
// Default: 30 minutes
MaxIdleTime time.Duration
// ConnectionLifetime specifies the maximum lifetime of a connection
// Default: 1 hour (matching Neo4j driver)
ConnectionLifetime time.Duration
// AcquisitionTimeout specifies how long to wait for a connection from the pool
// Default: 30 seconds
AcquisitionTimeout time.Duration
// EnableLivenessCheck enables periodic connection health checks
// Default: true
EnableLivenessCheck bool
}
PoolConfig provides connection pool configuration options
type QueryPlan ¶
type QueryPlan struct {
OperatorType string
Identifiers []string
Arguments map[string]interface{}
Children []*QueryPlan
}
QueryPlan represents a query execution plan
type QueryProfile ¶
type QueryProfile struct {
OperatorType string
Identifiers []string
Arguments map[string]interface{}
DbHits int64
Rows int64
Time time.Duration
Children []*QueryProfile
}
QueryProfile represents query profiling information
type ReactiveConfig ¶
type ReactiveConfig struct {
// BufferSize sets the size of internal buffers
BufferSize int
// BackpressureStrategy defines how to handle slow consumers
BackpressureStrategy BackpressureStrategy
// MaxConcurrency limits concurrent processing
MaxConcurrency int
// ErrorRecovery enables automatic error recovery
ErrorRecovery bool
// Metrics enables reactive stream metrics collection
Metrics bool
}
ReactiveConfig configures reactive stream behavior
func DefaultReactiveConfig ¶
func DefaultReactiveConfig() *ReactiveConfig
DefaultReactiveConfig returns default reactive configuration
type ReactiveDriver ¶
type ReactiveDriver interface {
StreamingDriver
// RunReactive executes a query and returns a ReactiveResult for non-blocking,
// event-driven processing with composable operators
RunReactive(ctx context.Context, query string, params map[string]interface{}, metaData map[string]interface{}) (ReactiveResult, error)
}
ReactiveDriver extends StreamingDriver with reactive programming capabilities
type ReactiveMetrics ¶
type ReactiveMetrics struct {
RecordsProcessed int64
RecordsDropped int64
AverageLatency time.Duration
ThroughputPerSec float64
BackpressureEvents int64
ErrorCount int64
OperatorCount int
// contains filtered or unexported fields
}
ReactiveMetrics tracks reactive stream performance
func NewReactiveMetrics ¶
func NewReactiveMetrics() *ReactiveMetrics
NewReactiveMetrics creates a new metrics tracker
func (*ReactiveMetrics) GetSnapshot ¶
func (m *ReactiveMetrics) GetSnapshot() ReactiveMetrics
GetSnapshot returns a snapshot of current metrics
func (*ReactiveMetrics) RecordDropped ¶
func (m *ReactiveMetrics) RecordDropped()
RecordDropped increments the dropped records counter
func (*ReactiveMetrics) RecordError ¶
func (m *ReactiveMetrics) RecordError()
RecordError increments the error counter
func (*ReactiveMetrics) RecordProcessed ¶
func (m *ReactiveMetrics) RecordProcessed()
RecordProcessed increments the processed records counter
type ReactiveResult ¶
type ReactiveResult interface {
// Subscribe consumes the reactive stream with the provided subscriber
Subscribe(ctx context.Context, subscriber Subscriber) error
// Records returns a channel that emits RecordEvent items
Records(ctx context.Context) <-chan RecordEvent
// Transform applies a transformation function to each record
Transform(fn TransformFunc) ReactiveResult
// Filter filters records based on a predicate function
Filter(fn FilterFunc) ReactiveResult
// Map transforms records to a different type
Map(fn MapFunc) ReactiveResult
// Batch groups records into batches of specified size
Batch(size int) ReactiveResult
// BatchByTime groups records into time-based batches
BatchByTime(duration time.Duration) ReactiveResult
// Take limits the stream to the first n records
Take(n int64) ReactiveResult
// Skip skips the first n records
Skip(n int64) ReactiveResult
// Distinct removes duplicate records based on a key function
Distinct(keyFunc func(*Record) string) ReactiveResult
// Throttle limits the rate of record emission
Throttle(rate time.Duration) ReactiveResult
// OnError handles errors in the stream
OnError(handler ErrorHandler) ReactiveResult
// DoOnNext performs a side effect for each record without modifying the stream
DoOnNext(action func(*Record)) ReactiveResult
// DoOnComplete performs a side effect when the stream completes
DoOnComplete(action func(*ResultSummary)) ReactiveResult
// DoOnError performs a side effect when an error occurs
DoOnError(action func(error)) ReactiveResult
// Keys returns the column names for this result
Keys() ([]string, error)
// ToSlice collects all records into a slice (blocking operation)
ToSlice(ctx context.Context) ([]*Record, error)
// First returns the first record (blocking operation)
First(ctx context.Context) (*Record, error)
// Count counts all records in the stream (blocking operation)
Count(ctx context.Context) (int64, error)
}
ReactiveResult provides reactive stream processing of query results with composable operators, backpressure handling, and event-driven consumption
func NewReactiveResult ¶
func NewReactiveResult(source Result, query string, params map[string]interface{}, config *ReactiveConfig) ReactiveResult
NewReactiveResult creates a new reactive result from a streaming result
type RecordEvent ¶
type RecordEvent struct {
// Record contains the data (nil for error or completion events)
Record *Record
// Error contains any error that occurred (nil for successful record events)
Error error
// Complete indicates stream completion (true only for final event)
Complete bool
// Summary contains result summary (only present on completion)
Summary *ResultSummary
}
RecordEvent represents an event in the reactive stream
type Result ¶
type Result interface {
// Keys returns the column names available in the result set
Keys() ([]string, error)
// Next advances to the next record and returns true if a record is available.
// Returns false when no more records or an error occurs.
Next(ctx context.Context) bool
// NextRecord advances to the next record and sets the record parameter.
// Returns true if a record is available, false otherwise.
NextRecord(ctx context.Context, record **Record) bool
// Record returns the current record. May be nil if Next() hasn't been called
// or returned false.
Record() *Record
// Peek returns true if there is a record after the current one without advancing.
// Useful for lookahead without consuming the record.
Peek(ctx context.Context) bool
// PeekRecord returns the next record without advancing the cursor.
PeekRecord(ctx context.Context, record **Record) bool
// Err returns any error that occurred during iteration
Err() error
// Collect fetches all remaining records and returns them as a slice.
// This consumes the entire result stream.
Collect(ctx context.Context) ([]*Record, error)
// Single returns exactly one record from the stream.
// Returns error if zero or more than one record remains.
Single(ctx context.Context) (*Record, error)
// Consume discards all remaining records and returns the result summary.
// This closes the result stream.
Consume(ctx context.Context) (*ResultSummary, error)
// IsOpen returns true if the result stream is still available for reading
IsOpen() bool
}
Result provides streaming access to query results using cursor-style iteration. This interface follows Neo4j driver patterns for memory-efficient processing of large result sets.
type ResultSummary ¶
type ResultSummary struct {
// Query execution metrics
QueryText string
Parameters map[string]interface{}
ExecutionTime time.Duration
// Result metrics
RecordsAvailable int64
RecordsConsumed int64
// Server information
ServerAddress string
ServerVersion string
Bookmark string
// Query classification
QueryType string // READ, WRITE, READ_WRITE, SCHEMA_WRITE
// Notifications from server (warnings, deprecations, etc.)
Notifications []Notification
// Query plan information (if available)
Plan *QueryPlan
// Profile information (if profiling enabled)
Profile *QueryProfile
// Database statistics from query execution
NodesCreated int64
NodesDeleted int64
RelationshipsCreated int64
RelationshipsDeleted int64
PropertiesSet int64
LabelsAdded int64
LabelsRemoved int64
IndexesAdded int64
IndexesRemoved int64
ConstraintsAdded int64
ConstraintsRemoved int64
ContainsUpdates bool
ContainsSystemUpdates bool
}
ResultSummary contains query execution metadata
type RetryContext ¶
type RetryContext struct {
Attempt int
Error error
NextDelay time.Duration
CumulativeDelay time.Duration
}
RetryContext provides context to retry callbacks.
type RetryError ¶
RetryError wraps the original error with retry context.
func (*RetryError) Error ¶
func (e *RetryError) Error() string
func (*RetryError) Unwrap ¶
func (e *RetryError) Unwrap() error
type RetryPolicy ¶
type RetryPolicy struct {
MaxAttempts int
BaseDelay time.Duration
MaxDelay time.Duration
Multiplier float64
JitterFactor float64 // 0.0 = no jitter, 1.0 = full jitter
// Callbacks for observability
OnRetry func(ctx RetryContext)
OnSuccess func(attempts int)
OnFailure func(err error, attempts int)
}
RetryPolicy defines retry behavior with exponential backoff and jitter.
func DefaultRetryPolicy ¶
func DefaultRetryPolicy() *RetryPolicy
DefaultRetryPolicy returns a sensible default retry policy.
func NoRetryPolicy ¶
func NoRetryPolicy() *RetryPolicy
NoRetryPolicy returns a policy that doesn't retry.
func (*RetryPolicy) CalculateDelay ¶
func (p *RetryPolicy) CalculateDelay(attempt int) time.Duration
CalculateDelay computes the delay for a given attempt using exponential backoff with jitter. Uses the "full jitter" algorithm to prevent thundering herd.
type StreamConnection ¶
type StreamConnection interface {
// PullNext fetches the next record from the stream
PullNext(ctx context.Context, batchSize int) (*Record, *ResultSummary, error)
// GetKeys returns the column keys for this result stream
GetKeys() ([]string, error)
// Close closes the stream and releases resources
Close() error
}
StreamConnection defines the interface for streaming connections
type StreamingDriver ¶
type StreamingDriver interface {
Driver
// RunStream executes a Cypher query and returns a streaming Result for lazy record processing.
// This is memory-efficient for large result sets as records are fetched on-demand.
RunStream(ctx context.Context, query string, params map[string]interface{}, metaData map[string]interface{}) (Result, error)
}
StreamingDriver extends Driver with streaming query capabilities for memory-efficient processing of large result sets.
type StreamingResult ¶
type StreamingResult struct {
// contains filtered or unexported fields
}
StreamingResult implements the Result interface with lazy loading
func NewStreamingResult ¶
func NewStreamingResult(conn StreamConnection, query string, params map[string]interface{}) *StreamingResult
NewStreamingResult creates a new streaming result
func (*StreamingResult) Collect ¶
func (r *StreamingResult) Collect(ctx context.Context) ([]*Record, error)
func (*StreamingResult) Consume ¶
func (r *StreamingResult) Consume(ctx context.Context) (*ResultSummary, error)
func (*StreamingResult) Err ¶
func (r *StreamingResult) Err() error
func (*StreamingResult) IsOpen ¶
func (r *StreamingResult) IsOpen() bool
func (*StreamingResult) Keys ¶
func (r *StreamingResult) Keys() ([]string, error)
func (*StreamingResult) NextRecord ¶
func (r *StreamingResult) NextRecord(ctx context.Context, record **Record) bool
func (*StreamingResult) PeekRecord ¶
func (r *StreamingResult) PeekRecord(ctx context.Context, record **Record) bool
func (*StreamingResult) Record ¶
func (r *StreamingResult) Record() *Record
type StructuredLogger ¶
type StructuredLogger interface {
Logger
// LogStructured logs a structured entry with full metadata
LogStructured(entry LogEntry)
}
StructuredLogger provides JSON structured logging output
type Subscriber ¶
type Subscriber interface {
// OnNext handles a new record
OnNext(record *Record)
// OnError handles an error in the stream
OnError(err error)
// OnComplete handles stream completion
OnComplete(summary *ResultSummary)
}
Subscriber defines the interface for consuming reactive streams
type TLSConfig ¶
type TLSConfig struct {
// Config allows passing a custom tls.Config directly
// If provided, this takes precedence over other TLS settings
Config *tls.Config
// InsecureSkipVerify disables certificate verification (equivalent to +ssc)
InsecureSkipVerify bool
// ServerName specifies the expected server name for certificate validation
// If empty, it's derived from the connection URL
ServerName string
// ClientCertificates holds client certificates for mutual TLS
ClientCertificates []tls.Certificate
// RootCAs specifies the root certificate authorities to trust
// If nil, system root CAs are used
RootCAs *x509.CertPool
// ClientCAs specifies certificate authorities for client certificate validation
ClientCAs *x509.CertPool
// MinVersion specifies the minimum TLS version (default: TLS 1.2)
MinVersion uint16
// MaxVersion specifies the maximum TLS version (default: latest)
MaxVersion uint16
// CipherSuites specifies allowed cipher suites
// If empty, Go's default secure cipher suites are used
CipherSuites []uint16
}
TLSConfig provides advanced TLS configuration options
func NewTLSConfigFromCertFiles ¶
NewTLSConfigFromCertFiles creates a TLSConfig from certificate file paths
type UsageError ¶
type UsageError struct {
Message string
}
UsageError represents an error in how the Result is being used
func NewUsageError ¶
func NewUsageError(message string) *UsageError
func (*UsageError) Error ¶
func (e *UsageError) Error() string