README

apexovernsq Build Status

Overview

The apexovernsq package provides a mechanism to transfer structured log entries, generated with the Apex log package ( github.com/apex/log ) over NSQ ( github.com/nsqio ). Specifically, it allows Apex's log.Entry structs to be marshalled, published to an NSQ topic and then unmarshalled at the other end and injected into a local Apex log handler.

Putting log messages onto the NSQ channel

To push log messages onto an NSQ channel we provide a type that implements the github.com/apex/log.Handler interface. In order to create a new ApexLogNSQHandler instance, you'll need to call apexovernsq.NewApexLogNSQHandler and pass it three things:

  • A function with a signature matching apexovernsq.MarshalFunc to convert an apex log.Entry into a slice of bytes.
  • A function with a signature matching apexovernsq.PublishFunc. This is typically github.com/nsqio/go-nsq.Producer.Publish, or a function that wraps it.
  • A string naming the nsq topic the log messages will be sent to.

Once you've got a handler, you can use it in apex/log by calling github.com/apex/log.SetHandler, with your handler instance as the only argument.

Partial Example
package main

import (
	"code.avct.io/apexovernsq"
	nsq "github.com/nsqio/go-nsq"
	alog "github.com/apex/log"
)


func main() {
	// This is a very minimal NSQ configuation, you'll need nsqd
	// running locally to make this work.
	cfg := nsq.NewConfig()
	nsqdAddress := "127.0.0.1:4150"
	producer := nsq.NewProducer(nsqdAddress, cfg)

	handler := apexovernsq.NewApexLogNSQHandler(json.Marshal, producer.Publish, "log")
	alog.SetHandler(handler)
	// From this point onward, logging via apex log will be forwarded over NSQ
}

For a more detailed usage example please look at the log_to_nsq program in the examples directory.

Consuming apex log messages from NSQ

To consume apex log Entry structs from NSQ an NSQ handler is provided. To construct an NSQApexLogHandler you must call apexovernsq.NewNSQApexLogHandler with two arguments:

  • an github.com/apex/log.Handlerimplementation which will handle the log messages as they arrive. For example, if you use the github.com/apex/log/handlers/cli.Default the log messages will be output to os.Stderr on the consuming process.
  • a function with a signature that matches apexovernsq.UnmarshalFunc- for example the json.Unmarshal. Note, this must match to the function used to marshal the log entries before they are published on NSQ.
Partial Example
package main

import (
	"encoding/json"
	"os"
	"os/signal"
	"syscall"

	alog "github.com/apex/log"
	"github.com/apex/log/handlers/cli"
	nsq "github.com/nsqio/go-nsq"

	"code.avct.io/apexovernsq"
)

func main() {
	cfg := nsq.NewConfig()
	channel := "mychannel#ephemeral"

	// Note, it's important you consume from the same topic name
	// that you publish the messaegs to.
	consumer, err := nsq.NewConsumer("log", channel, cfg)
	if err != nil {
		alog.WithError(err).Error("error setting up NSQ consumer")
		os.Exit(1)
	}

	// We choose the apex log handler we'd like to pump our log
	// messages through.  They'll be alog.Entry instances passed
	// to the HandleLog function for the handler, exactly as if
	// they were produced locally.
	apexHandler := cli.Default

	// We create an NSQ handler that will unmarshal the entries
	// from NSQ and pump them through the provided apex log
	// handler.
	nsqHandler := apexovernsq.NewNSQApexLogHandler(apexHandler, json.Unmarshal)

	// .. and we tell the NSQ consumer to use our new handler.
	consumer.AddHandler(nsqHandler)

	// Note, this is a very simplistic NSQ setup.  You'll need
	// nsqd running on the localhost to make this work.
	err := consumer.ConnectToNSQD("127.0.0.1:4150")
	if err != nil {
		alog.WithError(err).Error("error connecting to NSQD")
		os.Exit(2)
	}

	// This block makes us loop listening to NSQ until the program is terminated.
	sigChan := make(chan os.Signal, 1)
	signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)

	for {
		select {
		case <-consumer.StopChan:
			return nil
		case <-sigChan:
			consumer.Stop()
		}
	}

}

For another example look at the nsq-log-tail program in the apps sub-directory.

Niceties

We provide a few additional useful mechanisms.

NewApexLogServiceContext

The function apexovernsq.NewApexLogServiceContext returns an apex log Entry with some standard fields set:

  • "service" - the name of the process that is logging.
  • "hostname" - the hostname of the machine that created the log message.
  • "pid" - the process ID of the process that created the log message.

You can pass this Entry around and use it as a context for log calls (as per normal operation with apex log). Having these standard fields set is very helpful if, for example, you wish to aggregate the logs from multiple services and/or hosts.

Protobuf

We provide a protobuf definition of the apex log Entry struct, which generates a go library containing a Marshal and an Unmarshal function that can be used by the producing and consuming handlers in apexovernsq. You'll find these functions by importing code.avct.io/apexovernsq/protobuf

Expand ▾ Collapse ▴

Documentation

Overview

Package apexovernsq provides a handler for github.com/apex/log. It's intended to act as a transport to allow log.Entry structs to pass through nsq and be reconstructed and passed to another handler on the other side.

Index

Constants

This section is empty.

Variables

This section is empty.

Functions

func NewApexLogServiceContext

func NewApexLogServiceContext() *log.Entry

Create a new logging context with service information.

func NewApexLogServiceContextWithHandler

func NewApexLogServiceContextWithHandler(handler log.Handler) *log.Entry

Create a new logging context, with a handler that isn't the default handler and with service information appended.

Types

type ApexLogNSQHandler

type ApexLogNSQHandler struct {
	// contains filtered or unexported fields
}

ApexLogNSQHandler is a handler that can be passed to github.com/apex/log.SetHandler.

func NewApexLogNSQHandler

func NewApexLogNSQHandler(marshalFunc MarshalFunc, publishFunc PublishFunc, topic string) *ApexLogNSQHandler

NewApexLogNSQHandler returns a pointer to an apexovernsq.ApexLogNSQHandler that can in turn be passed to github.com/apex/log.SetHandler.

The marshalFunc provided will be used to marshal a github.com/apex/log.Entry as the body of a message sent over nsq.

The publishFunc is used to push a message onto the nsq. For simple cases, with only one nsq endpoint using github.com/nsqio/go-nsq.Producer.Publish is fine. For cases with multiple producers you'll want to wrap it. See the examples directory for an implementation of this.

The topic is a string determining the nsq topic the messages will be published to.

func (*ApexLogNSQHandler) HandleLog

func (h *ApexLogNSQHandler) HandleLog(e *log.Entry) error

HandleLog makes ApexLogNSQHandler fulfil the interface required by github.com/apex/log for handlers. Each individual log entry made in client programs will eventually invoke this function when using this ApexLogNSQHandler.

type AsyncApexLogNSQHandler

type AsyncApexLogNSQHandler struct {
	// contains filtered or unexported fields
}

AsyncApexLogNSQHandler is a handler that can be passed to github.com/apex/log.SetHandler and will publish log entries on NSQ asynchronously.

func NewAsyncApexLogNSQHandler

func NewAsyncApexLogNSQHandler(marshalFunc MarshalFunc, publishFunc PublishFunc, topic string, bufferSize int) *AsyncApexLogNSQHandler

NewAsyncApexLogNSQHandler returns a pointer to an apexovernsq.AsyncApexLogNSQHandler that can in turn be passed to github.com/apex/log.SetHandler. The AsyncApexLogNSQHandler uses a goroutine and a channel to make the publication of NSQ message asynchronous to the act of logging.

The marshalFunc provided will be used to marshal a github.com/apex/log.Entry as the body of a message sent over nsq.

The publishFunc is used to push a message onto the nsq. For simple cases, with only one nsq endpoint using github.com/nsqio/go-nsq.Producer.Publish is fine. For cases with multiple producers you'll want to wrap it. See the examples directory for an implementation of this.

The topic is a string determining the nsq topic the messages will be published to.

func (*AsyncApexLogNSQHandler) HandleLog

func (h *AsyncApexLogNSQHandler) HandleLog(e *log.Entry) error

func (*AsyncApexLogNSQHandler) Stop

func (h *AsyncApexLogNSQHandler) Stop()

type MarshalFunc

type MarshalFunc func(x interface{}) ([]byte, error)

MarshalFunc is a function signature for any function that can marshal an arbitrary struct to a slice of bytes.

type NSQApexLogHandler

type NSQApexLogHandler struct {
	// contains filtered or unexported fields
}

NSQApexLogHandler is a handler for NSQ that can consume messages who's Body is a marshalled github.com/apex/log.Entry.

func NewNSQApexLogHandler

func NewNSQApexLogHandler(handler alog.Handler, unmarshalFunc UnmarshalFunc) *NSQApexLogHandler

NewNSQApexLogHandler creates a new NSQApexLogHandler with a provided github.com/apex/log.Handler and any function that satifies the UnmarshalFunc interface.

The provided UnmarshalFunc will be used to unmarshal the github.com/apex/log.Entry from the NSQ Message.Body field. It should match the marshal function used to publish the Message on the NSQ channel. If you don't have any special requirement using the Marshal and Unmarshal functions from code.avct.io/apexovernsq/protobuf should work well.

When the handler is invoked to consume a message, the provided github.com/apex/log.Handler will have it's HandleLog method called with the unmarshalled github.com/apex/log.Entry just as it would if you made a logging call locally.

func (*NSQApexLogHandler) HandleMessage

func (alh *NSQApexLogHandler) HandleMessage(m *nsq.Message) error

HandleMessage makes NSQApexLogHandler implement the github.com/nsqio/go-nsq.Handler interface and therefore, NSQApexLogHandler can be passed to the AddHandler function of a github.com/nsqio/go-nsq.Consumer.

HandleMessage will unmarshal a github.com/apex/log.Entry from a github.com/nsqio/go-nsq.Message's Body and pass it into the github.com/apex/log.Handler provided when calling NewNSQApexLogHandler to construct the NSQApexLogHandler.

type PublishFunc

type PublishFunc func(topic string, body []byte) error

PublishFunc is a function signature for any function that publishes a message on a provided nsq topic. Typically this is github.com/nsqio/go-nsq.Producer.Publish, or something that wraps it.

type ServiceFilterApexLogHandler

type ServiceFilterApexLogHandler struct {
	// contains filtered or unexported fields
}

func NewApexLogServiceFilterHandler

func NewApexLogServiceFilterHandler(handler log.Handler, filter *[]string) *ServiceFilterApexLogHandler

func (*ServiceFilterApexLogHandler) HandleLog

func (h *ServiceFilterApexLogHandler) HandleLog(e *log.Entry) error

type UnmarshalFunc

type UnmarshalFunc func(data []byte, v interface{}) error

UnmarshalFunc is a function signature for any function that can unmarshal an arbitrary struct from a slice of bytes. Whilst we only ever want to support github.com/apex/log.Entry structs, we support this interface because it allows using 3rd party Marshal/Unmarshal functions simply.

Directories

Path Synopsis
apps/nsq-log-tail
example/log-to-nsq log_to_nsq is an example program that demonstrates the use of apexovernsq.
protobuf Package protobuf is a generated protocol buffer package.