ras-utils

module
v1.0.8 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 13, 2026 License: MIT

README

ras-utils

Go utility library providing shared helper functions for the Clinical+ ecosystem.

Installation

go get github.com/transactrx/ras-utils

Packages

rascache

Generic in-memory key-value cache with TTL expiration and thread-safe operations. Supports both local time and UTC-based expiration via functional options.

import "github.com/transactrx/ras-utils/rascache"

// Create a cache using local time (expired items removed on access)
c := rascache.NewCache[string, User]()

// Create a cache using UTC time for expiration
c := rascache.NewCache[string, User](rascache.WithUTC())

// Create a cache with background cleanup (removes expired items periodically)
c := rascache.NewCache[string, User](rascache.WithCleanup(5 * time.Minute))
defer c.Stop() // stop the cleanup goroutine when done

// Combine options
c := rascache.NewCache[string, User](rascache.WithCleanup(5*time.Minute), rascache.WithUTC())
defer c.Stop()

// Set with expiration time (assumes server time or UTC depending on cache init options)
c.Set("user:123", user, time.Now().Add(5*time.Minute))

// Get (returns zero value and false if expired/missing)
user, ok := c.Get("user:123")

// Cache-through pattern: fetch from source if not cached
user, ok := c.GetOrStore("user:123", func() (User, time.Time, bool) {
    user, err := db.GetUser(123)
    if err != nil {
        return User{}, time.Time{}, false
    }
    return user, time.Now().Add(5*time.Minute), true
})

// Delete and Clear
c.Delete("user:123")
c.Clear()

rasconfig

Database configuration and environment variable helpers.

import "github.com/transactrx/ras-utils/rasconfig"

// Environment variables with defaults
host := rasconfig.GetEnvironmentVariableOrDefault("DB_HOST", "localhost")
port := rasconfig.GetEnvironmentVariableOrDefaultInt("DB_PORT", 5432)
timeout := rasconfig.GetEnvironmentVariableOrDefaultDuration("DB_TIMEOUT", "30s")

// Required environment variables (panics if missing)
apiKey := rasconfig.GetEnvironmentVariableOrPanic("API_KEY", "API_KEY is required")

// Database connection pool
cfg := &rasconfig.DBConfig{
    Host:                  "localhost",
    ReadOnlyHost:          "readonly.localhost",
    Port:                  "5432",
    DatabaseName:          "mydb",
    User:                  "user",
    Password:              "pass",
    MaxConnections:        10,
    MinConnections:        2,
    MaxConnectionLifetime: time.Hour,
    MaxConnectionIdleTime: 30 * time.Minute,
    ConnectionTimeout:     5 * time.Second,
}

pool, err := rasconfig.InitDbPool(ctx, cfg)
readOnlyPool, err := rasconfig.InitReadOnlyDbPool(ctx, cfg)

rasconversion

Type conversion helpers for PostgreSQL (pgx/pgtype). Converts nullable Go types to pgtype equivalents with proper null handling.

import "github.com/transactrx/ras-utils/rasconversion"

// Convert nullable Go types to pgtype (logs errors, returns invalid on failure)
pgText := rasconversion.ConvertToPgtypeString(stringPtr)
pgInt8 := rasconversion.ConvertToPgtypeInt8(int64Ptr)
pgInt2 := rasconversion.ConvertToPgtypeInt2(int32Ptr)
pgBool := rasconversion.ConvertToPgtypeBool(boolPtr)
pgTime := rasconversion.ConvertToPgtypeTimestamp(timePtr)
pfTimez := rasconversion.ConvertToPgtypeTimestamptz(timePtr)
pfDate := rasconversion.ConvertToPgtypeDate(timePtr)
pfTime := rasconversion.ConvertToPgtypeTime(timePtr)

// Error-returning variants for explicit error handling
pgText, err := rasconversion.TryConvertToPgtypeString(stringPtr)
pgInt8, err := rasconversion.TryConvertToPgtypeInt8(int64Ptr)
pgInt2, err := rasconversion.TryConvertToPgtypeInt2(int32Ptr)
pgBool, err := rasconversion.TryConvertToPgtypeBool(boolPtr)
pgTime, err := rasconversion.TryConvertToPgtypeTimestamp(timePtr)
pfTimez, err := rasconversion.TryConvertToPgtypeTimestamptz(timePtr)
pfDate, err := rasconversion.TryConvertToPgtypeDate(timePtr)
pfTime, err := rasconversion.TryConvertToPgtypeTime(timePtr)

raslogging

HTTP request logging middleware with panic recovery and structured JSON logging.

import "github.com/transactrx/ras-utils/raslogging"

// Set up structured JSON logger (reads LOG_LEVEL env var)
raslogging.SetUpLogger()

// Logging middleware with panic recovery
logger := slog.Default()
loggingMw := raslogging.LoggingMiddleware(logger, "/health", "/ready") // skip paths optional

rasevents

Event publishing via NATS with sync/async support, worker pools, graceful shutdown, and observability hooks.

Environment Variables
Required
Variable Description
NATS_URL NATS server URL (e.g., nats://localhost:4222)
NATS_QUEUE_NAME Queue group name for load balancing (service only)
Optional
Variable Description
NATS_JWT JWT token for authenticated connections
NATS_KEY Private key for authenticated connections
NATS_DEBUG Enable debug logging (true/false)
APPID Application identifier for connection naming
MAX_SIZE_BEFORE_COMPRESS Client compression threshold (default: 2KB)
MAX_SIZE_BEFORE_CHUNK Client chunking threshold (default: 8KB)
import "github.com/transactrx/ras-utils/rasevents"

// Option 1: Use global functions with package-level handler
rasevents.Init(&rasevents.Config{
    DefaultNamespace: "MyService",
    Subject:          "custom.events.subject",
    Timeout:          30 * time.Second,
    WorkerPoolSize:   20,
    EventQueueSize:   500,
})

err := rasevents.SendEvent("PatientNotification", "Email", payload)
queued := rasevents.SendEventAsync("PatientNotification", "SMS", payload)

// Graceful shutdown (drains queue before stopping)
defer rasevents.Shutdown(context.Background())

// Option 2: Create independent handler instances
handler := rasevents.NewEventsHandler(rasevents.Config{
    DefaultNamespace: "MyService",
    Subject:          "custom.events.subject",
    Timeout:          10 * time.Second,
    WorkerPoolSize:   5,
    EventQueueSize:   100,
}, nil) // nil client = create lazily

err := handler.SendEvent("Namespace", "EventType", payload)
queued := handler.SendEventAsync("Namespace", "EventType", payload)

// Shutdown with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := handler.Shutdown(ctx); err != nil {
    log.Printf("Shutdown interrupted: %v", err)
}

Observability hooks:

handler := rasevents.NewEventsHandler(rasevents.Config{
    // ... config ...
    Hooks: &rasevents.Hooks{
        // Called after each synchronous send
        OnEventSent: func(namespace, eventType string, duration time.Duration, err error) {
            metrics.RecordLatency("event_send", duration)
            if err != nil {
                metrics.IncrCounter("event_send_errors")
            }
        },
        // Called when async event is queued (dropped=true if queue full)
        OnEventQueued: func(namespace, eventType string, dropped bool) {
            if dropped {
                metrics.IncrCounter("event_dropped")
            }
        },
        // Called after async worker processes an event
        OnEventProcessed: func(namespace, eventType string, duration time.Duration, err error) {
            metrics.RecordLatency("event_process", duration)
        },
    },
}, nil)

Testing:

// Inject mock client for testing
rasevents.SetNatsClient(mockClient)
defer rasevents.ResetNatsClient()

// Or with handler instances
handler := rasevents.NewEventsHandler(cfg, mockClient)

Environment variables:

  • EVENTS_DEFAULT_NAMESPACE - Default namespace (required)
  • EVENTS_SUBJECT - Base NATS subject (required)
  • EVENTS_TIMEOUT_SECONDS - Request timeout in seconds (default: 60)
  • EVENTS_WORKER_POOL_SIZE - Async worker count (default: 50)
  • EVENTS_QUEUE_SIZE - Async queue size (default: 1000)

rashttp

HTTP helper functions for request parsing, response writing, and common patterns.

import "github.com/transactrx/ras-utils/rashttp"

// Request helpers
ip := rashttp.GetClientIP(r)              // extracts from X-Forwarded-For, X-Real-IP, or RemoteAddr
url := rashttp.GetFullRequestURL(r)       // reconstructs full URL including scheme/host from proxied requests
token := rashttp.GetBearerToken(r)        // extracts bearer token from Authorization header
isHtmx := rashttp.IsHTMX(r)               // checks HX-Request header
isAjax := rashttp.IsAjax(r)               // checks X-Requested-With header

// Query parameter parsing with defaults
page := rashttp.QueryInt(r, "page", 1)
sort := rashttp.QueryString(r, "sort", "created_at")

// JSON request body decoding (with size limit)
type CreateUserRequest struct {
    Name  string `json:"name"`
    Email string `json:"email"`
}
payload, err := rashttp.DecodeJSON[CreateUserRequest](r, rashttp.DefaultMaxBodySize)

// Response helpers — generic
rashttp.WriteJSON(w, http.StatusOK, data)
rashttp.WriteError(w, http.StatusBadRequest, "invalid input")

// Response helpers — status shorthands
rashttp.OK(w, data)                              // 200
rashttp.Created(w, data)                          // 201
rashttp.Accepted(w, data)                         // 202
rashttp.NoContent(w)                              // 204
rashttp.BadRequest(w, "missing field")            // 400
rashttp.Unauthorized(w, "invalid token")          // 401
rashttp.Forbidden(w, "not allowed")               // 403
rashttp.NotFound(w, "resource not found")         // 404
rashttp.Conflict(w, "already exists")             // 409
rashttp.UnprocessableEntity(w, "validation error")// 422
rashttp.TooManyRequests(w, "rate limited")        // 429
rashttp.InternalServerError(w, "unexpected error")// 500
rashttp.ServiceUnavailable(w, "try again later")  // 503

// Health check handler
http.Handle("/health", rashttp.HealthHandler(func() error {
    return db.Ping() // returns 200 if nil, 503 if error
}))

rasstack

Middleware composition utility for chaining HTTP middleware.

import "github.com/transactrx/ras-utils/rasstack"

// Compose multiple middleware
stack := rasstack.CreateStack(
    raslogging.LoggingMiddleware(logger),
    authMiddleware,
    rateLimitMiddleware,
)

http.Handle("/", stack(myHandler))

rasworker

Generic worker pool for concurrent job execution with graceful shutdown and error handling.

import "github.com/transactrx/ras-utils/rasworker"

// Create pool with 10 workers and queue size of 100
pool := rasworker.NewPool(10, 100)
pool.Start()

// Submit jobs (returns false if queue is full)
ok := pool.Submit(func(ctx context.Context) error {
    // do work
    return nil
})

// Graceful shutdown - drains queue before returning
err := pool.Shutdown(context.Background())

// Shutdown with timeout - cancels in-flight jobs if deadline exceeded
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := pool.Shutdown(ctx); err != nil {
    log.Printf("Shutdown timeout: %v", err)
}

Error handling:

// Create pool with error handler
pool := rasworker.NewPoolWithErrorHandler(10, 100, func(err error) {
    log.Printf("Job failed: %v", err)
    metrics.IncrCounter("worker_errors")
})

// Or add handlers after creation (thread-safe, can be called after Start)
pool := rasworker.NewPool(10, 100)
pool.AddErrorHandler(func(err error) {
    slog.Error("job error", "error", err)
})
pool.AddErrorHandler(func(err error) {
    alerting.Notify(err)
})
pool.Start()

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL