pipeline

package module
v0.0.0-...-83d1243 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 28, 2026 License: MIT Imports: 5 Imported by: 3

README

Go

An advanced framework for pipeline processing built to process messages in a structured, staged manner similar to network switches and routers. This project natively supports horizontal scaling and distributed workloads via pluggable message queue drivers (Local Channels, Redis, AMQP/RabbitMQ). This permits launching thousands of goroutines within one process, or physically distributing nodes on servers to load balance pipeline stages.

Basic Structures & Usage:

1- pipeline.PipelineMessage:
type PipelineDirection string

const (
	PipelineInDirection  PipelineDirection = "IN"
	PipelineOutDirection PipelineDirection = "OUT"
)

type PipelineMessage struct {
	LastProcess int
	Direction   PipelineDirection
	Content     []byte
	Finished    bool
	Drop        bool
}

The standard object passed through the pipeline stages. Since pipeline components routinely span over the network, Content explicitly accepts binary byte slices ([]byte). Clients are expected to easily parse (json.Marshal) structures internally within process logic stages. A message enters the initial queue via func (pipe *Pipeline) SendMessage(msg PipelineMessage).

2- pipeline.Queue Drivers:

The pipeline internally routes messages between discrete processes exclusively by utilizing dependencies fulfilling the pipeline.Queue interface. First-party implementations include:

  • LocalQueue (github.com/m-motawea/pipeline/localqueue): Leverages standard Go buffered channels for extremely fast isolated concurrent processing locally.
  • RedisQueue (github.com/m-motawea/pipeline/redisqueue): Leverages Redis Lists natively relying on BLPOP to structurally enforce single-delivery worker lock load distribution across multiple servers.
  • AMQPQueue (github.com/m-motawea/pipeline/amqpqueue): Connects to RabbitMQ architectures natively mapping active consumer channels.
3- pipeline.Pipeline:

A pipeline is composed of multiple processes executing sequentially on a message in order until:

  1. Message is set as Finished by a process.
  2. Message is set as Drop by a process.
  3. No more processes exist linearly in the pipeline.

Messages traverse either one-way (up the pipeline routing) or two-ways (up then down reflecting backwards dynamically).

To initialize a pipeline by binding your configured queue driver:

import "github.com/m-motawea/pipeline/localqueue"

var wg sync.WaitGroup
driver := localqueue.NewLocalQueue()

// Create a one way pipeline
pipe1, _ := pipeline.NewPipeline("production_pipeline", false, &wg, driver)
4- pipeline.PipelineProcess:
type PipelineProcess struct {
	Id           int
	Name         string
	Concurrency  int
	// ... unexported parameters
}

A pipeline process represents a configured stage of the pipeline graph. Concurrency explicitly dictates the amount of discrete workers/goroutines to spontaneously spawn when evaluating messages on the queue asynchronously.

To construct a process, define your manipulation tracking evaluating msg.Content bytes:

inFunc := func(proc pipeline.PipelineProcess, msg pipeline.PipelineMessage) pipeline.PipelineMessage {
    // Process input data logic 
    log.Printf("Worker processing on stage %s", proc.Name)
    // Marshal customized updates...
    return msg
}

// Instantiate the distinct processing node
proc1, _ := pipeline.NewPipelineProcess("stage_1_authentication", inFunc)

// Scale concurrency to 10 lightweight thread workers instantly bridging the local queue!
proc1.Concurrency = 10 

pipe1.AddProcess(&proc1)
pipe1.Start() // Initiates standard asynchronous worker polls.

Running Distributed Tests

To see real functional distributed deployment examples running separate CLI apps communicating reliably through Redis or RabbitMQ architectures, view the examples/distributed directory!

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Pipeline

type Pipeline struct {
	Name string
	// contains filtered or unexported fields
}

func NewPipeline

func NewPipeline(name string, twoWay bool, wg *sync.WaitGroup, queue Queue, consumerChannel ...PipelineChannel) (Pipeline, error)

func (*Pipeline) AddProcess

func (pipe *Pipeline) AddProcess(proc *PipelineProcess)

func (*Pipeline) SendMessage

func (pipe *Pipeline) SendMessage(msg PipelineMessage)

func (*Pipeline) Start

func (pipe *Pipeline) Start()

func (*Pipeline) Stop

func (pipe *Pipeline) Stop()

type PipelineChannel

type PipelineChannel chan PipelineMessage

type PipelineDirection

type PipelineDirection string
const (
	PipelineInDirection  PipelineDirection = "IN"
	PipelineOutDirection PipelineDirection = "OUT"
)

type PipelineMessage

type PipelineMessage struct {
	LastProcess int
	Direction   PipelineDirection
	Content     []byte
	Finished    bool
	Drop        bool
}

type PipelineProcess

type PipelineProcess struct {
	Id   int
	Name string

	Concurrency int
	Pipe        *Pipeline
	// contains filtered or unexported fields
}

func (*PipelineProcess) Close

func (proc *PipelineProcess) Close()

func (*PipelineProcess) InProcess

func (proc *PipelineProcess) InProcess(ctx context.Context, queue Queue)

func (*PipelineProcess) InQueue

func (proc *PipelineProcess) InQueue(msg PipelineMessage)

func (*PipelineProcess) OutProcess

func (proc *PipelineProcess) OutProcess(ctx context.Context, queue Queue)

func (*PipelineProcess) OutQueue

func (proc *PipelineProcess) OutQueue(msg PipelineMessage)

type Queue

type Queue interface {
	// Publish sends a PipelineMessage to the specified topic
	Publish(ctx context.Context, topic string, msg PipelineMessage) error

	// Subscribe starts a consumer on the given topic. The consumerGroup
	// allows multiple nodes to process messages from the same topic
	// in a load-balanced way.
	// Returns a channel that receives messages and an error if subscription fails.
	Subscribe(ctx context.Context, topic string, consumerGroup string) (<-chan PipelineMessage, error)

	// Close terminates the connection to the queue and cleans up resources.
	Close() error
}

Queue is the interface that abstracts the message queue backend for scaling the pipeline across goroutines and nodes.

Directories

Path Synopsis
examples
distributed command

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL