redis_ipc

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2025 License: AGPL-3.0 Imports: 6 Imported by: 0

README

Rescoot Redis IPC Library

Redis-based IPC library implementing pub/sub messaging, queue processing, and atomic transactions. This is a clean-room reimplementation of the IPC functionality from unu's usk library.

Core Architecture

Single-client Redis multiplexer handling four IPC patterns:

  1. Pub/Sub Messaging: Topic-based message distribution via Redis PUBLISH/SUBSCRIBE
  2. Request Processing: Blocking queue consumers using BRPOP/LPUSH
  3. Atomic Transactions: Pipelined command groups with MULTI/EXEC semantics
  4. Direct Command Execution: Immediate execution of Redis commands with results

Usage Example

// Initialize with connection params
client := redis_ipc.New(redis_ipc.Config{
    Address: "localhost",
    Port:    6379,
})

// Subscribe to messages
sg := client.Subscribe("group")
sg.Handle("channel", func(msg []byte) error {
    log.Printf("Received: %s", msg)
    return nil
})

// Process queue items
client.HandleRequests("queue", func(data []byte) error {
    log.Printf("Processing: %s", data)
    return nil
})

// Direct command execution
value, err := client.Get("mykey")
if err != nil {
    log.Printf("Error: %v", err)
}
log.Printf("Value: %s", value)

// Using convenience methods
err = client.Set("mykey", "newvalue", 0)
count, err := client.Incr("counter")
hashValue, err := client.HGet("myhash", "field")

// Execute atomic transactions with results
txg := client.NewTxGroup("tx")
txg.Add("SET", "key", "value")
txg.Add("GET", "key")
txg.Add("INCR", "counter")

results, err := txg.Exec()
if err != nil {
    log.Printf("Transaction failed: %v", err)
}
log.Printf("SET result: %v", results[0])
log.Printf("GET result: %v", results[1])
log.Printf("INCR result: %v", results[2])

Concurrency Model

  • Multiplexed Redis connections (single pool)
  • Message routing via concurrent maps
  • Lock-free handler dispatch
  • Thread-safe transaction pipelining

Dependencies

  • Redis server
  • go-redis

Installation

go get github.com/rescoot/redis-ipc

License

AGPL-3.0, see LICENSE for details

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Client

type Client struct {
	// contains filtered or unexported fields
}

func New

func New(cfg Config) (*Client, error)

func (*Client) BLPop added in v0.2.0

func (c *Client) BLPop(timeout time.Duration, keys ...string) ([]string, error)

BLPop blocks until it can remove and return the first element of a list

func (*Client) BRPop added in v0.2.0

func (c *Client) BRPop(timeout time.Duration, keys ...string) ([]string, error)

BRPop blocks until it can remove and return the last element of a list

func (*Client) Close

func (c *Client) Close() error

func (*Client) Decr added in v0.2.0

func (c *Client) Decr(key string) (int64, error)

Decr decrements the integer value of a key by one

func (*Client) Del added in v0.2.0

func (c *Client) Del(keys ...string) (int64, error)

Del deletes a key

func (*Client) Do added in v0.2.0

func (c *Client) Do(cmd string, args ...interface{}) (interface{}, error)

Do executes a Redis command and returns the result

func (*Client) Exists added in v0.2.0

func (c *Client) Exists(keys ...string) (int64, error)

Exists checks if a key exists

func (*Client) Expire added in v0.2.0

func (c *Client) Expire(key string, expiration time.Duration) (bool, error)

Expire sets a key's time to live in seconds

func (*Client) Get added in v0.2.0

func (c *Client) Get(key string) (string, error)

Get retrieves the value of a key

func (*Client) HGet added in v0.2.0

func (c *Client) HGet(key, field string) (string, error)

HGet retrieves the value of a hash field

func (*Client) HGetAll added in v0.2.0

func (c *Client) HGetAll(key string) (map[string]string, error)

HGetAll retrieves all fields and values of a hash

func (*Client) HSet added in v0.2.0

func (c *Client) HSet(key, field string, value interface{}) error

HSet sets the value of a hash field

func (*Client) HandleRequests

func (c *Client) HandleRequests(name string, handler func([]byte) error) *RequestHandler

func (*Client) Incr added in v0.2.0

func (c *Client) Incr(key string) (int64, error)

Incr increments the integer value of a key by one

func (*Client) LPop added in v0.2.0

func (c *Client) LPop(key string) (string, error)

LPop removes and returns the first element of a list

func (*Client) LPush added in v0.2.0

func (c *Client) LPush(key string, values ...interface{}) (int64, error)

LPush inserts values at the head of a list

func (*Client) NewTxGroup

func (c *Client) NewTxGroup(name string) *TxGroup

func (*Client) Publish added in v0.2.0

func (c *Client) Publish(channel string, message interface{}) (int64, error)

Publish publishes a message to a channel

func (*Client) RPop added in v0.2.0

func (c *Client) RPop(key string) (string, error)

RPop removes and returns the last element of a list

func (*Client) RPush added in v0.2.0

func (c *Client) RPush(key string, values ...interface{}) (int64, error)

RPush inserts values at the tail of a list

func (*Client) Set added in v0.2.0

func (c *Client) Set(key string, value interface{}, expiration time.Duration) error

Set sets the value of a key

func (*Client) Subscribe

func (c *Client) Subscribe(name string) *SubscriberGroup

type Config

type Config struct {
	Address       string
	Port          int
	RetryInterval time.Duration
	MaxRetries    int
}

type RequestHandler

type RequestHandler struct {
	// contains filtered or unexported fields
}

type SubscriberGroup

type SubscriberGroup struct {
	// contains filtered or unexported fields
}

func (*SubscriberGroup) Handle

func (sg *SubscriberGroup) Handle(channel string, handler func([]byte) error) error

type TxGroup

type TxGroup struct {
	// contains filtered or unexported fields
}

func (*TxGroup) Add

func (g *TxGroup) Add(cmd string, args ...interface{}) error

func (*TxGroup) Exec

func (g *TxGroup) Exec() ([]interface{}, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL