bamboo

package module
v0.0.3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Oct 9, 2023 License: MIT Imports: 24 Imported by: 5

README

bamboo

bamboo is a library for distributing work across machines with asynchronous communication.

Overview

  • Workers are applications that execute time-consuming processes.
  • App combines worker processing.
  • App and Workers communicate asynchronously using Redis, for example.
sequenceDiagram

participant App
participant Worker1
participant Redis_Worker1

Worker1 -->> Redis_Worker1: BRPOP to wait a message for Worker1

App -->> Redis_Request1: SUBSCRIBE result for request1
App -->> Redis_Worker1: LPUSH message
Redis_Worker1 ->> Worker1: Fetch message
Worker1 ->> Worker1: process
Worker1 ->> Redis_Request1: Conn
Worker1 -->> Redis_Request1: PUBLISH result of request1
Worker1 ->> Redis_Request1: Close
Redis_Request1 ->> App: Fetch result

Installation

Example

Development

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	RequestIDKey  = "bamboo_request_id"
	LoggerNameKey = "bamboo_logger_name"
)
View Source
var (
	BambooLoggers map[ContextKey]*slog.Logger
)
View Source
var ErrAborted = errors.New("Aborted")
View Source
var ErrContextCanceled = errors.New("ContextCanceled")
View Source
var ErrTimedout = errors.New("Timedout")

Functions

func GetLoggerFromContext added in v0.0.2

func GetLoggerFromContext(ctx context.Context, key ContextKey) *slog.Logger

GetLoggerFromContext Gets the logger from context

func Init added in v0.0.2

func Init(ctx context.Context) context.Context

func MetricsServerProcess added in v0.0.2

func MetricsServerProcess(ctx context.Context, port int, gracefulShutdownTimeSec int) error

func WithLoggerName added in v0.0.2

func WithLoggerName(ctx context.Context, val ContextKey) context.Context

func WithValue added in v0.0.2

func WithValue(ctx context.Context, key ContextKey, val any) context.Context

Types

type BambooHeartbeatPublisher added in v0.0.2

type BambooHeartbeatPublisher interface {
	Ping(ctx context.Context) error
	Run(ctx context.Context, resultChannel string, heartbeatIntervalMSec int, done <-chan interface{}, aborted <-chan interface{}) error
}

func NewGoroutineBambooHeartbeatPublisher added in v0.0.2

func NewGoroutineBambooHeartbeatPublisher(pubsubMap GoroutineBambooPubSubMap) BambooHeartbeatPublisher

func NewRedisBambooHeartbeatPublisher

func NewRedisBambooHeartbeatPublisher(publisherOptions *redis.UniversalOptions) BambooHeartbeatPublisher

type BambooLogHandler added in v0.0.2

type BambooLogHandler struct {
	slog.Handler
}

func (*BambooLogHandler) Handle added in v0.0.2

func (h *BambooLogHandler) Handle(ctx context.Context, record slog.Record) error

func (*BambooLogHandler) WithAttrs added in v0.0.2

func (h *BambooLogHandler) WithAttrs(attrs []slog.Attr) slog.Handler

func (*BambooLogHandler) WithGroup added in v0.0.2

func (h *BambooLogHandler) WithGroup(name string) slog.Handler

type BambooRequestConsumer added in v0.0.2

type BambooRequestConsumer interface {
	Ping(ctx context.Context) error
	Consume(ctx context.Context) (*pb.WorkerParameter, error)
	Close(ctx context.Context) error
}

func NewGoroutineBambooRequestConsumer added in v0.0.2

func NewGoroutineBambooRequestConsumer(queue <-chan []byte) BambooRequestConsumer

func NewKafkaBambooRequestConsumer added in v0.0.2

func NewKafkaBambooRequestConsumer(consumerOptions kafka.ReaderConfig, requestWaitTimeout time.Duration) BambooRequestConsumer

func NewRedisBambooRequestConsumer added in v0.0.2

func NewRedisBambooRequestConsumer(consumerOptions *redis.UniversalOptions, consumerChannel string, requestWaitTimeout time.Duration) BambooRequestConsumer

type BambooRequestProducer

type BambooRequestProducer interface {
	Produce(ctx context.Context, resultChannel string, heartbeatIntervalMSec int, jobTimeoutMSec int, headers map[string]string, data []byte) error
	Close(ctx context.Context) error
}

func NewGoroutineBambooRequestProducer added in v0.0.2

func NewGoroutineBambooRequestProducer(ctx context.Context, workerName string, queue chan<- []byte) BambooRequestProducer

func NewKafkaBambooRequestProducer

func NewKafkaBambooRequestProducer(ctx context.Context, workerName string, kafkaWriter *kafka.Writer) BambooRequestProducer

func NewRedisBambooRequestProducer

func NewRedisBambooRequestProducer(ctx context.Context, workerName string, producerOptions redis.UniversalOptions, producerChannel string) BambooRequestProducer

type BambooResultPublisher added in v0.0.2

type BambooResultPublisher interface {
	Ping(ctx context.Context) error
	Publish(ctx context.Context, resultChannel string, result []byte) error
}

func NewGoroutineBambooResultPublisher added in v0.0.2

func NewGoroutineBambooResultPublisher(pubsubMap GoroutineBambooPubSubMap) BambooResultPublisher

func NewRedisBambooResultPublisher added in v0.0.2

func NewRedisBambooResultPublisher(publisherOptions *redis.UniversalOptions) BambooResultPublisher

type BambooResultSubscriber

type BambooResultSubscriber interface {
	Ping(ctx context.Context) error

	OpenSubscribeConnection(ctx context.Context, resultChannel string) (SubscribeFunc, CloseSubscribeConnectionFunc, error)
}

func NewGoroutineBambooResultSubscriber added in v0.0.2

func NewGoroutineBambooResultSubscriber(ctx context.Context, workerName string, pubsubMap GoroutineBambooPubSubMap) BambooResultSubscriber

func NewRedisBambooResultSubscriber added in v0.0.2

func NewRedisBambooResultSubscriber(ctx context.Context, workerName string, subscriberConfig *redis.UniversalOptions) BambooResultSubscriber

type BambooWorker

type BambooWorker interface {
	Run(ctx context.Context) error
}

func NewBambooWorker added in v0.0.2

func NewBambooWorker(createRequestConsumerFunc CreateBambooRequestConsumerFunc, resultPublisher BambooResultPublisher, heartbeatPublisher BambooHeartbeatPublisher, workerFunc WorkerFunc, numWorkers int, logConfigFunc LogConfigFunc, metricsEventHandler MetricsEventHandler) (BambooWorker, error)

type BambooWorkerClient added in v0.0.2

type BambooWorkerClient interface {
	Ping(ctx context.Context) error
	Close(ctx context.Context)
	Call(ctx context.Context, heartbeatIntervalMSec int, jobTimeoutMSec int, headers map[string]string, param []byte) ([]byte, error)
}

func NewBambooWorkerClient added in v0.0.2

func NewBambooWorkerClient(requestProducer BambooRequestProducer, resultSubscriber BambooResultSubscriber) BambooWorkerClient

type ByteArreayResult

type ByteArreayResult struct {
	Value []byte
	Error error
}

type CloseSubscribeConnectionFunc added in v0.0.2

type CloseSubscribeConnectionFunc func(ctx context.Context) error

type ContextKey added in v0.0.2

type ContextKey string
const (
	BambooWorkerLoggerContextKey             ContextKey = "BambooWorker"
	BambooWorkerClientLoggerContextKey       ContextKey = "BambooWorkerClient"
	BambooWorkerJobLoggerContextKey          ContextKey = "BambooWorkerJob"
	BambooHeartbeatPublisherLoggerContextKey ContextKey = "BambooHeartbeatPublisher"
	BambooRequestConsumerLoggerContextKey    ContextKey = "BambooRequestConsumer"
	BambooRequestProducerLoggerContextKey    ContextKey = "BambooRequestProducer"
	BambooResultPublisherLoggerContextKey    ContextKey = "BambooResultPublisher"
	BambooResultSubscriberLoggerContextKey   ContextKey = "BambooResultSubscriber"
)
const (
	RequestIDContextKey  ContextKey = "RequestIDContextKey"
	LoggerNameContextKey ContextKey = "LoggerNameContextKey"
)

type CreateBambooRequestConsumerFunc added in v0.0.2

type CreateBambooRequestConsumerFunc func(ctx context.Context) BambooRequestConsumer

type GoroutineBambooPubSubMap added in v0.0.2

type GoroutineBambooPubSubMap interface {
	CreateChannel(channelName string) chan []byte
	GetChannel(channelName string) (chan []byte, error)
	ClosePublishChannel(channelName string) error
	CloseSubscribeChannel(channelName string) error
}

func NewGoroutineBambooPubSubMap added in v0.0.2

func NewGoroutineBambooPubSubMap() GoroutineBambooPubSubMap

type LogConfigFunc

type LogConfigFunc func(ctx context.Context, headers map[string]string) context.Context

type MetricsEventHandler added in v0.0.2

type MetricsEventHandler interface {
	OnReceiveRequest()
	OnSuccessJob()
	OnInternalErrorJob()
	OnInvalidArgumentJob()
	OnIncrNumRunningWorkers()
	OnDecrNumRunningWorkers()
}

func NewEmptyEventHandler added in v0.0.2

func NewEmptyEventHandler() MetricsEventHandler

func NewPrometheusEventHandler added in v0.0.2

func NewPrometheusEventHandler() MetricsEventHandler

type SubscribeFunc added in v0.0.2

type SubscribeFunc func(ctx context.Context) (*pb.WorkerResponse, error)

type WorkerFunc

type WorkerFunc func(ctx context.Context, headers map[string]string, data []byte, aborted <-chan interface{}) ([]byte, error)

type WorkerJob added in v0.0.2

type WorkerJob interface {
	Run(ctx context.Context) error
}

func NewWorkerJob added in v0.0.2

func NewWorkerJob(ctx context.Context, carrier propagation.MapCarrier, workerFunc WorkerFunc, headers map[string]string, parameter []byte, resultPublisher BambooResultPublisher, resultChannel string, done chan<- interface{}, aborted <-chan interface{}, logConfigFunc LogConfigFunc, metricsEventHandler MetricsEventHandler) WorkerJob

Directories

Path Synopsis
example
calc-app Module
goroutine-app Module
workflow-app Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL