package module
Version: v0.0.0-...-5e91714 Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Nov 7, 2015 License: MIT Imports: 19 Imported by: 2


REST Operation Log godoc license build

OpLog is used as a real-time data synchronization layer between a producer and consumers. Basically, it's a generic database replication system for micro-services or an easy way to add a public streaming capability to a public API.

Typical use cases:

  • A component handles an authoritative database and several independent components need to keep locally an up-to-date read-only view of the data (e.g.: search engine indexing, recommendation engines, multi-regions architecture, etc.) or to react to certain changes (i.e.: spam detection, analytics, etc.).
  • Implement a public streaming API to monitor changes to objects within the service's model. With the use of Server Sent Event and filtering, it might be used directly from the browser to monitor changes that occur on objects displayed on the page (à la Meteor) or from a native mobile application to keep an up-to-date offline view of the data.

The agent may run locally on same nodes/containers as the API/application producing updates to the data model. The agent receives notifications of updates (operations) via UDP from the producer application and forwards them to the central oplog's data store (a MongoDb cluster). If the central data store is not available, operations are buffered in memory, waiting for the database cluster to be available again.


The agent also exposes a Server Sent Event API for consumers to be notified in real time about model changes. Thanks to the SSE protocol, a consumer can recover a connection breakage without loosing any updates by using the Last-Event-ID HTTP header (see [Server Sent Event API] below).

A [full replication] is also supported for freshly spawned consumers that need to have a full view of the data.

Change operations are stored on a central MongoDB server. A tailable cursor on a MongoDB capped collection is used for real-time updates and final states of objects is maintained in a secondary collection for full replication support. The actual data is not stored in the OpLog's data store ; the monitored API continues to serve as the authoritative source of data and as the gatekeeper for it (authen/authz). Only modified object's type and id are stored together with the timestamp of the update and some related "parent" object references, useful for filtering. What you put in type, id and parents is up to the service, and must be meaningful to fetch the actual objects data from their API. An optional reference to the modified object can be provided by the OpLog API if the URL schema is setup (see [Starting the agent] below for more info).

As the it is highly expected that the OpLog may miss some operations from the API, a [Periodical Source Synchronization] mechanism is available

A typical deployment includes an oplogd agent running locally on every node of a cluster serving the API to monitor. The agent serves both roles: 1) ingesting operations coming from the API and 2) streaming aggregated operation of all agents to consumers. The same load balancer use to serve the API can expose the OpLog SSE endpoint:


Another deployment choice may be to separate the SSE API from the operations ingestion. This can be very easily done by running a separated cluster of oplogd daemons:

Architecture 2


To install the project, execute the following commands:

go get -u github.com/dailymotion/oplog
go build -a -o /usr/local/bin/oplogd github.com/dailymotion/oplog/cmd/oplogd
go build -a -o /usr/local/bin/oplog-sync github.com/dailymotion/oplog/cmd/oplog-sync
go build -a -o /usr/local/bin/oplog-tail github.com/dailymotion/oplog/cmd/oplog-tail

Starting the agent

To start the agent, run the following command:

oplogd --mongo-url mongodb://[username:password@]host1[:port1][,host2[:port2],...[,hostN[:portN]]]/database[?options]

The oplog_ops and oplog_states collections will be created in the specified database. It is advised to dedicate a database to this service in order to not to share locks contention with another services.

Available options:

  • --capped-collection-size=10485760: Size of the created MongoDB capped collection size in bytes (default 10MB).
  • --debug=false: Show debug log messages.
  • --listen=":8042": The address to listen on. Same address is used for both SSE(HTTP) and UDP APIs.
  • --max-queued-events=100000: Number of events to queue before starting throwing up UDP messages.
  • --mongo-url: MongoDB URL to connect to.
  • --object-url: A URL template to reference objects. If this option is set, SSE events will have an "ref" field with the URL to the object. The URL should contain {{type}} and {{id}} variables (i.e.: http://api.mydomain.com/{{type}}/{{id}})
  • --password: Password protecting the global SSE stream.
  • --ingest-password: Password protecting the HTTP ingest endpoint.

Available environment variables:

  • OPLOGD_MONGO_URL: See --mongo-url.
  • OPLOGD_PASSWORD: See --password
  • OPLOGD_INGEST_PASSWORD: See --ingest-password
  • OPLOGD_OBJECT_URL: See --object-url

Producer API: UDP and HTTP

To send operations to the agent you can either send a UDP datagram or a HTTP POST request containing a JSON object.

The default port for both protocol is 8042.

The HTTP request must be a POST on / with application/json as Content-Type.

The format of the JSON object is as follow:

    "event": "insert",
    "parents": ["video/xk32jd", "user/xkjdi"],
    "type": "video",
    "id": "xk32jd",

The following keys are required:

  • event: The type of event. Can be insert, update or delete.
  • type: The object type (i.e.: video, user, playlist, …)
  • id: The object id of the impacted object as string.

The following keys are optional:

  • parents: The list of parent objects of the modified object. The advised format for items of this list is type/id but any format is acceptable. It is generally a good idea to put a reference to the modified object itself in this list in order to easily let the consumers filter on any updates performed on the object.
  • timestamp: It must contains the date when the object has been updated as RFC 3339 representation. If not provided, the time when the operation has been received by the agent is used instead.

See examples/ directory for implementation examples in different languages.

Consumer API: Server Sent Event

The SSE API runs on the same port as UDP API but using TCP. It means that agents have both input and output roles so it is easy to scale the service by putting an agent on every node of the source API cluster and expose their HTTP port via the same load balancer as the API while each node can send their updates to the UDP port on their localhost.

The W3C SSE protocol is respected by the book. To connect to the API, a GET on / with the Accept: text/event-stream header is performed. If no Last-Event-ID HTTP header is passed, the OpLog server will start sending all future operations with no backlog. On each received operation, the client must store the last associated "event id" as operations are treated. This event id will be used to resume the stream where it has been left in the case of a disconnect. The client just has to send the last consumed "event id" using the Last-Event-ID HTTP header.

It the case that the id defined by Last-Event-ID is no longer available in the underlying oplog_ops capped collection, the agent will automatically fallback to oplog_states by converting the oplog event id into a timestamp.

The following filters can be passed as a query-string:

  • types A list of object types to filter on separated by comas (i.e.: types=video,user).
  • parents A coma separated list of parents to filter on (i.e.: parents=video/xk32jd,user/xkjdi
GET / HTTP/1.1
Accept: text/event-stream

HTTP/1.1 200 OK
Content-Type: text/event-stream; charset=utf-8

id: 545b55c7f095528dd0f3863c
event: insert
data: {"timestamp":"2014-11-06T03:04:39.041-08:00","parents":["x3kd2"],"type":"video","id":"xekw","ref":"http://api.mydomain.com/video/xekw"}

id: 545b55c8f095528dd0f3863d
event: delete
data: {"timestamp":"2014-11-06T03:04:40.091-08:00","parents":["x3kd2"],"type":"video","id":"xekw","ref":"http://api.mydomain.com/video/xekw"}


Full Replication

If required, a full replication with all (not deleted) objects can be performed before streaming live updates. To perform a full replication, pass 0 as value for the Last-Event-ID HTTP header. Numeric event ids with 13 digits or less are considered replication ids, which represent a milliseconds UNIX timestamp. By passing a millisecond timestamp, you are asking to replicate all objects that have been modified passed this date. Passing 0 thus ensures that every object will be replicated.

If a full replication is interrupted during the transfer, the same mechanism as for live updates is used. Once replication is complete, the stream will automatically switch to the live events stream so that the consumer does not miss any updates.

When a full replication starts, a special reset event with no data is sent to inform the consumer that it should reset its database before applying the subsequent operations.

Once the replication is complete and the OpLog switches back to the live updates, a special live event with no data is sent. This event can be useful for a consumer to know when it is safe for the consumer's service to be activated in production for instance.

Periodical Source Synchronization

There is many ways for the OpLog to miss some updates and thus have an incorrect view of the current state of the source data. In order to cope with this issue, a regular synchronization process with the source data content can be performed. The sync is a separate process which compares a dump of the real data with what the OpLog has stored within its own database. For any discrepancies which is anterior to the dump in the OpLog's database, the sync process will generate an appropriate operation in the OpLog to fix the delta on both its own database and for all consumers.

The dump must be in a streamable JSON format. Each line is a JSON object with the same schema as of the data part of the SEE API response. Dump example:

{"timestamp":"2014-11-06T03:04:39.041-08:00", "parents": ["user/xl2d"], "type":"video", "id":"x34cd"}
{"timestamp":"2014-12-24T02:03:05.167+01:00", "parents": ["user/xkwek"], "type":"video", "id":"x12ab"}
{"timestamp":"2014-12-24T01:03:05.167Z", "parents": ["user/xkwek"], "type":"video", "id":"x54cd"}

The timestamp must represent the last modification date of the object as an RFC 3339 representation.

The oplog-sync command is used with this dump in order to perform the sync. This command will connect to the database, do the comparisons and generate the necessary oplog events to fix the deltas. This command does not need an oplogd agent to be running in order to perform its task.

Note that the oplog-sync command is the perfect tool to boostrap an OpLog with an existing API.

BE CAREFUL, any object absent of the dump having a timestamp lower than the most recent timestamp present in the dump will be deleted from the OpLog.

Status Endpoint

The agent exposes a /status endpoint over HTTP to show some statistics about itself. A JSON object is returned with the following fields:

  • events_received: Total number of events received on the UDP interface
  • events_sent: Total number of events sent thru the SSE interface
  • events_ingested: Total number of events ingested into MongoDB with success
  • events_error: Total number of events received on the UDP interface with an invalid format
  • events_discarded: Total number of events discarded because the queue was full
  • queue_size: Current number of events in the ingestion queue
  • queue_max_size: Maximum number of events allowed in the ingestion queue before discarding events
  • clients: Number of clients connected to the SSE API
  • connections: Total number of connections established on the SSE API
GET /status

HTTP/1.1 200 OK
Content-Length: 144
Content-Type: application/json
Date: Thu, 06 Nov 2014 10:40:25 GMT

    "clients": 0,
    "connections": 0,
    "events_discarded": 0,
    "events_error": 0,
    "events_ingested": 0,
    "events_received": 0,
    "events_sent": 0,
    "queue_max_size": 100000,
    "queue_size": 0,
    "status": "OK"


To write a consumer you may use any SSE library and consume the API yourself. If your consumer is written in Go, a dedicated consumer library is available (see github.com/dailymotion/oplogc).


All source code is licensed under the MIT License.



Package oplog provides a generic oplog/replication system for micro-services.

Most of the time, the oplog service is used thru the oplogd agent which uses this package. But in the case your application is written in Go, you may want to integrate at the code level.

You can find more information on the oplog service here: https://github.com/dailymotion/oplog




This section is empty.


View Source
var Version = "1.1.6"

Version contains the current version of OpLog


This section is empty.


type Event

type Event struct {
	ID    string
	Event string

Event is used to send "technical" events with no data like "reset" or "live"

func (Event) GetEventID

func (e Event) GetEventID() LastID

GetEventID returns an SSE event id

func (Event) WriteTo

func (e Event) WriteTo(w io.Writer) (int64, error)

WriteTo serializes an event as a SSE compatible message

type Filter

type Filter struct {
	Types   []string
	Parents []string

Filter contains filter query

type GenericEvent

type GenericEvent interface {
	GetEventID() LastID

GenericEvent is an interface used by the oplog to send different kinds of SSE compatible events

type LastID

type LastID interface {
	// String returns the string representation of their value
	String() string
	// Time returns the embedded time
	Time() time.Time

LastID defines an interface for different kinds of oplog id representations

func NewLastID

func NewLastID(id string) (LastID, error)

NewLastID creates a last id from a string containing either a operation id or a replication id.

type OpLog

type OpLog struct {
	Stats *Stats
	// ObjectURL is a template URL to be used to generate reference URL to operation's objects.
	// The URL can use {{type}} and {{id}} template as follow: http://api.mydomain.com/{{type}}/{{id}}.
	// If not provided, no "ref" field will be included in oplog events.
	ObjectURL string
	// Number of object to fetch from the states collection on each iteration.
	// Too large pages may create lock contention on MongoDB, too small may slow
	// down the iteration.
	PageSize int
	// contains filtered or unexported fields

OpLog allows to store and stream events to/from a Mongo database

func New

func New(mongoURL string, maxBytes int) (*OpLog, error)

New returns an OpLog connected to the given provided mongo URL. If the capped collection does not exists, it will be created with the max size defined by maxBytes parameter.

func (*OpLog) Append

func (oplog *OpLog) Append(op *Operation)

Append appends an operation into the OpLog

package main

import (


func main() {
	ol, err := oplog.New("mongodb://localhost/oplog", 1048576)
	if err != nil {
	op := oplog.NewOperation("insert", time.Now(), "123", "user", nil)

func (*OpLog) Diff

func (oplog *OpLog) Diff(createMap map[string]OperationData, updateMap map[string]OperationData, deleteMap map[string]OperationData) error

Diff finds which objects must be created or deleted in order to fix the delta

The createMap is a map pointing to all objects present in the source database. The function search of differences between the passed map and the oplog database and remove objects identical in both sides from the createMap and populate the deleteMap with objects that are present in the oplog database but not in the source database. If an object is present in both createMap and the oplog database but timestamp of the oplog object is earlier than createMap's, the object is added to the updateMap.

func (*OpLog) HasID

func (oplog *OpLog) HasID(id LastID) (bool, error)

HasID checks if an operation id is present in the capped collection.

func (*OpLog) Ingest

func (oplog *OpLog) Ingest(ops <-chan *Operation, done <-chan bool)

Ingest appends an operation into the OpLog thru a channel

package main

import (


func main() {
	ol, err := oplog.New("mongodb://localhost/oplog", 1048576)
	if err != nil {
	ops := make(chan *oplog.Operation)
	done := make(chan bool, 1)
	go ol.Ingest(ops, nil)
	// Insert a large number of operations
	for i := 0; i < 1000; i++ {
		ops <- oplog.NewOperation("insert", time.Now(), strconv.FormatInt(int64(i), 10), "user", nil)
	done <- true

func (*OpLog) LastID

func (oplog *OpLog) LastID() (LastID, error)

LastID returns the most recently inserted operation id if any or nil if oplog is empty

func (*OpLog) Tail

func (oplog *OpLog) Tail(lastID LastID, filter Filter, out chan<- GenericEvent, stop <-chan bool)

Tail tails all the new operations in the oplog and send the operation in the given channel. If the lastID parameter is given, all operation posted after this event will be returned.

If the lastID is a ReplicationLastID (unix timestamp in milliseconds), the tailing will start by replicating all the objects last updated after the timestamp.

Giving a lastID of 0 mean replicating all the stored objects before tailing the live updates.

The filter argument can be used to filter on some type of objects or objects with given parrents.

The create, update, delete events are streamed back to the sender thru the out channel

package main

import (


func main() {
	ol, err := oplog.New("mongodb://localhost/oplog", 1048576)
	if err != nil {
	ops := make(chan oplog.GenericEvent)
	stop := make(chan bool)
	// Tail all future events with no filters
	go ol.Tail(nil, oplog.Filter{}, ops, stop)
	// Read 100 events
	for i := 0; i < 100; i++ {
		op := <-ops
	// Stop the tail
	stop <- true

type Operation

type Operation struct {
	ID    *bson.ObjectId `bson:"_id,omitempty"`
	Event string         `bson:"event"`
	Data  *OperationData `bson:"data"`

Operation represents an operation stored in the OpLog, ready to be exposed as SSE.

func NewOperation

func NewOperation(event string, time time.Time, objID, objType string, objParents []string) *Operation

NewOperation creates an new operation from given information.

The event argument can be one of "insert", "update" or "delete". The time defines the exact modification date of the object (must be the exact same time as stored in the database).

func (Operation) GetEventID

func (op Operation) GetEventID() LastID

GetEventID returns an SSE last event id for the operation

func (*Operation) Info

func (op *Operation) Info() string

Info returns a human readable version of the operation

func (Operation) Validate

func (op Operation) Validate() error

Validate ensures an operation has the proper syntax

func (Operation) WriteTo

func (op Operation) WriteTo(w io.Writer) (int64, error)

WriteTo serializes an Operation as a SSE compatible message

type OperationData

type OperationData struct {
	Timestamp time.Time `bson:"ts" json:"timestamp"`
	Parents   []string  `bson:"p" json:"parents"`
	Type      string    `bson:"t" json:"type"`
	ID        string    `bson:"id" json:"id"`
	Ref       string    `bson:"-,omitempty" json:"ref,omitempty"`

OperationData is the data part of the SSE event for the operation.

func (OperationData) GetID

func (obd OperationData) GetID() string

GetID returns the operation id

func (OperationData) Validate

func (obd OperationData) Validate() error

Validate ensures an operation data has the right syntax

type OperationLastID

type OperationLastID struct {

OperationLastID represents an actual stored operation id

func (*OperationLastID) Fallback

func (oid *OperationLastID) Fallback() LastID

Fallback tries to convert a "event" id into a "replication" id by extracting the timestamp part of the Mongo ObjectId. If the id is not a valid ObjectId, an error is returned.

func (OperationLastID) String

func (oid OperationLastID) String() string

type ReplicationLastID

type ReplicationLastID struct {
	// contains filtered or unexported fields

ReplicationLastID represents a timestamp id allowing to hook into operation feed by time

func (ReplicationLastID) String

func (rid ReplicationLastID) String() string

func (ReplicationLastID) Time

func (rid ReplicationLastID) Time() time.Time

Time extract the time from the replication id

type SSEDaemon

type SSEDaemon struct {

	// Password is the shared secret to connect to a password protected oplog.
	Password string
	// IngestPassword is the shared secret to connect to the HTTP ingest endpoint.
	IngestPassword string
	// FlushInterval defines the interval between flushes of the HTTP socket.
	FlushInterval time.Duration
	// HeartbeatTickerCount defines the number of FlushInterval with nothing to flush
	// is required before we send an heartbeat.
	HeartbeatTickerCount int8
	// contains filtered or unexported fields

SSEDaemon listens for events and send them to the oplog MongoDB capped collection

func NewSSEDaemon

func NewSSEDaemon(addr string, ol *OpLog) *SSEDaemon

NewSSEDaemon creates a new HTTP server configured to serve oplog stream over HTTP using Server Sent Event protocol.

func (*SSEDaemon) GetOps

func (daemon *SSEDaemon) GetOps(w http.ResponseWriter, r *http.Request)

GetOps exposes an SSE endpoint to stream operations

func (*SSEDaemon) PostOps

func (daemon *SSEDaemon) PostOps(w http.ResponseWriter, r *http.Request)

PostOps exposes an endpoint to POST operations

func (*SSEDaemon) Run

func (daemon *SSEDaemon) Run() error

Run starts the SSE server

func (*SSEDaemon) ServeHTTP

func (daemon *SSEDaemon) ServeHTTP(w http.ResponseWriter, r *http.Request)

func (*SSEDaemon) Status

func (daemon *SSEDaemon) Status(w http.ResponseWriter, r *http.Request)

Status exposes expvar data

type Stats

type Stats struct {
	Status string
	// Total number of events recieved on the UDP interface
	EventsReceived *expvar.Int
	// Total number of events sent thru the SSE interface
	EventsSent *expvar.Int
	// Total number of events ingested into MongoDB with success
	EventsIngested *expvar.Int
	// Total number of events received on the UDP interface with an invalid format
	EventsError *expvar.Int
	// Total number of events discarded because the queue was full
	EventsDiscarded *expvar.Int
	// Current number of events in the ingestion queue
	QueueSize *expvar.Int
	// Maximum number of events allowed in the ingestion queue before discarding events
	QueueMaxSize *expvar.Int
	// Number of clients connected to the SSE API
	Clients *expvar.Int
	// Total number of SSE connections
	Connections *expvar.Int

Stats stores all the statistics about the oplog

type UDPDaemon

type UDPDaemon struct {
	// contains filtered or unexported fields

UDPDaemon listens for events and send them to the oplog MongoDB capped collection

func NewUDPDaemon

func NewUDPDaemon(addr string, ol *OpLog) *UDPDaemon

NewUDPDaemon create a deamon listening for operations over UDP

func (*UDPDaemon) Run

func (daemon *UDPDaemon) Run(queueMaxSize int) error

Run reads every datagrams and send them to the oplog

The queueSize parameter defines the number of operation that can be queued before the UDP server start throwing messages. This is particularly important to handle underlaying MongoDB slowdowns or unavalability.


Path Synopsis
The oplog-sync command performs a maintaince operation on the oplog database to keep it in sync with the source data.
The oplog-sync command performs a maintaince operation on the oplog database to keep it in sync with the source data.
The oplogd command is an agent listening on an UDP port for operations and exposing a HTTP SSE API.
The oplogd command is an agent listening on an UDP port for operations and exposing a HTTP SSE API.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL