s3io

package module
v3.1.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 18, 2025 License: MIT Imports: 21 Imported by: 1

README

S3IO

Go Reference Go Report Card

An abstraction layer on top of the s3 sdk to do io read/write opperations on s3 objects. The s3io reader and writer stream the objects from and to your s3 instance while being memory efficient.

// Note the "WithBucket..." are options
bucket, err := s3io.Open(ctx, "my-bucket-name", s3io.WithBucketCredentials(accessKey, secretKey))
if err != nil {
  return err
}

// Note the "WithWriter..." are options specifically for this writer session
writer := bucket.Put(ctx, "path/to/object.txt", s3io.WithWriterRetries(3))
defer writer.Close() // makes sure your upload won't keep hanging

if _, err := io.WriteString(writer, "Hello world!"); err != nil {
  return err
}

if err := writer.Close(); err != nil {
  return err 
}

reader := bucket.Get(ctx, "path/to/object.txt")

_, err := io.Copy(os.Stdout, reader)
...

Documentation

Overview

Package s3io is an abstraction layer on the s3 sdk.

The s3io package provides a bucket that can interact with all elements within the bucket. And can be configured with the "WithBucket..." options

bucket, err := s3io.Open(ctx, "my-bucket-name", s3io.WithBucketCredentials("access-key", "secret-key"))

There is an Reader to preform read opperations on an s3 object.

rd := bucket.Get(ctx, "path/to/object.txt", s3io.WithReaderConcurrency(10))

_, err := io.Copy(os.Stdout, rd)

And there is an Writer to preform write opperations on an s3 object. Note The writer MUST close to safe the object.

wr := bucket.Put(ctx, "path/to/object.txt")
defer wr.Close()

_, err := io.WriteString(wr, "Hello world!")
if err != nil {
  return err
}

if err := wr.Close(); err != nil {
  return err
}

The s3io reader and writer stream the objects from and to your s3 instance while being memory efficient.

Index

Examples

Constants

View Source
const (
	MinChunkSize     int64 = 1024 * 1024 * 5
	DefaultChunkSize int64 = MinChunkSize
)

Variables

This section is empty.

Functions

func NewReader

func NewReader(ctx context.Context, s3 DownloadAPIClient, input *s3.GetObjectInput, opts ...ReaderOption) io.Reader

NewReader returns a new ObjectReader to do io.Reader opperations on your s3 object

func NewWriter

func NewWriter(ctx context.Context, s3 UploadAPIClient, input *s3.PutObjectInput, opts ...WriterOption) io.WriteCloser

NewWriter returns a new ObjectWriter to do io.Writer opperations on your s3 object

Types

type Bucket

func New

func New(
	name string,
	readChunkSize, writeChunkSize int64,
	concurrency int,
	logger *slog.Logger,
	cli BucketApiClient,
) Bucket

New returns a new bucket instance. For normal operations use Open instead as this will connect and verify the bucket.

func Open

func Open(ctx context.Context, name string, opts ...BucketOption) (Bucket, error)

Open returns a bucket to interact with.

func OpenURL

func OpenURL(ctx context.Context, u string, opts ...BucketOption) (Bucket, error)

OpenURL opens the bucket with all the connection options in the url. The url is written as: s3://access-key:access-secret@host/bucketname?region=us-east&pathstyle=true

The url assumes the host has a https protocol unless the "insecure" query param is set to "true". To create the bucket if it doesn't exist set the "create" query param to "true". To use the pathstyle url set "pathstyle" to "true".

type BucketApiClient

type BucketApiClient interface {
	DownloadAPIClient
	UploadAPIClient
	s3.HeadObjectAPIClient
	s3.ListObjectsV2APIClient
	DeleteObject(ctx context.Context, input *s3.DeleteObjectInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectOutput, error)
	DeleteObjects(ctx context.Context, input *s3.DeleteObjectsInput, optFns ...func(*s3.Options)) (*s3.DeleteObjectsOutput, error)
	DeleteBucket(ctx context.Context, input *s3.DeleteBucketInput, optFns ...func(*s3.Options)) (*s3.DeleteBucketOutput, error)
}

type BucketOption

type BucketOption func(*bucketBuilder)

func BucketOptions

func BucketOptions(opts ...BucketOption) BucketOption

BucketOptions bundles bucket options

func WithBucketCli

func WithBucketCli(cli *s3.Client) BucketOption

WithCli directly sets the s3 client for the bucket.

func WithBucketCliLoaderOptions

func WithBucketCliLoaderOptions(opts ...func(*config.LoadOptions) error) BucketOption

WithBucketCliLoaderOptions sets the config.LoaderOptions for the aws config. Only works if the cli is not already provided.

func WithBucketConcurrency

func WithBucketConcurrency(size int) BucketOption

WithBucketConcurrency sets the default read/write concurrency

func WithBucketCreateIfNotExists

func WithBucketCreateIfNotExists() BucketOption

WithBucketCreateIfNotExitsts will create the bucket if it doesn't already exist.

func WithBucketCredentials

func WithBucketCredentials(accessKey, secretKey string) BucketOption

WithBucketCredentials sets the access key and secret key for the cli. Only works if the cli is not already provided.

func WithBucketHost

func WithBucketHost(url, region string, usePathStyle bool) BucketOption

WithBucketHost sets the endpoint, region and if it uses a pathstyle for the cli. Only works if the cli is not already provided.

func WithBucketLogger

func WithBucketLogger(logger *slog.Logger) BucketOption

WithBucketLogger sets the default logger for any opperation. Setting the logger provides debug logs.

func WithBucketReadChunkSize

func WithBucketReadChunkSize(size int64) BucketOption

WithBucketReadChunkSize sets the default read chunksize

func WithBucketRetries

func WithBucketRetries(i int) BucketOption

WithBucketRetries sets the default amount of retries for any given opperation

func WithBucketS3Options

func WithBucketS3Options(opts ...func(*s3.Options)) BucketOption

WithBucketS3Options sets the s3 options for t.he s3 client. Only works if the cli is not already provided.

func WithBucketWriteChunkSize

func WithBucketWriteChunkSize(size int64) BucketOption

WithBucketWriteChunkSize sets the default write chunksize

type BucketURIError

type BucketURIError string

BucketURIError

var (
	ErrInvalidScheme BucketURIError = "url doesn't start with s3 scheme"
	ErrNoBucketName  BucketURIError = "no bucketname"
	ErrNoCredentials BucketURIError = "missing credentials"
)

func (BucketURIError) Error

func (b BucketURIError) Error() string

type DeleteBucketAPI

type DeleteBucketAPI interface {
	Delete(ctx context.Context, keys ...string) error
}

type DeleteBucketBucketAPI

type DeleteBucketBucketAPI interface {
	DeleteBucket(ctx context.Context) error
}

type DownloadAPIClient

type DownloadAPIClient interface {
	GetObject(context.Context, *s3.GetObjectInput, ...func(*s3.Options)) (*s3.GetObjectOutput, error)
}

DownloadAPIClient is an S3 API client that can invoke the GetObject operation.

type ExistsBucketAPI

type ExistsBucketAPI interface {
	Exists(ctx context.Context, key string) (bool, error)
}

type GetBucketAPI

type GetBucketAPI interface {
	Get(ctx context.Context, key string, opts ...ReaderOption) io.Reader
}

type ListBucketAPI

type ListBucketAPI interface {
	List(ctx context.Context, prefix string) ([]types.Object, error)
}

type PutBucketAPI

type PutBucketAPI interface {
	Put(ctx context.Context, key string, opts ...WriterOption) io.WriteCloser
}

type Reader

type Reader struct {
	// contains filtered or unexported fields
}

Reader is an io.Reader implementation for an S3 Object.

You can open a new reader on it's own using the *s3.Client:

client := s3.NewFromConfig(cfg)
rd := s3io.NewReader(ctx, client, s3io.WithReaderConcurrency(3))

Or you can open it using the BucketAPI:

bucket, err := s3io.Open(ctx, "my-bucket-name", s3io.WithBucketCredentials("access-key", "access-secret"))
if err != nil {
  log.Fatalf("unable to open bucket: %w", err)
}

rd := bucket.Get(ctx, "path/to/object.txt")

func (*Reader) Close

func (r *Reader) Close() error

close is the io.Close implementation for the ObjectReader

func (*Reader) Read

func (r *Reader) Read(p []byte) (int, error)

Read is the io.Reader implementation for the ObjectReader.

It returns an fs.ErrNotExists if the object doesn't exist in the given bucket. And returns an io.EOF when all bytes are read.

Example
ctx := context.Background()

bucket, err := Open(ctx, "bucket-name",
	WithBucketCredentials("access-key", "access-secret"),
	WithBucketCreateIfNotExists(),
)
if err != nil {
	log.Fatal(err)
}

rd := bucket.Get(ctx, "path/to/object.txt")
if err != nil {
	log.Fatal(err)
}

if _, err := io.Copy(os.Stdout, rd); err != nil {
	log.Fatal(err)
}

func (*Reader) Stat

func (r *Reader) Stat() (fs.FileInfo, error)

Stat is the fs.File implementaion for the ObjectReader.

type ReaderOption

type ReaderOption func(*Reader)

ReaderOption is an option for the given read operation.

func ObjectReaderOptions

func ObjectReaderOptions(opts ...ReaderOption) ReaderOption

ObjectReaderOptions is a collection of ObjectReaderOption's.

func WithReaderChunkSize

func WithReaderChunkSize(size int64) ReaderOption

WithReaderChunkSize sets the chunksize for this reader

func WithReaderConcurrency

func WithReaderConcurrency(i int) ReaderOption

WithReaderConcurrency set the concurency amount for this reader

func WithReaderLogger

func WithReaderLogger(logger *slog.Logger) ReaderOption

WithReaderLogger sets the logger for this reader

func WithReaderRetries

func WithReaderRetries(i int) ReaderOption

WithReaderRetries sets the retry count for this reader

func WithReaderS3Options

func WithReaderS3Options(opts ...func(*s3.Options)) ReaderOption

WithReaderS3Options adds s3 options to the reader opperations

type UploadAPIClient

type UploadAPIClient interface {
	PutObject(context.Context, *s3.PutObjectInput, ...func(*s3.Options)) (*s3.PutObjectOutput, error)
	UploadPart(context.Context, *s3.UploadPartInput, ...func(*s3.Options)) (*s3.UploadPartOutput, error)
	CreateMultipartUpload(context.Context, *s3.CreateMultipartUploadInput, ...func(*s3.Options)) (*s3.CreateMultipartUploadOutput, error)
	CompleteMultipartUpload(context.Context, *s3.CompleteMultipartUploadInput, ...func(*s3.Options)) (*s3.CompleteMultipartUploadOutput, error)
	AbortMultipartUpload(context.Context, *s3.AbortMultipartUploadInput, ...func(*s3.Options)) (*s3.AbortMultipartUploadOutput, error)
}

UploadAPIClient is an S3 API client that can invoke PutObject, UploadPart, CreateMultipartUpload, CompleteMultipartUpload, and AbortMultipartUpload operations.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

Writer is an io.WriteCloser implementation for an s3 Object.

You can open a new writer on its own using the *s3.Client:

client := s3.NewFromConfig(cfg)
wr := s3io.NewWriter(ctx, client, s3io.WithWriterConcurrency(5))

Or you can open it usign the BucketAPI:

bucket, err := s3io.Open(ctx, "my-bucket-name", s3io.WithBucketCredentials("access-key", "access-secret"))
if err != nil {
  log.Fatalf("unable to open bucket: %w", err)
}

 wr := bucket.Put(ctx, "path/to/object.txt")

func (*Writer) Close

func (w *Writer) Close() error

Close completes the write opperation.

If the byte size is less than writer's chunk size then a simply PutObject opperation is preformed. Otherwise a multipart upload complete opperation is preformed. The error returned is the error from this store opperation.

If an error occured while uploading parts this error might also be a upload part error joined with a AbortMultipartUpload error.

func (*Writer) Write

func (w *Writer) Write(p []byte) (int, error)

Write is the io.Writer implementation of the ObjectWriter

The object is stored when the Close method is called.

Example
ctx := context.Background()

bucket, err := Open(ctx, "bucket-name",
	WithBucketCredentials("access-key", "access-secret"),
	WithBucketCreateIfNotExists(),
)
if err != nil {
	log.Fatal(err)
}

wr := bucket.Put(ctx, "path/to/object.txt")
if err != nil {
	log.Fatal(err)
}

if _, err := io.WriteString(wr, "hello world"); err != nil {
	log.Fatal(err)
}

// Note: you must close to finilize the upload
if err := wr.Close(); err != nil {
	log.Fatal(err)
}

type WriterOption

type WriterOption func(*Writer)

WriterOption is an option for the given write operation

func ObjectWriterOptions

func ObjectWriterOptions(opts ...WriterOption) WriterOption

ObjectWriterOptions is a collection of ObjectWriterOption's

func WithWriteRetries

func WithWriteRetries(i int) WriterOption

WithWriterRetries sets the retry count for this writer

func WithWriterACL

func WithWriterACL(acl types.ObjectCannedACL) WriterOption

Set ACL after giving the input

func WithWriterChunkSize

func WithWriterChunkSize(size int64) WriterOption

WithWriterChunkSize sets the chunksize for this writer. If set below the minimal chunk size of 5Mb then it will be set to the minimal chunksize.

func WithWriterClientOptions

func WithWriterClientOptions(opts ...func(*s3.Options)) WriterOption

WithWriterClientOptions adds s3 client options to the writer opperations

func WithWriterConcurrency

func WithWriterConcurrency(i int) WriterOption

WithWriterConcurrency sets the concurrency amount for this writer.

func WithWriterLogger

func WithWriterLogger(logger *slog.Logger) WriterOption

WithWriterLogger adds a logger for this writer.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL