Documentation ¶
Overview ¶
Package fwk provides a set of tools to process High Energy Physics events data. fwk is a components-based framework, a-la Gaudi, with builtin support for concurrency.
A fwk application consists of a set of components (fwk.Task) which are:
- (optionally) configured
- started
- given the chance to process each event
- stopped
Helper components (fwk.Svc) can provide additional features (such as a whiteboard/event-store service, a data-flow service, ...) but do not typically take (directly) part of the event processing.
Typically, users will implement fwk.Tasks, ie:
type MyTask struct { fwk.TaskBase } // Configure is called once, after having read the properties // from the data-cards. func (tsk *MyTask) Configure(ctx fwk.Context) error { return nil } // StartTask is called once (sequentially), just before // the main event-loop processing. func (tsk *MyTask) StartTask(ctx fwk.Context) error { return nil } // Process is called for each event, (quite) possibly concurrently. func (tsk *MyTask) Process(ctx fwk.Context) error { return nil } // StopTask is called once (sequentially), just after the // main event-loop processing finished. func (tsk *MyTask) StopTask(ctx fwk.Context) error { return nil }
A fwk application processes data and leverages concurrency at two different levels:
- event-level concurrency: multiple events are processed concurrently at any given time, during the event loop;
- task-level concurrency: during the event loop, multiple tasks are executing concurrently.
To ensure the proper self-consistency of the global processed event, components need to express their data dependencies (input(s)) as well as the data they produce (output(s)) for downstream components. This is achieved by the concept of a fwk.Port. A fwk.Port consists of a pair { Name string; Type reflect.Type } where 'Name' is the unique location in the event-store, and 'Type' the expected 'go' type of the data at that event-store location.
fwk.Ports can be either INPUT ports or OUTPUT ports. Components declare INPUT ports and OUTPUT ports during the 'Configure' stage of a fwk application, like so:
t := reflect.TypeOf([]Electron{}) err = component.DeclInPort("Electrons", t) err = component.DeclOutPort("ReScaledElectrons", t)
Then, during the event processing, one gets and puts data from/to the store like so:
func (tsk *MyTask) Process(ctx fwk.Context) error { var err error // retrieve the store associated with this event / region-of-interest store := ctx.Store() v, err := store.Get("Electrons") if err != nil { return err } eles := v.([]Electron) // type-cast to the correct (underlying) type // create output collection out := make([]Electron, 0, len(eles)) // make sure the collection will be put in the store defer func() { err = store.Put("ReScaledElectrons", out) }() // ... do some massaging with 'eles' and 'out' return err }
Index ¶
- func Error(err error) error
- func Errorf(format string, args ...interface{}) error
- func Register(t reflect.Type, fct FactoryFunc)
- func Registry() []string
- type App
- type Component
- type ComponentMgr
- type Configurer
- type Context
- type DeclPorter
- type Deleter
- type FSMStater
- type FactoryFunc
- type H1D
- type H2D
- type HID
- type Hist
- type HistSvc
- type InputStream
- type InputStreamer
- type Level
- type MsgStream
- type OutputStream
- type OutputStreamer
- type P1D
- type Port
- type PortMgr
- type PropMgr
- type Property
- type Runner
- type S2D
- type Scripter
- type Store
- type StreamControl
- type Svc
- type SvcBase
- type SvcMgr
- type Task
- type TaskBase
- func (tsk *TaskBase) DeclInPort(n string, t reflect.Type) error
- func (tsk *TaskBase) DeclOutPort(n string, t reflect.Type) error
- func (tsk *TaskBase) DeclProp(n string, ptr interface{}) error
- func (tsk *TaskBase) FSMState() fsm.State
- func (tsk *TaskBase) GetProp(n string) (interface{}, error)
- func (tsk *TaskBase) Name() string
- func (tsk *TaskBase) SetProp(n string, v interface{}) error
- func (tsk *TaskBase) Type() string
- type TaskMgr
- type WriteSyncer
Constants ¶
This section is empty.
Variables ¶
This section is empty.
Functions ¶
func Errorf ¶
Errorf formats according to a format specifier and returns the string as a value that satisfies error, together with the associated stack trace.
func Register ¶
func Register(t reflect.Type, fct FactoryFunc)
Register registers a type t with the FactoryFunc fct.
fwk.ComponentMgr will then be able to create new values of that type t using the associated FactoryFunc fct. If a type t was already registered, the previous FactoryFunc value will be silently overridden with the new FactoryFunc value.
Types ¶
type App ¶
type App interface { Component ComponentMgr SvcMgr TaskMgr PropMgr PortMgr FSMStater Runner Scripter() Scripter Msg() MsgStream }
App is the component orchestrating all the other components in a coherent application to process physics events.
type Component ¶
type Component interface { Type() string // Type of the component (ex: "go-hep.org/x/hep/fads.MomentumSmearing") Name() string // Name of the component (ex: "MyPropagator") }
Component is the interface satisfied by all values in fwk.
A component can be asked for: its Type() (ex: "go-hep.org/x/hep/fads.MomentumSmearing") its Name() (ex: "MyPropagator")
type ComponentMgr ¶
type ComponentMgr interface { Component(n string) Component HasComponent(n string) bool Components() []Component New(t, n string) (Component, error) }
ComponentMgr manages components. ComponentMgr creates and provides access to all the components in a fwk App.
type Configurer ¶
Configurer are components which can be configured via properties declared or created by the job-options.
type Context ¶
type Context interface { ID() int64 // id of this context (e.g. entry number or some kind of event number) Slot() int // slot number in the pool of event sequences Store() Store // data store corresponding to the id+slot Msg() MsgStream // messaging for this context (id+slot) Svc(n string) (Svc, error) // retrieve an already existing Svc by name }
Context is the interface to access context-local data.
type DeclPorter ¶
type DeclPorter interface { DeclInPort(name string, t reflect.Type) error DeclOutPort(name string, t reflect.Type) error }
DeclPorter is the interface to declare input/output ports for the data flow.
type FactoryFunc ¶
FactoryFunc creates a Component of type t and name n, managed by the fwk.App mgr.
type Hist ¶
type Hist interface { Name() string Value() interface{} }
Hist is a histogram, scatter or profile object that can be saved or loaded by the HistSvc.
type HistSvc ¶
type HistSvc interface { Svc // BookH1D books a 1D histogram. // name should be of the form: "/fwk/streams/<stream-name>/<path>/<histogram-name>" BookH1D(name string, nbins int, xmin, xmax float64) (H1D, error) // BookH2D books a 2D histogram. // name should be of the form: "/fwk/streams/<stream-name>/<path>/<histogram-name>" BookH2D(name string, nx int, xmin, xmax float64, ny int, ymin, ymax float64) (H2D, error) // BookP1D books a 1D profile. // name should be of the form: "/fwk/streams/<stream-name>/<path>/<profile-name>" BookP1D(name string, nbins int, xmin, xmax float64) (P1D, error) // BookS2D books a 2D scatter. // name should be of the form: "/fwk/streams/<stream-name>/<path>/<scatter-name>" BookS2D(name string) (S2D, error) // FillH1D fills the 1D-histogram id with data x and weight w. FillH1D(id HID, x, w float64) // FillH2D fills the 2D-histogram id with data (x,y) and weight w. FillH2D(id HID, x, y, w float64) // FillP1D fills the 1D-profile id with data (x,y) and weight w. FillP1D(id HID, x, y, w float64) // FillS2D fills the 2D-scatter id with data (x,y). FillS2D(id HID, x, y float64) }
HistSvc is the interface providing access to histograms
type InputStream ¶
type InputStream struct { TaskBase // contains filtered or unexported fields }
InputStream implements a task reading data from an InputStreamer.
InputStream is concurrent-safe.
InputStream declares a property 'Ports', a []fwk.Port, which will be used to declare the output ports the streamer will publish, loading in data from the underlying InputStreamer.
InputStream declares a property 'Streamer', a fwk.InputStreamer, which will be used to actually read data from.
func (*InputStream) Configure ¶
func (tsk *InputStream) Configure(ctx Context) error
Configure declares the output ports defined by the 'Ports' property.
func (*InputStream) Process ¶
func (tsk *InputStream) Process(ctx Context) error
Process loads data from the underlying InputStreamer and puts it in the event store.
func (*InputStream) StartTask ¶
func (tsk *InputStream) StartTask(ctx Context) error
StartTask starts the InputStreamer task
func (*InputStream) StopTask ¶
func (tsk *InputStream) StopTask(ctx Context) error
StopTask stops the InputStreamer task
type InputStreamer ¶
type InputStreamer interface { // Connect connects the InputStreamer to the underlying io.Reader, // and configure it to only read-in the data specified in ports. Connect(ports []Port) error // Read reads the data from the underlying io.Reader // and puts it in the store associated with the fwk.Context ctx Read(ctx Context) error // Disconnect disconnects the InputStreamer from the underlying io.Reader, // possibly computing some statistics data. // It does not (and can not) close the underlying io.Reader. Disconnect() error }
InputStreamer reads data from the underlying io.Reader and puts it into fwk's Context
type Level ¶
type Level int
Level regulates the verbosity level of a component.
type MsgStream ¶
type MsgStream interface { Debugf(format string, a ...interface{}) (int, error) Infof(format string, a ...interface{}) (int, error) Warnf(format string, a ...interface{}) (int, error) Errorf(format string, a ...interface{}) (int, error) Msg(lvl Level, format string, a ...interface{}) (int, error) }
MsgStream provides access to verbosity-defined formated messages, a la fmt.Printf.
func NewMsgStream ¶
func NewMsgStream(name string, lvl Level, w WriteSyncer) MsgStream
NewMsgStream creates a new MsgStream value with name name and minimum verbosity level lvl. This MsgStream will print messages into w.
type OutputStream ¶
type OutputStream struct { TaskBase // contains filtered or unexported fields }
OutputStream implements a task writing data to an OutputStreamer.
OutputStream is concurrent-safe.
OutputStream declares a property 'Ports', a []fwk.Port, which will be used to declare the input ports the task will access to, writing out data via the underlying OutputStreamer.
OutputStream declares a property 'Streamer', a fwk.OutputStreamer, which will be used to actually write data to.
func (*OutputStream) Configure ¶
func (tsk *OutputStream) Configure(ctx Context) error
Configure declares the input ports defined by the 'Ports' property.
func (*OutputStream) Process ¶
func (tsk *OutputStream) Process(ctx Context) error
Process gets data from the store and writes it out via the underlying OutputStreamer
func (*OutputStream) StartTask ¶
func (tsk *OutputStream) StartTask(ctx Context) error
StartTask starts the OutputStreamer task
func (*OutputStream) StopTask ¶
func (tsk *OutputStream) StopTask(ctx Context) error
StopTask stops the OutputStreamer task
type OutputStreamer ¶
type OutputStreamer interface { // Connect connects the OutputStreamer to the underlying io.Writer, // and configure it to only write-out the data specified in ports. Connect(ports []Port) error // Write gets the data from the store associated with the fwk.Context ctx // and writes it to the underlying io.Writer Write(ctx Context) error // Disconnect disconnects the OutputStreamer from the underlying io.Writer, // possibly computing some statistics data. // It does not (and can not) close the underlying io.Writer. Disconnect() error }
OutputStreamer gets data from the Context and writes it to the underlying io.Writer
type PortMgr ¶
type PortMgr interface { DeclInPort(c Component, name string, t reflect.Type) error DeclOutPort(c Component, name string, t reflect.Type) error }
PortMgr is the interface to manage input/output ports for the data flow
type PropMgr ¶
type PropMgr interface { DeclProp(c Component, name string, ptr interface{}) error SetProp(c Component, name string, value interface{}) error GetProp(c Component, name string) (interface{}, error) HasProp(c Component, name string) bool }
PropMgr manages properties attached to components.
type Property ¶
type Property interface { DeclProp(name string, ptr interface{}) error SetProp(name string, value interface{}) error GetProp(name string) (interface{}, error) }
Property is a pair key/value, associated to a component. Properties of a given component can be modified by a job-option or by other components.
type Runner ¶
type Runner interface {
Run() error
}
Runner runs a fwk App in a batch fashion:
- Configure
- Start
- Run event loop
- Stop
- Shutdown
type Scripter ¶
type Scripter interface { Configure() error Start() error Run(evtmax int64) error Stop() error Shutdown() error }
Scripter gives finer control to running a fwk App
type Store ¶
type Store interface { Get(key string) (interface{}, error) Put(key string, value interface{}) error Has(key string) bool }
Store provides access to a concurrent-safe map[string]interface{} store.
type StreamControl ¶
type StreamControl struct { Ports []Port // list of ports streamers will read-from or write-to Ctx chan Context // contexts to read-from or write-to Err chan error // errors encountered during reading-from or writing-to Quit chan struct{} // closed to signify in/out-streamers should stop reading-from/writing-to }
StreamControl provides concurrency-safe control to input and output streamers.
type Svc ¶
Svc is a component providing services or helper features. Services are started before the main event loop processing and stopped just after.
type SvcBase ¶
type SvcBase struct {
// contains filtered or unexported fields
}
SvcBase provides a base implementation for fwk.Svc
func (*SvcBase) DeclProp ¶
DeclProp declares this service has a property named n, and takes a pointer to the associated value.
type SvcMgr ¶
type SvcMgr interface { AddSvc(svc Svc) error DelSvc(svc Svc) error HasSvc(n string) bool GetSvc(n string) Svc Svcs() []Svc }
SvcMgr manages services.
type Task ¶
type Task interface { Component StartTask(ctx Context) error Process(ctx Context) error StopTask(ctx Context) error }
Task is a component processing event-level data. Task.Process is called for every component and for every input event.
type TaskBase ¶
type TaskBase struct {
// contains filtered or unexported fields
}
TaskBase provides a base implementation for fwk.Task
func NewTask ¶
NewTask creates a new TaskBase of type typ and name name, managed by the fwk.App mgr.
func (*TaskBase) DeclInPort ¶
DeclInPort declares this task has an input Port with name n and type t.
func (*TaskBase) DeclOutPort ¶
DeclOutPort declares this task has an output Port with name n and type t.
func (*TaskBase) DeclProp ¶
DeclProp declares this task has a property named n, and takes a pointer to the associated value.
type TaskMgr ¶
type TaskMgr interface { AddTask(tsk Task) error DelTask(tsk Task) error HasTask(n string) bool GetTask(n string) Task Tasks() []Task }
TaskMgr manages tasks.
type WriteSyncer ¶
WriteSyncer is an io.Writer which can be sync'ed/flushed.
Source Files ¶
Directories ¶
Path | Synopsis |
---|---|
cmd
|
|
examples
|
|
utils
|
|
builder
package builder builds a fwk-app binary from a list of go files.
|
package builder builds a fwk-app binary from a list of go files. |
parallel
The parallel package provides a way of running functions concurrently while limiting the maximum number running at once.
|
The parallel package provides a way of running functions concurrently while limiting the maximum number running at once. |
tarjan
package tarjan implements a graph loop detection algorithm called Tarjan's algorithm.
|
package tarjan implements a graph loop detection algorithm called Tarjan's algorithm. |