gosd

package module
v1.0.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Nov 16, 2020 License: MIT Imports: 5 Imported by: 0

README

gosd

Mentioned in Awesome Go Build Status Go Report Card codecov

go-schedulable-dispatcher (gosd), is a library for scheduling when to dispatch a message to a channel.

Implementation

The implementation provides an ease-of-use API with both an ingress (ingest) channel and egress (dispatch) channel. Messages are ingested and processed into a heap based priority queue for dispatching. At most two separate goroutines are used, one for processing of messages from both the ingest channel and heap then the other as a timer. Order is not guaranteed by default when messages have the same scheduled time, but can be changed through the config. By guaranteeing order, performance will be slightly worse. If strict-ordering isn't critical to your application, it's recommended to keep the default setting.

Example

// create instance of dispatcher
dispatcher, err := gosd.NewDispatcher(&gosd.DispatcherConfig{
    IngressChannelSize:  100,
    DispatchChannelSize: 100,
    MaxMessages:         100,
    GuaranteeOrder:      false,
})
checkErr(err)

// spawn process
go dispatcher.Start()

// schedule a message
dispatcher.IngressChannel() <- &gosd.ScheduledMessage{
    At:      time.Now().Add(1 * time.Second),
    Message: "Hello World in 1 second!",
}

// wait for the message
msg := <-dispatcher.DispatchChannel()

// type assert
msgStr := msg.(string)
fmt.Println(msgStr)
// Hello World in 1 second!

// shutdown without deadline
dispatcher.Shutdown(context.Background(), false)

More examples under examples.

Benchmarking

Tested with Intel Core i7-8700K CPU @ 3.70GHz, DDR4 RAM and 1000 messages per iteration.

Benchmark_integration_unordered-12               	     142	   8654906 ns/op
Benchmark_integration_unorderedSmallBuffer-12    	     147	   9503403 ns/op
Benchmark_integration_unorderedSmallHeap-12      	     122	   8860732 ns/op
Benchmark_integration_ordered-12                 	      96	  13354174 ns/op
Benchmark_integration_orderedSmallBuffer-12      	     121	  10115702 ns/op
Benchmark_integration_orderedSmallHeap-12        	     129	  10441857 ns/op
Benchmark_integration_orderedSameTime-12        	     99	   	  12575961 ns/op

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

This section is empty.

Types

type Dispatcher

type Dispatcher struct {
	// contains filtered or unexported fields
}

Dispatcher processes the ingress and dispatching of scheduled messages.

func NewDispatcher

func NewDispatcher(config *DispatcherConfig) (*Dispatcher, error)

NewDispatcher creates a new instance of a Dispatcher.

func (*Dispatcher) DispatchChannel

func (d *Dispatcher) DispatchChannel() <-chan interface{}

DispatchChannel returns a receive-only channel of type `interface{}`.

func (*Dispatcher) IngressChannel

func (d *Dispatcher) IngressChannel() chan<- *ScheduledMessage

IngressChannel returns the send-only channel of type `ScheduledMessage`.

func (*Dispatcher) Pause

func (d *Dispatcher) Pause() error

Pause updates the state of the Dispatcher to stop processing messages and will close the main process loop.

func (*Dispatcher) Resume

func (d *Dispatcher) Resume() error

Resume updates the state of the Dispatcher to start processing messages and starts the timer for the last message being processed and blocks.

func (*Dispatcher) Shutdown

func (d *Dispatcher) Shutdown(ctx context.Context, drainImmediately bool) error

Shutdown will attempt to shutdown the Dispatcher within the context deadline, otherwise terminating the process ungracefully.

If drainImmediately is true, then all messages will be dispatched immediately regardless of the schedule set. Order can be lost if new messages are still being ingested.

func (*Dispatcher) Start

func (d *Dispatcher) Start() error

Start initializes the processing of scheduled messages and blocks.

type DispatcherConfig

type DispatcherConfig struct {
	IngressChannelSize  int
	DispatchChannelSize int
	MaxMessages         int
	GuaranteeOrder      bool
}

DispatcherConfig config for creating an instance of a Dispatcher

type ScheduledMessage

type ScheduledMessage struct {
	At      time.Time
	Message interface{}
}

ScheduledMessage is a message to schedule with the Dispatchers ingest channel `At` is when the message will dispatched `Message` is the content of the message

Directories

Path Synopsis
examples

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL