Documentation
¶
Overview ¶
Package recorder contains logic to record data into a database. Any objects that implements the DataRecorder interface can be used in this system.
Recorders should ping their endpoint upon creation to make sure they can access. Otherwise they should return an error indicating they cannot start.
When the context is cancelled, the recorder should finish its job and return. The Time is used by the Engine for changing the index name. It is useful for cleaning up the old data.
Index ¶
- Constants
- Variables
- func TestRecorderConstruction(t *testing.T, setup setupFunc)
- func TestRecorderEssentials(t *testing.T, ...)
- type DataRecorder
- type ErrEndpointNotAvailable
- type ErrInvalidEndpoint
- type ErrLowBackoffValue
- type ErrParseTimeOut
- type MockConfig
- func (m *MockConfig) Backoff() int
- func (m *MockConfig) Endpoint() string
- func (m *MockConfig) IndexName() string
- func (m *MockConfig) Logger() logrus.FieldLogger
- func (m *MockConfig) Name() string
- func (m *MockConfig) NewInstance(ctx context.Context, payloadChan chan *RecordJob, ...) (DataRecorder, error)
- func (m *MockConfig) RoutePath() string
- func (m *MockConfig) Timeout() time.Duration
- type RecordJob
- type SimpleRecorder
- func (s *SimpleRecorder) Error() error
- func (s *SimpleRecorder) IndexName() string
- func (s *SimpleRecorder) Name() string
- func (s *SimpleRecorder) PayloadChan() chan *RecordJob
- func (s *SimpleRecorder) Start(ctx context.Context, stop communication.StopChannel)
- func (s *SimpleRecorder) Timeout() time.Duration
Examples ¶
Constants ¶
const ( // RecorderReceivesPayloadTestCase is for invoking TestRecorderReceivesPayload test RecorderReceivesPayloadTestCase = iota // RecorderSendsResultTestCase is for invoking TestRecorderSendsResult test RecorderSendsResultTestCase // RecorderClosesTestCase is for invoking TestRecorderCloses test RecorderClosesTestCase RecorderErrorsOnUnavailableEndpointTestCase )
Variables ¶
var ( // ErrEmptyName is the error when the package name is empty. ErrEmptyName = fmt.Errorf("name cannot be empty") // ErrEmptyEndpoint is the error when the given endpoint is empty. ErrEmptyEndpoint = fmt.Errorf("endpoint cannot be empty") // ErrEmptyIndexName is the error when the index_name is an empty string. ErrEmptyIndexName = fmt.Errorf("index_name cannot be empty") )
Functions ¶
func TestRecorderConstruction ¶ added in v0.5.0
TestRecorderConstruction runs all essential tests on object construction.
func TestRecorderEssentials ¶ added in v0.5.0
func TestRecorderEssentials(t *testing.T, setup func(testCase int) (ctx context.Context, rec DataRecorder, err error, errorChan chan communication.ErrorMessage, teardown func()))
TestRecorderEssentials runs all essential tests. The only case the error is needed is to check the endpoint on start up.
Types ¶
type DataRecorder ¶
type DataRecorder interface { // Timeout is required by the Engine so it can read the time-outs. Timeout() time.Duration // The Engine provides this channel and sends the payload through this channel. // Recorder should not block when RecordJob is sent to this channel. PayloadChan() chan *RecordJob // The recorder's loop should be inside a goroutine. // This channel should be closed once the worker receives a stop signal // and its work is finished. The response to the stop signal should happen // otherwise it will hang the Engine around. // When the context is timed-out or cancelled, the recorder should return. Start(ctx context.Context, stop communication.StopChannel) // Name should return the representation string for this recorder. // Choose a very simple and unique name. Name() string // IndexName comes from the configuration, but the engine takes over. // Recorders should not intercept the engine for its decision, unless they have a // valid reason. // The engine might add a date to this index name if the user has specified in the // configuration file. IndexName() string }
DataRecorder in an interface for shipping data to a repository. The repository should have the concept of index/database and type/table abstractions. See ElasticSearch for more information. Recorder should send the error error channel if any error occurs.
type ErrEndpointNotAvailable ¶ added in v0.5.0
ErrEndpointNotAvailable is the error when the endpoint is not available.
func (ErrEndpointNotAvailable) EndpointNotAvailable ¶ added in v0.5.0
func (ErrEndpointNotAvailable) EndpointNotAvailable()
EndpointNotAvailable defines the behaviour of the error
func (ErrEndpointNotAvailable) Error ¶ added in v0.5.0
func (e ErrEndpointNotAvailable) Error() string
type ErrInvalidEndpoint ¶ added in v0.4.0
type ErrInvalidEndpoint string
ErrInvalidEndpoint is the error when the endpoint is not a valid url
func (ErrInvalidEndpoint) Error ¶ added in v0.4.0
func (e ErrInvalidEndpoint) Error() string
func (ErrInvalidEndpoint) InvalidEndpoint ¶ added in v0.4.0
func (ErrInvalidEndpoint) InvalidEndpoint()
InvalidEndpoint defines the behaviour of the error
type ErrLowBackoffValue ¶ added in v0.4.0
type ErrLowBackoffValue int64
ErrLowBackoffValue is the error when the endpoint is not a valid url
func (ErrLowBackoffValue) Error ¶ added in v0.4.0
func (e ErrLowBackoffValue) Error() string
func (ErrLowBackoffValue) LowBackoffValue ¶ added in v0.4.0
func (ErrLowBackoffValue) LowBackoffValue()
LowBackoffValue defines the behaviour of the error
type ErrParseTimeOut ¶ added in v0.4.0
ErrParseTimeOut is for when the timeout cannot be parsed
func (ErrParseTimeOut) Error ¶ added in v0.4.0
func (e ErrParseTimeOut) Error() string
func (ErrParseTimeOut) ParseTimeOut ¶ added in v0.4.0
func (ErrParseTimeOut) ParseTimeOut()
ParseTimeOut defines the behaviour of the error
type MockConfig ¶ added in v0.2.1
type MockConfig struct { MockName string MockEndpoint string MockTimeout time.Duration MockBackoff int MockIndexName string MockLogger logrus.FieldLogger }
MockConfig holds the necessary configuration for setting up an elasticsearch reader endpoint.
func NewMockConfig ¶ added in v0.2.1
func NewMockConfig(name string, log logrus.FieldLogger, endpoint string, timeout time.Duration, backoff int, indexName string) (*MockConfig, error)
NewMockConfig returns a mocked object
func (*MockConfig) Backoff ¶ added in v0.2.1
func (m *MockConfig) Backoff() int
Backoff is the mocked version
func (*MockConfig) Endpoint ¶ added in v0.2.1
func (m *MockConfig) Endpoint() string
Endpoint is the mocked version
func (*MockConfig) IndexName ¶ added in v0.2.1
func (m *MockConfig) IndexName() string
IndexName is the mocked version
func (*MockConfig) Logger ¶ added in v0.2.1
func (m *MockConfig) Logger() logrus.FieldLogger
Logger is the mocked version
func (*MockConfig) Name ¶ added in v0.2.1
func (m *MockConfig) Name() string
Name is the mocked version
func (*MockConfig) NewInstance ¶ added in v0.2.1
func (m *MockConfig) NewInstance(ctx context.Context, payloadChan chan *RecordJob, errorChan chan<- communication.ErrorMessage) (DataRecorder, error)
NewInstance returns a mocked object
func (*MockConfig) RoutePath ¶ added in v0.2.1
func (m *MockConfig) RoutePath() string
RoutePath is the mocked version
func (*MockConfig) Timeout ¶ added in v0.2.1
func (m *MockConfig) Timeout() time.Duration
Timeout is the mocked version
type RecordJob ¶
type RecordJob struct { ID communication.JobID Ctx context.Context Payload datatype.DataContainer IndexName string TypeName string Time time.Time // Is used for time-series data }
RecordJob is sent with a context and a payload to be recorded. If the TypeName and IndexName are different than the previous one, the recorder should use the ones engine provides. Of any errors occurred, recorders should provide their errors through the provided errorChan, although it is not necessary to send a nil error value as the engine ignores it.
type SimpleRecorder ¶ added in v0.1.1
type SimpleRecorder struct { Pmu sync.RWMutex PayloadChanFunc func() chan *RecordJob ErrorFunc func() error Smu sync.RWMutex StartFunc func(communication.StopChannel) // contains filtered or unexported fields }
SimpleRecorder is designed to be used in tests
Example ¶
log := lib.DiscardLogger() ctx := context.Background() receivedPayload := make(chan string) ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { receivedPayload <- "I have received the payload!" })) defer ts.Close() payloadChan := make(chan *RecordJob) errorChan := make(chan communication.ErrorMessage) rec, _ := NewSimpleRecorder(ctx, log, payloadChan, errorChan, "reader_example", ts.URL, "intexName", time.Second) stop := make(communication.StopChannel) rec.Start(ctx, stop) payload := datatype.NewContainer([]datatype.DataType{ datatype.StringType{Key: "key", Value: "value"}, }) job := &RecordJob{ Ctx: ctx, Payload: payload, IndexName: "my index", Time: time.Now(), } // Issuing a job rec.PayloadChan() <- job fmt.Println(<-receivedPayload) // Lets check the errors select { case <-errorChan: panic("Wasn't expecting any errors") default: fmt.Println("No errors reported") } // Issuing another job rec.PayloadChan() <- job fmt.Println(<-receivedPayload) // The recorder should finish gracefully done := make(chan struct{}) stop <- done <-done fmt.Println("Reader has finished")
Output: I have received the payload! No errors reported I have received the payload! Reader has finished
Example (Start) ¶
log := lib.DiscardLogger() ctx := context.Background() ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { io.WriteString(w, `{"the key": "is the value!"}`) })) defer ts.Close() payloadChan := make(chan *RecordJob) errorChan := make(chan communication.ErrorMessage) rec, _ := NewSimpleRecorder(ctx, log, payloadChan, errorChan, "reader_example", ts.URL, "intexName", 10*time.Millisecond) stop := make(communication.StopChannel) rec.Start(ctx, stop) done := make(chan struct{}) stop <- done <-done fmt.Println("Recorder has stopped its event loop!")
Output: Recorder has stopped its event loop!
func NewSimpleRecorder ¶ added in v0.1.1
func NewSimpleRecorder(ctx context.Context, log logrus.FieldLogger, payloadChan chan *RecordJob, errorChan chan<- communication.ErrorMessage, name, endpoint, indexName string, timeout time.Duration) (*SimpleRecorder, error)
NewSimpleRecorder returns a SimpleRecorder instance
func (*SimpleRecorder) Error ¶ added in v0.1.1
func (s *SimpleRecorder) Error() error
func (*SimpleRecorder) IndexName ¶ added in v0.1.1
func (s *SimpleRecorder) IndexName() string
IndexName returns the indexname
func (*SimpleRecorder) Name ¶ added in v0.1.1
func (s *SimpleRecorder) Name() string
Name returns the name
func (*SimpleRecorder) PayloadChan ¶ added in v0.1.1
func (s *SimpleRecorder) PayloadChan() chan *RecordJob
PayloadChan returns the payload channel
func (*SimpleRecorder) Start ¶ added in v0.1.1
func (s *SimpleRecorder) Start(ctx context.Context, stop communication.StopChannel)
Start calls the StartFunc if exists, otherwise continues as normal
func (*SimpleRecorder) Timeout ¶ added in v0.1.1
func (s *SimpleRecorder) Timeout() time.Duration
Timeout returns the timeout
Source Files
¶
Directories
¶
Path | Synopsis |
---|---|
Package elasticsearch contains logic to record data to an elasticsearch index.
|
Package elasticsearch contains logic to record data to an elasticsearch index. |