file

package
v0.10.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 22, 2020 License: MIT Imports: 18 Imported by: 25

Documentation

Index

Examples

Constants

This section is empty.

Variables

This section is empty.

Functions

func Glob

func Glob(pth string, opt *Options) ([]stat.Stats, error)

Glob will only match to files and will not match recursively. Only files directly in pthDir are candidates for matching.

Supports the same globing patterns as provided in *nix terminals.

Globing in directories is not supported. ie - s3://bucket/path/*/files.txt will not work but s3://bucket/path/to/*.txt will work.

func List

func List(pthDir string, opt *Options) ([]stat.Stats, error)

List is a generic List function that will call the correct type of implementation based on the file schema, aka 's3://'. If there is no schema or if the schema is 'local://' then the local file List will be called.

pthDir is expected to be a dir.

func ReadLines

func ReadLines(ctx context.Context, r Reader, f func(ln []byte) error) (err error, cncl bool)

ReadLines is a high-level utility that will read all the lines of a reader and call f when the number of bytes is > 0. err will never be EOF and if cncl == true then err will be nil.

func Stat added in v0.5.0

func Stat(path string, opt *Options) (stat.Stats, error)

Stat returns a summary stats of a file or directory. It can be used to verify read permissions

Types

type DateExtractor

type DateExtractor func([]byte) (time.Time, error)

DateExtractor defines a type that will parse raw bytes and attempt to extract a time.Time value.

The underlying bytes should not be modified.

If time.Time.IsZero() == true then a non-nil error should always be returned. Likewise if error != nil time.Time.IsZero() should always be true.

func CSVDateExtractor

func CSVDateExtractor(sep, format string, fieldIndex int) DateExtractor

CSVDateExtractor returns a DateExtractor for csv row date extraction.

If negative field index is set to 0. sep and timeFmt

Example
os.Setenv("TZ", "UTC")

csvExtract := CSVDateExtractor("", "", 1)

t, err := csvExtract([]byte("test field,2007-02-03T16:05:06Z"))

fmt.Println(t.IsZero()) // output: false
fmt.Println(err)        // output: <nil>

// cleanup
os.Unsetenv("TZ")
Output:

false
<nil>

func JSONDateExtractor

func JSONDateExtractor(field, timeFmt string) DateExtractor

JSONDateExtractor returns a DateExtractor for json object date extraction.

Example
os.Setenv("TZ", "UTC")

jsonExtract := JSONDateExtractor("date-field", "")

t, err := jsonExtract([]byte(`{"date-field":"2007-02-03T16:05:06Z","other-field":"other-value"}`))

fmt.Println(err)        // output: <nil>
fmt.Println(t.IsZero()) // output: false
fmt.Println(t.Year())   // output: 2007
fmt.Println(t.Month())  // output: February
fmt.Println(t.Day())    // output: 3
fmt.Println(t.Hour())   // output: 16
fmt.Println(t.Minute()) // output: 5
fmt.Println(t.Second()) // output: 6

// cleanup
os.Unsetenv("TZ")
Output:

<nil>
false
2007
February
3
16
5
6

type Options

type Options struct {
	AccessKey string `toml:"access_key"`
	SecretKey string `toml:"secret_key"`

	CompressionLevel string `toml:"file_compression" commented:"true" comment:"gzip compression level (speed|size|default)"`

	// UseFileBuf specifies to use a tmp file for the delayed writing.
	// Can optionally also specify the tmp directory and tmp name
	// prefix.
	UseFileBuf bool `toml:"use_file_buf" commented:"true" comment:"set as 'true' if files are too big to buffer in memory"`

	// FileBufDir optionally specifies the temp directory. If not specified then
	// the os default temp dir is used.
	FileBufDir string `` /* 179-byte string literal not displayed */

	// FileBufPrefix optionally specifies the temp file prefix.
	// The full tmp file name is randomly generated and guaranteed
	// not to conflict with existing files. A prefix can help one find
	// the tmp file.
	//
	// In an effort to encourage fewer application configuration options
	// this value not made available to a toml config file and the default
	// is set to 'task-type_' by the application bootstrapper.
	//
	// If no prefix is provided then the temp file name is just a random
	// unique number.
	FileBufPrefix     string `toml:"-"` // default is usually 'task-type_'
	FileBufKeepFailed bool   `toml:"file_buf_keep_failed" commented:"true" comment:"keep the local buffer file on a upload failure"`
}

Options presents general options across all stats readers and writers.

func NewOptions

func NewOptions() *Options

NewOptions

type Reader

type Reader interface {
	// Read should behave as defined in the io.Read interface.
	// In this way we can take advantage of all standard library
	// methods that rely on Read such as copy.
	//
	// Close should do any necessary standard closing but also
	// do final syncing/flushing/cleanup esp when reading
	// from a remote source.
	io.ReadCloser

	// ReadLine should return a whole line of bytes not including
	// the newline delimiter. When the end of the file is reached, it
	// should return the last line of bytes (if any) and an instance
	// of io.EOF for the error.
	//
	// A call to ReadLine after Close has undefined behavior.
	ReadLine() ([]byte, error)

	// Stats returns an instance of Stats.
	Stats() stat.Stats
}

Reader is an io.ReadCloser that also provides file statistics along with a few additional methods.

func NewReader

func NewReader(pth string, opt *Options) (r Reader, err error)

type Scanner

type Scanner struct {
	// contains filtered or unexported fields
}

func NewScanner

func NewScanner(r Reader) *Scanner

func (*Scanner) Bytes

func (s *Scanner) Bytes() []byte

func (*Scanner) Err

func (s *Scanner) Err() error

func (*Scanner) Scan

func (s *Scanner) Scan() bool

func (*Scanner) Stats

func (s *Scanner) Stats() stat.Stats

func (*Scanner) Text

func (s *Scanner) Text() string

type WriteByHour

type WriteByHour struct {
	// contains filtered or unexported fields
}

WriteByHour writes to hourly files based on the extracted time.Time from the WriteLine bytes.

func NewWriteByHour

func NewWriteByHour(destTmpl string, opt *Options) *WriteByHour
Example
destTmpl := "./test/{HH}.csv"
wBy := NewWriteByHour(destTmpl, nil)
if wBy == nil {
	return
}

fmt.Println(wBy.opt != nil)     // output: true
fmt.Println(wBy.destTmpl)       // output: ./test/{HH}.csv
fmt.Println(wBy.writers != nil) // output: true
Output:

true
./test/{HH}.csv
true

func (*WriteByHour) Abort

func (w *WriteByHour) Abort() error

Abort will abort on all open files. If there are multiple non-nil errors it will return one of them.

Example
os.Setenv("TZ", "UTC")

destTmpl := "./test/{HH}.csv"
csvExtractor := CSVDateExtractor("", "", 0)
ln1 := []byte("2007-02-03T16:05:06Z,test field")
ln2 := []byte("2007-02-03T17:05:06Z,test field")
ln3 := []byte("2007-02-03T18:05:06Z,test field")
t1, _ := csvExtractor(ln1)
t2, _ := csvExtractor(ln2)
t3, _ := csvExtractor(ln3)

wBy := NewWriteByHour(destTmpl, nil)
if wBy == nil {
	return
}

wBy.WriteLine(ln1, t1)
wBy.WriteLine(ln2, t2)
wBy.WriteLine(ln3, t3)
err := wBy.Abort()

fmt.Println(err) // output: <nil>

// cleanup
os.Remove("./test")
os.Unsetenv("TZ")
Output:

<nil>

func (*WriteByHour) Close

func (w *WriteByHour) Close() error

Close will close all open files. If there are multiple non-nil errors it will return one of them.

All writers are closed simultaneously so if an error is returned it's possible that one or more writes succeeded. Therefore the result space could be mixed with successes and failures.

To know which ones succeeded, check through all the file stats by calling Stats and look for non-empty Stats.Created values. For this reason it is recommended that records should be written to destination files in such a way that re-running sort from the same data source will replace an existing sorted file instead of creating a new one.

Make sure writing is complete before calling Close.

func (*WriteByHour) CloseWithContext

func (w *WriteByHour) CloseWithContext(ctx context.Context) error

CloseWContext is just like close but accepts a context. ctx.Done is checked before starting each file close.

Returns an error with body "interrupted" if prematurely shutdown by ctx.

func (*WriteByHour) LineCnt

func (w *WriteByHour) LineCnt() int64

LineCnt will provide the totals number of lines written across all files.

Example
os.Setenv("TZ", "UTC")

destTmpl := "./test/{HH}.csv"
csvExtractor := CSVDateExtractor("", "", 0)
ln := []byte("2007-02-03T16:05:06Z,test field")
t, _ := csvExtractor(ln)

wBy := NewWriteByHour(destTmpl, nil)
if wBy == nil {
	return
}

wBy.WriteLine(ln, t)

fmt.Println(wBy.LineCnt()) // output: 1

// cleanup
os.Remove("./test")
os.Unsetenv("TZ")
Output:

1

func (*WriteByHour) Stats

func (w *WriteByHour) Stats() []stat.Stats

Stats provides stats for all files.

Example
os.Setenv("TZ", "UTC")

destTmpl := "./test/{HH}.csv"
csvExtractor := CSVDateExtractor("", "", 0)
ln1 := []byte("2007-02-03T16:05:06Z,test field")
ln2 := []byte("2007-02-03T17:05:06Z,test field")
ln3 := []byte("2007-02-03T18:05:06Z,test field")
t1, _ := csvExtractor(ln1)
t2, _ := csvExtractor(ln2)
t3, _ := csvExtractor(ln3)

wBy := NewWriteByHour(destTmpl, nil)
if wBy == nil {
	return
}

wBy.WriteLine(ln1, t1)
wBy.WriteLine(ln2, t2)
wBy.WriteLine(ln3, t3)
allSts := wBy.Stats()

for _, sts := range allSts {
	fmt.Println(sts.LineCnt) // output: 1
	fmt.Println(sts.ByteCnt) // output: 32
}
fmt.Println(len(allSts)) // output: 3

// cleanup
os.Remove("./test")
os.Unsetenv("TZ")
Output:

1
32
1
32
1
32
3

func (*WriteByHour) WriteLine

func (w *WriteByHour) WriteLine(ln []byte, t time.Time) (err error)

WriteLine will attempt to extract a date from the line bytes and write to a destination file path from the parsed destTmpl file template.

An error is returned if there is a problem writing the line or if there is a problem extracting the date.

Write order is not guaranteed.

Example
os.Setenv("TZ", "UTC")

destTmpl := "./test/{HH}.csv"
csvExtractor := CSVDateExtractor("", "", 0)
ln1 := []byte("2007-02-03T16:05:06Z,test field")
t1, _ := csvExtractor(ln1)

wBy := NewWriteByHour(destTmpl, nil)
if wBy == nil {
	return
}

err := wBy.WriteLine(ln1, t1)
if err != nil {
	return
}
wBy.Close()

// read from file
pth := "./test/16.csv"
f, _ := os.Open(pth)
b := make([]byte, 32)
f.Read(b)

fmt.Println(err)                 // output: <nil>
fmt.Print(string(b))             // output: 2007-02-03T16:05:06Z,test field
fmt.Println(wBy.lineCnt.LineCnt) // output: 1

// cleanup
os.Remove(pth)
os.Remove("./test")
os.Unsetenv("TZ")
Output:

<nil>
2007-02-03T16:05:06Z,test field
1

type Writer

type Writer interface {
	// Write should behave as defined in io.Writer so that it
	// is compatible with standard library tooling such as
	// io.Copy. Additionally concurrent calls to Write should
	// be safe and not corrupt the output. Order may
	// not be guaranteed.
	//
	// Close should do any necessary standard closing but also
	// do final copying/syncing/flushing to local and remote
	// locations. Should also gather final stats for a call
	// to the Stats method.
	io.WriteCloser

	// WriteLine will write a line of bytes.
	// The user should not need to add the newline,
	// the implementation should do that for the user.
	//
	// Should be safe to call concurrently and concurrent
	// calling should not corrupt the output. Concurrent calling
	// does not guarantee order but one record will not partially
	// over-write another.
	WriteLine([]byte) error

	// Stats returns the file stats. Safe to call any time.
	Stats() stat.Stats

	// Abort can be called anytime before or during a call
	// to Close. Will block until abort cleanup is complete.
	Abort() error
}

Writer is a io.WriteCloser that also provides file statistics along with a few additional methods.

func NewWriter

func NewWriter(pth string, opt *Options) (w Writer, err error)

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL