Back to godoc.org

Package fwk

v0.27.0
Latest Go to latest
Published: May 20, 2020 | License: BSD-3-Clause | Module: go-hep.org/x/hep

Overview

Package fwk provides a set of tools to process High Energy Physics events data. fwk is a components-based framework, a-la Gaudi, with builtin support for concurrency.

A fwk application consists of a set of components (fwk.Task) which are:

- (optionally) configured
- started
- given the chance to process each event
- stopped

Helper components (fwk.Svc) can provide additional features (such as a whiteboard/event-store service, a data-flow service, ...) but do not typically take (directly) part of the event processing.

Typically, users will implement fwk.Tasks, ie:

type MyTask struct {
  fwk.TaskBase
}

// Configure is called once, after having read the properties
// from the data-cards.
func (tsk *MyTask) Configure(ctx fwk.Context) error { return nil }

// StartTask is called once (sequentially), just before
// the main event-loop processing.
func (tsk *MyTask) StartTask(ctx fwk.Context) error { return nil }

// Process is called for each event, (quite) possibly concurrently.
func (tsk *MyTask) Process(ctx fwk.Context)   error { return nil }

// StopTask is called once (sequentially), just after the
// main event-loop processing finished.
func (tsk *MyTask) StopTask(ctx fwk.Context)  error { return nil }

A fwk application processes data and leverages concurrency at two different levels:

- event-level concurrency: multiple events are processed concurrently
  at any given time, during the event loop;
- task-level concurrency: during the event loop, multiple tasks are
  executing concurrently.

To ensure the proper self-consistency of the global processed event, components need to express their data dependencies (input(s)) as well as the data they produce (output(s)) for downstream components. This is achieved by the concept of a fwk.Port. A fwk.Port consists of a pair { Name string; Type reflect.Type } where 'Name' is the unique location in the event-store, and 'Type' the expected 'go' type of the data at that event-store location.

fwk.Ports can be either INPUT ports or OUTPUT ports. Components declare INPUT ports and OUTPUT ports during the 'Configure' stage of a fwk application, like so:

t := reflect.TypeOf([]Electron{})
err = component.DeclInPort("Electrons", t)
err = component.DeclOutPort("ReScaledElectrons", t)

Then, during the event processing, one gets and puts data from/to the store like so:

func (tsk *MyTask) Process(ctx fwk.Context) error {
   var err error

   // retrieve the store associated with this event / region-of-interest
   store := ctx.Store()

   v, err := store.Get("Electrons")
   if err != nil {
      return err
   }
   eles := v.([]Electron) // type-cast to the correct (underlying) type

   // create output collection
   out := make([]Electron, 0, len(eles))

   // make sure the collection will be put in the store
   defer func() {
      err = store.Put("ReScaledElectrons", out)
   }()

   // ... do some massaging with 'eles' and 'out'

   return err
}

Index

Package Files

func Register

func Register(t reflect.Type, fct FactoryFunc)

Register registers a type t with the FactoryFunc fct.

fwk.ComponentMgr will then be able to create new values of that type t using the associated FactoryFunc fct. If a type t was already registered, the previous FactoryFunc value will be silently overridden with the new FactoryFunc value.

func Registry

func Registry() []string

Registry returns the list of all registered and known components.

type App

type App interface {
	Component
	ComponentMgr
	SvcMgr
	TaskMgr
	PropMgr
	PortMgr

	FSMStater

	Runner
	Scripter() Scripter

	Msg() MsgStream
}

App is the component orchestrating all the other components in a coherent application to process physics events.

func NewApp

func NewApp() App

NewApp creates a (default) fwk application with (default and) sensible options.

type Component

type Component interface {
	Type() string // Type of the component (ex: "go-hep.org/x/hep/fads.MomentumSmearing")
	Name() string // Name of the component (ex: "MyPropagator")
}

Component is the interface satisfied by all values in fwk.

A component can be asked for: its Type() (ex: "go-hep.org/x/hep/fads.MomentumSmearing") its Name() (ex: "MyPropagator")

type ComponentMgr

type ComponentMgr interface {
	Component(n string) Component
	HasComponent(n string) bool
	Components() []Component
	New(t, n string) (Component, error)
}

ComponentMgr manages components. ComponentMgr creates and provides access to all the components in a fwk App.

type Configurer

type Configurer interface {
	Component
	Configure(ctx Context) error
}

Configurer are components which can be configured via properties declared or created by the job-options.

type Context

type Context interface {
	ID() int64      // id of this context (e.g. entry number or some kind of event number)
	Slot() int      // slot number in the pool of event sequences
	Store() Store   // data store corresponding to the id+slot
	Msg() MsgStream // messaging for this context (id+slot)

	Svc(n string) (Svc, error) // retrieve an already existing Svc by name
}

Context is the interface to access context-local data.

type DeclPorter

type DeclPorter interface {
	DeclInPort(name string, t reflect.Type) error
	DeclOutPort(name string, t reflect.Type) error
}

DeclPorter is the interface to declare input/output ports for the data flow.

type Deleter

type Deleter interface {
	Delete() error
}

Deleter prepares values to be GC-reclaimed

type FSMStater

type FSMStater interface {
	FSMState() fsm.State
}

FSMStater is the interface used to query the current state of the fwk application

type FactoryFunc

type FactoryFunc func(t, n string, mgr App) (Component, error)

FactoryFunc creates a Component of type t and name n, managed by the fwk.App mgr.

type H1D

type H1D struct {
	ID   HID // unique id
	Hist *hbook.H1D
}

H1D wraps a hbook.H1D for safe concurrent access

func (H1D) Name

func (h H1D) Name() string

func (H1D) Value

func (h H1D) Value() interface{}

type H2D

type H2D struct {
	ID   HID // unique id
	Hist *hbook.H2D
}

H2D wraps a hbook.H2D for safe concurrent access

func (H2D) Name

func (h H2D) Name() string

func (H2D) Value

func (h H2D) Value() interface{}

type HID

type HID string

HID is a histogram, scatter or profile identifier

type Hist

type Hist interface {
	Name() string
	Value() interface{}
}

Hist is a histogram, scatter or profile object that can be saved or loaded by the HistSvc.

type HistSvc

type HistSvc interface {
	Svc

	// BookH1D books a 1D histogram.
	// name should be of the form: "/fwk/streams/<stream-name>/<path>/<histogram-name>"
	BookH1D(name string, nbins int, xmin, xmax float64) (H1D, error)

	// BookH2D books a 2D histogram.
	// name should be of the form: "/fwk/streams/<stream-name>/<path>/<histogram-name>"
	BookH2D(name string, nx int, xmin, xmax float64, ny int, ymin, ymax float64) (H2D, error)

	// BookP1D books a 1D profile.
	// name should be of the form: "/fwk/streams/<stream-name>/<path>/<profile-name>"
	BookP1D(name string, nbins int, xmin, xmax float64) (P1D, error)

	// BookS2D books a 2D scatter.
	// name should be of the form: "/fwk/streams/<stream-name>/<path>/<scatter-name>"
	BookS2D(name string) (S2D, error)

	// FillH1D fills the 1D-histogram id with data x and weight w.
	FillH1D(id HID, x, w float64)

	// FillH2D fills the 2D-histogram id with data (x,y) and weight w.
	FillH2D(id HID, x, y, w float64)

	// FillP1D fills the 1D-profile id with data (x,y) and weight w.
	FillP1D(id HID, x, y, w float64)

	// FillS2D fills the 2D-scatter id with data (x,y).
	FillS2D(id HID, x, y float64)
}

HistSvc is the interface providing access to histograms

type InputStream

type InputStream struct {
	TaskBase
	// contains filtered or unexported fields
}

InputStream implements a task reading data from an InputStreamer.

InputStream is concurrent-safe.

InputStream declares a property 'Ports', a []fwk.Port, which will be used to declare the output ports the streamer will publish, loading in data from the underlying InputStreamer.

InputStream declares a property 'Streamer', a fwk.InputStreamer, which will be used to actually read data from.

func (*InputStream) Configure

func (tsk *InputStream) Configure(ctx Context) error

Configure declares the output ports defined by the 'Ports' property.

func (*InputStream) Process

func (tsk *InputStream) Process(ctx Context) error

Process loads data from the underlying InputStreamer and puts it in the event store.

func (*InputStream) StartTask

func (tsk *InputStream) StartTask(ctx Context) error

StartTask starts the InputStreamer task

func (*InputStream) StopTask

func (tsk *InputStream) StopTask(ctx Context) error

StopTask stops the InputStreamer task

type InputStreamer

type InputStreamer interface {

	// Connect connects the InputStreamer to the underlying io.Reader,
	// and configure it to only read-in the data specified in ports.
	Connect(ports []Port) error

	// Read reads the data from the underlying io.Reader
	// and puts it in the store associated with the fwk.Context ctx
	Read(ctx Context) error

	// Disconnect disconnects the InputStreamer from the underlying io.Reader,
	// possibly computing some statistics data.
	// It does not (and can not) close the underlying io.Reader.
	Disconnect() error
}

InputStreamer reads data from the underlying io.Reader and puts it into fwk's Context

type Level

type Level int

Level regulates the verbosity level of a component.

const (
	LvlDebug   Level = -10 // LvlDebug defines the DBG verbosity level
	LvlInfo    Level = 0   // LvlInfo defines the INFO verbosity level
	LvlWarning Level = 10  // LvlWarning defines the WARN verbosity level
	LvlError   Level = 20  // LvlError defines the ERR verbosity level
)

Default verbosity levels.

func (Level) String

func (lvl Level) String() string

String prints the human-readable representation of a Level value.

type MsgStream

type MsgStream interface {
	Debugf(format string, a ...interface{})
	Infof(format string, a ...interface{})
	Warnf(format string, a ...interface{})
	Errorf(format string, a ...interface{})

	Msg(lvl Level, format string, a ...interface{})
}

MsgStream provides access to verbosity-defined formated messages, a la fmt.Printf.

func NewMsgStream

func NewMsgStream(name string, lvl Level, w WriteSyncer) MsgStream

NewMsgStream creates a new MsgStream value with name name and minimum verbosity level lvl. This MsgStream will print messages into w.

type OutputStream

type OutputStream struct {
	TaskBase
	// contains filtered or unexported fields
}

OutputStream implements a task writing data to an OutputStreamer.

OutputStream is concurrent-safe.

OutputStream declares a property 'Ports', a []fwk.Port, which will be used to declare the input ports the task will access to, writing out data via the underlying OutputStreamer.

OutputStream declares a property 'Streamer', a fwk.OutputStreamer, which will be used to actually write data to.

func (*OutputStream) Configure

func (tsk *OutputStream) Configure(ctx Context) error

Configure declares the input ports defined by the 'Ports' property.

func (*OutputStream) Process

func (tsk *OutputStream) Process(ctx Context) error

Process gets data from the store and writes it out via the underlying OutputStreamer

func (*OutputStream) StartTask

func (tsk *OutputStream) StartTask(ctx Context) error

StartTask starts the OutputStreamer task

func (*OutputStream) StopTask

func (tsk *OutputStream) StopTask(ctx Context) error

StopTask stops the OutputStreamer task

type OutputStreamer

type OutputStreamer interface {

	// Connect connects the OutputStreamer to the underlying io.Writer,
	// and configure it to only write-out the data specified in ports.
	Connect(ports []Port) error

	// Write gets the data from the store associated with the fwk.Context ctx
	// and writes it to the underlying io.Writer
	Write(ctx Context) error

	// Disconnect disconnects the OutputStreamer from the underlying io.Writer,
	// possibly computing some statistics data.
	// It does not (and can not) close the underlying io.Writer.
	Disconnect() error
}

OutputStreamer gets data from the Context and writes it to the underlying io.Writer

type P1D

type P1D struct {
	ID      HID // unique id
	Profile *hbook.P1D
}

P1D wraps a hbook.P1D for safe concurrent access

func (P1D) Name

func (p P1D) Name() string

func (P1D) Value

func (p P1D) Value() interface{}

type Port

type Port struct {
	Name string
	Type reflect.Type
}

Port holds the name and type of a data item in a store

type PortMgr

type PortMgr interface {
	DeclInPort(c Component, name string, t reflect.Type) error
	DeclOutPort(c Component, name string, t reflect.Type) error
}

PortMgr is the interface to manage input/output ports for the data flow

type PropMgr

type PropMgr interface {
	DeclProp(c Component, name string, ptr interface{}) error
	SetProp(c Component, name string, value interface{}) error
	GetProp(c Component, name string) (interface{}, error)
	HasProp(c Component, name string) bool
}

PropMgr manages properties attached to components.

type Property

type Property interface {
	DeclProp(name string, ptr interface{}) error
	SetProp(name string, value interface{}) error
	GetProp(name string) (interface{}, error)
}

Property is a pair key/value, associated to a component. Properties of a given component can be modified by a job-option or by other components.

type Runner

type Runner interface {
	Run() error
}

Runner runs a fwk App in a batch fashion:

- Configure
- Start
- Run event loop
- Stop
- Shutdown

type S2D

type S2D struct {
	ID      HID // unique id
	Scatter *hbook.S2D
}

S2D wraps a hbook.S2D for safe concurrent access

func (S2D) Name

func (s S2D) Name() string

func (S2D) Value

func (s S2D) Value() interface{}

type Scripter

type Scripter interface {
	Configure() error
	Start() error
	Run(evtmax int64) error
	Stop() error
	Shutdown() error
}

Scripter gives finer control to running a fwk App

type Store

type Store interface {
	Get(key string) (interface{}, error)
	Put(key string, value interface{}) error
	Has(key string) bool
}

Store provides access to a concurrent-safe map[string]interface{} store.

type StreamControl

type StreamControl struct {
	Ports []Port        // list of ports streamers will read-from or write-to
	Ctx   chan Context  // contexts to read-from or write-to
	Err   chan error    // errors encountered during reading-from or writing-to
	Quit  chan struct{} // closed to signify in/out-streamers should stop reading-from/writing-to
}

StreamControl provides concurrency-safe control to input and output streamers.

type Svc

type Svc interface {
	Component

	StartSvc(ctx Context) error
	StopSvc(ctx Context) error
}

Svc is a component providing services or helper features. Services are started before the main event loop processing and stopped just after.

type SvcBase

type SvcBase struct {
	// contains filtered or unexported fields
}

SvcBase provides a base implementation for fwk.Svc

func NewSvc

func NewSvc(typ, name string, mgr App) SvcBase

NewSvc creates a new SvcBase of type typ and name name, managed by the fwk.App mgr.

func (*SvcBase) DeclProp

func (svc *SvcBase) DeclProp(n string, ptr interface{}) error

DeclProp declares this service has a property named n, and takes a pointer to the associated value.

func (*SvcBase) FSMState

func (svc *SvcBase) FSMState() fsm.State

FSMState returns the current state of the FSM

func (*SvcBase) GetProp

func (svc *SvcBase) GetProp(name string) (interface{}, error)

GetProp returns the value of the property named n.

func (*SvcBase) Name

func (svc *SvcBase) Name() string

Name returns the name of the underlying service. e.g. "my-service"

func (*SvcBase) SetProp

func (svc *SvcBase) SetProp(name string, value interface{}) error

SetProp sets the property name n with the value v.

func (*SvcBase) Type

func (svc *SvcBase) Type() string

Type returns the fully qualified type of the underlying service. e.g. "go-hep.org/x/hep/fwk/testdata.svc1"

type SvcMgr

type SvcMgr interface {
	AddSvc(svc Svc) error
	DelSvc(svc Svc) error
	HasSvc(n string) bool
	GetSvc(n string) Svc
	Svcs() []Svc
}

SvcMgr manages services.

type Task

type Task interface {
	Component

	StartTask(ctx Context) error
	Process(ctx Context) error
	StopTask(ctx Context) error
}

Task is a component processing event-level data. Task.Process is called for every component and for every input event.

type TaskBase

type TaskBase struct {
	// contains filtered or unexported fields
}

TaskBase provides a base implementation for fwk.Task

func NewTask

func NewTask(typ, name string, mgr App) TaskBase

NewTask creates a new TaskBase of type typ and name name, managed by the fwk.App mgr.

func (*TaskBase) DeclInPort

func (tsk *TaskBase) DeclInPort(n string, t reflect.Type) error

DeclInPort declares this task has an input Port with name n and type t.

func (*TaskBase) DeclOutPort

func (tsk *TaskBase) DeclOutPort(n string, t reflect.Type) error

DeclOutPort declares this task has an output Port with name n and type t.

func (*TaskBase) DeclProp

func (tsk *TaskBase) DeclProp(n string, ptr interface{}) error

DeclProp declares this task has a property named n, and takes a pointer to the associated value.

func (*TaskBase) FSMState

func (tsk *TaskBase) FSMState() fsm.State

FSMState returns the current state of the FSM

func (*TaskBase) GetProp

func (tsk *TaskBase) GetProp(n string) (interface{}, error)

GetProp returns the value of the property named n.

func (*TaskBase) Name

func (tsk *TaskBase) Name() string

Name returns the name of the underlying task. e.g. "my-task"

func (*TaskBase) SetProp

func (tsk *TaskBase) SetProp(n string, v interface{}) error

SetProp sets the property name n with the value v.

func (*TaskBase) Type

func (tsk *TaskBase) Type() string

Type returns the fully qualified type of the underlying task. e.g. "go-hep.org/x/hep/fwk/testdata.task1"

type TaskMgr

type TaskMgr interface {
	AddTask(tsk Task) error
	DelTask(tsk Task) error
	HasTask(n string) bool
	GetTask(n string) Task
	Tasks() []Task
}

TaskMgr manages tasks.

type WriteSyncer

type WriteSyncer interface {
	io.Writer
	Sync() error
}

WriteSyncer is an io.Writer which can be sync'ed/flushed.

Documentation was rendered with GOOS=linux and GOARCH=amd64.

Jump to identifier

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to identifier