mongodbio

package
v2.55.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Apr 2, 2024 License: Apache-2.0, BSD-3-Clause, MIT Imports: 15 Imported by: 2

Documentation

Overview

Package mongodbio contains transforms for reading from and writing to MongoDB.

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Read

func Read(
	s beam.Scope,
	uri string,
	database string,
	collection string,
	t reflect.Type,
	opts ...ReadOptionFn,
) beam.PCollection

Read reads a MongoDB collection and returns a PCollection<T> for a given type T. T must be a struct with exported fields that should have a "bson" tag. By default, the transform uses the MongoDB internal splitVector command to split the collection into bundles. The transform can be configured to use the $bucketAuto aggregation instead to support reading from MongoDB Atlas where the splitVector command is not allowed. This is enabled by passing the ReadOptionFn WithReadBucketAuto(true).

The Read transform has the required parameters:

  • s: the scope of the pipeline
  • uri: the MongoDB connection string
  • database: the MongoDB database to read from
  • collection: the MongoDB collection to read from
  • t: the type of the elements in the collection

The Read transform takes a variadic number of ReadOptionFn which can set the ReadOption fields:

  • BucketAuto: whether to use the bucketAuto aggregation to split the collection into bundles. Defaults to false
  • Filter: a bson.M map that is used to filter the documents in the collection. Defaults to nil, which means no filter is applied
  • BundleSize: the size in bytes to bundle the documents into when reading. Defaults to 64 * 1024 * 1024 (64 MB)
Example (Default)
package main

import (
	"context"
	"log"
	"reflect"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/mongodbio"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
	"go.mongodb.org/mongo-driver/bson/primitive"
)

func main() {
	type Event struct {
		ID        primitive.ObjectID `bson:"_id"`
		Timestamp int64              `bson:"timestamp"`
		EventType int32              `bson:"event_type"`
	}

	beam.Init()
	p, s := beam.NewPipelineWithRoot()

	col := mongodbio.Read(
		s,
		"mongodb://localhost:27017",
		"demo",
		"events",
		reflect.TypeOf(Event{}),
	)
	debug.Print(s, col)

	if err := beamx.Run(context.Background(), p); err != nil {
		log.Fatalf("Failed to execute job: %v", err)
	}
}
Output:

Example (Options)
package main

import (
	"context"
	"log"
	"reflect"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/mongodbio"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/bson/primitive"
)

func main() {
	type Event struct {
		ID        primitive.ObjectID `bson:"_id"`
		Timestamp int64              `bson:"timestamp"`
		EventType int32              `bson:"event_type"`
	}

	beam.Init()
	p, s := beam.NewPipelineWithRoot()

	col := mongodbio.Read(
		s,
		"mongodb://localhost:27017",
		"demo",
		"events",
		reflect.TypeOf(Event{}),
		mongodbio.WithReadBucketAuto(true),
		mongodbio.WithReadBundleSize(32*1024*1024),
		mongodbio.WithReadFilter(bson.M{"timestamp": bson.M{"$gt": 1640995200000}}),
	)
	debug.Print(s, col)

	if err := beamx.Run(context.Background(), p); err != nil {
		log.Fatalf("Failed to execute job: %v", err)
	}
}
Output:

func Write

func Write(
	s beam.Scope,
	uri string,
	database string,
	collection string,
	col beam.PCollection,
	opts ...WriteOptionFn,
) beam.PCollection

Write writes a PCollection<T> of a type T to MongoDB. T must be a struct with exported fields that should have a "bson" tag. If the struct has a field with the bson tag "_id", the value of that field will be used as the id of the document. Otherwise, a new id field of type primitive.ObjectID will be generated for each document. Write returns a PCollection<K> of the inserted id values with type K.

The Write transform has the required parameters:

  • s: the scope of the pipeline
  • uri: the MongoDB connection string
  • database: the MongoDB database to write to
  • collection: the MongoDB collection to write to
  • col: the PCollection to write to MongoDB

The Write transform takes a variadic number of WriteOptionFn which can set the WriteOption fields:

  • BatchSize: the number of documents to write in a single batch. Defaults to 1000
  • Ordered: whether to execute the writes in order. Defaults to true
Example (Default)
package main

import (
	"context"
	"log"
	"time"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/mongodbio"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
	"go.mongodb.org/mongo-driver/bson/primitive"
)

func main() {
	type Event struct {
		ID        primitive.ObjectID `bson:"_id"`
		Timestamp int64              `bson:"timestamp"`
		EventType int32              `bson:"event_type"`
	}

	beam.Init()
	p, s := beam.NewPipelineWithRoot()

	input := []Event{
		{
			ID:        primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200001)),
			Timestamp: 1640995200001,
			EventType: 1,
		},
		{
			ID:        primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200002)),
			Timestamp: 1640995200002,
			EventType: 2,
		},
	}

	col := beam.CreateList(s, input)
	mongodbio.Write(s, "mongodb://localhost:27017", "demo", "events", col)

	if err := beamx.Run(context.Background(), p); err != nil {
		log.Fatalf("Failed to execute job: %v", err)
	}
}
Output:

Example (GenerateID)
package main

import (
	"context"
	"log"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/mongodbio"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug"
)

func main() {
	type Event struct {
		Timestamp int64 `bson:"timestamp"`
		EventType int32 `bson:"event_type"`
	}

	beam.Init()
	p, s := beam.NewPipelineWithRoot()

	input := []Event{
		{
			Timestamp: 1640995200001,
			EventType: 1,
		},
		{
			Timestamp: 1640995200002,
			EventType: 1,
		},
	}

	col := beam.CreateList(s, input)
	ids := mongodbio.Write(s, "mongodb://localhost:27017", "demo", "events", col)
	debug.Print(s, ids)

	if err := beamx.Run(context.Background(), p); err != nil {
		log.Fatalf("Failed to execute job: %v", err)
	}
}
Output:

Example (Options)
package main

import (
	"context"
	"log"
	"time"

	"github.com/apache/beam/sdks/v2/go/pkg/beam"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/io/mongodbio"
	"github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx"
	"go.mongodb.org/mongo-driver/bson/primitive"
)

func main() {
	type Event struct {
		ID        primitive.ObjectID `bson:"_id"`
		Timestamp int64              `bson:"timestamp"`
		EventType int32              `bson:"event_type"`
	}

	beam.Init()
	p, s := beam.NewPipelineWithRoot()

	input := []Event{
		{
			ID:        primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200001)),
			Timestamp: 1640995200001,
			EventType: 1,
		},
		{
			ID:        primitive.NewObjectIDFromTimestamp(time.UnixMilli(1640995200002)),
			Timestamp: 1640995200002,
			EventType: 2,
		},
	}

	col := beam.CreateList(s, input)
	mongodbio.Write(
		s,
		"mongodb://localhost:27017",
		"demo",
		"events",
		col,
		mongodbio.WithWriteBatchSize(500),
		mongodbio.WithWriteOrdered(false),
	)

	if err := beamx.Run(context.Background(), p); err != nil {
		log.Fatalf("Failed to execute job: %v", err)
	}
}
Output:

Types

type ReadOption

type ReadOption struct {
	BucketAuto bool
	Filter     bson.M
	BundleSize int64
}

ReadOption represents options for reading from MongoDB.

type ReadOptionFn

type ReadOptionFn func(option *ReadOption) error

ReadOptionFn is a function that configures a ReadOption.

func WithReadBucketAuto

func WithReadBucketAuto(bucketAuto bool) ReadOptionFn

WithReadBucketAuto configures the ReadOption whether to use the bucketAuto aggregation stage.

func WithReadBundleSize

func WithReadBundleSize(bundleSize int64) ReadOptionFn

WithReadBundleSize configures the ReadOption to use the provided bundle size in bytes.

func WithReadFilter

func WithReadFilter(filter bson.M) ReadOptionFn

WithReadFilter configures the ReadOption to use the provided filter.

type WriteOption

type WriteOption struct {
	BatchSize int64
	Ordered   bool
}

WriteOption represents options for writing to MongoDB.

type WriteOptionFn

type WriteOptionFn func(option *WriteOption) error

WriteOptionFn is a function that configures a WriteOption.

func WithWriteBatchSize

func WithWriteBatchSize(batchSize int64) WriteOptionFn

WithWriteBatchSize configures the WriteOption to use the provided batch size when writing documents.

func WithWriteOrdered

func WithWriteOrdered(ordered bool) WriteOptionFn

WithWriteOrdered configures the WriteOption whether to apply an ordered bulk write.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL