impalathing

package module
v0.0.0-...-091d07d Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jun 26, 2020 License: MIT Imports: 14 Imported by: 0

README

Impalathing is a small Go wrapper library the thrift interface go Impala

It's based on hivething

Working on this you quickly realize that having strings deliminated by tabs is a ugly API... (That's the thrift side of things)

Usage

To add kerberos support this requires header files to build against the GSSAPI C library. They can be installed with:

Ubuntu: sudo apt-get install libkrb5-dev
MacOS: brew install homebrew/dupes/heimdal --without-x11
Debian: yum install -y krb5-devel

in order to use kerberos, you need an extra dependency

go get -tags kerberos github.com/beltran/gosasl

then

go build --tags=kerberos

before starting your application, you should kinit first, for example

kinit -k -t impala.keytab impala/host@DOMAIN.COM

package main

import (
    "log"
    "fmt"
    "time"
    "github.com/KosyanMedia/impalathing"
)

func main() {
    host := "impala-host"
    port := 21000

    useKerberos := true
    con, err := impalathing.Connect(host, port, impalathing.DefaultOptions, useKerberos)

    if err != nil {
        log.Fatal("Error connecting", err)
        return
    }

    query, err := con.Query("SELECT user_id, action, yyyymm FROM engagements LIMIT 10000")

    startTime := time.Now()
    total := 0
    for query.Next() {
        var (
            user_id     string
            action      string
            yyyymm      int
        )

        query.Scan(&user_id, &action, &yyyymm)
        total += 1

        fmt.Println(user_id, action)
    }

    log.Printf("Fetch %d rows(s) in %.2fs", total, time.Duration(time.Since(startTime)).Seconds())
}

Documentation

Index

Constants

View Source
const (
	START    = 1
	OK       = 2
	BAD      = 3
	ERROR    = 4
	COMPLETE = 5
)
View Source
const DEFAULT_MAX_LENGTH = 16384000

Variables

View Source
var (
	DefaultOptions = Options{PollIntervalSeconds: 0.1, BatchSize: 10000}
)

Functions

This section is empty.

Types

type Connection

type Connection struct {
	// contains filtered or unexported fields
}

func Connect

func Connect(host string, port int, options Options, saslConfiguration map[string]string) (*Connection, error)

func (*Connection) Close

func (c *Connection) Close() error

func (*Connection) Query

func (c *Connection) Query(query string) (RowSet, error)

func (*Connection) QueryWithContext

func (c *Connection) QueryWithContext(ctx context.Context, query string) (RowSet, error)

type Options

type Options struct {
	PollIntervalSeconds float64
	BatchSize           int64
}

type RowSet

type RowSet interface {
	Columns() []string
	ColumnsWithContext(ctx context.Context) []string
	Next() bool
	NextWithContext(ctx context.Context) bool
	Scan(dest ...interface{}) error
	Poll() (*Status, error)
	PollWithContext(ctx context.Context) (*Status, error)
	Wait() (*Status, error)
	WaitWithContext(ctx context.Context) (*Status, error)
	FetchAll() []map[string]interface{}
	FetchAllWithContext(ctx context.Context) []map[string]interface{}
	MapScan(dest map[string]interface{}) error
}

A RowSet represents an asyncronous hive operation. You can Reattach to a previously submitted hive operation if you have a valid thrift client, and the serialized Handle() from the prior operation.

type Status

type Status struct {
	Error error
	// contains filtered or unexported fields
}

Represents job status, including success state and time the status was updated.

func (*Status) IsComplete

func (s *Status) IsComplete() bool

func (*Status) IsSuccess

func (s *Status) IsSuccess() bool

type TSaslTransport

type TSaslTransport struct {
	OpeningContext context.Context
	// contains filtered or unexported fields
}

TSaslTransport is a tranport thrift struct that uses SASL

func NewTSaslTransport

func NewTSaslTransport(trans thrift.TTransport, host string, mechanismName string, configuration map[string]string) (*TSaslTransport, error)

NewTSaslTransport return a TSaslTransport

func (*TSaslTransport) Close

func (p *TSaslTransport) Close() (err error)

Close close a SASL transport connection

func (*TSaslTransport) Flush

func (p *TSaslTransport) Flush(ctx context.Context) (err error)

Flush the bytes in the buffer

func (*TSaslTransport) IsOpen

func (p *TSaslTransport) IsOpen() bool

IsOpen opens a SASL connection

func (*TSaslTransport) Open

func (p *TSaslTransport) Open() (err error)

Open check if a SASL transport connection is opened

func (*TSaslTransport) Read

func (p *TSaslTransport) Read(buf []byte) (l int, err error)

func (*TSaslTransport) RemainingBytes

func (p *TSaslTransport) RemainingBytes() uint64

RemainingBytes return the size of the unwrapped bytes

func (*TSaslTransport) Write

func (p *TSaslTransport) Write(buf []byte) (int, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL