tlq

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 22, 2025 License: MIT Imports: 7 Imported by: 0

README

TLQ Client for Go

A lightweight, efficient Go client library for TLQ (Tiny Little Queue) - a simple, in-memory message queue for development and testing.

Features

  • Simple and intuitive API
  • Automatic retry with exponential backoff
  • Context-aware operations for proper cancellation
  • Configurable timeout and retry settings
  • Zero external dependencies (uses only Go standard library)
  • Thread-safe operations
  • Comprehensive error handling

Installation

go get github.com/skyaktech/tlq-client-go

Quick Start

package main

import (
    "context"
    "fmt"
    "log"
    
    tlq "github.com/skyaktech/tlq-client-go"
)

func main() {
    // Create a new client with default settings
    client := tlq.NewClient()
    
    ctx := context.Background()
    
    // Check server health
    if err := client.HealthCheck(ctx); err != nil {
        log.Fatal("Server is not healthy:", err)
    }
    
    // Add a message
    message, err := client.AddMessage(ctx, "Hello, TLQ!")
    if err != nil {
        log.Fatal("Failed to add message:", err)
    }
    fmt.Printf("Added message: %s\n", message.ID)
    
    // Get messages
    messages, err := client.GetMessages(ctx, 1)
    if err != nil {
        log.Fatal("Failed to get messages:", err)
    }
    
    for _, msg := range messages {
        fmt.Printf("Got message: %s - %s\n", msg.ID, msg.Body)
        
        // Delete the message after processing
        if err := client.DeleteMessage(ctx, msg.ID); err != nil {
            log.Printf("Failed to delete message: %v", err)
        }
    }
}

Configuration

The client can be configured using functional options:

client := tlq.NewClient(
    tlq.WithHost("custom.host"),
    tlq.WithPort(8080),
    tlq.WithTimeout(60 * time.Second),
    tlq.WithMaxRetries(5),
    tlq.WithRetryDelay(200 * time.Millisecond),
)
Available Options
  • WithHost(host string) - Set the TLQ server host (default: "localhost")
  • WithPort(port int) - Set the TLQ server port (default: 1337)
  • WithTimeout(timeout time.Duration) - Set request timeout (default: 30s)
  • WithMaxRetries(maxRetries int) - Set maximum retry attempts (default: 3)
  • WithRetryDelay(delay time.Duration) - Set base retry delay (default: 100ms)
  • WithHTTPClient(client *http.Client) - Use a custom HTTP client

API Reference

Client Methods
NewClient(opts ...Option) *Client

Creates a new TLQ client with the specified options.

HealthCheck(ctx context.Context) error

Checks if the TLQ server is healthy and responsive.

AddMessage(ctx context.Context, body string) (*Message, error)

Adds a new message to the queue. Returns the created message with its ID.

  • Message body is limited to 64KB
GetMessages(ctx context.Context, count int) ([]*Message, error)

Retrieves up to count messages from the queue.

DeleteMessage(ctx context.Context, messageID string) error

Deletes a single message from the queue.

DeleteMessages(ctx context.Context, messageIDs []string) error

Deletes multiple messages from the queue.

RetryMessage(ctx context.Context, messageID string) error

Returns a single message to the queue for retry.

RetryMessages(ctx context.Context, messageIDs []string) error

Returns multiple messages to the queue for retry.

PurgeQueue(ctx context.Context) error

Removes all messages from the queue.

Message Structure
type Message struct {
    ID         string `json:"id"`          // UUID v7 message identifier
    Body       string `json:"body"`        // Message content
    State      string `json:"state"`       // Message state (Ready, Processing, etc.)
    RetryCount int    `json:"retry_count"` // Number of retry attempts
}

Advanced Usage

Worker Pattern
func worker(ctx context.Context, client *tlq.Client, workerID int) {
    for {
        select {
        case <-ctx.Done():
            return
        default:
            messages, err := client.GetMessages(ctx, 10)
            if err != nil {
                log.Printf("Worker %d: Failed to get messages: %v", workerID, err)
                time.Sleep(time.Second)
                continue
            }
            
            for _, msg := range messages {
                // Process message
                if err := processMessage(msg); err != nil {
                    // Retry on failure
                    client.RetryMessage(ctx, msg.ID)
                } else {
                    // Delete on success
                    client.DeleteMessage(ctx, msg.ID)
                }
            }
            
            if len(messages) == 0 {
                time.Sleep(100 * time.Millisecond)
            }
        }
    }
}
Batch Processing
func processBatch(ctx context.Context, client *tlq.Client) error {
    messages, err := client.GetMessages(ctx, 100)
    if err != nil {
        return fmt.Errorf("failed to get messages: %w", err)
    }
    
    var successIDs, failedIDs []string
    
    for _, msg := range messages {
        if err := processMessage(msg); err != nil {
            failedIDs = append(failedIDs, msg.ID)
        } else {
            successIDs = append(successIDs, msg.ID)
        }
    }
    
    // Delete successful messages
    if len(successIDs) > 0 {
        if err := client.DeleteMessages(ctx, successIDs); err != nil {
            return fmt.Errorf("failed to delete messages: %w", err)
        }
    }
    
    // Retry failed messages
    if len(failedIDs) > 0 {
        if err := client.RetryMessages(ctx, failedIDs); err != nil {
            return fmt.Errorf("failed to retry messages: %w", err)
        }
    }
    
    return nil
}
Graceful Shutdown
func main() {
    client := tlq.NewClient()
    ctx, cancel := context.WithCancel(context.Background())
    
    // Handle shutdown signals
    sigChan := make(chan os.Signal, 1)
    signal.Notify(sigChan, os.Interrupt, syscall.SIGTERM)
    
    go func() {
        <-sigChan
        log.Println("Shutting down...")
        cancel()
    }()
    
    // Start workers
    var wg sync.WaitGroup
    for i := 0; i < 5; i++ {
        wg.Add(1)
        go func(id int) {
            defer wg.Done()
            worker(ctx, client, id)
        }(i)
    }
    
    wg.Wait()
    log.Println("All workers stopped")
}

Running Tests

# Run all tests
go test ./...

# Run tests with coverage
go test -cover ./...

# Run tests with verbose output
go test -v ./...

Requirements

  • Go 1.18 or higher
  • TLQ server running (default: localhost:1337)

TLQ Server Installation

Install TLQ using Cargo:

cargo install tlq
tlq

Or run with Docker:

docker run -p 1337:1337 nebojsa/tlq:latest

Contributing

Contributions are welcome! Please feel free to submit a Pull Request.

  1. Fork the repository
  2. Create your feature branch (git checkout -b feature/amazing-feature)
  3. Commit your changes (git commit -m 'Add some amazing feature')
  4. Push to the branch (git push origin feature/amazing-feature)
  5. Open a Pull Request

License

This project is licensed under the MIT License - see the LICENSE file for details.

Support

For issues, questions, or contributions, please visit the GitHub repository.

Documentation

Index

Constants

View Source
const (
	DefaultHost       = "localhost"
	DefaultPort       = 1337
	DefaultTimeout    = 30 * time.Second
	DefaultMaxRetries = 3
	DefaultRetryDelay = 100 * time.Millisecond
	MaxMessageSize    = 64 * 1024 // 64KB
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func NewClient

func NewClient(opts ...Option) *Client

func (*Client) AddMessage

func (c *Client) AddMessage(ctx context.Context, body string) (*Message, error)

func (*Client) DeleteMessage

func (c *Client) DeleteMessage(ctx context.Context, messageID string) error

func (*Client) DeleteMessages

func (c *Client) DeleteMessages(ctx context.Context, messageIDs []string) error

func (*Client) GetMessages

func (c *Client) GetMessages(ctx context.Context, count int) ([]*Message, error)

func (*Client) HealthCheck

func (c *Client) HealthCheck(ctx context.Context) error

func (*Client) PurgeQueue

func (c *Client) PurgeQueue(ctx context.Context) error

func (*Client) RetryMessage

func (c *Client) RetryMessage(ctx context.Context, messageID string) error

func (*Client) RetryMessages

func (c *Client) RetryMessages(ctx context.Context, messageIDs []string) error

type Config

type Config struct {
	Host       string
	Port       int
	Timeout    time.Duration
	MaxRetries int
	RetryDelay time.Duration
	HTTPClient *http.Client
}

type Message

type Message struct {
	ID         string `json:"id"`
	Body       string `json:"body"`
	State      string `json:"state"`
	RetryCount int    `json:"retry_count"`
}

type Option

type Option func(*Config)

func WithHTTPClient

func WithHTTPClient(client *http.Client) Option

func WithHost

func WithHost(host string) Option

func WithMaxRetries

func WithMaxRetries(maxRetries int) Option

func WithPort

func WithPort(port int) Option

func WithRetryDelay

func WithRetryDelay(delay time.Duration) Option

func WithTimeout

func WithTimeout(timeout time.Duration) Option

Directories

Path Synopsis
examples
basic command
worker command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL