hatchery

package module
v0.0.2-0...-6da4d8c Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 21, 2025 License: Apache-2.0 Imports: 12 Imported by: 5

README

hatchery

A code-based audit log collector for SaaS services

overview

Motivation

Many SaaS services offer APIs for accessing data and logs, but managing them can be challenging due to various reasons:

  • Audit logs are often set to expire after a few months.
  • The built-in log search console provided by the service is not user-friendly and lacks centralized functionality for searching and analysis.

As a result, security administrators are required to gather logs from multiple services and store them in object storage for long-term retention and analysis. However, this process is complicated by the fact that each service has its own APIs and data formats, making it difficult to implement and maintain a tool to gather logs.

hatchery is a solution designed to address these challenges by collecting data and logs from SaaS services and storing them in object storage. This facilitates log retention and prepares the data for analysis by security administrators.

How it works

design overview

hatchery is not a tool, but SDK. You can build your own binary with hatchery SDK and run it on your environment. You can define the source and destination of the data you want to collect, and hatchery will handle the data collection and storage for you.

In hatchery, the data collection and storage pipeline is called a "stream". A stream consists of a source and a destination. The source is the data provider (e.g., Slack, 1Password, Falcon Data Replicator), and the destination is the data storage (e.g., Google Cloud Storage, Amazon S3). You can define multiple streams and run them in parallel.

A stream has also an ID and tags. The ID is a unique identifier for the stream, and the tags are used to categorize the streams. You can use these identifiers to run specific streams or filter them by tags.

Here is an example of how to define streams according to the above design image.

streams := []*hatchery.Stream{
	hatchery.NewStream(
		// Source: Slack Audit API
		slack.New(secret.NewString(os.Getenv("SLACK_TOKEN"))),
		// Destination: Google Cloud Storage
		gcs.New("mizutani-test"),

		// Identifiers
		hatchery.WithID("slack-to-gcs"),
		hatchery.WithTags("hourly"),
	),

	hatchery.NewStream(
		// Source: 1Password
		one_password.New(secret.NewString(os.Getenv("ONE_PASSWORD_TOKEN"))),
		// Destination: Amazon S3
		s3.New("ap-northeast1", "mizutani-test"),

		// Identifiers
		hatchery.WithID("1pw-to-s3"),
		hatchery.WithTags("daily"),
	),
}

You can run hatchery with the streams you defined. The following example shows how to run hatchery as Command Line Tool. It handles the command line arguments and runs the streams you specified.

if err := hatchery.New(streams).CLI(os.Args); err != nil {
	panic(err)
}
$ go build -o myhatchery main.go
$ ./myhatchery -i slack-to-gcs # Run the stream with ID "slack-to-gcs"
$ ./myhatchery -t hourly       # Run the streams with tag "hourly"

Documentation

License

Apache License 2.0

Documentation

Index

Constants

This section is empty.

Variables

View Source
var (
	ErrStreamConflicted = errors.New("stream id conflicted")
	ErrNoStreamFound    = errors.New("no stream found")
	ErrInvalidStream    = errors.New("invalid stream")
)

Functions

This section is empty.

Types

type Destination

type Destination func(ctx context.Context, md metadata.MetaData) (io.WriteCloser, error)

Destination is an interface that writes data to data storage, messaging queue or something like that.

type Hatchery

type Hatchery struct {
	// contains filtered or unexported fields
}

Hatchery is a main manager of this tool.

func New

func New(streams []*Stream, opts ...Option) *Hatchery

New creates a new Hatchery instance.

func (*Hatchery) CLI

func (h *Hatchery) CLI(argv []string) error

func (*Hatchery) Run

func (h *Hatchery) Run(ctx context.Context, selectors ...Selector) error

type Option

type Option func(*Hatchery)

func WithLogger

func WithLogger(logger *slog.Logger) Option

WithLogger is an option to set a logger to the hatchery. The logger is used to log messages from the hatchery. This option is prioritized over other settings (e.g. CLI option)

type Pipe

type Pipe struct {
	// contains filtered or unexported fields
}

Pipe is a struct that contains a destination. It is middle layer between source and destination and source function receives the Pipe object as the argument. This has a Spout method that outputs the data from the source to the destination.

Example:

func someSource(ctx context.Context, p *hatchery.Pipe) error {
  var r io.ReadCloser
  //
  // Load data from somewhere	to r
  //
  defer r.Close()
  return p.Spout(ctx, r, metadata.MetaData{})
}

func NewPipe

func NewPipe(dst Destination) *Pipe

NewPipe creates a new Pipe object with the destination. It is for testing.

func (*Pipe) Spout

func (p *Pipe) Spout(ctx context.Context, src io.Reader, md metadata.MetaData) error

Spout outputs the data from the source to the destination.

type Selector

type Selector func(*Stream) bool

func SelectAll

func SelectAll() Selector

func SelectByID

func SelectByID(ids ...string) Selector

func SelectByTag

func SelectByTag(tags ...string) Selector

type Source

type Source func(ctx context.Context, p *Pipe) error

Source is an interface that loads data from a source to a destination. The function loads data from a source to a destination. It should be called periodically to get data from the source. The interval of calling Load depends on command execution of hatchery.

type Stream

type Stream struct {
	// contains filtered or unexported fields
}

Stream is a pipeline of data processing.

func NewStream

func NewStream(src Source, dst Destination, options ...StreamOption) *Stream

NewStream creates a new Stream object with source and destination. It can be customized by options.

func (*Stream) Run

func (x *Stream) Run(ctx context.Context) error

Run executes the stream, which invokes Source.Load and saves data via Destination.

func (*Stream) Validate

func (x *Stream) Validate() error

Validate checks the stream is valid or not.

type StreamOption

type StreamOption func(*Stream)

func WithID

func WithID(id string) StreamOption

WithID is an option to set ID to the stream. If not set, it will be generated by UUID.

func WithTags

func WithTags(tags ...string) StreamOption

WithTags is an option to set tags to the stream.

type Streams

type Streams []*Stream

func (Streams) Validate

func (s Streams) Validate() error

Directories

Path Synopsis
destination
gcs
s3
example
cli Module
extend Module
lambda Module
readme Module
tag Module
pkg
source

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL