pipe

package
v0.0.0-...-c5fb9eb Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 27, 2021 License: AGPL-3.0 Imports: 8 Imported by: 0

Documentation

Index

Constants

View Source
const TickerInterval = time.Second * 10

Variables

This section is empty.

Functions

This section is empty.

Types

type Event

type Event struct {
	Time   time.Time
	Link   string
	Tag    string
	Cookie string
	UA     string
	IP     string
}

func NewEvent

func NewEvent(link, tag, cookie, ua, ip string) Event

type Pipe

type Pipe struct {
	Streams []*Stream
	Task
	Queue
	// contains filtered or unexported fields
}

func New

func New(conf *config.Config, db *pgxpool.Pool) *Pipe

func (*Pipe) Close

func (p *Pipe) Close()

func (*Pipe) Push

func (p *Pipe) Push(item interface{})

func (*Pipe) Start

func (p *Pipe) Start() *Pipe

func (*Pipe) Wait

func (p *Pipe) Wait() *Pipe

type Queue

type Queue chan chan interface{}

type Stream

type Stream struct {
	Task
	Queue
	Quit chan struct{}

	Batch *pgx.Batch
	// contains filtered or unexported fields
}

func NewStream

func NewStream(queue Queue, db *pgxpool.Pool, batchSize int) *Stream

func (*Stream) Add

func (s *Stream) Add(item interface{})

func (*Stream) Ingest

func (s *Stream) Ingest()

func (*Stream) Start

func (s *Stream) Start()

func (*Stream) Stop

func (s *Stream) Stop()

type Task

type Task chan interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL