kafka

package
v0.0.13 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 24, 2023 License: Apache-2.0 Imports: 22 Imported by: 0

Documentation

Overview

Copyright 2023 The acquirecloud Authors

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewInMem added in v0.0.5

func NewInMem(cfg QueueConfig) *queue

NewInMem creates the queue with the inmem clients. The object may be used for testing.

func NewKafkaKVS added in v0.0.8

func NewKafkaKVS(cfg QueueConfig, storage kvs.Storage) *queue

NewKafkaKVS returns the queue object connected to kafka and kvs.Storage by the configurations provided. Please be aware that the result object should be initialized and closed via Init() and Shutdown() functions calls respectively.

NOTE: the result object *queue supports JobController, Queue and TaskPublisher interfaces

func NewKafkaRedis

func NewKafkaRedis(cfg QueueConfig, redisOpts *redis.Options) *queue

NewKafkaRedis returns the queue object connected to kafka and redis by the configurations provided. Please be aware that the result object should be initialized and closed via Init() and Shutdown() functions calls respectively.

NOTE: the result object *queue supports both JobController, Queue and TaskPublisher interfaces

Types

type QueueConfig

type QueueConfig struct {
	// Brokers contains the list of Kafka brokers
	Brokers []string

	// GroupID is the name of consumer group which will be used to access to the kafka topics
	GroupID string

	// Topic the main Kafka topic where all initial tasks are placed
	Topic string

	// WaitTopic is the back-up topic, where tasks that are scheduled for a processing
	// are placed to have an ability to re-execute them in case of a retry is needed
	WaitTopic string

	// Timeout defines the timeout between the scheduling a job execution and the
	// job retry execution in case of the job is not done.
	Timeout time.Duration

	// DegradationTimeout defines the timeout how long a state of the job
	// maybe stored in the system. If a task will be re-tried with the period of
	// time longer that this timeout, the system cannot guarantee the once job execution
	// due to the lost of the job state. This case a task maybe processed more than once
	DegradationTimeout time.Duration

	// PublishOnly creates the queue for publishing tasks only. The queue object
	// created with the configuration will not support subscribing functionality
	PublishOnly bool
}

QueueConfig struct defines the configuration which will be used by the queue object to process tasks. The QueueConfig extends the PublisherConfig

func GetDefaultQueueConfig

func GetDefaultQueueConfig() QueueConfig

GetDefaultQueueConfig returns a default config for the Queue object

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL