package module
v0.0.0-...-c8ac0fd Latest Latest

This package is not in the latest version of its module.

Go to latest
Published: Apr 4, 2020 License: MIT Imports: 12 Imported by: 0



Build Status Go Report Card License GoDoc

Forward raw HTTP requests to a Kafka cluster

data transformation in rawkafka


The Kafka REST protocol requires a specific format for its requests, as you can see in the API Spec. This service is a simple Go HTTP server that listens for all methods and paths, formatting the request properly and sending them to a Kafka REST endpoint.

At startup, the configured schema is automatically registered at the Schema Registry endpoint.

Link to the current Avro schema: https://github.com/diogenes1oliveira/rawkafka/blob/master/request.avsc


Run it with Docker:

$ docker run -it --rm -e RAWKAFKA_TOPIC=SampleTopic -p 7000:7000 diogenes1oliveira/rawkafka:1.0.0 --port 7000


  rawkafka [OPTIONS]

Application Options:
      --port=                Port to bind to (default: 9000) [$RAWKAFKA_PORT]
      --host=                Host IP to bind to (default: [$RAWKAFKA_HOST]
      --topic=               Name of the topic to publish the messages to (default: RawRequest)
      --rest-endpoint=       Kafka REST endpoint [$RAWKAFKA_REST_ENDPOINT]
      --schema-location=     Avro schema location (default: ./request.avsc) [$RAWKAFKA_SCHEMA_LOCATION]
      --schema-registry-url= Schema registry URL [$RAWKAFKA_SCHEMA_REGISTRY_URL]

Help Options:
  -h, --help                 Show this help message


You need the following requirements:

  • Go >= 1.13
  • GCC
  • GNU Make

In the project root, just run:

$ make install




This section is empty.


View Source
var HeadersExcluded = getHeaderListFromEnv("KAFKA_RAW_HEADERS_HTTP_EXCLUDED", "Cookie")

HeadersExcluded contains a map with the HTTP headers that should not be stored or processed. It is extracted from the environment variable KAFKA_RAW_HTTP_EXCLUDED_HEADERS and by default excludes the 'Cookie' header

View Source
var HeadersIPForwarding = getHeaderListFromEnv("KAFKA_RAW_HTTP_HEADERS_IP_FORWARDING", "X-Forwarded-For,X-Real-Ip")

HeadersIPForwarding contains a map with the HTTP headers that should be considered for IP forwarding. It is extracted from the environment variable KAFKA_RAW_HTTP_HEADERS_IP_FORWARDING and by default considers the headers X-Forwarded-For and X-Real-Ip in this order

View Source
var SchemaDefaultLocation = "./request.avsc"

SchemaDefaultLocation contains the default path to the request schema


func GetIPFromRequest

func GetIPFromRequest(req *http.Request) (string, error)

GetIPFromRequest extracts the user IP from the headers, falling back to the IP from the request itself


type KafkaCodec

type KafkaCodec struct {
	ValueSchema   string
	ValueSchemaID int

KafkaCodec represents an Avro codec to encode the raw HTTP messages info

func LoadKafkaCodec

func LoadKafkaCodec(schemaPath string) (*KafkaCodec, error)

LoadKafkaCodec loads a new codec from the schema path

func (*KafkaCodec) Register

func (codec *KafkaCodec) Register(endpoint, topic string) error

Register registers this Kafka codec to the Schema Registry

func (*KafkaCodec) Restify

func (codec *KafkaCodec) Restify(req *RequestInfo) ([]byte, error)

Restify encodes the data of the raw request into a Kafka-REST compatible format

type RequestInfo

type RequestInfo struct {
	Headers     http.Header `json:"headers"`
	IP          string      `json:"ip" faker:"ipv4"`
	Method      string      `json:"method"`
	ServerTime  time.Time   `json:"server_time"`
	URL         string      `json:"url" faker:"url"`
	Body        []byte      `json:"body"`
	ParseErrors []string

RequestInfo contains the information for a raw HTTP request

func (*RequestInfo) Parse

func (reqInfo *RequestInfo) Parse(req *http.Request)

Parse fills up the raw request struct with info parsed from the HTTP request


Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL