Back to

Package watcher

Latest Go to latest

The highest tagged major version is .

Published: Aug 13, 2020 | License: Apache-2.0 | Module:



var ErrBucketMismatch = errors.New("bucket mismatch")
var ErrWatchTimeout = errors.New("watcher timed out")

func New

func New(ctx context.Context, c *Config) (*serviceImpl, error)

func NewFilesystemWatcher

func NewFilesystemWatcher(ctx context.Context, config *FilesystemConfig) (*filesystemWatcher, error)

func NewMinioWatcher

func NewMinioWatcher(ctx context.Context, config *MinioConfig) (*minioWatcher, error)

type BlobEvent

type BlobEvent struct {
	// Name of the watcher that received this blob.
	WatcherName string

	// Name of the pipeline that the watcher targets.
	PipelineName string

	// Retention period for this blob.
	RetentionPeriod *time.Duration

	// Whether the top-level directory is meant to be stripped.
	StripTopLevelDir bool

	// Key of the blob.
	Key string

	// Bucket where the blob lives.
	Bucket string `json:"Bucket,omitempty"`

BlobEvent is a serializable event that describes a blob.

BlobEvent can be sent over the wire, i.e. they're serializable. Receivers, typicially Cadence activities, can download the blob via the service implementation in this package.

TODO: use signed URLs to simplify access to buckets?

func NewBlobEvent

func NewBlobEvent(w Watcher, key string) *BlobEvent

func NewBlobEventWithBucket

func NewBlobEventWithBucket(w Watcher, bucket, key string) *BlobEvent

func (BlobEvent) String

func (e BlobEvent) String() string

type Config

type Config struct {
	Filesystem []*FilesystemConfig
	Minio      []*MinioConfig

type FilesystemConfig

type FilesystemConfig struct {
	Name    string
	Path    string
	Inotify bool
	Ignore  string

	Pipeline         string
	RetentionPeriod  *time.Duration
	StripTopLevelDir bool

See filesystem.go for more.

type MinioConfig

type MinioConfig struct {
	Name         string
	RedisAddress string
	RedisList    string
	Region       string
	Endpoint     string
	PathStyle    bool
	Profile      string
	Key          string
	Secret       string
	Token        string
	Bucket       string

	Pipeline         string
	RetentionPeriod  *time.Duration
	StripTopLevelDir bool

See minio.go for more.

type MinioEvent

type MinioEvent struct {
	Name string       `json:"eventName"`
	S3   MinioEventS3 `json:"s3"`

MinioEvent represents the event delivered by Minio (S3) via Redis.

For reference:


"eventVersion": "2.0",
"eventSource": "minio:s3",
"awsRegion": "",
"eventTime": "2019-10-01T15:28:22Z",
"eventName": "s3:ObjectCreated:CompleteMultipartUpload",
"userIdentity": {
    "principalId": "36J9X8EZI4KEV1G7EHXA"
"requestParameters": {
    "accessKey": "36J9X8EZI4KEV1G7EHXA",
    "region": "",
    "sourceIPAddress": ""
"responseElements": {
    "content-length": "291",
    "x-amz-request-id": "15C98F7AC9D60CA6",
    "x-minio-deployment-id": "bcc2f9ce-65f2-4558-a455-b8176012f89b",
    "x-minio-origin-endpoint": ""
"s3": {
    "s3SchemaVersion": "1.0",
    "configurationId": "Config",
    "bucket": {
        "name": "sips",
        "ownerIdentity": {
            "principalId": "36J9X8EZI4KEV1G7EHXA"
        "arn": "arn:aws:s3:::sips"
    "object": {
        "key": "y25.gif",
        "size": 100,
        "eTag": "b0814df70de0779da2b0b3f9c676c64d-1",
        "contentType": "image/gif",
        "userMetadata": {
            "X-Minio-Internal-actual-size": "100",
            "content-type": "image/gif"
        "versionId": "1",
        "sequencer": "15C98F7ACA94598C"
"source": {
    "host": "",
    "port": "",
    "userAgent": "MinIO (linux; amd64) minio-go/v6.0.32 mc/DEVELOPMENT.GOGET"


func (MinioEvent) String

func (e MinioEvent) String() string

type MinioEventS3

type MinioEventS3 struct {
	Bucket MinioEventS3Bucket `json:"bucket"`
	Object MinioEventS3Object `json:"object"`

type MinioEventS3Bucket

type MinioEventS3Bucket struct {
	Name string `json:"name"`

type MinioEventS3Object

type MinioEventS3Object struct {
	Key string `json:"key"`

type MinioEventSet

type MinioEventSet struct {
	Event     []MinioEvent
	EventTime string

type Service

type Service interface {
	// Watchers return all known watchers.
	Watchers() []Watcher

	// Download blob given an event.
	Download(ctx context.Context, w io.Writer, watcherName, key string) error

	// Delete blob given an event.
	Delete(ctx context.Context, watcherName, key string) error

type Watcher

type Watcher interface {
	// Watch waits until a blob is dispatched.
	Watch(ctx context.Context) (*BlobEvent, error)

	// OpenBucket returns the bucket where the blobs can be found.
	OpenBucket(ctx context.Context) (*blob.Bucket, error)

	// Every watcher targets a pipeline.
	Pipeline() string

	RetentionPeriod() *time.Duration

	StripTopLevelDir() bool

	fmt.Stringer // It should return the name of the watcher.

Package Files

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier