sparkypmtatracking

package module
v0.1.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 3, 2020 License: Apache-2.0 Imports: 30 Imported by: 0

README

SparkPost, Inc.

Sign up for a SparkPost account and visit our Developer Hub for even more content.

sparkypmtatracking

Build Status Coverage Status

Open and click tracking modules for PMTA and SparkPost Signals:

app (link) description
feeder Takes open & click events, adds message attributes from Redis where found, and feeds them to the SparkPost Signals Ingest API
tracker Web service that decodes client email opens and clicks
acct_etl Extract, transform, load piped PMTA custom accounting stream message attributes into Redis
wrapper SMTP proxy service that adds engagement information to email
linktool Command-line tool to encode and decode link URLs

Click above links for command-specific README.

Install

First, check you have the the pre-requisites (installation tips):

  • Git & Golang
  • Redis
  • NGINX

If you don't have $GOPATH set already:

cd /home/ec2-user/ 
mkdir go
export GOPATH=/home/ec2-user/go # change this to suit the directory you just made

Build project from source

Get this project (and its dependent libraries):

go get github.com/tuck1s/sparkypmtatracking

Compile binaries, which will be placed in the main project folder:

cd sparkypmtatracking
./build.sh

You can now run a command with (e.g.) ./linktool -h.

Run

Script start.sh is provided as a starting point for you to customise, along with an example cronfile that can be used to start services on boot:

crontab cronfile

or crontab -e then paste in cronfile contents.

CI code tests

The project includes built-in tests as per usual Go / Travis CI / Coveralls practices.


Pre-requisites

Git and Golang

Your package manager should provide easy installation for these, e.g.

sudo yum install -y git go

Redis on Amazon Linux

sudo amazon-linux-extras install epel
sudo yum install -y redis
sudo service redis start

This project assumes the usual port 6379 on your host. Check you now have redis installed and working.

redis-cli --version

you should see redis-cli 5.0.5 or similar

redis-cli PING

you should see PONG.


NGINX

This can be used to protect your open/click tracking server. The example config file in this project uses the following Nginx features/modules:

  • http-ssl
  • http-v2
  • headers-more
yum/EPEL/webtatic install on Amazon Linux

If you have access to the EPEL and Webtatic repos on your platform, you can use yum-based install to get Nginx with added modules:

sudo yum update -y
sudo amazon-linux-extras install epel
wget http://repo.webtatic.com/yum/el7/x86_64/RPMS/webtatic-release-7-3.noarch.rpm
sudo rpm -Uvh webtatic-release-7-3.noarch.rpm
sudo yum --enablerepo=webtatic install nginx1w nginx1w-module-headers-more
sudo service nginx start
nginx -V
dhparam

As per the article referred to in the example .conf file, the .conf file expects DH params set up. You can create these with openssl and keep them in the usual place for certs. Needs sudo to write to this directory.

sudo openssl dhparam 2048 -out /etc/pki/tls/certs/dhparam.pem
Standard ports

If you wish to use standard ports (80, 443) for tracking:

  • Check the main config file nginx.conf is not serving ordinary files by default on those ports.
  • If it is, you may need to change or remove the existing server { .. } stanza.

Check the endpoint is active from another host, using curl as above, but using your external host address and port number.

Alternative to yum install: source-based install

The standard Nginx version available via yum does not have all needed features. You can build from source, providing you have gcc and git installed.

sudo yum install -y gcc git # pre-requisites

wget http://nginx.org/download/nginx-1.16.1.tar.gz
tar -xzvf nginx-1.16.1.tar.gz
wget https://github.com/openresty/headers-more-nginx-module/archive/v0.33.tar.gz
tar -xzvf v0.33.tar.gz
git clone https://github.com/openssl/openssl.git
cd openssl
git branch -a
# Choose the following as I found 1_1_1 gave problems
git checkout remotes/origin/OpenSSL_1_0_2-stable
cd ../nginx-1.16.1
./configure --prefix=/opt/nginx --with-http_ssl_module --with-openssl=../openssl --add-module=../headers-more-nginx-module-0.33  --with-http_v2_module
make
sudo make install

This places the freshly built code into /opt/nginx/.

Start, stop and reload config as follows:

# start
sudo /opt/nginx/sbin/nginx

# stop
sudo /opt/nginx/sbin/nginx -s stop

# reload config
sudo /opt/nginx/sbin/nginx -s reload

Ensure nginx starts on boot. There are various ways to do this, the simplest is to append a start command in the file /etc/rc.local.

Documentation

Index

Constants

View Source
const MsgIDTTL = time.Duration(time.Hour * 24 * 10)

MsgIDTTL defines the time-to-live for augmentation data

View Source
const RedisAcctHeaders = "acct_headers"

RedisAcctHeaders holds the PowerMTA accounting file headers

View Source
const RedisQueue = "trk_queue"

RedisQueue connects the tracker and feeder tasks

View Source
const SparkPostIngestBatchMaxAge = 30 * time.Second

SparkPostIngestBatchMaxAge sets the maximum time we wait before forwarding a batch of events

View Source
const SparkPostIngestMaxPayload = 5 * 1024 * 1024

SparkPostIngestMaxPayload set in accord with https://developers.sparkpost.com/api/events-ingest/#header-event-format

View Source
const SparkPostMessageIDHeader = "X-Sp-Message-Id"

SparkPostMessageIDHeader is the email header name that carries the unique message ID

View Source
const TrackingPrefix = "msgID_"

TrackingPrefix is the prefix for keys holding augmentation data

View Source
const XRealIPHeader = "X-Real-Ip"

XRealIPHeader is a header that will be used, if provided (e.g. from NGINX)

Variables

View Source
var TransparentGif = []byte("GIF89a\x01\x00\x01\x00\x80\x00\x00\xff\xff\xff" +
	"\xff\xff\xff\x21\xf9\x04\x01\x0a\x00\x01\x00\x2c\x00\x00\x00\x00" +
	"\x01\x00\x01\x00\x00\x02\x02\x4c\x01\x00\x3b\x00")

TransparentGif contains the bytes that should be served back to the client for an open pixel

Functions

func AccountETL

func AccountETL(f io.Reader) error

AccountETL extracts, transforms accounting data from PowerMTA into Redis records

func ActionToType

func ActionToType(a string) string

ActionToType maps the short "action" string used in URLs to SparkPost event type

func ConsoleAndLogFatal

func ConsoleAndLogFatal(s ...interface{})

ConsoleAndLogFatal writes error to both log and stdout

func Contains

func Contains(arr []string, val string) bool

Contains returns whether arr contains an element exactly matching val

func DecodePath

func DecodePath(s string) ([]byte, error)

DecodePath returns the zlib-decoded, base64-decoded version of a url path string as []byte

func EncodeLink(encodeTrackingURL, encodeAction, encodeMessageID, encodeRcptTo, encodeTargetLinkURL string, trackOpen, trackInitialOpen, trackLink bool) (string, error)

EncodeLink - convenience function

func EncodePath

func EncodePath(data []byte) (string, error)

EncodePath returns the base64-encoded, zlib-encoded version of data as a URL path string

func FeedEvents

func FeedEvents(client *redis.Client, host string, apiKey string, maxAge time.Duration) error

FeedEvents sends data arriving via Redis queue to SparkPost ingest API. Send a batch periodically, or every X MB, whichever comes first.

func FeedForever

func FeedForever(client *redis.Client, host string, apiKey string, maxAge time.Duration)

FeedForever processes events forever

func GetenvDefault

func GetenvDefault(k string, d string) string

GetenvDefault returns an environment variable, with default if unset

func HostCleanup

func HostCleanup(host string) string

HostCleanup returns a SparkPost host address in canonical form (with schema, without /api/v1 path)

func MakeSession

func MakeSession(c *smtpproxy.Client, bkd *Backend) smtpproxy.Session

MakeSession returns a session for this client and backend

func MyLogger

func MyLogger(filename string)

MyLogger sets up a custom logger, if filename is given, emitting to stdout as well If filename is blank string, then output is stdout only

func MyRedis

func MyRedis() (client *redis.Client)

MyRedis returns a client handle for Redis, for server the standard port

func PositionIn

func PositionIn(arr []string, val string) (int, bool)

PositionIn returns the position of a value within an array of strings, if an element exactly matches val

func SafeStringToInt

func SafeStringToInt(s string) int

SafeStringToInt logs an error and returns zero if it can't convert

func SparkPostEventNDJSON

func SparkPostEventNDJSON(eStr string, client *redis.Client) ([]byte, error)

SparkPostEventNDJSON formats a SparkPost event into NDJSON, augmenting with Redis data

func SparkPostIngest

func SparkPostIngest(ingestData []byte, client *redis.Client, host string, apiKey string) error

SparkPostIngest POSTs a batch of ingestData to SparkPost Ingest API

func StoreEvent

func StoreEvent(r []string, client *redis.Client) error

StoreEvent puts a single accounting event r into redis, based on previously seen header format

func StoreHeaders

func StoreHeaders(r []string, client *redis.Client) error

StoreHeaders puts an acccounting header record (sent at PowerMTA startup).

Checks for required and optional fields.
Writes these into persistent storage, so that we can decode "d" records in future, separate process invocations.

func TrackingServer

func TrackingServer(w http.ResponseWriter, req *http.Request)

TrackingServer expects URL paths of the form /xyzzy where xyzzy = base64 urlsafe encoded, Zlib compressed, []byte These are written to the Redis queue

func UniqMessageID

func UniqMessageID() string

UniqMessageID returns a SparkPost formatted unique messageID

Types

type Backend

type Backend struct {
	// contains filtered or unexported fields
}

The Backend implements SMTP server methods.

func NewBackend

func NewBackend(outHostPort string, verbose bool, upstreamDataDebug *os.File, wrap *Wrapper, insecureSkipVerify bool) *Backend

NewBackend init function

func (*Backend) Init

func (bkd *Backend) Init() (smtpproxy.Session, error)

Init the backend. Here we establish the upstream connection

func (*Backend) SetVerbose

func (bkd *Backend) SetVerbose(v bool)

SetVerbose allows changing logging options on-the-fly

func (*Backend) SetWrapper

func (bkd *Backend) SetWrapper(wrap *Wrapper)

SetWrapper Verbose allows changing on-the-fly

type GeoIP

type GeoIP struct {
	Country    string
	Region     string
	City       string
	Latitude   float64
	Longitude  float64
	Zip        int
	PostalCode string
}

GeoIP data expected by SparkPost .. will be blank for now

type IngestResult

type IngestResult struct {
	Results struct {
		ID string `json:"id"`
	} `json:"results"`
	Errors []struct {
		Message string `json:"message"`
	} `json:"errors"`
}

IngestResult object coming back from SparkPost

type Session

type Session struct {
	// contains filtered or unexported fields
}

A Session is returned after successful login. Here hold information that needs to persist across message phases.

func (*Session) Auth

func (s *Session) Auth(expectcode int, cmd, arg string) (int, string, error)

Auth command backend handler

func (*Session) Data

func (s *Session) Data(r io.Reader, w io.WriteCloser) (int, string, error)

Data body (dot delimited) pass upstream, returning the usual responses

func (*Session) DataCommand

func (s *Session) DataCommand() (io.WriteCloser, int, string, error)

DataCommand pass upstream, returning a place to write the data AND the usual responses

func (*Session) Greet

func (s *Session) Greet(helotype string) ([]string, int, string, error)

Greet the upstream host and report capabilities back.

func (*Session) Mail

func (s *Session) Mail(expectcode int, cmd, arg string) (int, string, error)

Mail command backend handler

func (*Session) Passthru

func (s *Session) Passthru(expectcode int, cmd, arg string) (int, string, error)

Passthru a command to the upstream server, logging

func (*Session) Quit

func (s *Session) Quit(expectcode int, cmd, arg string) (int, string, error)

Quit command backend handler

func (*Session) Rcpt

func (s *Session) Rcpt(expectcode int, cmd, arg string) (int, string, error)

Rcpt command backend handler

func (*Session) Reset

func (s *Session) Reset(expectcode int, cmd, arg string) (int, string, error)

Reset command backend handler

func (*Session) StartTLS

func (s *Session) StartTLS() (int, string, error)

StartTLS command

func (*Session) Unknown

func (s *Session) Unknown(expectcode int, cmd, arg string) (int, string, error)

Unknown command backend handler

type SparkPostEvent

type SparkPostEvent struct {
	EventWrapper struct {
		EventGrouping struct {
			Type          string `json:"type"`
			DelvMethod    string `json:"delv_method"`
			EventID       string `json:"event_id"`
			IPAddress     string `json:"ip_address"`
			GeoIP         GeoIP  `json:"geo_ip"`
			MessageID     string `json:"message_id"`
			RcptTo        string `json:"rcpt_to"`
			TimeStamp     string `json:"timestamp"`
			TargetLinkURL string `json:"target_link_url"`
			UserAgent     string `json:"user_agent"`
			SubaccountID  int    `json:"subaccount_id"`
		} `json:"track_event"`
	} `json:"msys"`
}

SparkPostEvent structure for SparkPost Ingest API. Note the nesting. There are some fields we're not populating here, as they will automatically be "enriched" by SparkPost, providing there is a injection event matching for this message_id.

ab_test_id, ab_test_version, amp_enabled, binding, binding_group, campaign_id, click_tracking,
friendly_from, initial_pixel, injection_time, ip_pool, ip_pool_raw, msg_from, msg_size, open_tracking,
rcpt_meta, rcpt_tags, rcpt_type, recv_method, routing_domain, sending_ip, subject, template_id, template_version, transactional,
transmission_id

We are also not populating: num_retries, queue_time, raw_rcpt_to, target_link_name A future implementation could usefully populate target_link_name, geo_ip if desired.

type TimedBuffer

type TimedBuffer struct {
	Content     []byte
	TimeStarted time.Time
	MaxAge      time.Duration
}

TimedBuffer associates content with a time started and a maximum age it should be held for

func (*TimedBuffer) AgedContent

func (t *TimedBuffer) AgedContent() bool

AgedContent returns true if the buffer has non-nil contents that are older than the specified maxAge

type TrackEvent

type TrackEvent struct {
	WD        WrapperData
	TimeStamp string `json:"ts"`
	UserAgent string `json:"ua"`
	IPAddress string `json:"ip"`
}

TrackEvent is the augmented info passed from "tracker" via the Redis event queue to "feeder"

type Wrapper

type Wrapper struct {
	URL url.URL
	// contains filtered or unexported fields
}

Wrapper carries the per-message information as each message is processed

func NewWrapper

func NewWrapper(URL string, trackOpen, trackInitialOpen, trackLink bool) (*Wrapper, error)

NewWrapper returns a tracker with the persistent info set up from params

func (*Wrapper) Active

func (wrap *Wrapper) Active() bool

Active returns bool when wrapping/tracking is active.

func (*Wrapper) HandleMessagePart

func (wrap *Wrapper) HandleMessagePart(dst io.Writer, part io.Reader, cType string, cte string) error

HandleMessagePart walks the MIME structure, and may be called recursively. The incoming content type and cte (content transfer encoding) are passed separately

func (*Wrapper) InitialOpenPixel

func (wrap *Wrapper) InitialOpenPixel() string

InitialOpenPixel returns an html fragment with pixel for initial open tracking. If there are problems, empty string is returned.

func (*Wrapper) MailCopy

func (wrap *Wrapper) MailCopy(dst io.Writer, src io.Reader) error

MailCopy transfers the mail body from downstream (client) to upstream (server), using the engagement wrapper The writer should be closed by the parent function

func (*Wrapper) OpenPixel

func (wrap *Wrapper) OpenPixel() string

OpenPixel returns an html fragment with pixel for bottom open tracking. If there are problems, empty string is returned.

func (*Wrapper) ProcessMessageHeaders

func (wrap *Wrapper) ProcessMessageHeaders(h mail.Header) error

ProcessMessageHeaders reads the message's current headers and updates/inserts any new ones required. The RCPT TO address is grabbed

func (*Wrapper) SetMessageInfo

func (wrap *Wrapper) SetMessageInfo(msgID string, rcpt string)

SetMessageInfo sets the per-message specifics

func (*Wrapper) TrackHTML

func (wrap *Wrapper) TrackHTML(w io.Writer, r io.Reader) (int, error)

TrackHTML streams content to w from r (a la io.Copy), adding engagement tracking by wrapping links and inserting open pixel(s). Returns count of bytes written and error status If the wrapping is inactive, just do a copy

func (*Wrapper) WrapURL

func (wrap *Wrapper) WrapURL(url string) string

WrapURL returns the wrapped, encoded version of the URL for engagement tracking. If there are problems, the original unwrapped url is returned.

type WrapperData

type WrapperData struct {
	Action        string `json:"act"` // carries "c" = click, "o" = open, "i" = initial open
	TargetLinkURL string `json:"t_url"`
	MessageID     string `json:"msg_id"`
	RcptTo        string `json:"rcpt"`
}

WrapperData is used to build the tracking URL

func DecodeLink(urlStr string) ([]byte, WrapperData, string, error)

DecodeLink - convenience function. returns JSON intermediate form, decoded Wrapper data, and tracking domain

Directories

Path Synopsis
cmd

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL