pubsubio

package module
v0.0.0-...-57b8937 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 18, 2024 License: BSD-3-Clause Imports: 4 Imported by: 0

README

arqbeam-pubsubio

An Apache Beam sink for arqbeam-app.

This implementation uses go/pubsub google sdk to publish messages in Pubsub topic.

TL;DR

package main

import (
	"context"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/textio"
	"github.com/arquivei/beamapp"

	pubsubio "github.com/arquivei/arqbeam-pubsubio"
	errorpubsubio "github.com/arquivei/arqbeam-pubsubio/error"
)

var (
	pipeline *beam.Pipeline
	version  = "dev"
)

var config struct {
	beamapp.Config

	GCSInputFile string
	Pubsub       struct {
		Project   string
		Topic     string
		BatchSize int
	}
}

func main() {
	beamapp.Bootstrap(version, &config)
	pipeline = getPipeline(context.Background())
	beamapp.Run(pipeline)
}

func getPipeline(_ context.Context) *beam.Pipeline {
	if pipeline != nil {
		return pipeline
	}

	pipeline := beam.NewPipeline()
	s := pipeline.Root()

	// Read some files with textio default from apache beam go sdk
	readRows := textio.Read(s, config.GCSInputFile)

	// Send each line to pubsub with pubsubio from arqbeam-pubsubio
	pbResult := pubsubio.Publish(s, config.Pubsub.Project, config.Pubsub.Topic, config.Pubsub.BatchSize, readRows)

	// Log if any error happened in publish step
	errorpubsubio.LogHandler(s, config.Pubsub.BatchSize, pbResult)

	return pipeline
}

Comments, discussions, issues and pull-requests are welcomed.

Documentation

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func Publish

func Publish(s beam.Scope, project string, topic string, batchSize int, targetData beam.PCollection) beam.PCollection

Types

type Publisher

type Publisher struct {
	Project   string
	Topic     string
	BatchSize int
	// contains filtered or unexported fields
}

func (*Publisher) ProcessElement

func (fn *Publisher) ProcessElement(ctx context.Context, value []byte) *pubsub.PublishResult

func (*Publisher) Setup

func (fn *Publisher) Setup(ctx context.Context)

type PubsubResultError

type PubsubResultError struct {
	MessageID    string
	ErrorMessage string
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL