Documentation ¶
Index ¶
- func Glob(pth string, opt *Options) ([]stat.Stats, error)
- func List(pthDir string, opt *Options) ([]stat.Stats, error)
- func ReadLines(ctx context.Context, r Reader, f func(ln []byte) error) (err error, cncl bool)
- func Stat(path string, opt *Options) (stat.Stats, error)
- type DateExtractor
- type Options
- type Reader
- type Scanner
- type WriteByHour
- type Writer
Examples ¶
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Glob ¶
Glob will only match to files and will not match recursively. Only files directly in pthDir are candidates for matching.
Supports the same globing patterns as provided in *nix terminals.
Globing in directories is not supported. ie - s3://bucket/path/*/files.txt will not work but s3://bucket/path/to/*.txt will work.
func List ¶
List is a generic List function that will call the correct type of implementation based on the file schema, aka 's3://'. If there is no schema or if the schema is 'local://' then the local file List will be called.
pthDir is expected to be a dir.
Types ¶
type DateExtractor ¶
DateExtractor defines a type that will parse raw bytes and attempt to extract a time.Time value.
The underlying bytes should not be modified.
If time.Time.IsZero() == true then a non-nil error should always be returned. Likewise if error != nil time.Time.IsZero() should always be true.
func CSVDateExtractor ¶
func CSVDateExtractor(sep, format string, fieldIndex int) DateExtractor
CSVDateExtractor returns a DateExtractor for csv row date extraction.
If negative field index is set to 0. sep and timeFmt
Example ¶
os.Setenv("TZ", "UTC") csvExtract := CSVDateExtractor("", "", 1) t, err := csvExtract([]byte("test field,2007-02-03T16:05:06Z")) fmt.Println(t.IsZero()) // output: false fmt.Println(err) // output: <nil> // cleanup os.Unsetenv("TZ")
Output: false <nil>
func JSONDateExtractor ¶
func JSONDateExtractor(field, timeFmt string) DateExtractor
JSONDateExtractor returns a DateExtractor for json object date extraction.
Example ¶
os.Setenv("TZ", "UTC") jsonExtract := JSONDateExtractor("date-field", "") t, err := jsonExtract([]byte(`{"date-field":"2007-02-03T16:05:06Z","other-field":"other-value"}`)) fmt.Println(err) // output: <nil> fmt.Println(t.IsZero()) // output: false fmt.Println(t.Year()) // output: 2007 fmt.Println(t.Month()) // output: February fmt.Println(t.Day()) // output: 3 fmt.Println(t.Hour()) // output: 16 fmt.Println(t.Minute()) // output: 5 fmt.Println(t.Second()) // output: 6 // cleanup os.Unsetenv("TZ")
Output: <nil> false 2007 February 3 16 5 6
type Options ¶
type Options struct { AccessKey string `toml:"access_key"` SecretKey string `toml:"secret_key"` CompressionLevel string `toml:"file_compression" commented:"true" comment:"gzip compression level (speed|size|default)"` // UseFileBuf specifies to use a tmp file for the delayed writing. // Can optionally also specify the tmp directory and tmp name // prefix. UseFileBuf bool `toml:"use_file_buf" commented:"true" comment:"set as 'true' if files are too big to buffer in memory"` // FileBufDir optionally specifies the temp directory. If not specified then // the os default temp dir is used. FileBufDir string `` /* 179-byte string literal not displayed */ // FileBufPrefix optionally specifies the temp file prefix. // The full tmp file name is randomly generated and guaranteed // not to conflict with existing files. A prefix can help one find // the tmp file. // // In an effort to encourage fewer application configuration options // this value not made available to a toml config file and the default // is set to 'task-type_' by the application bootstrapper. // // If no prefix is provided then the temp file name is just a random // unique number. FileBufPrefix string `toml:"-"` // default is usually 'task-type_' FileBufKeepFailed bool `toml:"file_buf_keep_failed" commented:"true" comment:"keep the local buffer file on a upload failure"` }
Options presents general options across all stats readers and writers.
type Reader ¶
type Reader interface { // Read should behave as defined in the io.Read interface. // In this way we can take advantage of all standard library // methods that rely on Read such as copy. // // Close should do any necessary standard closing but also // do final syncing/flushing/cleanup esp when reading // from a remote source. io.ReadCloser // ReadLine should return a whole line of bytes not including // the newline delimiter. When the end of the file is reached, it // should return the last line of bytes (if any) and an instance // of io.EOF for the error. // // A call to ReadLine after Close has undefined behavior. ReadLine() ([]byte, error) // Stats returns an instance of Stats. Stats() stat.Stats }
Reader is an io.ReadCloser that also provides file statistics along with a few additional methods.
type WriteByHour ¶
type WriteByHour struct {
// contains filtered or unexported fields
}
WriteByHour writes to hourly files based on the extracted time.Time from the WriteLine bytes.
func NewWriteByHour ¶
func NewWriteByHour(destTmpl string, opt *Options) *WriteByHour
Example ¶
destTmpl := "./test/{HH}.csv" wBy := NewWriteByHour(destTmpl, nil) if wBy == nil { return } fmt.Println(wBy.opt != nil) // output: true fmt.Println(wBy.destTmpl) // output: ./test/{HH}.csv fmt.Println(wBy.writers != nil) // output: true
Output: true ./test/{HH}.csv true
func (*WriteByHour) Abort ¶
func (w *WriteByHour) Abort() error
Abort will abort on all open files. If there are multiple non-nil errors it will return one of them.
Example ¶
os.Setenv("TZ", "UTC") destTmpl := "./test/{HH}.csv" csvExtractor := CSVDateExtractor("", "", 0) ln1 := []byte("2007-02-03T16:05:06Z,test field") ln2 := []byte("2007-02-03T17:05:06Z,test field") ln3 := []byte("2007-02-03T18:05:06Z,test field") t1, _ := csvExtractor(ln1) t2, _ := csvExtractor(ln2) t3, _ := csvExtractor(ln3) wBy := NewWriteByHour(destTmpl, nil) if wBy == nil { return } wBy.WriteLine(ln1, t1) wBy.WriteLine(ln2, t2) wBy.WriteLine(ln3, t3) err := wBy.Abort() fmt.Println(err) // output: <nil> // cleanup os.Remove("./test") os.Unsetenv("TZ")
Output: <nil>
func (*WriteByHour) Close ¶
func (w *WriteByHour) Close() error
Close will close all open files. If there are multiple non-nil errors it will return one of them.
All writers are closed simultaneously so if an error is returned it's possible that one or more writes succeeded. Therefore the result space could be mixed with successes and failures.
To know which ones succeeded, check through all the file stats by calling Stats and look for non-empty Stats.Created values. For this reason it is recommended that records should be written to destination files in such a way that re-running sort from the same data source will replace an existing sorted file instead of creating a new one.
Make sure writing is complete before calling Close.
func (*WriteByHour) CloseWithContext ¶
func (w *WriteByHour) CloseWithContext(ctx context.Context) error
CloseWContext is just like close but accepts a context. ctx.Done is checked before starting each file close.
Returns an error with body "interrupted" if prematurely shutdown by ctx.
func (*WriteByHour) LineCnt ¶
func (w *WriteByHour) LineCnt() int64
LineCnt will provide the totals number of lines written across all files.
Example ¶
os.Setenv("TZ", "UTC") destTmpl := "./test/{HH}.csv" csvExtractor := CSVDateExtractor("", "", 0) ln := []byte("2007-02-03T16:05:06Z,test field") t, _ := csvExtractor(ln) wBy := NewWriteByHour(destTmpl, nil) if wBy == nil { return } wBy.WriteLine(ln, t) fmt.Println(wBy.LineCnt()) // output: 1 // cleanup os.Remove("./test") os.Unsetenv("TZ")
Output: 1
func (*WriteByHour) Stats ¶
func (w *WriteByHour) Stats() []stat.Stats
Stats provides stats for all files.
Example ¶
os.Setenv("TZ", "UTC") destTmpl := "./test/{HH}.csv" csvExtractor := CSVDateExtractor("", "", 0) ln1 := []byte("2007-02-03T16:05:06Z,test field") ln2 := []byte("2007-02-03T17:05:06Z,test field") ln3 := []byte("2007-02-03T18:05:06Z,test field") t1, _ := csvExtractor(ln1) t2, _ := csvExtractor(ln2) t3, _ := csvExtractor(ln3) wBy := NewWriteByHour(destTmpl, nil) if wBy == nil { return } wBy.WriteLine(ln1, t1) wBy.WriteLine(ln2, t2) wBy.WriteLine(ln3, t3) allSts := wBy.Stats() for _, sts := range allSts { fmt.Println(sts.LineCnt) // output: 1 fmt.Println(sts.ByteCnt) // output: 32 } fmt.Println(len(allSts)) // output: 3 // cleanup os.Remove("./test") os.Unsetenv("TZ")
Output: 1 32 1 32 1 32 3
func (*WriteByHour) WriteLine ¶
func (w *WriteByHour) WriteLine(ln []byte, t time.Time) (err error)
WriteLine will attempt to extract a date from the line bytes and write to a destination file path from the parsed destTmpl file template.
An error is returned if there is a problem writing the line or if there is a problem extracting the date.
Write order is not guaranteed.
Example ¶
os.Setenv("TZ", "UTC") destTmpl := "./test/{HH}.csv" csvExtractor := CSVDateExtractor("", "", 0) ln1 := []byte("2007-02-03T16:05:06Z,test field") t1, _ := csvExtractor(ln1) wBy := NewWriteByHour(destTmpl, nil) if wBy == nil { return } err := wBy.WriteLine(ln1, t1) if err != nil { return } wBy.Close() // read from file pth := "./test/16.csv" f, _ := os.Open(pth) b := make([]byte, 32) f.Read(b) fmt.Println(err) // output: <nil> fmt.Print(string(b)) // output: 2007-02-03T16:05:06Z,test field fmt.Println(wBy.lineCnt.LineCnt) // output: 1 // cleanup os.Remove(pth) os.Remove("./test") os.Unsetenv("TZ")
Output: <nil> 2007-02-03T16:05:06Z,test field 1
type Writer ¶
type Writer interface { // Write should behave as defined in io.Writer so that it // is compatible with standard library tooling such as // io.Copy. Additionally concurrent calls to Write should // be safe and not corrupt the output. Order may // not be guaranteed. // // Close should do any necessary standard closing but also // do final copying/syncing/flushing to local and remote // locations. Should also gather final stats for a call // to the Stats method. io.WriteCloser // WriteLine will write a line of bytes. // The user should not need to add the newline, // the implementation should do that for the user. // // Should be safe to call concurrently and concurrent // calling should not corrupt the output. Concurrent calling // does not guarantee order but one record will not partially // over-write another. WriteLine([]byte) error // Stats returns the file stats. Safe to call any time. Stats() stat.Stats // Abort can be called anytime before or during a call // to Close. Will block until abort cleanup is complete. Abort() error }
Writer is a io.WriteCloser that also provides file statistics along with a few additional methods.