driver

package
v0.0.0-...-2e6c020 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 13, 2025 License: MIT Imports: 25 Imported by: 0

Documentation

Overview

Package driver implements a lightweight Bolt protocol client used to communicate with Neo4j and Memgraph databases.

Index

Constants

View Source
const (
	ColorReset  = "\033[0m"
	ColorRed    = "\033[31m"
	ColorYellow = "\033[33m"
	ColorBlue   = "\033[34m"
	ColorGray   = "\033[37m"
	ColorGreen  = "\033[32m"
)

Color codes for different log levels

Variables

This section is empty.

Functions

func IsRetriable

func IsRetriable(err error) bool

IsRetriable checks if an error should trigger a retry.

func Retry

func Retry[T any](ctx context.Context, policy *RetryPolicy, fn func() (T, error)) (T, error)

Retry executes fn with retry logic according to the policy.

func RetryVoid

func RetryVoid(ctx context.Context, policy *RetryPolicy, fn func() error) error

RetryVoid executes fn with retry logic for functions that don't return a value.

func UserAgent

func UserAgent() string

UserAgent returns the user agent string used in Bolt protocol communications

func Version

func Version() string

Version returns the current version of the gopher-cypher driver

Types

type BackpressureHandler

type BackpressureHandler struct {
	// contains filtered or unexported fields
}

BackpressureHandler manages backpressure in reactive streams

func NewBackpressureHandler

func NewBackpressureHandler(strategy BackpressureStrategy, bufferSize int) *BackpressureHandler

NewBackpressureHandler creates a new backpressure handler

func (*BackpressureHandler) DrainBuffer

func (bp *BackpressureHandler) DrainBuffer(ctx context.Context, output chan<- RecordEvent) error

DrainBuffer drains the backpressure buffer

func (*BackpressureHandler) GetDroppedCount

func (bp *BackpressureHandler) GetDroppedCount() int64

GetDroppedCount returns the number of events dropped due to backpressure

func (*BackpressureHandler) Handle

func (bp *BackpressureHandler) Handle(ctx context.Context, event RecordEvent, output chan<- RecordEvent) error

Handle applies backpressure strategy to an event

type BackpressureStrategy

type BackpressureStrategy int

BackpressureStrategy defines how to handle backpressure

const (
	// BackpressureBuffer buffers items when subscriber is slow
	BackpressureBuffer BackpressureStrategy = iota

	// BackpressureDrop drops items when subscriber is slow
	BackpressureDrop

	// BackpressureBlock blocks emission when subscriber is slow
	BackpressureBlock

	// BackpressureLatest keeps only the latest item when subscriber is slow
	BackpressureLatest
)

type BoltLogger

type BoltLogger interface {
	Logger
	// LogBoltMessage logs a Bolt protocol message with full details
	LogBoltMessage(direction string, messageType string, fields []interface{})
	// LogBoltHandshake logs Bolt handshake details
	LogBoltHandshake(version string, clientName string, authScheme string)
	// LogBoltError logs Bolt protocol errors
	LogBoltError(code string, message string, metadata map[string]interface{})
}

BoltLogger is a specialized logger for Bolt protocol tracing Similar to Neo4j's dedicated Bolt debug logger

type CategorizedLogger

type CategorizedLogger interface {
	Logger
	// LogWithCategory logs a message with a specific category for granular control
	LogWithCategory(level LogLevel, category LogCategory, msg string, keysAndValues ...interface{})
	// IsLevelEnabled returns true if the specified level is enabled
	IsLevelEnabled(level LogLevel) bool
	// IsCategoryEnabled returns true if the specified category is enabled
	IsCategoryEnabled(category LogCategory) bool
	// SetCategoryLevel sets the log level for a specific category
	SetCategoryLevel(category LogCategory, level LogLevel)
}

CategorizedLogger extends Logger with category-specific and leveled logging

type ChannelSubscriber

type ChannelSubscriber struct {
	RecordChan chan *Record
	ErrorChan  chan error
	DoneChan   chan *ResultSummary
}

ChannelSubscriber writes events to a channel

func NewChannelSubscriber

func NewChannelSubscriber(bufferSize int) *ChannelSubscriber

NewChannelSubscriber creates a new channel subscriber with buffered channels

func (*ChannelSubscriber) OnComplete

func (c *ChannelSubscriber) OnComplete(summary *ResultSummary)

func (*ChannelSubscriber) OnError

func (c *ChannelSubscriber) OnError(err error)

func (*ChannelSubscriber) OnNext

func (c *ChannelSubscriber) OnNext(record *Record)

type Config

type Config struct {
	// TLS holds TLS-specific configuration
	TLS *TLSConfig

	// ConnectionPool holds connection pool configuration
	ConnectionPool *PoolConfig

	// Observability holds telemetry configuration
	Observability *ObservabilityConfig

	// Logging holds logging configuration
	Logging *LoggingConfig
}

Config holds configuration options for the driver

func DefaultConfig

func DefaultConfig() *Config

DefaultConfig returns a Config with sensible defaults

type ConsoleLogger

type ConsoleLogger struct {
	// contains filtered or unexported fields
}

ConsoleLogger logs to stdout/stderr with configurable level and formatting

func NewConsoleLogger

func NewConsoleLogger(level LogLevel) *ConsoleLogger

NewConsoleLogger creates a new console logger with the specified level

func NewConsoleLoggerWithOutput

func NewConsoleLoggerWithOutput(level LogLevel, stdout, stderr io.Writer) *ConsoleLogger

NewConsoleLoggerWithOutput creates a console logger with custom output writers

func (*ConsoleLogger) Debug

func (c *ConsoleLogger) Debug(msg string, keysAndValues ...interface{})

func (*ConsoleLogger) Error

func (c *ConsoleLogger) Error(msg string, keysAndValues ...interface{})

func (*ConsoleLogger) Info

func (c *ConsoleLogger) Info(msg string, keysAndValues ...interface{})

func (*ConsoleLogger) IsDebugEnabled

func (c *ConsoleLogger) IsDebugEnabled() bool

func (*ConsoleLogger) IsInfoEnabled

func (c *ConsoleLogger) IsInfoEnabled() bool

func (*ConsoleLogger) SetLevel

func (c *ConsoleLogger) SetLevel(level LogLevel)

SetLevel updates the log level

func (*ConsoleLogger) SetTimeFormat

func (c *ConsoleLogger) SetTimeFormat(format string)

SetTimeFormat sets the time format for log messages

func (*ConsoleLogger) Warn

func (c *ConsoleLogger) Warn(msg string, keysAndValues ...interface{})

type DatabaseError

type DatabaseError struct {
	Code    string
	Message string
}

DatabaseError represents a database server error (Neo4j, Memgraph, etc.)

func (*DatabaseError) Error

func (e *DatabaseError) Error() string

func (*DatabaseError) IsAuthError

func (e *DatabaseError) IsAuthError() bool

IsAuthError returns true for authentication/authorization errors.

func (*DatabaseError) IsClusterError

func (e *DatabaseError) IsClusterError() bool

IsClusterError returns true for cluster/replication errors.

func (*DatabaseError) IsConflict

func (e *DatabaseError) IsConflict() bool

IsConflict returns true for transaction conflict/deadlock errors.

func (*DatabaseError) IsRetriable

func (e *DatabaseError) IsRetriable() bool

IsRetriable returns true if the error is transient and can be retried.

func (*DatabaseError) IsTransient

func (e *DatabaseError) IsTransient() bool

IsTransient returns true for transient/temporary errors.

type DedicatedBoltLogger

type DedicatedBoltLogger struct {
	Level  LogLevel
	Output io.Writer
	// contains filtered or unexported fields
}

DedicatedBoltLogger provides specialized Bolt protocol logging Similar to Neo4j's dedicated Bolt debug logger

func (*DedicatedBoltLogger) Debug

func (l *DedicatedBoltLogger) Debug(msg string, keysAndValues ...interface{})

func (*DedicatedBoltLogger) Error

func (l *DedicatedBoltLogger) Error(msg string, keysAndValues ...interface{})

func (*DedicatedBoltLogger) Info

func (l *DedicatedBoltLogger) Info(msg string, keysAndValues ...interface{})

func (*DedicatedBoltLogger) IsDebugEnabled

func (l *DedicatedBoltLogger) IsDebugEnabled() bool

func (*DedicatedBoltLogger) IsInfoEnabled

func (l *DedicatedBoltLogger) IsInfoEnabled() bool

func (*DedicatedBoltLogger) LogBoltError

func (l *DedicatedBoltLogger) LogBoltError(code string, message string, metadata map[string]interface{})

func (*DedicatedBoltLogger) LogBoltHandshake

func (l *DedicatedBoltLogger) LogBoltHandshake(version string, clientName string, authScheme string)

func (*DedicatedBoltLogger) LogBoltMessage

func (l *DedicatedBoltLogger) LogBoltMessage(direction string, messageType string, fields []interface{})

func (*DedicatedBoltLogger) Warn

func (l *DedicatedBoltLogger) Warn(msg string, keysAndValues ...interface{})

type Driver

type Driver interface {
	// Close releases all resources associated with the driver.
	Close() error
	// Ping verifies the server is reachable and the Bolt protocol is
	// supported.
	Ping() error
	// Run executes a Cypher query with context, optional parameters and metadata.
	// It returns the column names and resulting rows.
	Run(ctx context.Context, query string, params map[string]interface{}, metaData map[string]interface{}) ([]string, []map[string]interface{}, error)
	// RunWithContext executes a Cypher query with context support and returns detailed summary.
	// This is the recommended method for production use with observability.
	RunWithContext(ctx context.Context, query string, params map[string]interface{}, metaData map[string]interface{}) ([]string, []map[string]interface{}, *ResultSummary, error)
}

Driver defines the minimal functionality required to communicate with a Cypher-compatible database. Implementations must manage their own connections and provide simple query execution utilities.

func NewDriver

func NewDriver(urlString string) (Driver, error)

NewDriver initializes a new Driver based on the provided connection URL. It validates the URL, establishes a connection pool and performs an initial ping to ensure the server is reachable.

func NewDriverWithConfig

func NewDriverWithConfig(urlString string, config *Config) (Driver, error)

NewDriverWithConfig creates a new Driver with custom configuration options. If config is nil, default configuration is used.

type EnhancedConsoleLogger

type EnhancedConsoleLogger struct {
	Level            LogLevel
	Output           io.Writer
	IncludeTimestamp bool
	IncludeSource    bool
	ColorEnabled     bool
	CategoryLevels   map[LogCategory]LogLevel
	// contains filtered or unexported fields
}

EnhancedConsoleLogger provides rich console logging with colors and formatting Similar to Neo4j's ConsoleLogger but with enhanced capabilities

func (*EnhancedConsoleLogger) Debug

func (l *EnhancedConsoleLogger) Debug(msg string, keysAndValues ...interface{})

func (*EnhancedConsoleLogger) Error

func (l *EnhancedConsoleLogger) Error(msg string, keysAndValues ...interface{})

func (*EnhancedConsoleLogger) Info

func (l *EnhancedConsoleLogger) Info(msg string, keysAndValues ...interface{})

func (*EnhancedConsoleLogger) IsCategoryEnabled

func (l *EnhancedConsoleLogger) IsCategoryEnabled(category LogCategory) bool

func (*EnhancedConsoleLogger) IsDebugEnabled

func (l *EnhancedConsoleLogger) IsDebugEnabled() bool

func (*EnhancedConsoleLogger) IsInfoEnabled

func (l *EnhancedConsoleLogger) IsInfoEnabled() bool

func (*EnhancedConsoleLogger) IsLevelEnabled

func (l *EnhancedConsoleLogger) IsLevelEnabled(level LogLevel) bool

func (*EnhancedConsoleLogger) LogWithCategory

func (l *EnhancedConsoleLogger) LogWithCategory(level LogLevel, category LogCategory, msg string, keysAndValues ...interface{})

LogWithCategory implements CategorizedLogger interface

func (*EnhancedConsoleLogger) SetCategoryLevel

func (l *EnhancedConsoleLogger) SetCategoryLevel(category LogCategory, level LogLevel)

func (*EnhancedConsoleLogger) Warn

func (l *EnhancedConsoleLogger) Warn(msg string, keysAndValues ...interface{})

type EnhancedStructuredLogger

type EnhancedStructuredLogger struct {
	Level            LogLevel
	Output           io.Writer
	IncludeTimestamp bool
	IncludeSource    bool
	RequestIDEnabled bool
	CategoryLevels   map[LogCategory]LogLevel
	// contains filtered or unexported fields
}

EnhancedStructuredLogger provides JSON structured logging output

func (*EnhancedStructuredLogger) Debug

func (l *EnhancedStructuredLogger) Debug(msg string, keysAndValues ...interface{})

func (*EnhancedStructuredLogger) Error

func (l *EnhancedStructuredLogger) Error(msg string, keysAndValues ...interface{})

func (*EnhancedStructuredLogger) Info

func (l *EnhancedStructuredLogger) Info(msg string, keysAndValues ...interface{})

func (*EnhancedStructuredLogger) IsCategoryEnabled

func (l *EnhancedStructuredLogger) IsCategoryEnabled(category LogCategory) bool

func (*EnhancedStructuredLogger) IsDebugEnabled

func (l *EnhancedStructuredLogger) IsDebugEnabled() bool

func (*EnhancedStructuredLogger) IsInfoEnabled

func (l *EnhancedStructuredLogger) IsInfoEnabled() bool

func (*EnhancedStructuredLogger) IsLevelEnabled

func (l *EnhancedStructuredLogger) IsLevelEnabled(level LogLevel) bool

func (*EnhancedStructuredLogger) LogStructured

func (l *EnhancedStructuredLogger) LogStructured(entry LogEntry)

func (*EnhancedStructuredLogger) LogWithCategory

func (l *EnhancedStructuredLogger) LogWithCategory(level LogLevel, category LogCategory, msg string, keysAndValues ...interface{})

func (*EnhancedStructuredLogger) SetCategoryLevel

func (l *EnhancedStructuredLogger) SetCategoryLevel(category LogCategory, level LogLevel)

func (*EnhancedStructuredLogger) Warn

func (l *EnhancedStructuredLogger) Warn(msg string, keysAndValues ...interface{})

type ErrorHandler

type ErrorHandler func(error) error

type FilterFunc

type FilterFunc func(*Record) bool

type FuncSubscriber

type FuncSubscriber struct {
	OnNextFunc     func(*Record)
	OnErrorFunc    func(error)
	OnCompleteFunc func(*ResultSummary)
}

FuncSubscriber allows using functions as subscribers

func (*FuncSubscriber) OnComplete

func (f *FuncSubscriber) OnComplete(summary *ResultSummary)

func (*FuncSubscriber) OnError

func (f *FuncSubscriber) OnError(err error)

func (*FuncSubscriber) OnNext

func (f *FuncSubscriber) OnNext(record *Record)

type LogCategory

type LogCategory string

LogCategory represents different categories of logging for granular control

const (
	// LogCategoryGeneral for general driver operations
	LogCategoryGeneral LogCategory = "driver"
	// LogCategoryConnection for connection pool and networking events
	LogCategoryConnection LogCategory = "connection"
	// LogCategoryQuery for query execution and timing
	LogCategoryQuery LogCategory = "query"
	// LogCategoryBolt for low-level Bolt protocol messages
	LogCategoryBolt LogCategory = "bolt"
	// LogCategoryAuth for authentication events
	LogCategoryAuth LogCategory = "auth"
	// LogCategoryTLS for TLS/SSL related events
	LogCategoryTLS LogCategory = "tls"
	// LogCategoryStreaming for streaming result processing
	LogCategoryStreaming LogCategory = "streaming"
	// LogCategoryReactive for reactive programming events
	LogCategoryReactive LogCategory = "reactive"
)

type LogEntry

type LogEntry struct {
	Timestamp time.Time              `json:"timestamp"`
	Level     LogLevel               `json:"level"`
	Category  LogCategory            `json:"category"`
	Message   string                 `json:"message"`
	Fields    map[string]interface{} `json:"fields,omitempty"`
	Error     string                 `json:"error,omitempty"`
	Source    string                 `json:"source,omitempty"`
	RequestID string                 `json:"request_id,omitempty"`
}

LogEntry represents a structured log entry with full metadata

type LogLevel

type LogLevel int

LogLevel represents the severity of a log message

const (
	// LogLevelDebug logs everything including detailed protocol messages
	LogLevelDebug LogLevel = iota
	// LogLevelInfo logs general information about driver operations
	LogLevelInfo
	// LogLevelWarn logs warning messages that don't stop execution
	LogLevelWarn
	// LogLevelError logs only error conditions
	LogLevelError
	// LogLevelOff disables all logging
	LogLevelOff
)

func ParseLogLevel

func ParseLogLevel(level string) LogLevel

ParseLogLevel parses a string into a LogLevel

func (LogLevel) String

func (l LogLevel) String() string

String returns the string representation of a log level

type Logger

type Logger interface {
	// Debug logs a debug message with optional key-value pairs
	Debug(msg string, keysAndValues ...interface{})
	// Info logs an info message with optional key-value pairs
	Info(msg string, keysAndValues ...interface{})
	// Warn logs a warning message with optional key-value pairs
	Warn(msg string, keysAndValues ...interface{})
	// Error logs an error message with optional key-value pairs
	Error(msg string, keysAndValues ...interface{})
	// IsDebugEnabled returns true if debug logging is enabled
	IsDebugEnabled() bool
	// IsInfoEnabled returns true if info logging is enabled
	IsInfoEnabled() bool
}

Logger defines the interface for pluggable logging in the driver. Compatible with Neo4j's log.Logger interface but with enhanced capabilities.

func NewLoggerAdapter

func NewLoggerAdapter(
	debugFunc, infoFunc, warnFunc, errorFunc func(msg string, args ...interface{}),
	debugEnabled, infoEnabled bool,
) Logger

NewLoggerAdapter creates a logger adapter from external logging functions This allows easy integration with existing logging frameworks

type LoggerAdapter

type LoggerAdapter struct {
	DebugFunc func(msg string, args ...interface{})
	InfoFunc  func(msg string, args ...interface{})
	WarnFunc  func(msg string, args ...interface{})
	ErrorFunc func(msg string, args ...interface{})

	DebugEnabled bool
	InfoEnabled  bool
}

LoggerAdapter allows wrapping external loggers to implement our Logger interface Similar to how Neo4j allows custom logger implementations

func (*LoggerAdapter) Debug

func (a *LoggerAdapter) Debug(msg string, keysAndValues ...interface{})

func (*LoggerAdapter) Error

func (a *LoggerAdapter) Error(msg string, keysAndValues ...interface{})

func (*LoggerAdapter) Info

func (a *LoggerAdapter) Info(msg string, keysAndValues ...interface{})

func (*LoggerAdapter) IsDebugEnabled

func (a *LoggerAdapter) IsDebugEnabled() bool

func (*LoggerAdapter) IsInfoEnabled

func (a *LoggerAdapter) IsInfoEnabled() bool

func (*LoggerAdapter) Warn

func (a *LoggerAdapter) Warn(msg string, keysAndValues ...interface{})

type LoggingConfig

type LoggingConfig struct {
	// Logger is the pluggable logger implementation (compatible with Neo4j's interface)
	Logger Logger
	// BoltLogger is an optional specialized logger for Bolt protocol tracing
	BoltLogger BoltLogger
	// Level sets the global minimum log level to output
	Level LogLevel
	// CategoryLevels allows setting different log levels per category
	CategoryLevels map[LogCategory]LogLevel
	// EnabledCategories specifies which categories should be logged
	EnabledCategories map[LogCategory]bool
	// StructuredOutput enables JSON structured logging format
	StructuredOutput bool
	// IncludeTimestamp includes timestamp in log output
	IncludeTimestamp bool
	// IncludeSource includes source location in logs
	IncludeSource bool
	// RequestIDEnabled enables request ID tracking
	RequestIDEnabled bool

	// Specific feature flags for backward compatibility
	// LogBoltMessages enables detailed Bolt protocol message logging
	LogBoltMessages bool
	// LogConnectionPool enables connection pool event logging
	LogConnectionPool bool
	// LogQueryTiming enables query execution timing logs
	LogQueryTiming bool
	// LogAuthEvents enables authentication event logging
	LogAuthEvents bool
	// LogTLSEvents enables TLS/SSL event logging
	LogTLSEvents bool
	// LogStreamingEvents enables streaming result logging
	LogStreamingEvents bool
	// LogReactiveEvents enables reactive programming event logging
	LogReactiveEvents bool
}

LoggingConfig holds comprehensive logging configuration

func DefaultLoggingConfig

func DefaultLoggingConfig() *LoggingConfig

DefaultLoggingConfig returns a logging configuration with no-op logger (silent by default) Similar to Neo4j driver's default behavior

func NewBoltTracingConfig

func NewBoltTracingConfig(level LogLevel, output io.Writer) *LoggingConfig

NewBoltTracingConfig creates a configuration with detailed Bolt protocol tracing Similar to Neo4j's Bolt debug logger

func NewConsoleLoggingConfig

func NewConsoleLoggingConfig(level LogLevel) *LoggingConfig

NewConsoleLoggingConfig creates a console logging configuration similar to Neo4j's ConsoleLogger

func NewStructuredLoggingConfig

func NewStructuredLoggingConfig(level LogLevel, output io.Writer) *LoggingConfig

NewStructuredLoggingConfig creates a structured JSON logging configuration

type MapFunc

type MapFunc func(*Record) interface{}

type NoOpLogger

type NoOpLogger struct{}

NoOpLogger is a logger that does nothing (default behavior)

func (*NoOpLogger) Debug

func (l *NoOpLogger) Debug(msg string, keysAndValues ...interface{})

func (*NoOpLogger) Error

func (l *NoOpLogger) Error(msg string, keysAndValues ...interface{})

func (*NoOpLogger) Info

func (l *NoOpLogger) Info(msg string, keysAndValues ...interface{})

func (*NoOpLogger) IsDebugEnabled

func (l *NoOpLogger) IsDebugEnabled() bool

func (*NoOpLogger) IsInfoEnabled

func (l *NoOpLogger) IsInfoEnabled() bool

func (*NoOpLogger) Warn

func (l *NoOpLogger) Warn(msg string, keysAndValues ...interface{})

type Notification

type Notification struct {
	Code        string
	Title       string
	Description string
	Severity    string
	Position    *Position
}

Notification represents a server notification

type ObservabilityConfig

type ObservabilityConfig struct {
	// EnableTracing enables OpenTelemetry distributed tracing
	EnableTracing bool

	// EnableMetrics enables OpenTelemetry metrics collection
	EnableMetrics bool

	// TracingAttributes are additional attributes to add to all spans
	TracingAttributes []attribute.KeyValue

	// MetricAttributes are additional attributes to add to all metrics
	MetricAttributes []attribute.KeyValue
}

ObservabilityConfig controls telemetry collection

func DefaultObservabilityConfig

func DefaultObservabilityConfig() *ObservabilityConfig

DefaultObservabilityConfig returns default observability configuration

type PoolConfig

type PoolConfig struct {
	// MaxConnections specifies the maximum number of connections in the pool
	// Default: 100 (matching Neo4j driver)
	MaxConnections int

	// MaxIdleTime specifies how long connections can be idle before being closed
	// Default: 30 minutes
	MaxIdleTime time.Duration

	// ConnectionLifetime specifies the maximum lifetime of a connection
	// Default: 1 hour (matching Neo4j driver)
	ConnectionLifetime time.Duration

	// AcquisitionTimeout specifies how long to wait for a connection from the pool
	// Default: 30 seconds
	AcquisitionTimeout time.Duration

	// EnableLivenessCheck enables periodic connection health checks
	// Default: true
	EnableLivenessCheck bool
}

PoolConfig provides connection pool configuration options

type Position

type Position struct {
	Offset int
	Line   int
	Column int
}

Position represents a position in the query

type QueryPlan

type QueryPlan struct {
	OperatorType string
	Identifiers  []string
	Arguments    map[string]interface{}
	Children     []*QueryPlan
}

QueryPlan represents a query execution plan

type QueryProfile

type QueryProfile struct {
	OperatorType string
	Identifiers  []string
	Arguments    map[string]interface{}
	DbHits       int64
	Rows         int64
	Time         time.Duration
	Children     []*QueryProfile
}

QueryProfile represents query profiling information

type ReactiveConfig

type ReactiveConfig struct {
	// BufferSize sets the size of internal buffers
	BufferSize int

	// BackpressureStrategy defines how to handle slow consumers
	BackpressureStrategy BackpressureStrategy

	// MaxConcurrency limits concurrent processing
	MaxConcurrency int

	// ErrorRecovery enables automatic error recovery
	ErrorRecovery bool

	// Metrics enables reactive stream metrics collection
	Metrics bool
}

ReactiveConfig configures reactive stream behavior

func DefaultReactiveConfig

func DefaultReactiveConfig() *ReactiveConfig

DefaultReactiveConfig returns default reactive configuration

type ReactiveDriver

type ReactiveDriver interface {
	StreamingDriver
	// RunReactive executes a query and returns a ReactiveResult for non-blocking,
	// event-driven processing with composable operators
	RunReactive(ctx context.Context, query string, params map[string]interface{}, metaData map[string]interface{}) (ReactiveResult, error)
}

ReactiveDriver extends StreamingDriver with reactive programming capabilities

type ReactiveMetrics

type ReactiveMetrics struct {
	RecordsProcessed   int64
	RecordsDropped     int64
	AverageLatency     time.Duration
	ThroughputPerSec   float64
	BackpressureEvents int64
	ErrorCount         int64
	OperatorCount      int
	// contains filtered or unexported fields
}

ReactiveMetrics tracks reactive stream performance

func NewReactiveMetrics

func NewReactiveMetrics() *ReactiveMetrics

NewReactiveMetrics creates a new metrics tracker

func (*ReactiveMetrics) GetSnapshot

func (m *ReactiveMetrics) GetSnapshot() ReactiveMetrics

GetSnapshot returns a snapshot of current metrics

func (*ReactiveMetrics) RecordDropped

func (m *ReactiveMetrics) RecordDropped()

RecordDropped increments the dropped records counter

func (*ReactiveMetrics) RecordError

func (m *ReactiveMetrics) RecordError()

RecordError increments the error counter

func (*ReactiveMetrics) RecordProcessed

func (m *ReactiveMetrics) RecordProcessed()

RecordProcessed increments the processed records counter

type ReactiveResult

type ReactiveResult interface {
	// Subscribe consumes the reactive stream with the provided subscriber
	Subscribe(ctx context.Context, subscriber Subscriber) error

	// Records returns a channel that emits RecordEvent items
	Records(ctx context.Context) <-chan RecordEvent

	// Transform applies a transformation function to each record
	Transform(fn TransformFunc) ReactiveResult

	// Filter filters records based on a predicate function
	Filter(fn FilterFunc) ReactiveResult

	// Map transforms records to a different type
	Map(fn MapFunc) ReactiveResult

	// Batch groups records into batches of specified size
	Batch(size int) ReactiveResult

	// BatchByTime groups records into time-based batches
	BatchByTime(duration time.Duration) ReactiveResult

	// Take limits the stream to the first n records
	Take(n int64) ReactiveResult

	// Skip skips the first n records
	Skip(n int64) ReactiveResult

	// Distinct removes duplicate records based on a key function
	Distinct(keyFunc func(*Record) string) ReactiveResult

	// Throttle limits the rate of record emission
	Throttle(rate time.Duration) ReactiveResult

	// OnError handles errors in the stream
	OnError(handler ErrorHandler) ReactiveResult

	// DoOnNext performs a side effect for each record without modifying the stream
	DoOnNext(action func(*Record)) ReactiveResult

	// DoOnComplete performs a side effect when the stream completes
	DoOnComplete(action func(*ResultSummary)) ReactiveResult

	// DoOnError performs a side effect when an error occurs
	DoOnError(action func(error)) ReactiveResult

	// Keys returns the column names for this result
	Keys() ([]string, error)

	// ToSlice collects all records into a slice (blocking operation)
	ToSlice(ctx context.Context) ([]*Record, error)

	// First returns the first record (blocking operation)
	First(ctx context.Context) (*Record, error)

	// Count counts all records in the stream (blocking operation)
	Count(ctx context.Context) (int64, error)
}

ReactiveResult provides reactive stream processing of query results with composable operators, backpressure handling, and event-driven consumption

func NewReactiveResult

func NewReactiveResult(source Result, query string, params map[string]interface{}, config *ReactiveConfig) ReactiveResult

NewReactiveResult creates a new reactive result from a streaming result

type Record

type Record map[string]interface{}

Record represents a single record in a result set

type RecordEvent

type RecordEvent struct {
	// Record contains the data (nil for error or completion events)
	Record *Record

	// Error contains any error that occurred (nil for successful record events)
	Error error

	// Complete indicates stream completion (true only for final event)
	Complete bool

	// Summary contains result summary (only present on completion)
	Summary *ResultSummary
}

RecordEvent represents an event in the reactive stream

type Result

type Result interface {
	// Keys returns the column names available in the result set
	Keys() ([]string, error)

	// Next advances to the next record and returns true if a record is available.
	// Returns false when no more records or an error occurs.
	Next(ctx context.Context) bool

	// NextRecord advances to the next record and sets the record parameter.
	// Returns true if a record is available, false otherwise.
	NextRecord(ctx context.Context, record **Record) bool

	// Record returns the current record. May be nil if Next() hasn't been called
	// or returned false.
	Record() *Record

	// Peek returns true if there is a record after the current one without advancing.
	// Useful for lookahead without consuming the record.
	Peek(ctx context.Context) bool

	// PeekRecord returns the next record without advancing the cursor.
	PeekRecord(ctx context.Context, record **Record) bool

	// Err returns any error that occurred during iteration
	Err() error

	// Collect fetches all remaining records and returns them as a slice.
	// This consumes the entire result stream.
	Collect(ctx context.Context) ([]*Record, error)

	// Single returns exactly one record from the stream.
	// Returns error if zero or more than one record remains.
	Single(ctx context.Context) (*Record, error)

	// Consume discards all remaining records and returns the result summary.
	// This closes the result stream.
	Consume(ctx context.Context) (*ResultSummary, error)

	// IsOpen returns true if the result stream is still available for reading
	IsOpen() bool
}

Result provides streaming access to query results using cursor-style iteration. This interface follows Neo4j driver patterns for memory-efficient processing of large result sets.

type ResultSummary

type ResultSummary struct {
	// Query execution metrics
	QueryText     string
	Parameters    map[string]interface{}
	ExecutionTime time.Duration

	// Result metrics
	RecordsAvailable int64
	RecordsConsumed  int64

	// Server information
	ServerAddress string
	ServerVersion string
	Bookmark      string

	// Query classification
	QueryType string // READ, WRITE, READ_WRITE, SCHEMA_WRITE

	// Notifications from server (warnings, deprecations, etc.)
	Notifications []Notification

	// Query plan information (if available)
	Plan *QueryPlan

	// Profile information (if profiling enabled)
	Profile *QueryProfile

	// Database statistics from query execution
	NodesCreated          int64
	NodesDeleted          int64
	RelationshipsCreated  int64
	RelationshipsDeleted  int64
	PropertiesSet         int64
	LabelsAdded           int64
	LabelsRemoved         int64
	IndexesAdded          int64
	IndexesRemoved        int64
	ConstraintsAdded      int64
	ConstraintsRemoved    int64
	ContainsUpdates       bool
	ContainsSystemUpdates bool
}

ResultSummary contains query execution metadata

type RetryContext

type RetryContext struct {
	Attempt         int
	Error           error
	NextDelay       time.Duration
	CumulativeDelay time.Duration
}

RetryContext provides context to retry callbacks.

type RetryError

type RetryError struct {
	OriginalError   error
	Attempts        int
	CumulativeDelay time.Duration
}

RetryError wraps the original error with retry context.

func (*RetryError) Error

func (e *RetryError) Error() string

func (*RetryError) Unwrap

func (e *RetryError) Unwrap() error

type RetryPolicy

type RetryPolicy struct {
	MaxAttempts  int
	BaseDelay    time.Duration
	MaxDelay     time.Duration
	Multiplier   float64
	JitterFactor float64 // 0.0 = no jitter, 1.0 = full jitter

	// Callbacks for observability
	OnRetry   func(ctx RetryContext)
	OnSuccess func(attempts int)
	OnFailure func(err error, attempts int)
}

RetryPolicy defines retry behavior with exponential backoff and jitter.

func DefaultRetryPolicy

func DefaultRetryPolicy() *RetryPolicy

DefaultRetryPolicy returns a sensible default retry policy.

func NoRetryPolicy

func NoRetryPolicy() *RetryPolicy

NoRetryPolicy returns a policy that doesn't retry.

func (*RetryPolicy) CalculateDelay

func (p *RetryPolicy) CalculateDelay(attempt int) time.Duration

CalculateDelay computes the delay for a given attempt using exponential backoff with jitter. Uses the "full jitter" algorithm to prevent thundering herd.

type StreamConnection

type StreamConnection interface {
	// PullNext fetches the next record from the stream
	PullNext(ctx context.Context, batchSize int) (*Record, *ResultSummary, error)
	// GetKeys returns the column keys for this result stream
	GetKeys() ([]string, error)
	// Close closes the stream and releases resources
	Close() error
}

StreamConnection defines the interface for streaming connections

type StreamingDriver

type StreamingDriver interface {
	Driver
	// RunStream executes a Cypher query and returns a streaming Result for lazy record processing.
	// This is memory-efficient for large result sets as records are fetched on-demand.
	RunStream(ctx context.Context, query string, params map[string]interface{}, metaData map[string]interface{}) (Result, error)
}

StreamingDriver extends Driver with streaming query capabilities for memory-efficient processing of large result sets.

type StreamingResult

type StreamingResult struct {
	// contains filtered or unexported fields
}

StreamingResult implements the Result interface with lazy loading

func NewStreamingResult

func NewStreamingResult(conn StreamConnection, query string, params map[string]interface{}) *StreamingResult

NewStreamingResult creates a new streaming result

func (*StreamingResult) Collect

func (r *StreamingResult) Collect(ctx context.Context) ([]*Record, error)

func (*StreamingResult) Consume

func (r *StreamingResult) Consume(ctx context.Context) (*ResultSummary, error)

func (*StreamingResult) Err

func (r *StreamingResult) Err() error

func (*StreamingResult) IsOpen

func (r *StreamingResult) IsOpen() bool

func (*StreamingResult) Keys

func (r *StreamingResult) Keys() ([]string, error)

func (*StreamingResult) Next

func (r *StreamingResult) Next(ctx context.Context) bool

func (*StreamingResult) NextRecord

func (r *StreamingResult) NextRecord(ctx context.Context, record **Record) bool

func (*StreamingResult) Peek

func (r *StreamingResult) Peek(ctx context.Context) bool

func (*StreamingResult) PeekRecord

func (r *StreamingResult) PeekRecord(ctx context.Context, record **Record) bool

func (*StreamingResult) Record

func (r *StreamingResult) Record() *Record

func (*StreamingResult) Single

func (r *StreamingResult) Single(ctx context.Context) (*Record, error)

type StructuredLogger

type StructuredLogger interface {
	Logger
	// LogStructured logs a structured entry with full metadata
	LogStructured(entry LogEntry)
}

StructuredLogger provides JSON structured logging output

type Subscriber

type Subscriber interface {
	// OnNext handles a new record
	OnNext(record *Record)

	// OnError handles an error in the stream
	OnError(err error)

	// OnComplete handles stream completion
	OnComplete(summary *ResultSummary)
}

Subscriber defines the interface for consuming reactive streams

type TLSConfig

type TLSConfig struct {
	// Config allows passing a custom tls.Config directly
	// If provided, this takes precedence over other TLS settings
	Config *tls.Config

	// InsecureSkipVerify disables certificate verification (equivalent to +ssc)
	InsecureSkipVerify bool

	// ServerName specifies the expected server name for certificate validation
	// If empty, it's derived from the connection URL
	ServerName string

	// ClientCertificates holds client certificates for mutual TLS
	ClientCertificates []tls.Certificate

	// RootCAs specifies the root certificate authorities to trust
	// If nil, system root CAs are used
	RootCAs *x509.CertPool

	// ClientCAs specifies certificate authorities for client certificate validation
	ClientCAs *x509.CertPool

	// MinVersion specifies the minimum TLS version (default: TLS 1.2)
	MinVersion uint16

	// MaxVersion specifies the maximum TLS version (default: latest)
	MaxVersion uint16

	// CipherSuites specifies allowed cipher suites
	// If empty, Go's default secure cipher suites are used
	CipherSuites []uint16
}

TLSConfig provides advanced TLS configuration options

func NewTLSConfigFromCertFiles

func NewTLSConfigFromCertFiles(certFile, keyFile, caFile string) (*TLSConfig, error)

NewTLSConfigFromCertFiles creates a TLSConfig from certificate file paths

type TransformFunc

type TransformFunc func(*Record) *Record

Function types for reactive operators

type UsageError

type UsageError struct {
	Message string
}

UsageError represents an error in how the Result is being used

func NewUsageError

func NewUsageError(message string) *UsageError

func (*UsageError) Error

func (e *UsageError) Error() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL