feedx

package module
v0.13.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 9, 2022 License: Apache-2.0 Imports: 15 Imported by: 1

README

Feedx

Test License

Feed-based data exchange between services.

Usage (Ruby)

require 'bfs/s3'
require 'feedx'

# Init a new producer with an S3 destination
relation = Post.includes(:author)
producer = Feedx::Producer.new relation, 's3://my-bucket/feeds/users.json.gz'

# Push a new feed every hour
loop do
  producer.perform
  sleep(3600)
end

Documentation

Index

Constants

This section is empty.

Variables

View Source
var ErrNotModified = errors.New("feedx: not modified")

ErrNotModified is used to signal that something has not been modified.

View Source
var FlateCompression = flateCompression{}

FlateCompression supports flate compression format.

View Source
var GZipCompression = gzipCompression{}

GZipCompression supports gzip compression format.

View Source
var JSONFormat = jsonFormat{}

JSONFormat provides a Format implemention for JSON.

View Source
var NoCompression = noCompression{}

NoCompression is just a pass-through without compression.

View Source
var ProtobufFormat = protobufFormat{}

ProtobufFormat provides a Format implemention for Protobuf.

Functions

This section is empty.

Types

type Compression

type Compression interface {
	// NewReader wraps a reader.
	NewReader(io.Reader) (io.ReadCloser, error)
	// NewWriter wraps a writer.
	NewWriter(io.Writer) (io.WriteCloser, error)
}

Compression represents the data compression.

func DetectCompression

func DetectCompression(name string) Compression

DetectCompression detects the compression type from a URL path or file name.

type ConsumeFunc added in v0.4.0

type ConsumeFunc func(*Reader) (data interface{}, err error)

ConsumeFunc is a parsing callback which is run by the consumer every sync interval.

type Consumer

type Consumer interface {
	// Data returns the data as returned by ConsumeFunc on last sync.
	Data() interface{}
	// LastSync returns time of last sync attempt.
	LastSync() time.Time
	// LastConsumed returns time of last feed consumption.
	LastConsumed() time.Time
	// LastModified returns time at which the remote feed was last modified.
	LastModified() time.Time
	// NumRead returns the number of values consumed during the last sync.
	NumRead() int
	// Close stops the underlying sync process.
	Close() error
}

Consumer manages data retrieval from a remote feed. It queries the feed in regular intervals, continuously retrieving new updates.

func NewConsumer

func NewConsumer(ctx context.Context, remoteURL string, opt *ConsumerOptions, cfn ConsumeFunc) (Consumer, error)

NewConsumer starts a new feed consumer.

func NewConsumerForRemote added in v0.4.0

func NewConsumerForRemote(ctx context.Context, remote *bfs.Object, opt *ConsumerOptions, cfn ConsumeFunc) (Consumer, error)

NewConsumerForRemote starts a new feed consumer with a remote.

type ConsumerOptions

type ConsumerOptions struct {
	ReaderOptions

	// The interval used by consumer to check the remote changes.
	// Default: 1m
	Interval time.Duration

	// AfterSync callbacks are triggered after each sync, receiving
	// the sync state and error (if occurred).
	AfterSync func(*ConsumerSync, error)
}

ConsumerOptions configure the consumer instance.

type ConsumerSync added in v0.6.3

type ConsumerSync struct {
	// Consumer exposes the current consumer state.
	Consumer
	// Updated indicates is the sync resulted in an update.
	Updated bool
	// PreviousData references the data before the update.
	// It allows to apply finalizers to data structures created by ConsumeFunc.
	// This is only set when an update happened.
	PreviousData interface{}
}

ConsumerSync contains the state of the last sync.

type Format

type Format interface {
	// NewDecoder wraps a decoder around a reader.
	NewDecoder(io.Reader) (FormatDecoder, error)
	// NewEncoder wraps an encoder around a writer.
	NewEncoder(io.Writer) (FormatEncoder, error)
}

Format represents the data format.

func DetectFormat

func DetectFormat(name string) Format

DetectFormat detects the data format from a URL path or file name. May return nil.

type FormatDecoder

type FormatDecoder interface {
	// Decode decodes the next message into an interface.
	Decode(v interface{}) error

	io.Closer
}

FormatDecoder methods

type FormatEncoder

type FormatEncoder interface {
	// Encode encodes the value to the stream.
	Encode(v interface{}) error

	io.Closer
}

FormatEncoder methods

type ProduceFunc added in v0.6.0

type ProduceFunc func(*Writer) error

ProduceFunc is a callback which is run by the producer on every iteration.

type Producer added in v0.6.0

type Producer struct {
	// contains filtered or unexported fields
}

Producer (continously) produces a feed.

func NewProducer added in v0.6.0

func NewProducer(ctx context.Context, remoteURL string, opt *ProducerOptions, pfn ProduceFunc) (*Producer, error)

NewProducer inits a new feed producer.

func NewProducerForRemote added in v0.6.0

func NewProducerForRemote(ctx context.Context, remote *bfs.Object, opt *ProducerOptions, pfn ProduceFunc) (*Producer, error)

NewProducerForRemote starts a new feed producer with a remote.

func (*Producer) Close added in v0.6.0

func (p *Producer) Close() error

Close stops the producer.

func (*Producer) LastModified added in v0.6.0

func (p *Producer) LastModified() time.Time

LastModified returns time at which the remote feed was last modified.

func (*Producer) LastPush added in v0.6.0

func (p *Producer) LastPush() time.Time

LastPush returns time of last push attempt.

func (*Producer) NumWritten added in v0.6.0

func (p *Producer) NumWritten() int

NumWritten returns the number of values produced during the last push.

type ProducerOptions added in v0.6.0

type ProducerOptions struct {
	WriterOptions

	// The interval used by producer to initiate a cycle.
	// Default: 1m
	Interval time.Duration

	// LastModCheck this function will be called before each push attempt
	// to dynamically determine the last modified time.
	LastModCheck func(context.Context) (time.Time, error)

	// AfterPush callbacks are triggered after each push cycle, receiving
	// the push state and error (if occurred).
	AfterPush func(*ProducerPush, error)
}

ProducerOptions configure the producer instance.

type ProducerPush added in v0.6.3

type ProducerPush struct {
	// Producer exposes the current producer state.
	*Producer
	// Updated indicates is the push resulted in an update.
	Updated bool
}

ProducerPush contains the state of the last push.

type Reader added in v0.4.0

type Reader struct {
	// contains filtered or unexported fields
}

Reader reads data from a remote feed.

func NewReader added in v0.4.0

func NewReader(ctx context.Context, remote *bfs.Object, opt *ReaderOptions) (*Reader, error)

NewReader inits a new reader.

func (*Reader) Close added in v0.4.0

func (r *Reader) Close() error

Close closes the reader.

func (*Reader) Decode added in v0.4.0

func (r *Reader) Decode(v interface{}) error

Decode decodes the next formatted value from the feed.

func (*Reader) LastModified added in v0.4.0

func (r *Reader) LastModified() (time.Time, error)

LastModified returns the last modified time of the remote feed.

func (*Reader) NumRead added in v0.4.0

func (r *Reader) NumRead() int

NumRead returns the number of read values.

func (*Reader) Read added in v0.8.0

func (r *Reader) Read(p []byte) (int, error)

Read reads raw bytes from the feed.

type ReaderOptions added in v0.4.0

type ReaderOptions struct {
	// Format specifies the format
	// Default: auto-detected from URL path.
	Format Format

	// Compression specifies the compression type.
	// Default: auto-detected from URL path.
	Compression Compression
}

ReaderOptions configure the reader instance.

type Writer added in v0.4.0

type Writer struct {
	// contains filtered or unexported fields
}

Writer encodes feeds to remote locations.

func NewWriter added in v0.4.0

func NewWriter(ctx context.Context, remote *bfs.Object, opt *WriterOptions) *Writer

NewWriter inits a new feed writer.

func (*Writer) Commit added in v0.5.0

func (w *Writer) Commit() error

Commit closes the writer and persists the contents.

func (*Writer) Discard added in v0.5.0

func (w *Writer) Discard() error

Discard closes the writer and discards the contents.

func (*Writer) Encode added in v0.4.0

func (w *Writer) Encode(v interface{}) error

Encode appends a value to the feed.

func (*Writer) NumWritten added in v0.4.0

func (w *Writer) NumWritten() int

NumWritten returns the number of written values.

func (*Writer) Write added in v0.8.0

func (w *Writer) Write(p []byte) (int, error)

Write write raw bytes to the feed.

func (*Writer) WriteString added in v0.8.0

func (w *Writer) WriteString(s string) (int, error)

WriteString write a raw string to the feed.

type WriterOptions added in v0.4.0

type WriterOptions struct {
	// Format specifies the format
	// Default: auto-detected from URL path.
	Format Format

	// Compression specifies the compression type.
	// Default: auto-detected from URL path.
	Compression Compression

	// Provides an optional last modified timestamp which is stored with the remote metadata.
	// Default: time.Time{}.
	LastMod time.Time
}

WriterOptions configure the producer instance.

Directories

Path Synopsis
ext
parquet Module

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL