Documentation ¶
Overview ¶
Package fileio provides transforms for matching and reading files.
Index ¶
- func MatchAll(s beam.Scope, col beam.PCollection, opts ...MatchOptionFn) beam.PCollection
- func MatchFiles(s beam.Scope, glob string, opts ...MatchOptionFn) beam.PCollection
- func ReadMatches(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam.PCollection
- type FileMetadata
- type MatchOptionFn
- type ReadOptionFn
- type ReadableFile
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func MatchAll ¶
func MatchAll(s beam.Scope, col beam.PCollection, opts ...MatchOptionFn) beam.PCollection
MatchAll finds all files matching the glob patterns given by the incoming PCollection<string> and returns a PCollection<FileMetadata> of the matching files. MatchAll accepts a variadic number of MatchOptionFn that can be used to configure the treatment of empty matches. By default, empty matches are allowed if the pattern contains a wildcard.
Example ¶
package main import ( "context" "log" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" ) func main() { beam.Init() p, s := beam.NewPipelineWithRoot() globs := beam.Create(s, "gs://path/to/sub1/*.gz", "gs://path/to/sub2/*.gz") matches := fileio.MatchAll(s, globs) debug.Print(s, matches) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) } }
Output:
func MatchFiles ¶
func MatchFiles(s beam.Scope, glob string, opts ...MatchOptionFn) beam.PCollection
MatchFiles finds all files matching the glob pattern and returns a PCollection<FileMetadata> of the matching files. MatchFiles accepts a variadic number of MatchOptionFn that can be used to configure the treatment of empty matches. By default, empty matches are allowed if the pattern contains a wildcard.
Example ¶
package main import ( "context" "log" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" ) func main() { beam.Init() p, s := beam.NewPipelineWithRoot() matches := fileio.MatchFiles(s, "gs://path/to/*.gz") debug.Print(s, matches) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) } }
Output:
func ReadMatches ¶
func ReadMatches(s beam.Scope, col beam.PCollection, opts ...ReadOptionFn) beam.PCollection
ReadMatches accepts the result of MatchFiles or MatchAll as a PCollection<FileMetadata> and converts it to a PCollection<ReadableFile>. The ReadableFile can be used to retrieve file metadata, open the file for reading or read the entire file into memory. ReadMatches accepts a variadic number of ReadOptionFn that can be used to configure the compression type of the files and treatment of directories. By default, the compression type is determined by the file extension and directories are skipped.
Example ¶
package main import ( "context" "log" "github.com/apache/beam/sdks/v2/go/pkg/beam" "github.com/apache/beam/sdks/v2/go/pkg/beam/io/fileio" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/beamx" "github.com/apache/beam/sdks/v2/go/pkg/beam/x/debug" ) func main() { beam.Init() p, s := beam.NewPipelineWithRoot() pairFn := func(ctx context.Context, file fileio.ReadableFile, emit func(string, string)) error { contents, err := file.ReadString(ctx) if err != nil { return err } emit(file.Metadata.Path, contents) return nil } matches := fileio.MatchFiles(s, "gs://path/to/*.gz") files := fileio.ReadMatches(s, matches) pairs := beam.ParDo(s, pairFn, files) debug.Print(s, pairs) if err := beamx.Run(context.Background(), p); err != nil { log.Fatalf("Failed to execute job: %v", err) } }
Output:
Types ¶
type FileMetadata ¶
FileMetadata contains metadata about a file, namely its path and size in bytes.
type MatchOptionFn ¶
type MatchOptionFn func(*matchOption)
MatchOptionFn is a function that can be passed to MatchFiles or MatchAll to configure options for matching files.
func MatchEmptyAllow ¶
func MatchEmptyAllow() MatchOptionFn
MatchEmptyAllow specifies that empty matches are allowed.
func MatchEmptyAllowIfWildcard ¶
func MatchEmptyAllowIfWildcard() MatchOptionFn
MatchEmptyAllowIfWildcard specifies that empty matches are allowed if the pattern contains a wildcard.
func MatchEmptyDisallow ¶
func MatchEmptyDisallow() MatchOptionFn
MatchEmptyDisallow specifies that empty matches are not allowed.
type ReadOptionFn ¶
type ReadOptionFn func(*readOption)
ReadOptionFn is a function that can be passed to ReadMatches to configure options for reading files.
func ReadAutoCompression ¶
func ReadAutoCompression() ReadOptionFn
ReadAutoCompression specifies that the compression type of files should be auto-detected.
func ReadDirectoryDisallow ¶
func ReadDirectoryDisallow() ReadOptionFn
ReadDirectoryDisallow specifies that directories are not allowed.
func ReadDirectorySkip ¶
func ReadDirectorySkip() ReadOptionFn
ReadDirectorySkip specifies that directories are skipped.
func ReadGzip ¶
func ReadGzip() ReadOptionFn
ReadGzip specifies that files have been compressed using gzip.
func ReadUncompressed ¶
func ReadUncompressed() ReadOptionFn
ReadUncompressed specifies that files have not been compressed.
type ReadableFile ¶
type ReadableFile struct { Metadata FileMetadata Compression compressionType }
ReadableFile is a wrapper around a FileMetadata and compressionType that can be used to obtain a file descriptor or read the file's contents.
func (ReadableFile) Open ¶
func (f ReadableFile) Open(ctx context.Context) (io.ReadCloser, error)
Open opens the file for reading. The compression type is determined by the Compression field of the ReadableFile. If Compression is compressionAuto, the compression type is auto-detected from the file extension. It is the caller's responsibility to close the returned reader.
func (ReadableFile) Read ¶
func (f ReadableFile) Read(ctx context.Context) (data []byte, err error)
Read reads the entire file into memory and returns the contents.
func (ReadableFile) ReadString ¶
func (f ReadableFile) ReadString(ctx context.Context) (string, error)
ReadString reads the entire file into memory and returns the contents as a string.