README
¶
ras-utils
Go utility library providing shared helper functions for the Clinical+ ecosystem.
Installation
go get github.com/transactrx/ras-utils
Packages
rascache
Generic in-memory key-value cache with TTL expiration and thread-safe operations. Supports both local time and UTC-based expiration via functional options.
import "github.com/transactrx/ras-utils/rascache"
// Create a cache using local time (expired items removed on access)
c := rascache.NewCache[string, User]()
// Create a cache using UTC time for expiration
c := rascache.NewCache[string, User](rascache.WithUTC())
// Create a cache with background cleanup (removes expired items periodically)
c := rascache.NewCache[string, User](rascache.WithCleanup(5 * time.Minute))
defer c.Stop() // stop the cleanup goroutine when done
// Combine options
c := rascache.NewCache[string, User](rascache.WithCleanup(5*time.Minute), rascache.WithUTC())
defer c.Stop()
// Set with expiration time (assumes server time or UTC depending on cache init options)
c.Set("user:123", user, time.Now().Add(5*time.Minute))
// Get (returns zero value and false if expired/missing)
user, ok := c.Get("user:123")
// Cache-through pattern: fetch from source if not cached
user, ok := c.GetOrStore("user:123", func() (User, time.Time, bool) {
user, err := db.GetUser(123)
if err != nil {
return User{}, time.Time{}, false
}
return user, time.Now().Add(5*time.Minute), true
})
// Delete and Clear
c.Delete("user:123")
c.Clear()
rasconfig
Database configuration and environment variable helpers.
import "github.com/transactrx/ras-utils/rasconfig"
// Environment variables with defaults
host := rasconfig.GetEnvironmentVariableOrDefault("DB_HOST", "localhost")
port := rasconfig.GetEnvironmentVariableOrDefaultInt("DB_PORT", 5432)
timeout := rasconfig.GetEnvironmentVariableOrDefaultDuration("DB_TIMEOUT", "30s")
// Required environment variables (panics if missing)
apiKey := rasconfig.GetEnvironmentVariableOrPanic("API_KEY", "API_KEY is required")
// Database connection pool
cfg := &rasconfig.DBConfig{
Host: "localhost",
ReadOnlyHost: "readonly.localhost",
Port: "5432",
DatabaseName: "mydb",
User: "user",
Password: "pass",
MaxConnections: 10,
MinConnections: 2,
MaxConnectionLifetime: time.Hour,
MaxConnectionIdleTime: 30 * time.Minute,
ConnectionTimeout: 5 * time.Second,
}
pool, err := rasconfig.InitDbPool(ctx, cfg)
readOnlyPool, err := rasconfig.InitReadOnlyDbPool(ctx, cfg)
rasconversion
Type conversion helpers for PostgreSQL (pgx/pgtype). Converts nullable Go types to pgtype equivalents with proper null handling.
import "github.com/transactrx/ras-utils/rasconversion"
// Convert nullable Go types to pgtype (logs errors, returns invalid on failure)
pgText := rasconversion.ConvertToPgtypeString(stringPtr)
pgInt8 := rasconversion.ConvertToPgtypeInt8(int64Ptr)
pgInt2 := rasconversion.ConvertToPgtypeInt2(int32Ptr)
pgBool := rasconversion.ConvertToPgtypeBool(boolPtr)
pgTime := rasconversion.ConvertToPgtypeTimestamp(timePtr)
pfTimez := rasconversion.ConvertToPgtypeTimestamptz(timePtr)
pfDate := rasconversion.ConvertToPgtypeDate(timePtr)
pfTime := rasconversion.ConvertToPgtypeTime(timePtr)
// Error-returning variants for explicit error handling
pgText, err := rasconversion.TryConvertToPgtypeString(stringPtr)
pgInt8, err := rasconversion.TryConvertToPgtypeInt8(int64Ptr)
pgInt2, err := rasconversion.TryConvertToPgtypeInt2(int32Ptr)
pgBool, err := rasconversion.TryConvertToPgtypeBool(boolPtr)
pgTime, err := rasconversion.TryConvertToPgtypeTimestamp(timePtr)
pfTimez, err := rasconversion.TryConvertToPgtypeTimestamptz(timePtr)
pfDate, err := rasconversion.TryConvertToPgtypeDate(timePtr)
pfTime, err := rasconversion.TryConvertToPgtypeTime(timePtr)
raslogging
HTTP request logging middleware with panic recovery and structured JSON logging.
import "github.com/transactrx/ras-utils/raslogging"
// Set up structured JSON logger (reads LOG_LEVEL env var)
raslogging.SetUpLogger()
// Logging middleware with panic recovery
logger := slog.Default()
loggingMw := raslogging.LoggingMiddleware(logger, "/health", "/ready") // skip paths optional
rasevents
Event publishing via NATS with sync/async support, worker pools, graceful shutdown, and observability hooks.
Environment Variables
Required
| Variable | Description |
|---|---|
NATS_URL |
NATS server URL (e.g., nats://localhost:4222) |
NATS_QUEUE_NAME |
Queue group name for load balancing (service only) |
Optional
| Variable | Description |
|---|---|
NATS_JWT |
JWT token for authenticated connections |
NATS_KEY |
Private key for authenticated connections |
NATS_DEBUG |
Enable debug logging (true/false) |
APPID |
Application identifier for connection naming |
MAX_SIZE_BEFORE_COMPRESS |
Client compression threshold (default: 2KB) |
MAX_SIZE_BEFORE_CHUNK |
Client chunking threshold (default: 8KB) |
import "github.com/transactrx/ras-utils/rasevents"
// Option 1: Use global functions with package-level handler
rasevents.Init(&rasevents.Config{
DefaultNamespace: "MyService",
Subject: "custom.events.subject",
Timeout: 30 * time.Second,
WorkerPoolSize: 20,
EventQueueSize: 500,
})
err := rasevents.SendEvent("PatientNotification", "Email", payload)
queued := rasevents.SendEventAsync("PatientNotification", "SMS", payload)
// Graceful shutdown (drains queue before stopping)
defer rasevents.Shutdown(context.Background())
// Option 2: Create independent handler instances
handler := rasevents.NewEventsHandler(rasevents.Config{
DefaultNamespace: "MyService",
Subject: "custom.events.subject",
Timeout: 10 * time.Second,
WorkerPoolSize: 5,
EventQueueSize: 100,
}, nil) // nil client = create lazily
err := handler.SendEvent("Namespace", "EventType", payload)
queued := handler.SendEventAsync("Namespace", "EventType", payload)
// Shutdown with timeout
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := handler.Shutdown(ctx); err != nil {
log.Printf("Shutdown interrupted: %v", err)
}
Observability hooks:
handler := rasevents.NewEventsHandler(rasevents.Config{
// ... config ...
Hooks: &rasevents.Hooks{
// Called after each synchronous send
OnEventSent: func(namespace, eventType string, duration time.Duration, err error) {
metrics.RecordLatency("event_send", duration)
if err != nil {
metrics.IncrCounter("event_send_errors")
}
},
// Called when async event is queued (dropped=true if queue full)
OnEventQueued: func(namespace, eventType string, dropped bool) {
if dropped {
metrics.IncrCounter("event_dropped")
}
},
// Called after async worker processes an event
OnEventProcessed: func(namespace, eventType string, duration time.Duration, err error) {
metrics.RecordLatency("event_process", duration)
},
},
}, nil)
Testing:
// Inject mock client for testing
rasevents.SetNatsClient(mockClient)
defer rasevents.ResetNatsClient()
// Or with handler instances
handler := rasevents.NewEventsHandler(cfg, mockClient)
Environment variables:
EVENTS_DEFAULT_NAMESPACE- Default namespace (required)EVENTS_SUBJECT- Base NATS subject (required)EVENTS_TIMEOUT_SECONDS- Request timeout in seconds (default: 60)EVENTS_WORKER_POOL_SIZE- Async worker count (default: 50)EVENTS_QUEUE_SIZE- Async queue size (default: 1000)
rashttp
HTTP helper functions for request parsing, response writing, and common patterns.
import "github.com/transactrx/ras-utils/rashttp"
// Request helpers
ip := rashttp.GetClientIP(r) // extracts from X-Forwarded-For, X-Real-IP, or RemoteAddr
url := rashttp.GetFullRequestURL(r) // reconstructs full URL including scheme/host from proxied requests
token := rashttp.GetBearerToken(r) // extracts bearer token from Authorization header
isHtmx := rashttp.IsHTMX(r) // checks HX-Request header
isAjax := rashttp.IsAjax(r) // checks X-Requested-With header
// Query parameter parsing with defaults
page := rashttp.QueryInt(r, "page", 1)
sort := rashttp.QueryString(r, "sort", "created_at")
// JSON request body decoding (with size limit)
type CreateUserRequest struct {
Name string `json:"name"`
Email string `json:"email"`
}
payload, err := rashttp.DecodeJSON[CreateUserRequest](r, rashttp.DefaultMaxBodySize)
// Response helpers — generic
rashttp.WriteJSON(w, http.StatusOK, data)
rashttp.WriteError(w, http.StatusBadRequest, "invalid input")
// Response helpers — status shorthands
rashttp.OK(w, data) // 200
rashttp.Created(w, data) // 201
rashttp.Accepted(w, data) // 202
rashttp.NoContent(w) // 204
rashttp.BadRequest(w, "missing field") // 400
rashttp.Unauthorized(w, "invalid token") // 401
rashttp.Forbidden(w, "not allowed") // 403
rashttp.NotFound(w, "resource not found") // 404
rashttp.Conflict(w, "already exists") // 409
rashttp.UnprocessableEntity(w, "validation error")// 422
rashttp.TooManyRequests(w, "rate limited") // 429
rashttp.InternalServerError(w, "unexpected error")// 500
rashttp.ServiceUnavailable(w, "try again later") // 503
// Health check handler
http.Handle("/health", rashttp.HealthHandler(func() error {
return db.Ping() // returns 200 if nil, 503 if error
}))
rasstack
Middleware composition utility for chaining HTTP middleware.
import "github.com/transactrx/ras-utils/rasstack"
// Compose multiple middleware
stack := rasstack.CreateStack(
raslogging.LoggingMiddleware(logger),
authMiddleware,
rateLimitMiddleware,
)
http.Handle("/", stack(myHandler))
rasworker
Generic worker pool for concurrent job execution with graceful shutdown and error handling.
import "github.com/transactrx/ras-utils/rasworker"
// Create pool with 10 workers and queue size of 100
pool := rasworker.NewPool(10, 100)
pool.Start()
// Submit jobs (returns false if queue is full)
ok := pool.Submit(func(ctx context.Context) error {
// do work
return nil
})
// Graceful shutdown - drains queue before returning
err := pool.Shutdown(context.Background())
// Shutdown with timeout - cancels in-flight jobs if deadline exceeded
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := pool.Shutdown(ctx); err != nil {
log.Printf("Shutdown timeout: %v", err)
}
Error handling:
// Create pool with error handler
pool := rasworker.NewPoolWithErrorHandler(10, 100, func(err error) {
log.Printf("Job failed: %v", err)
metrics.IncrCounter("worker_errors")
})
// Or add handlers after creation (thread-safe, can be called after Start)
pool := rasworker.NewPool(10, 100)
pool.AddErrorHandler(func(err error) {
slog.Error("job error", "error", err)
})
pool.AddErrorHandler(func(err error) {
alerting.Notify(err)
})
pool.Start()