nifi

package module
v0.3.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 26, 2021 License: Apache-2.0 Imports: 25 Imported by: 0

README

nifi-api-client

A simple NiFi api client with authentication

Requiremets

  • Go 1.16 and above
  • NiFi username and password or server and client certificates

Documentation

Index

Constants

View Source
const (
	ProcessGroupName       = "processgroup"
	RemoteProcessGroupName = "remoteprocessgroup"
	ProcessorName          = "processor"
	ConnectionName         = "connection"
	InputPortName          = "inputport"
	OutputPortName         = "outputport"
)
View Source
const (
	ProcessGroupTitle       = "Process Group"
	RemoteProcessGroupTitle = "Remote Process Group"
	ProcessorTitle          = "Processor"
	ConnectionTitle         = "Connection"
	InputPortTitle          = "Input Port"
	OutputPortTitle         = "Output Port"
)
View Source
const (
	RUNNING = "RUNNING"
	UNKNOWN = "UNKNOWN"
	STOPPED = "STOPPED"
)

Variables

View Source
var (
	ErrInvalidFormat = fmt.Errorf("invalid response fornat")
)

Functions

This section is empty.

Types

type BoxType

type BoxType int
const (
	Regular BoxType = iota
	Last
	AfterLast
	Between
)

func (BoxType) String

func (boxType BoxType) String() string

type ByTime

type ByTime []time.Time

func (ByTime) Len

func (a ByTime) Len() int

func (ByTime) Less

func (a ByTime) Less(i, j int) bool

func (ByTime) Swap

func (a ByTime) Swap(i, j int)

type ByType

type ByType []*Component

func (ByType) Len

func (a ByType) Len() int

func (ByType) Less

func (a ByType) Less(i, j int) bool

func (ByType) Swap

func (a ByType) Swap(i, j int)

type ByVersion

type ByVersion []ProcessGroupVersion

func (ByVersion) Len

func (a ByVersion) Len() int

func (ByVersion) Less

func (a ByVersion) Less(i, j int) bool

func (ByVersion) Swap

func (a ByVersion) Swap(i, j int)

type Client

type Client struct {
	// contains filtered or unexported fields
}

func Connect

func Connect(server *url.URL, certFile string, password string, ca string, insecureSkipVerify bool) (*Client, error)

func Login

func Login(server *url.URL, username string, password string, ca string, options ...interface{}) (*Client, error)

func (*Client) All

func (c *Client) All(ids []string, types NiFiType, recursive bool) ([]*Component, error)

func (*Client) AllWith

func (c *Client) AllWith(ids []string, types NiFiType, recursive bool, filter ComponentFilter) ([]*Component, error)

func (*Client) Call

func (c *Client) Call(method Method, url *url.URL, data []byte) (string, error)

func (*Client) CallAPI added in v0.0.2

func (c *Client) CallAPI(method Method, path string, data []byte, query ...string) (string, error)

func (*Client) Cluster

func (c *Client) Cluster() (string, error)

func (*Client) Delete

func (c *Client) Delete(path string, query ...string) (string, error)

func (*Client) Get

func (c *Client) Get(path string, query ...string) (string, error)

func (*Client) GetInfo

func (c *Client) GetInfo(id string) (interface{}, error)

func (*Client) GetVersionControlInfo

func (c *Client) GetVersionControlInfo(id string) (*VersionControlInfo, *Revision, error)

func (*Client) GetVersions

func (c *Client) GetVersions(registry string, bucket string, flow string) ([]ProcessGroupVersion, error)

func (*Client) Post

func (c *Client) Post(path string, data []byte, query ...string) (string, error)

func (*Client) Put

func (c *Client) Put(path string, data []byte, query ...string) (string, error)

func (*Client) Root

func (c *Client) Root() (*Component, error)

func (*Client) SetState

func (c *Client) SetState(id string, state string) (string, error)

func (*Client) SetVersion

func (c *Client) SetVersion(versionInfo *VersionControlInfo, revision *Revision, version int) (interface{}, error)

func (*Client) Tree

func (c *Client) Tree(ids []string, types NiFiType) (Tree, error)

type Component

type Component struct {
	ID         string                 `json:"id"`
	Name       string                 `json:"name"`
	Path       string                 `json:"path"`
	Type       NiFiType               `json:"-"`
	TypeName   string                 `json:"type"`
	Attributes map[string]interface{} `json:"-"`
}

func NewComponent

func NewComponent(t string, p string, o map[string]interface{}) *Component

func (*Component) String

func (c *Component) String() string

type ComponentFilter

type ComponentFilter func(*Component) bool

type FlowFile

type FlowFile struct {
	Uuid            string    `json:"uuid"`
	Filename        string    `json:"filename"`
	Size            int       `json:"size"`
	LineageDuration float64   `json:"lineageDuration"`
	QueuedDuration  float64   `json:"queuedDuration"`
	LinageStart     time.Time `json:"linageStart"`
	QueuedStart     time.Time `json:"queuedStart"`
	Penalized       bool      `json:"penalized"`
	Node            string    `json:"node"`
}

type HttpClient

type HttpClient struct {
	// contains filtered or unexported fields
}

func NewHttpCertClient added in v0.0.2

func NewHttpCertClient(file string, password string, ca string, insecureSkipVerify bool) (*HttpClient, error)

func NewHttpClient

func NewHttpClient(ca string, insecureSkipVerify bool) (*HttpClient, error)

func (*HttpClient) Do

func (c *HttpClient) Do(req *http.Request) (*http.Response, error)

func (*HttpClient) LoadPemCertificate added in v0.2.1

func (c *HttpClient) LoadPemCertificate(path string) ([]*x509.Certificate, error)

func (*HttpClient) ReadPKCS12

func (c *HttpClient) ReadPKCS12(file string, password string) (*tls.Certificate, *x509.CertPool, error)

type ListingRequest

type ListingRequest struct {
	// contains filtered or unexported fields
}

func NewListingRequest

func NewListingRequest(client Client, connection string) (*ListingRequest, error)

func (*ListingRequest) Close

func (r *ListingRequest) Close() error

func (*ListingRequest) List

func (r *ListingRequest) List() ([]FlowFile, error)

type Method

type Method string
const (
	Get     Method = http.MethodGet
	Post    Method = http.MethodPost
	Delete  Method = http.MethodDelete
	Put     Method = http.MethodPut
	Unknown Method = ""
)

type NiFiType

type NiFiType byte
const (
	ProcessGroup         NiFiType = 1 << iota
	RemoteProcessGroup   NiFiType = 1 << iota
	Processor            NiFiType = 1 << iota
	Connection           NiFiType = 1 << iota
	InputPort            NiFiType = 1 << iota
	OutputPort           NiFiType = 1 << iota
	UnknownType          NiFiType = 1 << iota
	AllTypes             NiFiType = 0xff
	AllExceptConnections NiFiType = 0xf7
)

func (NiFiType) String

func (t NiFiType) String() string

type ProcessGroupVersion

type ProcessGroupVersion struct {
	Version   float64   `json:"version"`
	Timestamp time.Time `json:"timestamp"`
	Comments  string    `json:"comments"`
}

type Revision

type Revision struct {
	ClientId string `json:"clientId"`
	Version  int    `json:"version"`
}

func NewRevision

func NewRevision(data interface{}) (*Revision, error)

type RunningStatus

type RunningStatus struct {
	Id                           string `json:"id"`
	State                        string `json:"state"`
	DisconnectedNodeAcknowledged bool   `json:"disconnectedNodeAcknowledged"`
}

type Status

type Status struct {
	User     string
	Token    string
	Cookies  map[string]string
	Expire   time.Time
	Aud      string
	Insecure bool
	Server   string
	CA       string
}

func NewStatus

func NewStatus(server *url.URL, token string, cookies []*http.Cookie, ca string, insecure bool) (*Status, error)

func (*Status) GetCookies

func (s *Status) GetCookies() []*http.Cookie

func (*Status) Load

func (s *Status) Load(file string) error

func (*Status) NewRequest

func (s *Status) NewRequest(method, url string, body io.Reader) (*http.Request, error)

func (*Status) Save

func (s *Status) Save(file string) error

type Tree

type Tree map[*Component]Tree

func (Tree) Add

func (t Tree) Add(c *Component)

func (Tree) Fprint

func (t Tree) Fprint(w io.Writer, root bool, padding string)

func (Tree) Merge

func (t Tree) Merge(tree Tree)

type UpdateRequest

type UpdateRequest struct {
	// contains filtered or unexported fields
}

func NewUpdateRequest

func NewUpdateRequest(client *Client, url *url.URL) *UpdateRequest

func (*UpdateRequest) Close

func (u *UpdateRequest) Close() error

func (*UpdateRequest) Wait

func (u *UpdateRequest) Wait() (map[string]interface{}, error)

type Version

type Version struct {
	Version   string `json:"version"`
	GitCommit string `json:"commit"`
	BuildTime string `json:"buildTime"`
	TreeState string `json:"treeState"`

	OS        string `json:"os"`
	Arch      string `json:"arch"`
	GoVersion string `json:"goVersion"`
}
var ClientVersion *Version

type VersionControlInfo

type VersionControlInfo struct {
	GroupId  string `json:"groupId" yaml:"groupId"`
	Registry string `json:"registryId"`
	Bucket   string `json:"bucketId"`
	Flow     string `json:"flowId"`
	Version  int    `json:"version"`
	State    string `json:"state"`
}

func NewVersionControlInfo

func NewVersionControlInfo(data interface{}) (*VersionControlInfo, error)

type VersionInfo

type VersionInfo struct {
	Revision                     *Revision           `json:"processGroupRevision"`
	Version                      *VersionControlInfo `json:"versionControlInformation"`
	DisconnectedNodeAcknowledged bool                `json:"disconnectedNodeAcknowledged"`
}

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL