kafkagosaur

package
v0.0.0-...-ef469da Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2022 License: MIT Imports: 14 Imported by: 0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var NewDialerJsFunc = js.FuncOf(func(this js.Value, args []js.Value) interface{} {
	dialConfigJs := args[0]

	kafkaDialer := NewKafkaDialer(dialConfigJs)

	return (&dialer{
		underlying: kafkaDialer,
	}).toJSObject()

})
View Source
var NewReaderJsFunc = js.FuncOf(func(this js.Value, args []js.Value) interface{} {

	readerConfigJs := args[0]

	kafkaDialer := NewKafkaDialer(readerConfigJs)

	kafkaReaderConfig := kafka.ReaderConfig{
		Dialer: kafkaDialer,
	}

	if brokers := readerConfigJs.Get("brokers"); !brokers.IsUndefined() {
		kafkaReaderConfig.Brokers = interop.MapToString(interop.ToSlice(brokers))
	}

	if groupId := readerConfigJs.Get("groupId"); !groupId.IsUndefined() {
		kafkaReaderConfig.GroupID = groupId.String()
	}

	if partition := readerConfigJs.Get("partition"); !partition.IsUndefined() {
		kafkaReaderConfig.Partition = partition.Int()
	}

	if topic := readerConfigJs.Get("topic"); !topic.IsUndefined() {
		kafkaReaderConfig.Topic = topic.String()
	}

	if partition := readerConfigJs.Get("partition"); !partition.IsUndefined() {
		kafkaReaderConfig.Partition = partition.Int()
	}

	if queueCapacity := readerConfigJs.Get("queueCapacity"); !queueCapacity.IsUndefined() {
		kafkaReaderConfig.QueueCapacity = queueCapacity.Int()
	}

	if minBytes := readerConfigJs.Get("minBytes"); !minBytes.IsUndefined() {
		kafkaReaderConfig.MinBytes = minBytes.Int()
	}

	if maxBytes := readerConfigJs.Get("maxBytes"); !maxBytes.IsUndefined() {
		kafkaReaderConfig.MaxBytes = maxBytes.Int()
	}

	if maxWait := readerConfigJs.Get("maxWait"); !maxWait.IsUndefined() {
		kafkaReaderConfig.MaxWait = JsNumberMillisToDuration(maxWait)
	}

	if readLagInterval := readerConfigJs.Get("readLagInterval"); !readLagInterval.IsUndefined() {
		kafkaReaderConfig.ReadLagInterval = JsNumberMillisToDuration(readLagInterval)
	}

	if heartbeatInterval := readerConfigJs.Get("heartbeatInterval"); !heartbeatInterval.IsUndefined() {
		kafkaReaderConfig.HeartbeatInterval = JsNumberMillisToDuration(heartbeatInterval)
	}

	if commitInterval := readerConfigJs.Get("commitInterval"); !commitInterval.IsUndefined() {
		kafkaReaderConfig.CommitInterval = JsNumberMillisToDuration(commitInterval)
	}

	if partitionWatchInterval := readerConfigJs.Get("partitionWatchInterval"); !partitionWatchInterval.IsUndefined() {
		kafkaReaderConfig.PartitionWatchInterval = JsNumberMillisToDuration(partitionWatchInterval)
	}

	if watchPartitionChanges := readerConfigJs.Get("watchPartitionChanges"); !watchPartitionChanges.IsUndefined() {
		kafkaReaderConfig.WatchPartitionChanges = watchPartitionChanges.Bool()
	}

	if sessionTimeout := readerConfigJs.Get("sessionTimeout"); !sessionTimeout.IsUndefined() {
		kafkaReaderConfig.SessionTimeout = JsNumberMillisToDuration(sessionTimeout)
	}

	if rebalanceTimeout := readerConfigJs.Get("rebalanceTimeout"); !rebalanceTimeout.IsUndefined() {
		kafkaReaderConfig.RebalanceTimeout = JsNumberMillisToDuration(rebalanceTimeout)
	}

	if joinGroupBackoff := readerConfigJs.Get("joinGroupBackoff"); !joinGroupBackoff.IsUndefined() {
		kafkaReaderConfig.JoinGroupBackoff = JsNumberMillisToDuration(joinGroupBackoff)
	}

	if retentionTime := readerConfigJs.Get("retentionTime"); !retentionTime.IsUndefined() {
		kafkaReaderConfig.RetentionTime = JsNumberMillisToDuration(retentionTime)
	}

	if startOffset := readerConfigJs.Get("startOffset"); !startOffset.IsUndefined() {
		kafkaReaderConfig.StartOffset = int64(startOffset.Int())
	}

	if readBackoffMin := readerConfigJs.Get("readBackoffMin"); !readBackoffMin.IsUndefined() {
		kafkaReaderConfig.ReadBackoffMin = JsNumberMillisToDuration(readBackoffMin)
	}

	if readBackoffMax := readerConfigJs.Get("readBackoffMax"); !readBackoffMax.IsUndefined() {
		kafkaReaderConfig.ReadBackoffMax = JsNumberMillisToDuration(readBackoffMax)
	}

	if logger := readerConfigJs.Get("logger"); !logger.IsUndefined() && logger.Bool() {
		kafkaReaderConfig.Logger = log.Default()
	}

	if maxAttempts := readerConfigJs.Get("maxAttempts"); !maxAttempts.IsUndefined() {
		kafkaReaderConfig.MaxAttempts = maxAttempts.Int()
	}

	kafkaReader := kafka.NewReader(kafkaReaderConfig)

	return (&reader{
		underlying: kafkaReader,
	}).toJSObject()

})
View Source
var NewWriterJsFunc = js.FuncOf(func(this js.Value, args []js.Value) interface{} {
	writerConfig := args[0]

	saslMechanism, err := SASLMechanism(writerConfig)
	if err != nil {
		panic(err)
	}

	tls, err := TLSConfig(writerConfig)
	if err != nil {
		panic(err)
	}

	var dialBackend = interop.NodeDialBackend

	if dialBackendJs := writerConfig.Get("dialBackend"); !dialBackendJs.IsUndefined() {
		dialBackend = interop.StringToDialBackend(dialBackendJs.String())
	}

	transport := &kafka.Transport{
		Dial: interop.NewDenoConn(dialBackend),
		SASL: saslMechanism,
		TLS:  tls,
	}

	if dialTimeout := writerConfig.Get("dialTimeout"); !dialTimeout.IsUndefined() {
		transport.DialTimeout = JsNumberMillisToDuration(dialTimeout)
	}

	if jsIdleTimeout := writerConfig.Get("idleTimeout"); !jsIdleTimeout.IsUndefined() {
		transport.IdleTimeout = JsNumberMillisToDuration(jsIdleTimeout)
	}

	if metadataTTL := writerConfig.Get("metadataTTL"); !metadataTTL.IsUndefined() {
		transport.MetadataTTL = JsNumberMillisToDuration(metadataTTL)
	}

	if clientId := writerConfig.Get("clientId"); !clientId.IsUndefined() {
		transport.ClientID = clientId.String()
	}

	kafkaWriter := kafka.Writer{
		Addr:      kafka.TCP(writerConfig.Get("address").String()),
		Transport: transport,
	}

	if jsTopic := writerConfig.Get("topic"); !jsTopic.IsUndefined() {
		kafkaWriter.Topic = jsTopic.String()
	}

	if maxAttempts := writerConfig.Get("maxAttempts"); !maxAttempts.IsUndefined() {
		kafkaWriter.MaxAttempts = maxAttempts.Int()
	}

	if batchSize := writerConfig.Get("batchSize"); !batchSize.IsUndefined() {
		kafkaWriter.BatchSize = batchSize.Int()
	}

	if batchBytes := writerConfig.Get("batchBytes"); !batchBytes.IsUndefined() {
		kafkaWriter.BatchBytes = int64(batchBytes.Int())
	}

	if batchTimeout := writerConfig.Get("batchTimeout"); !batchTimeout.IsUndefined() {
		kafkaWriter.BatchTimeout = JsNumberMillisToDuration(batchTimeout)
	}

	if readTimeout := writerConfig.Get("readTimeout"); !readTimeout.IsUndefined() {
		kafkaWriter.ReadTimeout = JsNumberMillisToDuration(readTimeout)
	}

	if writeTimeout := writerConfig.Get("writeTimeout"); !writeTimeout.IsUndefined() {
		kafkaWriter.WriteTimeout = JsNumberMillisToDuration(writeTimeout)
	}

	if async := writerConfig.Get("async"); !async.IsUndefined() {
		kafkaWriter.Async = async.Bool()
	}

	if logger := writerConfig.Get("logger"); !logger.IsUndefined() && logger.Bool() {
		kafkaWriter.Logger = log.Default()
	}

	return (&writer{
		underlying: &kafkaWriter,
		transport:  transport,
	}).toJSObject()

})

Functions

func JsNumberMillisToDuration

func JsNumberMillisToDuration(value js.Value) time.Duration

func NewKafkaDialer

func NewKafkaDialer(dialConfigJs js.Value) *kafka.Dialer

func SASLMechanism

func SASLMechanism(config js.Value) (sasl.Mechanism, error)

func TLSConfig

func TLSConfig(config js.Value) (*tls.Config, error)

Types

This section is empty.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL