timequeue

package module
v0.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Dec 2, 2024 License: MIT Imports: 3 Imported by: 0

README

CI Go Report Card Go Reference

timequeue provides a TimeQueue type that releases arbitrary messages at given time.Times.

Forked from the long-dormant github.com/gogolfing/timequeue.

Documentation

Index

Examples

Constants

View Source
const (
	DefaultCapacity = 0
)

Variables

This section is empty.

Functions

This section is empty.

Types

type Message

type Message struct {
	// contains filtered or unexported fields
}

Message is a container type that associates a Time with some arbitrary data. A Message is "released" from a TimeQueue as close to Time At as possible.

Message zero values are not in a valid state. You should use NewMessage to create Message instances.

func NewMessage added in v0.2.0

func NewMessage(at time.Time, data interface{}) *Message

NewMessage returns a Message with at and data set on their corresponding fields.

You should use this function to create Messages instead of using a struct initializer.

func (*Message) At added in v0.2.0

func (m *Message) At() time.Time

At returns the Time at which m is scheduled to be released.

func (*Message) Data

func (m *Message) Data() interface{}

Data returns the data associated with m.

This will usually be used after receiving a Message from a TimeQueue in order to process the Message appropriately.

func (Message) Len added in v0.2.0

func (mh Message) Len() int

Len is the heap.Interface implementation. It returns len(mh).

func (Message) Less added in v0.2.0

func (mh Message) Less(i, j int) bool

Less is the heap.Interface implementation.

func (Message) Pop added in v0.2.0

func (mh Message) Pop() interface{}

Pop is the heap.Interface implementation.

func (Message) Push added in v0.2.0

func (mh Message) Push(x interface{})

Push is the heap.Interface implementation.

func (Message) Swap added in v0.2.0

func (mh Message) Swap(i, j int)

Swap is the heap.Interface implementation.

type TimeQueue

type TimeQueue struct {
	// contains filtered or unexported fields
}
Example
package main

import (
	"context"
	"fmt"
	"time"

	"github.com/aqua/timequeue"
)

func main() {
	now := time.Now()
	tq := timequeue.NewTimeQueue()

	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
	defer cancel()

	stopped := make(chan struct{})
	go func() {
		defer close(stopped)

		<-ctx.Done()

		tq.Stop()
	}()

	doneProducing := make(chan struct{})
	go func() {
		defer close(doneProducing)

		const count = 10

		toPush := make([]*timequeue.Message, count)
		for i := 0; i < count; i++ {
			m := timequeue.NewMessage(now.Add(time.Duration(i)), i+1)
			toPush[i] = m
		}

		tq.PushAll(toPush...)
	}()

	doneConsuming := make(chan struct{})
	go func() {
		defer close(doneConsuming)

		for {
			select {
			case <-stopped:
				return

			case m := <-tq.Messages():
				fmt.Println(m.Data().(int))
			}
		}
	}()

	<-doneProducing
	<-stopped
	<-doneConsuming

}
Output:

1
2
3
4
5
6
7
8
9
10

func NewTimeQueue added in v0.2.0

func NewTimeQueue() *TimeQueue

func NewTimeQueueCapacity added in v0.2.0

func NewTimeQueueCapacity(c int) *TimeQueue

func (*TimeQueue) Drain added in v0.2.0

func (tq *TimeQueue) Drain() []Message

func (*TimeQueue) Messages

func (tq *TimeQueue) Messages() <-chan Message

func (*TimeQueue) Push

func (tq *TimeQueue) Push(at time.Time, data interface{}) *Message

func (*TimeQueue) PushAll added in v0.2.0

func (tq *TimeQueue) PushAll(messages ...*Message)

func (*TimeQueue) Remove

func (tq *TimeQueue) Remove(m *Message) bool

func (*TimeQueue) Start

func (tq *TimeQueue) Start() bool

func (*TimeQueue) Stop

func (tq *TimeQueue) Stop() bool

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL