flowfile

package module
v0.0.0-...-6ecc1a3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 1, 2023 License: BSD-3-Clause Imports: 34 Imported by: 0

README

go-flowfile

flowfile is a GoLang module providing a set of tools to interact with NiFi FlowFiles at a low level. It's been finely tuned to handle the streaming context best, as memory and disk often have limitations.

This module was built to be both simple-to-use and operate at a low level (at the bytes) to work with FlowFiles at wire speed. Here is an example of how a basic filtering and forwarding method would be written:

func main() {
  // Create a endpoint to send FlowFiles to:
  txn, err := flowfile.NewHTTPTransaction("http://target:8080/contentListener", nil)
  if err != nil {
    log.Fatal(err)
  }

  // Setup a receiver method to deal with incoming flowfiles
  myFilter := flowfile.NewHTTPFileReceiver(func(f *flowfile.File, w http.ResponseWriter, r *http.Request) error {
    if f.Attrs.Get("project") == "ProjectA" {
      return txn.Send(f) // Forward only ProjectA related FlowFiles
    }
    return nil // Drop the rest
  })
  http.Handle("/contentListener", myFilter) // Add the listener to a path
  http.ListenAndServe(":8080", nil)         // Start accepting connections
}

Another slightly more complex program example of building a NiFi routing program, this time it forwards 1 of every 10 FlowFiles while keeping the bundles together in one POST:

func main() {
  txn, err := flowfile.NewHTTPTransaction("http://decimated:8080/contentListener", tlsConfig)
  if err != nil {
    log.Fatal(err)
  }

  var counter int
  myDecimator := flowfile.NewHTTPReceiver(func(s *flowfile.Scanner, w http.ResponseWriter, r *http.Request) {
    pw := txn.NewHTTPPostWriter()
    defer pw.Close() // Ensure the POST is sent when the transaction finishes.

    for s.Scan() {
      f := s.File()
      if counter++; counter%10 == 1 { // Forward only 1 of every 10 Files
        if _, err = pw.Write(f); err != nil { // Oops, something unexpected bad happened
          w.WriteHeader(http.StatusInternalServerError) // Return an error
          pw.Terminate()
          return
        }
      }
    }
    if err := s.Err(); err != nil {
      log.Println("Error:", err)
      w.WriteHeader(http.StatusInternalServerError)
      return
    }
    w.WriteHeader(http.StatusOK) // Drop the rest by claiming all is ok
  })

  http.Handle("/contentDecimator", myDecimator) // Add the listener to a path
  http.ListenAndServe(":8080", nil)             // Start accepting connections
}

More examples can be found: https://pkg.go.dev/github.com/pschou/go-flowfile#pkg-examples

Early logic is key! When an incoming FlowFile is presented to the program, what is presented are the attributes often seen in the first packet in the stream, so by the time a decision is made on what to do with the FlowFile, the destination and incoming streams can be connected together to avoid all local caches and enable "fast-forwarding" of the original packets.

The complexity of the decision logic can be as complex or as simple as one desires. One can consume on one or more ports/listening paths and send to as many downstream servers as desired with concurrency.

For more documentation, go to https://pkg.go.dev/github.com/pschou/go-flowfile .

Documentation

Overview

Go-flowfile is a light weight connection handling tool for sending and receiving FlowFiles via an HTTP/HTTPS exchange. When the HTTPS method is used, the client MUST also present a valid client certificate.

About FlowFiles

FlowFiles are at the heart of Apache NiFi and its flow-based design. A FlowFile is a data record, which consists of a pointer to its content (payload) and attributes to support the content, that is associated with one or more provenance events. The attributes are key/value pairs that act as the metadata for the FlowFile, such as the FlowFile filename. The content is the actual data or the payload of the file.

More info: https://nifi.apache.org/docs/nifi-docs/html/nifi-in-depth.html

What is a FlowFile

A FlowFile is a logical notion that correlates a piece of data with a set of Attributes about that data. Such attributes include a FlowFile's unique identifier, as well as its name, size, and any number of other flow-specific values. While the contents and attributes of a FlowFile can change, the FlowFile object is immutable. Modifications to a FlowFile are made possible by the ProcessSession.

The core attributes for FlowFiles are defined in the org.apache.nifi.flowfile.attributes.CoreAttributes enum. The most common attributes you'll see are filename, path and uuid. The string in quotes is the value of the attribute within the CoreAttributes enum.

- Filename ("filename"): The filename of the FlowFile. The filename should not contain any directory structure.

- UUID ("uuid"): A unique universally unique identifier (UUID) assigned to this FlowFile.

- Path ("path"): The FlowFile's path indicates the relative directory to which a FlowFile belongs and does not contain the filename.

- Absolute Path ("absolute.path"): The FlowFile's absolute path indicates the absolute directory to which a FlowFile belongs and does not contain the filename.

- Priority ("priority"): A numeric value indicating the FlowFile priority.

- MIME Type ("mime.type"): The MIME Type of this FlowFile.

- Discard Reason ("discard.reason"): Specifies the reason that a FlowFile is being discarded.

- Alternative Identifier ("alternate.identifier"): Indicates an identifier other than the FlowFile's UUID that is known to refer to this FlowFile.

More info: https://docs.cloudera.com/HDPDocuments/HDF3/HDF-3.0.3/bk_developer-guide/content/flowfile.html

Implementations

The `File` entity represents the flowfile. The FlowFile Attributes can be inspected on the wire, before the payload is consumed. This way one can craft switching logic and apply rules before the payload has been brougn in.

One can compare this design, of acting on the header alone, to that of a network switch/router in which the Layer 2/3 packet header is inspected before the entire packet is consumed to make a determination on which path the bytes should flow. By this design, of not having, to have the entire file before logic can be done, pipes can be connected and memory use can be kept to a minimum.

Standard usage has shown that the overall memory footprint of several utilities doing complex routing of FlowFiles remains around 10-20MB, conversely a standard NiFi package will take a minimum of approximately 800MB and up.

Index

Examples

Constants

View Source
const (
	FlowFile3Header = "NiFiFF3"
	FlowFileEOF     = "NiFiEOF"
)

Variables

View Source
var (
	ErrorNoFlowFileHeader      = errors.New("No NiFiFF3 header found")
	ErrorInvalidFlowFileHeader = errors.New("Invalid of incomplete FlowFile header")
)
View Source
var (
	ErrorChecksumMismatch = errors.New("Mismatching checksum")
	ErrorChecksumMissing  = errors.New("Missing checksum")
	ErrorChecksumNoInit   = errors.New("Checksum was not initialized")
)
View Source
var (
	UserAgent   = "NiFi FlowFile Client (github.com/pschou/go-flowfile)"
	AboutString = "NiFi FlowFile Server (github.com/pschou/go-flowfile)"
	Debug       = false
)
View Source
var ErrorInconsistantSize = errors.New("Inconsistant flowfile size")
View Source
var ErrorUnmarshallingAttributes = errors.New("Error unmarshalling attributes")

Functions

This section is empty.

Types

type Attribute

type Attribute struct {
	Name, Value string
}

A single attribue in a FlowFile header

type Attributes

type Attributes []Attribute

A set of attributes in a FlowFile header

func (Attributes) Clone

func (h Attributes) Clone() Attributes

Clone the attributes for ease of duplication

func (*Attributes) CustodyChainAddHTTP

func (h *Attributes) CustodyChainAddHTTP(r *http.Request)

Add attributes related to an http request, such as remote host, request URI, and TLS details.

func (*Attributes) CustodyChainAddListen

func (h *Attributes) CustodyChainAddListen(listen string)

func (*Attributes) CustodyChainShift

func (h *Attributes) CustodyChainShift()

Update the custodyChain field to increment all the values one and add an additional time and hostname.

func (*Attributes) GenerateUUID

func (h *Attributes) GenerateUUID() string

Set a new UUID value for a FlowFile

func (*Attributes) Get

func (h *Attributes) Get(name string) string

Returns the first attribute's value with specified name

Example

This show how to get an individual attribute

var a flowfile.Attributes
a.Set("path", "./")

fmt.Println("attribute:", a.Get("path"))
Output:

attribute: ./

func (Attributes) MarshalBinary

func (h Attributes) MarshalBinary() ([]byte, error)

Parse the FlowFile attributes into binary slice.

Example

This show how to encode the attributes into a header for sending

var a flowfile.Attributes
a.Set("path", "./")
a.Set("filename", "abcd-efgh")

b, _ := a.MarshalBinary()
fmt.Printf("attributes: %q\n", b)
Output:

attributes: "NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh"

func (Attributes) MarshalJSON

func (h Attributes) MarshalJSON() ([]byte, error)

Provides a MarshalJSON interface

func (Attributes) NewChecksumHash

func (h Attributes) NewChecksumHash() hash.Hash

Create a new checksum for verifying payload.

func (*Attributes) ReadFrom

func (h *Attributes) ReadFrom(in io.Reader) (err error)

Parse the FlowFile attributes from binary Reader.

Example

This show how to decode the attributes frim a header for parsing

var a flowfile.Attributes
wire := bytes.NewBuffer([]byte("NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh"))

a.ReadFrom(wire)

fmt.Printf("attributes: %v\n", a)
Output:

attributes: {"path":"./","filename":"abcd-efgh"}

func (*Attributes) Set

func (h *Attributes) Set(name, val string) *Attributes

Sets the attribute with the given value, takes two inputs the first is the attribute name and the second is the attribute value. It returns the attributes for function stacking.

Example

This show how to set an individual attribute

var a flowfile.Attributes
fmt.Printf("attributes: %v\n", a)

a.Set("path", "./")
fmt.Printf("attributes: %v\n", a)
Output:

attributes: {}
attributes: {"path":"./"}

func (*Attributes) Sort

func (h *Attributes) Sort()

Sort the attributes by name

func (Attributes) String

func (h Attributes) String() string

func (*Attributes) UnmarshalBinary

func (h *Attributes) UnmarshalBinary(in []byte) (err error)

Parse the FlowFile attributes from a binary slice.

Example

This show how to decode the attributes frim a header for parsing

var a flowfile.Attributes
buf := []byte("NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh")

err := a.UnmarshalBinary(buf)
if err != nil {
	log.Fatal("Error unmarshalling attributes:", err)
}

fmt.Printf("attributes: %v\n", a)
Output:

attributes: {"path":"./","filename":"abcd-efgh"}

func (*Attributes) UnmarshalJSON

func (h *Attributes) UnmarshalJSON(in []byte) error

func (*Attributes) Unset

func (h *Attributes) Unset(name string) (ok bool)

Returns the first attribute's value with specified name

Example

This show how to unset an individual attribute

var a flowfile.Attributes
a.Set("path", "./")
a.Set("junk", "cars")
a.Set("filename", "abcd-efgh")

a.Unset("junk")
fmt.Printf("attributes: %v\n", a)
Output:

attributes: {"path":"./","filename":"abcd-efgh"}

func (*Attributes) WriteTo

func (h *Attributes) WriteTo(out io.Writer) (err error)

Parse the FlowFile attributes into binary writer.

Example

This show how to encode the attributes into a header for sending

var a flowfile.Attributes
a.Set("path", "./")
a.Set("filename", "abcd-efgh")

buf := bytes.NewBuffer([]byte{})
a.WriteTo(buf)

fmt.Printf("raw: %q\n", buf)
Output:

raw: "NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh"

type File

type File struct {
	Attrs Attributes

	Size int64 // total size
	// contains filtered or unexported fields
}

A File is a handler for either an incoming datafeed or outgoing datafeed of the contents of a file over a File connection. The intent is for one to either provide a Reader to provide to a flowfile sender or read from the File directly as it implments the io.Reader interface. Neither the reader or the counts are exported to avoid accidental over-reads of the underlying reader interface.

func New

func New(r io.Reader, size int64) *File

Create a new File struct from an io.Reader with size. One should add attributes before writing it to a stream.

Example

A calling method should do the due diligence of closing the inner reader after the flowfile is done being used. A good way to do this is something like:

dir, filename := "./", "myfile.dat"
fh, err := os.Open(filename)
if err != nil {
	log.Fatal(err)
}
defer fh.Close() // Ensure the file is closed when the function exits
fileInfo, _ := fh.Stat()
f := flowfile.New(fh, fileInfo.Size()) // Construct a flowfile with size
f.Attrs.Set("path", dir)               // Specify the path for the file
f.Attrs.Set("filename", filename)      // Give the filename
f.Attrs.GenerateUUID()                 // Set a unique identifier to this file
Output:

func NewFromDisk

func NewFromDisk(filename string) (*File, error)

NewFromDisk creates a new File struct from a file on disk. One should add attributes before writing it to a stream.

Note that the file is not opened to keep the opened file pointers on the system at a minimum. However, once a file is used, the file handle remains open until Close() is called. It is recommended that a checksum is done on the file before sending.

func Segment

func Segment(in *File, count int64) (out []*File, err error)

Splits up a flowfile into count number of segments. The intended purpose here is to enable larger files to be sent in smaller chucks so as to avoid having to replay sending a whole file in case a connection gets dropped.

func SegmentBySize

func SegmentBySize(in *File, segmentSize int64) (out []*File, err error)

Splits up a flowfile into a number of segments with segmentSize. The intended purpose here is to enable larger files to be sent in smaller chucks so as to avoid having to replay sending a whole file in case a connection gets dropped.

func (*File) AddChecksum

func (f *File) AddChecksum(cksum string) error

Add checksum to flowfile, requires a ReadAt interface in the flowfile context.

Note: The checksums cannot be added to a streamed File (io.Reader) as the header would have already been sent and could not be placed in the header as the payload would have been sent on the wire already. Hence, read the content, build checksum and add to header. Hence why the io.ReaderAt interface is important.

func (*File) AddChecksumFromVerify

func (l *File) AddChecksumFromVerify() error

AddChecksumFromVerify will take the checksum computed in the verify step and set the checksum attribute to match. This effectively makes a FlowFile pass what may other be a failed verification. Useful for updating a checksum to an existing flowfile after it has been fully read in.

func (*File) BufferFile

func (f *File) BufferFile(buf *bytes.Buffer) (err error)

Read the entire payload into a buffer, so as to complete the checksum and enable the ability to reset the File for multiple reads.

Note: This could create memory bloat if the buffers are not able to be cleared out due to the runtime keeping an unused pointer or the buffer isn't returned to a Pool.

func (*File) ChecksumInit

func (l *File) ChecksumInit() error

Function called before a file is read for setting up the hashing function.

func (*File) Close

func (l *File) Close() (err error)

Close the flowfile contruct. Generally the FlowFile is acted upon in a streaming context, moving a file from one place to another. So, in this understanding, the action of closing a flowfile is effectively removing the current payload from consideration and moving the reader pointer forward, making the next flowfile available for reading.

func (*File) EncodedReader

func (f *File) EncodedReader() (rdr io.Reader)

Encode a flowfile into an io.Writer

func (File) FilePath

func (f File) FilePath() string

If the File was created with NewFromDisk, return the filename referenced.

func (File) HeaderSize

func (f File) HeaderSize() (n int)

Return the size of the header for computations of the total flow file size.

Total Size = Header + Data

func (*File) MarshalBinary

func (f *File) MarshalBinary(dat []byte, err error)

Marshal a FlowFile into a byte slice.

Note: This is not preferred as it can cause memory bloat.

func (*File) Read

func (l *File) Read(p []byte) (n int, err error)

Read will read the content from a FlowFile

func (*File) Reset

func (f *File) Reset() error

If the flowfile has a ReaderAt interface, one can reset the reader to the start for reading again

func (*File) Save

func (f *File) Save(baseDir string) (outputFile string, err error)

Save will save the flowfile to a given directory, reconstructing the original directory tree with files in it while doing checksums on each file as they are layed down. It is up to the calling function to determine whether to delete or keep the file after an unsuccessful send.

func (*File) UnmarshalBinary

func (f *File) UnmarshalBinary(dat []byte) (err error)

Unmarshal parses a FlowFile formatted byte slice into a File struct for processing.

Note: This is not preferred as it can cause memory bloat.

Example

Sends files out a writer, making sure the headers are sent before each file is sent.

dat := []byte("NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh\x00\x00\x00\x00\x00\x00\x00$this is a custom string for flowfile")

var f flowfile.File
err := f.UnmarshalBinary(dat)
if err != nil {
	fmt.Println("Error unmarshalling:", err)
}
fmt.Printf("Attrs: %v\n", f.Attrs)

buf := bytes.NewBuffer([]byte{})
buf.ReadFrom(&f)
fmt.Printf("content: %q\n", buf.String())
Output:

Attrs: {"path":"./","filename":"abcd-efgh"}
content: "this is a custom string for flowfile"

func (*File) Verify

func (l *File) Verify() error

Verify the file sent was complete and accurate

func (*File) VerifyDetails

func (l *File) VerifyDetails() string

VerifyDetails describes why a match was successful or failed

func (*File) VerifyHash

func (l *File) VerifyHash(h hash.Hash) error

Verify a given hash against the file sent, to ensure a complete and accurate payload.

func (*File) VerifyParent

func (l *File) VerifyParent(fp string) error

Verify the file sent was complete and accurate

type HTTPPostWriter

type HTTPPostWriter struct {
	Header        http.Header
	FlushInterval time.Duration
	Sent          int64

	Response *http.Response
	// contains filtered or unexported fields
}

Writer ecapsulates the ability to write one or more flow files in one POST request. This must be closed upon completion of the last File sent.

One needs to first create an HTTPTransaction before one can create an HTTPPostWriter, so the process looks like:

ff1 := flowfile.New(strings.NewReader("test1"), 5)
ff2 := flowfile.New(strings.NewReader("test2"), 5)
ht, err := flowfile.NewHTTPTransaction("http://localhost:8080/contentListener", http.DefaultClient)
if err != nil {
  log.Fatal(err)
}

w := ht.NewHTTPPostWriter() // Create the POST to the NiFi endpoint
w.Write(ff1)
w.Write(ff2)
err = w.Close() // Finalize the POST
Example
// Build two small files
ff1 := flowfile.New(strings.NewReader("test1"), 5)
ff2 := flowfile.New(strings.NewReader("test2"), 5)

// Prepare an HTTP transaction
ht, err := flowfile.NewHTTPTransaction("http://localhost:8080/contentListener", tlsConfig)
if err != nil {
	log.Fatal(err)
}

// Post the files to the endpoint
w := ht.NewHTTPPostWriter()
w.Write(ff1)
w.Write(ff2)
err = w.Close() // Finalize the POST
Output:

Example (SendWithCustomHeader)
// Create a new HTTPTransaction, used for sending batches of flowfiles
hs, err := flowfile.NewHTTPTransaction("http://localhost:8080/contentListener", tlsConfig)
if err != nil {
	log.Fatal(err)
}

dat := []byte("NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh\x00\x00\x00\x00\x00\x00\x00$this is a custom string for flowfile")
var ff flowfile.File
err = ff.UnmarshalBinary(dat)

hp := hs.NewHTTPPostWriter()
defer hp.Close()

hp.Header.Set("X-Forwarded-For", "1.2.3.4:5678")
_, err = hp.Write(&ff)
Output:

func (*HTTPPostWriter) Close

func (hw *HTTPPostWriter) Close() (err error)

Close the HTTPPostWriter and flush the data to the stream

func (*HTTPPostWriter) Terminate

func (hw *HTTPPostWriter) Terminate()

Terminate the HTTPPostWriter

func (*HTTPPostWriter) Write

func (hw *HTTPPostWriter) Write(f *File) (n int64, err error)

Write a flow file to the remote server and return any errors back. One cannot determine if there has been a successful send until the HTTPPostWriter is closed. Then the Response.StatusCode will be set with the reply from the server.

type HTTPReceiver

type HTTPReceiver struct {
	Server           string
	MaxPartitionSize int64

	MaxConnections int

	Metrics *Metrics
	// contains filtered or unexported fields
}

Implements http.Handler and can be used with the GoLang built-in http module:

https://pkg.go.dev/net/http#Handler

func NewHTTPFileReceiver

func NewHTTPFileReceiver(handler func(*File, http.ResponseWriter, *http.Request) error) *HTTPReceiver

NewHTTPFileReceiver interfaces with the built-in HTTP Handler and parses out the individual FlowFiles from a stream and sends them to a FlowFile handler.

Example
ffReceiver := flowfile.NewHTTPFileReceiver(func(f *flowfile.File, w http.ResponseWriter, r *http.Request) error {
	log.Println("Got file", f.Attrs.Get("filename"))
	// do stuff with file
	return nil
})

// Add this reciever to the path
http.Handle("/contentListener", ffReceiver)

// Start accepting files
http.ListenAndServe(":8080", nil)
Output:

func NewHTTPReceiver

func NewHTTPReceiver(handler func(*Scanner, http.ResponseWriter, *http.Request)) *HTTPReceiver

NewHTTPReceiver interfaces with the built-in HTTP Handler and parses out the FlowFile stream and provids a FlowFile scanner to a FlowFile handler.

Example
ffReceiver := flowfile.NewHTTPReceiver(func(fs *flowfile.Scanner, w http.ResponseWriter, r *http.Request) {
	// Loop over all the files in the post payload
	count := 0
	for fs.Scan() {
		count++
		f := fs.File()
		log.Println("Got file", f.Attrs.Get("filename"))
		// do stuff with file
	}

	if err := fs.Err(); err != nil {
		log.Println("Error:", err)
		w.WriteHeader(http.StatusInternalServerError)
		return
	}

	log.Println(count, "file(s) in POST payload")
	w.WriteHeader(http.StatusOK)
})

http.Handle("/contentListener", ffReceiver) // Add this reciever to the path
http.ListenAndServe(":8080", nil)           // Start accepting files
Output:

func (*HTTPReceiver) MetricsHandler

func (hr *HTTPReceiver) MetricsHandler() http.Handler

func (*HTTPReceiver) ServeHTTP

func (f *HTTPReceiver) ServeHTTP(w http.ResponseWriter, r *http.Request)

Handle for accepting flow files through a http webserver. The handle here is intended to be used in a Listen Handler so as to make building out all the web endpoints seemless.

ffReceiver := flowfile.HTTPReceiver{Handler: post}
http.Handle("/contentListener", ffReceiver)
log.Fatal(http.ListenAndServe(":8080", nil))

type HTTPTransaction

type HTTPTransaction struct {
	Server        string
	TransactionID string

	RetryCount int // When using a ReadAt reader, attempt multiple retries
	RetryDelay time.Duration
	OnRetry    func(ff []*File, retry int, err error)

	// Non-standard NiFi entities supported by this library
	MaxPartitionSize int64  // Maximum partition size for partitioned file
	CheckSumType     string // What kind of CheckSum to use for sent files

	MetricsHandshakeLatency time.Duration
	// contains filtered or unexported fields
}

The HTTP Sender will establish a NiFi handshake and ensure that the remote endpoint is listening and compatible with the current flow file format.

func NewHTTPTransaction

func NewHTTPTransaction(url string, cfg *tls.Config) (*HTTPTransaction, error)

Create the HTTP sender and verify that the remote side is listening.

Example
// Create a new HTTPTransaction, used for sending batches of flowfiles
hs, err := flowfile.NewHTTPTransaction("http://localhost:8080/contentListener", tlsConfig)
if err != nil {
	log.Fatal(err)
}

dat := []byte("NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh\x00\x00\x00\x00\x00\x00\x00$this is a custom string for flowfile")
var ff flowfile.File
err = ff.UnmarshalBinary(dat)

err = hs.Send(&ff)
Output:

func NewHTTPTransactionNoHandshake

func NewHTTPTransactionNoHandshake(url string, cfg *tls.Config) *HTTPTransaction

Create the HTTP sender without verifying remote is listening

func NewHTTPTransactionWithTransport

func NewHTTPTransactionWithTransport(url string, cfg *http.Transport) (*HTTPTransaction, error)

Create the HTTP sender and verify that the remote side is listening.

func (*HTTPTransaction) Handshake

func (hs *HTTPTransaction) Handshake() error

Establishes or re-establishes a transaction id with NiFi to begin the process of transferring flowfiles. This is a blocking call so no new files will be sent until this is completed.

func (*HTTPTransaction) NewHTTPBufferedPostWriter

func (hs *HTTPTransaction) NewHTTPBufferedPostWriter() (httpWriter *HTTPPostWriter)

NewHTTPBufferedPostWriter creates a POST to a NiFi listening endpoint and allows multiple files to be written to the endpoint at one time. This reduces additional overhead (with fewer HTTP responses) and decreases latency. Additionally, the added buffering helps with constructing larger packets, thus further reducing TCP overhead.

However, HTTPPostWriter increases the chances of failures as all the sent files will be marked as failed if the the HTTP POST is not a success.

func (*HTTPTransaction) NewHTTPPostWriter

func (hs *HTTPTransaction) NewHTTPPostWriter() (httpWriter *HTTPPostWriter)

NewHTTPPostWriter creates a POST to a NiFi listening endpoint and allows multiple files to be written to the endpoint at one time. This reduces additional overhead (with fewer HTTP responses) and decreases latency (by instead putting pressure on TCP with smaller payload sizes).

However, HTTPPostWriter increases the chances of failures as all the sent files will be marked as failed if the the HTTP POST is not a success.

func (*HTTPTransaction) Send

func (hs *HTTPTransaction) Send(ff ...*File) (err error)

Send one or more flow files to the remote server and return any errors back. A nil return for error is a successful send.

A failed send will be retried if HTTPTransaction.RetryCount is set and the File uses a ReadAt reader, a (1+retries) attempts will be made with a HTTPTransaction.RetryDelay between retries.

// With one or more files:
err = hs.Send(file1)
err = hs.Send(file1, file2) // or more
// A slice of files:
err = hs.Send(files...)

This method of sending will make one POST-per-file which is not recommended for small files. To increase throughput on smaller files one should consider using either NewHTTPPostWriter or NewHTTPBufferedPostWriter.

type Metrics

type Metrics struct {

	// Custom buckets can be defined by setting new buckets before ingesting data
	// Note the BucketValues is always N+1 sized, as the last is overflow
	MetricsFlowFileTransferredBuckets      []int64
	MetricsFlowFileTransferredBucketValues []int64
	MetricsFlowFileTransferredSum          int64
	MetricsFlowFileTransferredCount        int64

	//MetricsFlowFileReceivedSum   *int64
	//MetricsFlowFileReceivedCount *int64
	MetricsThreadsActive     int64
	MetricsThreadsTerminated int64
	MetricsThreadsQueued     int64
	// contains filtered or unexported fields
}

func NewMetrics

func NewMetrics() *Metrics

func (*Metrics) BucketCounter

func (f *Metrics) BucketCounter(size int64)

func (Metrics) ServeHTTP

func (m Metrics) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (Metrics) String

func (f Metrics) String(keyValuePairs ...string) string

type Scanner

type Scanner struct {
	// contains filtered or unexported fields
}

A wrapper around an io.Reader which parses out the flow files.

func NewScanner

func NewScanner(in io.Reader) *Scanner

Create a new FlowFile reader, wrapping io.Reader for reading consecutive FlowFiles from a stream.

Example

This example shows how to write a FlowFile and then read in a stream to make a flowfile

wire := bytes.NewBuffer([]byte("NiFiFF3\x00\x02\x00\x04path\x00\x02./\x00\bfilename\x00\tabcd-efgh\x00\x00\x00\x00\x00\x00\x00$this is a custom string for flowfile"))

s := flowfile.NewScanner(wire)
for s.Scan() { // Scan for another FlowFile in the stream
	f := s.File()

	fmt.Printf("attributes: %v\n", f.Attrs)

	buf := bytes.NewBuffer([]byte{})
	buf.ReadFrom(f)
	fmt.Printf("content: %q\n", buf.String())
}
fmt.Println("Check for errors:", s.Err())
Output:

attributes: {"path":"./","filename":"abcd-efgh"}
content: "this is a custom string for flowfile"
Check for errors: <nil>

func NewScannerChan

func NewScannerChan(ch chan *File) *Scanner

Create a new FlowFile reader, using a (chan *File) for reading consecutive FlowFiles from a channel.

func NewScannerSlice

func NewScannerSlice(ff ...*File) *Scanner

Create a new FlowFile reader, for reading from a slice of FlowfFles.

func (*Scanner) Close

func (r *Scanner) Close() (err error)

Close out any file remaining (if any)

func (*Scanner) Err

func (r *Scanner) Err() error

If a scan was not able to proceed due to an error, get the last error seen.

func (*Scanner) File

func (r *Scanner) File() (f *File)

File returns the most recent token generated by a call to Scan.

func (*Scanner) Scan

func (r *Scanner) Scan() (more bool)

Scan advances the Scanner to the next token, which will then be available through the File method. It returns false when the scan stops, either by reaching the end of the input or an error. After Scan returns false, the Err method will return any error that occurred during scanning, except that if it was io.EOF, Err will return nil.

type Writer

type Writer struct {
	// contains filtered or unexported fields
}

func NewWriter

func NewWriter(w io.Writer) *Writer

func (*Writer) Write

func (e *Writer) Write(f *File) (n int64, err error)

Encode a flowfile into an io.Writer

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL