normfs_go

package module
v0.0.0-...-7fe72db Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 23, 2026 License: MIT Imports: 13 Imported by: 0

README

NormFS Go Client 🐹

Go client library for connecting to NormFS servers.

Installation

go get github.com/norma-core/normfs/normfs_go

Quick Start

package main

import (
    "fmt"
    "log"
    "log/slog"
    "os"

    normfs "github.com/norma-core/normfs/normfs_go"
)

func main() {
    logger := slog.New(slog.NewTextHandler(os.Stdout, nil))

    // Connect to NormFS server
    client, err := normfs.NewClient("localhost:8888", logger)
    if err != nil {
        log.Fatal(err)
    }

    // Write data
    queuePath := "sensors/imu"
    data := []byte("sensor reading")

    id, err := client.Enqueue(queuePath, data)
    if err != nil {
        log.Fatal(err)
    }
    fmt.Printf("Written entry with ID: %s\n", id.String())

    // Read data
    qr := client.ReadFromOffset(queuePath, id, 1, 1, 10)
    for entry := range qr.Data {
        fmt.Printf("Entry %s: %s\n", entry.ID.ID.String(), string(entry.Data))
    }

    if qr.Err != nil {
        log.Fatal(qr.Err)
    }
}

Features

  • 🔌 Simple API: Easy-to-use client for all NormFS operations
  • 📖 Flexible Reads: Absolute, tail-based, and subscription modes
  • High Performance: Zero-copy operations where possible
  • 🤖 Multi-Client: Support for parallel connections with connection pooling

API Overview

Creating a Client
// Basic connection with connection pooling (4 read connections, 1 write connection)
logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
client, err := normfs.NewClient("localhost:8888", logger)
Writing Data
// Single write
id, err := client.Enqueue("queue/path", []byte("data"))

// Batch write
ids, err := client.EnqueuePack("queue/path", [][]byte{
    []byte("data1"),
    []byte("data2"),
    []byte("data3"),
})

Examples

Sensor Data Logger
package main

import (
    "encoding/json"
    "log"
    "log/slog"
    "os"
    "time"

    normfs "github.com/norma-core/normfs/normfs_go"
)

type SensorReading struct {
    Timestamp time.Time
    Value     float64
    SensorID  string
}

func main() {
    logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
    client, _ := normfs.NewClient("localhost:8888", logger)

    // Write sensor data
    reading := SensorReading{
        Timestamp: time.Now(),
        Value:     42.5,
        SensorID:  "imu-001",
    }

    data, _ := json.Marshal(reading)
    id, _ := client.Enqueue("sensors/imu", data)

    println("Logged sensor reading:", id.String())
}
Real-Time Subscriber
package main

import (
    "fmt"
    "log/slog"
    "os"

    normfs "github.com/norma-core/normfs/normfs_go"
)

func main() {
    logger := slog.New(slog.NewTextHandler(os.Stdout, nil))
    client, _ := normfs.NewClient("localhost:8888", logger)

    // Subscribe to real-time updates
    entryChan := make(chan normfs.StreamEntry, 100)
    errChan := client.Follow("events/system", entryChan)

    fmt.Println("Listening for events...")

    go func() {
        for entry := range entryChan {
            fmt.Printf("[%s] %s\n", entry.ID.ID.String(), string(entry.Data))
        }
    }()

    if err := <-errChan; err != nil {
        panic(err)
    }
}

Protocol

The Go client uses NormFS's Protocol Buffers-based wire protocol over TCP. The protocol specification is defined in the proto/ directory of the main repository.

Building

# Run tests
go test ./...

# Generate protobuf code (if needed)
go run gen.go

Requirements

License

MIT

Documentation

Index

Constants

Variables

View Source
var (
	ErrNotConnected     = errors.New("client not connected or setup not complete")
	ErrBufferFull       = errors.New("client request buffer is full")
	ErrRequestTimeout   = errors.New("request timed out waiting for server response")
	ErrConnectionClosed = errors.New("connection closed while waiting for response")
	ErrInvalidResponse  = errors.New("invalid or unexpected response from server")
	ErrServerSide       = errors.New("server returned an error")
	ErrQueueNotFound    = errors.New("queue not found on server")

	ErrReadStreamClosed = errors.New("read stream closed by server or connection error")
	ErrEntryNotFound    = errors.New("entry not found")
)

Functions

This section is empty.

Types

type Client

type Client interface {
	Enqueue(queueID string, data []byte) (uintn.UintN, error)
	EnqueuePack(queueID string, data [][]byte) ([]uintn.UintN, error)
	ReadFromOffset(queueID string, offset uintn.UintN, limit uint64, step uint64, bufSize uint) *QueueRead
	ReadFromTail(queueID string, offset uintn.UintN, limit uint64, step uint64, bufSize uint) *QueueRead
	Follow(queueID string, target chan<- StreamEntry) <-chan error
}

func NewClient

func NewClient(addr string, logger *slog.Logger) (Client, error)

NewClient creates a new client with multiple connections

type ConnectionStats

type ConnectionStats struct {
	TotalConnections        int
	HealthyReadConnections  int
	HealthyWriteConnections int
}

ConnectionStats holds statistics about the multi-client connections

type DataSource

type DataSource = pb.ReadResponse_DataSource

DataSource re-exported from protobuf

type QueueRead

type QueueRead struct {
	Id   uuid.UUID
	Data chan StreamEntry
	Err  error
}

QueueRead represents a read operation from the queue (client-side)

type StreamEntry

type StreamEntry struct {
	ID         StreamEntryId
	Data       []byte
	DataSource DataSource
	Err        error
}

StreamEntry represents an entry in a stream with its associated metadata.

type StreamEntryId

type StreamEntryId struct {
	ID uintn.UintN
}

Directories

Path Synopsis
pb
Package uintn provides a flexible unsigned integer type system that can represent u8, u16, u32, u64, or arbitrary precision values.
Package uintn provides a flexible unsigned integer type system that can represent u8, u16, u32, u64, or arbitrary precision values.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL