Documentation ¶
Index ¶
- Variables
- func NewOrderCodec() goka.Codec
- func NewWaitCodec() goka.Codec
- type Config
- func (c *Config) WithMxExecuteRoundTrips(h Observe) *Config
- func (c *Config) WithMxExecutionDropped(cnt Count) *Config
- func (c *Config) WithMxExecutionTimeDeviation(h Observe) *Config
- func (c *Config) WithMxPlaceOrderLag(o Observe) *Config
- func (c *Config) WithMxRescheduled(cnt CountForType) *Config
- func (c *Config) WithMxThrottleDuplicate(cnt Count) *Config
- func (c *Config) WithMxThrottleFirstRescheduled(cnt Count) *Config
- func (c *Config) WithMxZombieEvicted(cnt Count) *Config
- func (c *Config) WithOrderCatchupTimeout(timeout time.Duration) *Config
- func (c *Config) WithOrderZombieTTL(ttl time.Duration) *Config
- type Count
- type CountForType
- type Emitter
- type EmitterCreator
- type Observe
- type Order
- func (*Order) Descriptor() ([]byte, []int)deprecated
- func (x *Order) GetDelayMs() int64
- func (x *Order) GetExecutionTime() *timestamp.Timestamp
- func (x *Order) GetNoCatchup() bool
- func (x *Order) GetOrderType() OrderType
- func (x *Order) GetPayload() *Order_Payload
- func (*Order) ProtoMessage()
- func (x *Order) ProtoReflect() protoreflect.Message
- func (x *Order) Reset()
- func (x *Order) String() string
- type OrderClient
- type OrderCodec
- type OrderType
- type Order_Payload
- func (*Order_Payload) Descriptor() ([]byte, []int)deprecated
- func (x *Order_Payload) GetKey() []byte
- func (x *Order_Payload) GetMessage() []byte
- func (x *Order_Payload) GetTopic() string
- func (*Order_Payload) ProtoMessage()
- func (x *Order_Payload) ProtoReflect() protoreflect.Message
- func (x *Order_Payload) Reset()
- func (x *Order_Payload) String() string
- type Scheduler
- type Wait
- func (*Wait) Descriptor() ([]byte, []int)deprecated
- func (x *Wait) GetEnterQueueTime() *timestamp.Timestamp
- func (x *Wait) GetExecutionTime() *timestamp.Timestamp
- func (x *Wait) GetIterations() int32
- func (w *Wait) Inc() *Wait
- func (*Wait) ProtoMessage()
- func (x *Wait) ProtoReflect() protoreflect.Message
- func (x *Wait) Reset()
- func (x *Wait) String() string
- type WaitCodec
Examples ¶
Constants ¶
This section is empty.
Variables ¶
var ( OrderType_name = map[int32]string{ 0: "Invalid", 1: "Delay", 2: "ThrottleFirst", 3: "ThrottleFirstReschedule", } OrderType_value = map[string]int32{ "Invalid": 0, "Delay": 1, "ThrottleFirst": 2, "ThrottleFirstReschedule": 3, } )
Enum value maps for OrderType.
var File_order_proto protoreflect.FileDescriptor
Functions ¶
func NewOrderCodec ¶
NewOrderCodec creates a new codec for encoding/decoding a Order
func NewWaitCodec ¶
NewWaitCodec creates a new codec for encoding/decoding a Wait
Types ¶
type Config ¶
type Config struct {
// contains filtered or unexported fields
}
Config configures the running scheduler
func (*Config) WithMxExecuteRoundTrips ¶
WithMxExecuteRoundTrips sets an observer for measuring the number of round trips an order has done before finally being executed. This will usually be a histogram or summary
func (*Config) WithMxExecutionDropped ¶
WithMxExecutionDropped adds a counter to track dropped executions due to retention or rescheduling issues.
func (*Config) WithMxExecutionTimeDeviation ¶
WithMxExecutionTimeDeviation sets an observer for measuring the seconds of deviation between planned execution and actual execution. This will usually be a histogram or summary. Times can also be negative in case there are no waiters small enough for the last iteration, which means the order will be executed before its actual deadline.
func (*Config) WithMxPlaceOrderLag ¶
WithMxPlaceOrderLag sets an observer for measuring the lag in seconds for order placement
func (*Config) WithMxRescheduled ¶
func (c *Config) WithMxRescheduled(cnt CountForType) *Config
WithMxRescheduled sets a counter for measuring the number of reschedules in total
func (*Config) WithMxThrottleDuplicate ¶
WithMxThrottleDuplicate sets a counter for measuring the number of duplicates/throttles for a throttling order
func (*Config) WithMxThrottleFirstRescheduled ¶
WithMxThrottleFirstRescheduled sets a counter for measuring the number of reschedules for orders configured with ThrottleFirstReschedule
func (*Config) WithMxZombieEvicted ¶
WithMxZombieEvicted sets a counter for measuring the number of zombie orders being evicted.
func (*Config) WithOrderCatchupTimeout ¶
WithOrderCatchupTimeout sets a counter for measuring the number of catchups after restarting the scheduler with existing delay-orders
type CountForType ¶
CountForType allows to count events belonging to a specific type
type Emitter ¶
Emitter is a generic emitter, but probably a *goka.Emitter Note that the emitter must be closed (by calling Finish) by the creator, the scheduler is not capable of closing the emitters, because it does not have an internal state.
type EmitterCreator ¶
EmitterCreator is a callback that will be used by the scheduler to create emitters on the fly while receiving orders.
type Observe ¶
type Observe func(float64)
Observe metric allows to observe multiple values, e.g. a histogram or a summary
type Order ¶
type Order struct { Payload *Order_Payload `protobuf:"bytes,1,opt,name=payload,proto3" json:"payload,omitempty"` OrderType OrderType `` /* 143-byte string literal not displayed */ DelayMs int64 `protobuf:"varint,3,opt,name=delay_ms,json=delayMs,proto3" json:"delay_ms,omitempty"` // marks the next or last execution time of the order. ExecutionTime *timestamp.Timestamp `protobuf:"bytes,4,opt,name=execution_time,json=executionTime,proto3" json:"execution_time,omitempty"` NoCatchup bool `protobuf:"varint,5,opt,name=no_catchup,json=noCatchup,proto3" json:"no_catchup,omitempty"` // contains filtered or unexported fields }
func CreateDelayedOrderWithPrefix ¶
func CreateDelayedOrderWithPrefix(targetKey []byte, targetTopic string, variant string, message []byte, orderType OrderType, delay time.Duration) (string, *Order, error)
CreateDelayedOrderWithPrefix allows to create an order to be sent to the scheduler. The order's destination time is defined by a delay, so there will be an implicit catchup.
func (*Order) Descriptor
deprecated
func (*Order) GetDelayMs ¶
func (*Order) GetExecutionTime ¶
func (*Order) GetNoCatchup ¶
func (*Order) GetOrderType ¶
func (*Order) GetPayload ¶
func (x *Order) GetPayload() *Order_Payload
func (*Order) ProtoMessage ¶
func (*Order) ProtoMessage()
func (*Order) ProtoReflect ¶
func (x *Order) ProtoReflect() protoreflect.Message
type OrderClient ¶
type OrderClient interface { // NewOrder creates a new order NewOrder(targetKey []byte, targetTopic string, variant string, message []byte, orderType OrderType, delay time.Duration) (string, *Order, error) // OrderEdge returns the group graph edge that can be used to client procesors // to be able to place orders into the scheduler. Like this: // goka.NewProcessor(..., goka.DefineGroup(..., om.OrderEdge())) OrderEdge() goka.Edge // OrderTopic returns the topic to be used for the emitting orders to the scheduler. OrderTopic() goka.Stream }
OrderClient helps communicating with a scheduler to place orders
func NewOrderClient ¶
func NewOrderClient(topicPrefix string) OrderClient
type OrderCodec ¶
type OrderCodec struct{}
OrderCodec allows to marshal and unmarshal items of type Order
func (*OrderCodec) Decode ¶
func (c *OrderCodec) Decode(data []byte) (interface{}, error)
Decode provides unmarshals a Order json into the struct.
func (*OrderCodec) Encode ¶
func (c *OrderCodec) Encode(value interface{}) ([]byte, error)
Encode marshals a Order
type OrderType ¶
type OrderType int32
const ( // invalid to avoid default type misunderstandings OrderType_Invalid OrderType = 0 // simple delay OrderType_Delay OrderType = 1 // delay ignoring intermediate orders // Send the first order's value OrderType_ThrottleFirst OrderType = 2 // delay taking the shortest delay while waiting for the event. // delays in future will be ignored and if the wait gets rescheduled, // the later executions will get dropped OrderType_ThrottleFirstReschedule OrderType = 3 )
func (OrderType) Descriptor ¶
func (OrderType) Descriptor() protoreflect.EnumDescriptor
func (OrderType) EnumDescriptor
deprecated
func (OrderType) Number ¶
func (x OrderType) Number() protoreflect.EnumNumber
func (OrderType) Type ¶
func (OrderType) Type() protoreflect.EnumType
type Order_Payload ¶
type Order_Payload struct { Topic string `protobuf:"bytes,1,opt,name=topic,proto3" json:"topic,omitempty"` Key []byte `protobuf:"bytes,2,opt,name=key,proto3" json:"key,omitempty"` Message []byte `protobuf:"bytes,3,opt,name=message,proto3" json:"message,omitempty"` // contains filtered or unexported fields }
func (*Order_Payload) Descriptor
deprecated
func (*Order_Payload) Descriptor() ([]byte, []int)
Deprecated: Use Order_Payload.ProtoReflect.Descriptor instead.
func (*Order_Payload) GetKey ¶
func (x *Order_Payload) GetKey() []byte
func (*Order_Payload) GetMessage ¶
func (x *Order_Payload) GetMessage() []byte
func (*Order_Payload) GetTopic ¶
func (x *Order_Payload) GetTopic() string
func (*Order_Payload) ProtoMessage ¶
func (*Order_Payload) ProtoMessage()
func (*Order_Payload) ProtoReflect ¶
func (x *Order_Payload) ProtoReflect() protoreflect.Message
func (*Order_Payload) Reset ¶
func (x *Order_Payload) Reset()
func (*Order_Payload) String ¶
func (x *Order_Payload) String() string
type Scheduler ¶
type Scheduler struct {
// contains filtered or unexported fields
}
Scheduler implements a scheduler
Example ¶
// create new scheduler config cfg := NewConfig() // e.g. replace selected metrics to be exported cfg.WithMxZombieEvicted(func(value float64) { // handle metric }) sched := CreateScheduler(cfg, []time.Duration{ 1 * time.Hour, 30 * time.Minute, 1 * time.Minute, 10 * time.Second, 3 * time.Second, }, func(topic goka.Stream, codec goka.Codec) (Emitter, error) { return goka.NewEmitter([]string{"localhost:9092"}, topic, codec) }, "my-scheduler", // some prefix if we have multiple schedulers running in the same kafka cluster ) errg, ctx := multierr.NewErrGroup(context.Background()) for _, graph := range sched.CreateGraphs() { proc, err := goka.NewProcessor([]string{"localhost:9092"}, graph) if err != nil { log.Fatalf("Error creating processor for graph %#v: %v", graph, err) } errg.Go(func() error { return proc.Run(ctx) }) } if err := errg.Wait().ErrorOrNil(); err != nil { log.Printf("Error running scheduler: %v", err) } if err := sched.Close(); err != nil { log.Printf("Error closing scheduler: %v", err) }
Output:
func CreateScheduler ¶
func CreateScheduler(config *Config, waitIntervals []time.Duration, creator EmitterCreator, topicPrefix string) *Scheduler
CreateScheduler creates a scheduler type that provides the group graphs for all included components. To be independent of the processor handling boilerplate code, creating the group graphs is sufficient waitIntervals is a slice of time.Duration, creating a wait-processor for each wait interval. Note that the duration will be truncated to milliseconds and the list deduplicated automatically.
func (*Scheduler) Close ¶
Close finishes all created emitters. Note that the processors are not stopped, because it is started by the client
func (*Scheduler) CreateGraphs ¶
func (s *Scheduler) CreateGraphs() []*goka.GroupGraph
CreateGraphs creates the group graphs for all components used for the scheduler: one for the scheduler itself (that does the deduplication and store the payload) one for each waiter, i.e. one for each interval.
type Wait ¶
type Wait struct { ExecutionTime *timestamp.Timestamp `protobuf:"bytes,1,opt,name=execution_time,json=executionTime,proto3" json:"execution_time,omitempty"` EnterQueueTime *timestamp.Timestamp `protobuf:"bytes,3,opt,name=enter_queue_time,json=enterQueueTime,proto3" json:"enter_queue_time,omitempty"` Iterations int32 `protobuf:"varint,2,opt,name=iterations,proto3" json:"iterations,omitempty"` // contains filtered or unexported fields }
func (*Wait) Descriptor
deprecated
func (*Wait) GetEnterQueueTime ¶
func (*Wait) GetExecutionTime ¶
func (*Wait) GetIterations ¶
func (*Wait) ProtoMessage ¶
func (*Wait) ProtoMessage()
func (*Wait) ProtoReflect ¶
func (x *Wait) ProtoReflect() protoreflect.Message