flight

package
v11.0.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jan 18, 2023 License: Apache-2.0, BSD-2-Clause, BSD-3-Clause, + 8 more Imports: 24 Imported by: 1

Documentation

Overview

Example (Server)
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package main

import (
	"context"
	"errors"
	"fmt"
	"io"
	"log"

	"github.com/apache/arrow/go/v11/arrow/flight"
	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/status"
)

type serverAuth struct{}

func (sa *serverAuth) Authenticate(c flight.AuthConn) error {
	in, err := c.Read()
	if errors.Is(err, io.EOF) {
		return status.Error(codes.Unauthenticated, "no auth info provided")
	}

	if err != nil {
		return status.Error(codes.FailedPrecondition, "error reading auth handshake")
	}

	// do something with in....
	fmt.Println(string(in))

	// send auth token back
	return c.Send([]byte("foobar"))
}

func (sa *serverAuth) IsValid(token string) (interface{}, error) {
	if token == "foobar" {
		return "foo", nil
	}
	return "", status.Error(codes.PermissionDenied, "invalid auth token")
}

func main() {
	server := flight.NewFlightServer()
	server.Init("localhost:0")
	svc := &flight.BaseFlightServer{}
	svc.SetAuthHandler(&serverAuth{})
	server.RegisterFlightService(svc)

	go server.Serve()
	defer server.Shutdown()

	conn, err := grpc.Dial(server.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()

	client := flight.NewFlightServiceClient(conn)
	stream, err := client.Handshake(context.Background())
	if err != nil {
		log.Fatal(err)
	}

	// ignore error handling here for brevity
	stream.Send(&flight.HandshakeRequest{Payload: []byte("baz")})

	resp, _ := stream.Recv()
	fmt.Println(string(resp.Payload))

}
Output:

baz
foobar

Index

Examples

Constants

View Source
const (
	DescriptorUNKNOWN = flight.FlightDescriptor_UNKNOWN
	DescriptorPATH    = flight.FlightDescriptor_PATH
	DescriptorCMD     = flight.FlightDescriptor_CMD
)

Variables

View Source
var FlightService_ServiceDesc = flight.FlightService_ServiceDesc

FlightService_ServiceDesc is the grpc.ServiceDesc for the FlightService server. It should only be used for direct call of grpc.RegisterService, and not introspected or modified (even as a copy).

View Source
var NewFlightServiceClient = flight.NewFlightServiceClient

Functions

func AuthFromContext

func AuthFromContext(ctx context.Context) interface{}

AuthFromContext will return back whatever object was returned from `IsValid` for a given request context allowing handlers to retrieve identifying information for the current request for use.

func CreateServerBearerTokenAuthInterceptors deprecated

func CreateServerBearerTokenAuthInterceptors(validator BasicAuthValidator) (grpc.UnaryServerInterceptor, grpc.StreamServerInterceptor)

CreateServerBearerTokenAuthInterceptors returns grpc interceptors for basic auth handling via bearer tokens. validator cannot be nil

Deprecated: use CreateServerBasicAuthMiddleware instead

func DeserializeSchema

func DeserializeSchema(info []byte, mem memory.Allocator) (*arrow.Schema, error)

DeserializeSchema takes the schema bytes from FlightInfo or SchemaResult and returns the deserialized arrow schema.

func RegisterFlightServiceServer

func RegisterFlightServiceServer(s grpc.ServiceRegistrar, srv FlightServer)

RegisterFlightServiceServer registers an existing flight server onto an existing grpc server, or anything that is a grpc service registrar.

Example
package main

import (
	"context"
	"fmt"
	"net"

	"github.com/apache/arrow/go/v11/arrow/flight"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	"google.golang.org/grpc/health"

	healthgrpc "google.golang.org/grpc/health/grpc_health_v1"
)

func main() {
	s := grpc.NewServer()
	healthSrv := health.NewServer()
	healthgrpc.RegisterHealthServer(s, healthSrv)

	// add methods to this to override the desired methods
	// like DoGet, DoPut, etc.
	server := struct {
		flight.BaseFlightServer
	}{}

	flight.RegisterFlightServiceServer(s, &server)
	healthSrv.SetServingStatus("test", healthgrpc.HealthCheckResponse_SERVING)

	lis, err := net.Listen("tcp", "localhost:0")
	if err != nil {
		panic(err)
	}
	go s.Serve(lis)
	defer s.Stop()

	conn, err := grpc.DialContext(context.Background(), lis.Addr().String(),
		grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		panic(err)
	}
	defer conn.Close()

	hc := healthgrpc.NewHealthClient(conn)
	rsp, err := hc.Check(context.Background(), &healthgrpc.HealthCheckRequest{Service: "test"})
	if err != nil {
		panic(err)
	}

	fmt.Println(rsp.Status)
	fc := flight.NewClientFromConn(conn, nil)
	if err != nil {
		panic(err)
	}

	// we didn't implement GetFlightInfo so we should get an Unimplemented
	// error, proving it did call into the base flight server. If we didn't
	// register the service, we'd get an error that says "unknown service arrow.flight.protocol.FlightService"
	_, err = fc.GetFlightInfo(context.Background(), &flight.FlightDescriptor{})
	fmt.Println(err)

}
Output:

SERVING
rpc error: code = Unimplemented desc = method GetFlightInfo not implemented

func SerializeSchema

func SerializeSchema(rec *arrow.Schema, mem memory.Allocator) []byte

SerializeSchema returns the serialized schema bytes for use in Arrow Flight protobuf messages.

func StreamChunksFromReader

func StreamChunksFromReader(rdr array.RecordReader, ch chan<- StreamChunk)

StreamChunksFromReader is a convenience function to populate a channel from a record reader. It is intended to be run using a separate goroutine by calling `go flight.StreamChunksFromReader(rdr, ch)`.

If the record reader panics, an error chunk will get sent on the channel.

This will close the channel and release the reader when it completes.

Types

type Action

type Action = flight.Action

type ActionType

type ActionType = flight.ActionType

type AuthConn

type AuthConn interface {
	Read() ([]byte, error)
	Send([]byte) error
}

AuthConn wraps the stream from grpc for handshakes to simplify handling handshake request and response from the flight.proto forwarding just the payloads and errors instead of having to deal with the handshake request and response protos directly

type BaseFlightServer

type BaseFlightServer struct {
	flight.UnimplementedFlightServiceServer
	// contains filtered or unexported fields
}

BaseFlightServer is the base flight server implementation and must be embedded in any server implementation to ensure forward compatibility with any modifications of the spec without compiler errors.

func (*BaseFlightServer) GetAuthHandler

func (s *BaseFlightServer) GetAuthHandler() ServerAuthHandler

func (*BaseFlightServer) Handshake

func (*BaseFlightServer) SetAuthHandler

func (s *BaseFlightServer) SetAuthHandler(handler ServerAuthHandler)

type BasicAuth

type BasicAuth = flight.BasicAuth

type BasicAuthValidator

type BasicAuthValidator interface {
	Validate(username, password string) (string, error)
	IsValid(bearerToken string) (interface{}, error)
}

type Client

type Client interface {
	// Authenticate uses the ClientAuthHandler that was used when creating the client
	// in order to use the Handshake endpoints of the service.
	Authenticate(context.Context, ...grpc.CallOption) error
	AuthenticateBasicToken(ctx context.Context, username string, password string, opts ...grpc.CallOption) (context.Context, error)
	Close() error
	// join the interface from the FlightServiceClient instead of re-defining all
	// the endpoints here.
	FlightServiceClient
}

Client is an interface wrapped around the generated FlightServiceClient which is generated by grpc protobuf definitions. This interface provides a useful hiding of the authentication handshake via calling Authenticate and using the ClientAuthHandler rather than manually having to implement the grpc communication and sending of the auth token.

func NewClientFromConn

func NewClientFromConn(cc grpc.ClientConnInterface, auth ClientAuthHandler) Client

func NewClientWithMiddleware

func NewClientWithMiddleware(addr string, auth ClientAuthHandler, middleware []ClientMiddleware, opts ...grpc.DialOption) (Client, error)

NewClientWithMiddleware takes a slice of middlewares in addition to the auth and address which will be used by grpc and chained, the first middleware will be the outer most with the last middleware being the inner most wrapper around the actual call. It also passes along the dialoptions passed in such as TLS certs and so on.

func NewFlightClient deprecated

func NewFlightClient(addr string, auth ClientAuthHandler, opts ...grpc.DialOption) (Client, error)

NewFlightClient takes in the address of the grpc server and an auth handler for the application-level handshake. If using TLS or other grpc configurations they can still be passed via the grpc.DialOption list just as if connecting manually without this helper function.

Alternatively, a grpc client can be constructed as normal without this helper as the grpc generated client code is still exported. This exists to add utility and helpers around the authentication and passing the token with requests.

Deprecated: prefer to use NewClientWithMiddleware

type ClientAuthHandler

type ClientAuthHandler interface {
	Authenticate(context.Context, AuthConn) error
	GetToken(context.Context) (string, error)
}

ClientAuthHandler defines an interface for the Flight client to perform the authentication handshake. The token that is retrieved from GetToken will be sent as part of the context metadata in subsequent requests after authentication is performed using the key "auth-token-bin".

type ClientHeadersMiddleware

type ClientHeadersMiddleware interface {
	HeadersReceived(ctx context.Context, md metadata.MD)
}

type ClientMiddleware

type ClientMiddleware struct {
	Stream grpc.StreamClientInterceptor
	Unary  grpc.UnaryClientInterceptor
}

func CreateClientMiddleware

func CreateClientMiddleware(middleware CustomClientMiddleware) ClientMiddleware

type ClientPostCallMiddleware

type ClientPostCallMiddleware interface {
	CallCompleted(ctx context.Context, err error)
}

type Criteria

type Criteria = flight.Criteria

type CustomClientMiddleware

type CustomClientMiddleware interface {
	StartCall(ctx context.Context) context.Context
}

type CustomServerMiddleware

type CustomServerMiddleware interface {
	// StartCall will be called with the current context of the call, grpc.SetHeader can be used to add outgoing headers
	// if the returned context is non-nil, then it will be used as the new context being passed through the calls
	StartCall(ctx context.Context) context.Context
	// CallCompleted is a callback which is called with the return from the handler
	// it will be nil if everything was successful or will be the error about to be returned
	// to grpc
	CallCompleted(ctx context.Context, err error)
}

CustomerServerMiddleware is a helper interface for more easily defining custom grpc middlware without having to expose or understand all the grpc bells and whistles.

type DataStreamReader

type DataStreamReader interface {
	Recv() (*FlightData, error)
}

DataStreamReader is an interface for receiving flight data messages on a stream such as via grpc with Arrow Flight.

type DataStreamWriter

type DataStreamWriter interface {
	Send(*FlightData) error
}

DataStreamWriter is an interface that represents an Arrow Flight stream writer that writes FlightData objects

type Empty

type Empty = flight.Empty

type FlightData

type FlightData = flight.FlightData

type FlightDescriptor

type FlightDescriptor = flight.FlightDescriptor

type FlightEndpoint

type FlightEndpoint = flight.FlightEndpoint

type FlightInfo

type FlightInfo = flight.FlightInfo

type FlightServer

type FlightServer = flight.FlightServiceServer

type FlightServiceClient

type FlightServiceClient = flight.FlightServiceClient

type FlightService_DoActionClient

type FlightService_DoActionClient = flight.FlightService_DoActionClient

type FlightService_DoActionServer

type FlightService_DoActionServer = flight.FlightService_DoActionServer

type FlightService_DoExchangeClient

type FlightService_DoExchangeClient = flight.FlightService_DoExchangeClient

type FlightService_DoExchangeServer

type FlightService_DoExchangeServer = flight.FlightService_DoExchangeServer

type FlightService_DoGetClient

type FlightService_DoGetClient = flight.FlightService_DoGetClient

type FlightService_DoGetServer

type FlightService_DoGetServer = flight.FlightService_DoGetServer

type FlightService_DoPutClient

type FlightService_DoPutClient = flight.FlightService_DoPutClient

type FlightService_DoPutServer

type FlightService_DoPutServer = flight.FlightService_DoPutServer

type FlightService_HandshakeClient

type FlightService_HandshakeClient = flight.FlightService_HandshakeClient

type FlightService_HandshakeServer

type FlightService_HandshakeServer = flight.FlightService_HandshakeServer

type FlightService_ListActionsClient

type FlightService_ListActionsClient = flight.FlightService_ListActionsClient

type FlightService_ListActionsServer

type FlightService_ListActionsServer = flight.FlightService_ListActionsServer

type FlightService_ListFlightsClient

type FlightService_ListFlightsClient = flight.FlightService_ListFlightsClient

type FlightService_ListFlightsServer

type FlightService_ListFlightsServer = flight.FlightService_ListFlightsServer

type HandshakeRequest

type HandshakeRequest = flight.HandshakeRequest

type HandshakeResponse

type HandshakeResponse = flight.HandshakeResponse

type Location

type Location = flight.Location

type MessageReader

type MessageReader interface {
	array.RecordReader
	arrio.Reader
	Err() error
	Chunk() StreamChunk
	LatestFlightDescriptor() *FlightDescriptor
	LatestAppMetadata() []byte
}

MessageReader is an interface representing a RecordReader that also provides StreamChunks and/or the ability to retrieve FlightDescriptors and AppMetadata from the flight stream

type MetadataWriter

type MetadataWriter interface {
	WriteMetadata([]byte) error
}

type PutResult

type PutResult = flight.PutResult

type Reader

type Reader struct {
	*ipc.Reader
	// contains filtered or unexported fields
}

Reader is an ipc.Reader which also keeps track of the metadata from the FlightData messages as they come in, calling LatestAppMetadata will return the metadata bytes from the most recently read message.

func NewRecordReader

func NewRecordReader(r DataStreamReader, opts ...ipc.Option) (*Reader, error)

NewRecordReader constructs an ipc reader using the flight data stream reader as the source of the ipc messages, opts passed will be passed to the underlying ipc.Reader such as ipc.WithSchema and ipc.WithAllocator

func (*Reader) Chunk

func (r *Reader) Chunk() StreamChunk

Chunk is a convenience function to return a chunk of the flight stream returning the RecordBatch along with the FlightDescriptor and any AppMetadata. Each of these can be retrieved separately with their respective functions, this is just a convenience to retrieve all three with one function call.

func (*Reader) LatestAppMetadata

func (r *Reader) LatestAppMetadata() []byte

LatestAppMetadata returns the bytes from the AppMetadata field of the most recently read FlightData message that was processed by calling the Next function. The metadata returned would correspond to the record retrieved by calling Record().

func (*Reader) LatestFlightDescriptor

func (r *Reader) LatestFlightDescriptor() *FlightDescriptor

LatestFlightDescriptor returns a pointer to the last FlightDescriptor object that was received in the most recently read FlightData message that was processed by calling the Next function. The descriptor returned would correspond to the record retrieved by calling Record().

func (*Reader) Release

func (r *Reader) Release()

Release reduces the reference count for the underlying message reader and ipc.Reader, when the reference counts become zero, the allocated memory is released for the stored record and metadata.

func (*Reader) Retain

func (r *Reader) Retain()

Retain increases the reference count for the underlying message reader and ipc.Reader which are utilized by this Reader.

type Result

type Result = flight.Result

type SchemaResult

type SchemaResult = flight.SchemaResult

type Server

type Server interface {
	// Init takes in the address to bind to and creates the listener. If both this
	// and InitListener are called, then whichever was called last will be used.
	Init(addr string) error
	// InitListener initializes with an already created listener rather than
	// creating a new one like Init does. If both this and Init are called,
	// whichever was called last is what will be used as they both set a listener
	// into the server.
	InitListener(lis net.Listener)
	// Addr will return the address that was bound to for the service to listen on
	Addr() net.Addr
	// SetShutdownOnSignals sets notifications on the given signals to call GracefulStop
	// on the grpc service if any of those signals are received
	SetShutdownOnSignals(sig ...os.Signal)
	// Serve blocks until accepting a connection fails with a fatal error. It will return
	// a non-nil error unless it stopped due to calling Shutdown or receiving one of the
	// signals set in SetShutdownOnSignals
	Serve() error
	// Shutdown will call GracefulStop on the grpc server so that it stops accepting connections
	// and will wait until current methods complete
	Shutdown()
	// RegisterFlightService sets up the handler for the Flight Endpoints as per
	// normal Grpc setups
	RegisterFlightService(FlightServer)
	// ServiceRegistrar wraps a single method that supports service registration.
	// For example, it may be used to register health check provided by grpc-go.
	grpc.ServiceRegistrar
	// ServiceInfoProvider is an interface used to retrieve metadata about the services to expose.
	// If reflection is enabled on the server, all the endpoints can be invoked using grpcurl.
	reflection.ServiceInfoProvider
}

Server is an interface for hiding some of the grpc specifics to make it slightly easier to manage a flight service, slightly modeled after the C++ implementation

func NewFlightServer deprecated

func NewFlightServer(opt ...grpc.ServerOption) Server

NewFlightServer takes any grpc Server options desired, such as TLS certs and so on which will just be passed through to the underlying grpc server.

Alternatively, a grpc server can be created normally without this helper as the grpc server generated code is still being exported. This only exists to allow the utility of the helpers

Deprecated: prefer to use NewServerWithMiddleware, due to auth handler middleware this function will be problematic if any of the grpc options specify other middlewares.

func NewServerWithMiddleware

func NewServerWithMiddleware(middleware []ServerMiddleware, opts ...grpc.ServerOption) Server

NewServerWithMiddleware takes a slice of middleware which will be used by grpc and chained, the first middleware will be the outer most with the last middleware being the inner most wrapper around the actual call. It also takes any grpc Server options desired, such as TLS certs and so on which will just be passed through to the underlying grpc server.

Because of the usage of `ChainStreamInterceptor` and `ChainUnaryInterceptor` do not specify any middleware using the grpc options, use the ServerMiddleware slice instead as the auth middleware will be added for handling the case that a service handler is registered that uses the ServerAuthHandler.

Alternatively, a grpc server can be created normally without this helper as the grpc server generated code is still being exported. This only exists to allow the utility of the helpers.

type ServerAuthHandler

type ServerAuthHandler interface {
	Authenticate(AuthConn) error
	IsValid(token string) (interface{}, error)
}

ServerAuthHandler defines an interface for the server to perform the handshake. The token is expected to be sent as part of the context metadata in subsequent requests with a key of "auth-token-bin" which will then call IsValid to validate

type ServerMiddleware

type ServerMiddleware struct {
	Stream grpc.StreamServerInterceptor
	Unary  grpc.UnaryServerInterceptor
}

func CreateServerBasicAuthMiddleware

func CreateServerBasicAuthMiddleware(validator BasicAuthValidator) ServerMiddleware

CreateServerBasicAuthMiddleware returns a ServerMiddleware that can be passed to NewServerWithMiddleware in order to automatically add interceptors which will properly enforce auth validation as per the passed in BasicAuthValidator.

validator cannot be nil.

func CreateServerMiddleware

func CreateServerMiddleware(middleware CustomServerMiddleware) ServerMiddleware

CreateServerMiddlware constructs a ServerMiddleware object for the passed in custom middleware, generating both the Unary and Stream interceptors from the interface.

type StreamChunk

type StreamChunk struct {
	Data        arrow.Record
	Desc        *FlightDescriptor
	AppMetadata []byte
	Err         error
}

StreamChunk represents a single chunk of a FlightData stream

type Ticket

type Ticket = flight.Ticket

type Writer

type Writer struct {
	*ipc.Writer
	// contains filtered or unexported fields
}

Writer is an ipc.Writer which also adds a WriteWithAppMetadata function in order to allow adding AppMetadata to the FlightData messages which are written.

func NewRecordWriter

func NewRecordWriter(w DataStreamWriter, opts ...ipc.Option) *Writer

NewRecordWriter can be used to construct a writer for arrow flight via the grpc stream handler to write flight data objects and write record batches to the stream. Options passed here will be passed to ipc.NewWriter

func (*Writer) SetFlightDescriptor

func (w *Writer) SetFlightDescriptor(descr *FlightDescriptor)

SetFlightDescriptor sets the flight descriptor into the next payload that will be written by the flight writer. It will only be put into the very next payload and afterwards the writer will no longer keep it's pointer to the descriptor.

func (*Writer) Write

func (w *Writer) Write(rec arrow.Record) error

Write writes a recordbatch payload and returns any error, implementing the arrio.Writer interface

func (*Writer) WriteMetadata

func (w *Writer) WriteMetadata(appMetadata []byte) error

WriteMetadata writes a payload message to the stream containing only the specified app metadata.

func (*Writer) WriteWithAppMetadata

func (w *Writer) WriteWithAppMetadata(rec arrow.Record, appMeta []byte) error

WriteWithAppMetadata will write this record with the supplied application metadata attached in the flightData message.

Directories

Path Synopsis
example
Package example contains a FlightSQL Server implementation using sqlite as the backing engine.
Package example contains a FlightSQL Server implementation using sqlite as the backing engine.
schema_ref
Package schema_ref contains the expected reference Schemas to be used by FlightSQL servers and clients.
Package schema_ref contains the expected reference Schemas to be used by FlightSQL servers and clients.
internal

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL