worker

package
v25.0.0-split-vector3 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 4, 2025 License: Apache-2.0 Imports: 78 Imported by: 1

README

To update the protocol buffer definitions, run this from one directory above:

protoc -I worker worker/payload.proto --gofast_out=plugins=grpc:worker

Documentation

Overview

Package worker contains code for pb.worker communication to perform queries and mutations.

Index

Constants

View Source
const (
	EventTypeDrop     = "drop"
	EventTypeMutation = "mutation"
	OpDropPred        = "predicate"
)
View Source
const (
	// AllowMutations is the mode allowing all mutations.
	AllowMutations int = iota
	// DisallowMutations is the mode that disallows all mutations.
	DisallowMutations
	// StrictMutations is the mode that allows mutations if and only if they contain known preds.
	StrictMutations
)
View Source
const (
	ErrGraphQLSchemaAlterFailed = "succeeded in saving GraphQL schema but failed to alter Dgraph schema - " +
		"GraphQL layer may exhibit unexpected behaviour, reapplying the old GraphQL schema may prevent any issues"

	GqlSchemaPred = "dgraph.graphql.schema"
)
View Source
const (
	// NOTE: SuperFlag defaults must include every possible option that can be used. This way, if a
	//       user makes a typo while defining a SuperFlag we can catch it and fail right away rather
	//       than fail during runtime while trying to retrieve an option that isn't there.
	//
	//       For easy readability, keep the options without default values (if any) at the end of
	//       the *Defaults string. Also, since these strings are printed in --help text, avoid line
	//       breaks.
	AuditDefaults  = `compress=false; days=10; size=100; dir=; output=; encrypt-file=;`
	BadgerDefaults = `compression=snappy; numgoroutines=8;`
	RaftDefaults   = `learner=false; snapshot-after-entries=10000; ` +
		`snapshot-after-duration=30m; pending-proposals=256; idx=; group=;`
	SecurityDefaults = `token=; whitelist=;`
	CDCDefaults      = `file=; kafka=; sasl_user=; sasl_password=; ca_cert=; client_cert=; ` +
		`client_key=; sasl-mechanism=PLAIN; tls=false;`
	LimitDefaults = `mutations=allow; query-edge=1000000; normalize-node=10000; ` +
		`mutations-nquad=1000000; disallow-drop=false; query-timeout=0ms; txn-abort-after=5m; ` +
		` max-retries=10;max-pending-queries=10000;shared-instance=false;type-filter-uid-limit=10`
	ZeroLimitsDefaults = `uid-lease=0; refill-interval=30s; disable-admin-http=false;`
	GraphQLDefaults    = `introspection=true; debug=false; extensions=true; poll-interval=1s; ` +
		`lambda-url=;`
	CacheDefaults        = `size-mb=1024; percentage=40,40,20; remove-on-update=false`
	FeatureFlagsDefaults = `normalize-compatibility-mode=; enable-detailed-metrics=false`
)
View Source
const (
	// UseTxnCache indicates the transaction cache should be used.
	UseTxnCache = iota
	// NoCache indicates no caches should be used.
	NoCache
)
View Source
const DefaultExportFormat = "rdf"

DefaultExportFormat stores the name of the default format for exports.

View Source
const DefaultMaxOpenFileLimit = 1024

Default limit on number of simultaneous open files on unix systems

View Source
const (
	// MB represents a megabyte.
	MB = 1 << 20
)

Variables

View Source
var AclCachePtr = &AclCache{
	loaded:        false,
	predPerms:     make(map[string]map[string]int32),
	userPredPerms: make(map[string]map[string]int32),
}
View Source
var AvailableMemory int64

AvailableMemory is the total size of the memory we were able to identify.

View Source
var (
	ErrMultipleGraphQLSchemaNodes = errors.New("found multiple nodes for GraphQL schema")
)
View Source
var (
	// ErrNonExistentTabletMessage is the error message sent when no tablet is serving a predicate.
	ErrNonExistentTabletMessage = "Requested predicate is not being served by any tablet"
)
View Source
var (
	// Tasks is a global persistent task queue.
	// Do not use this before calling InitTasks.
	Tasks *tasks
)

Functions

func ApplyCommited

func ApplyCommited(ctx context.Context, delta *pb.OracleDelta) error

func ApplyInitialSchema

func ApplyInitialSchema(ns, ts uint64) error

func ApplyMutations

func ApplyMutations(ctx context.Context, p *pb.Proposal) error

func AssignNsIdsOverNetwork

func AssignNsIdsOverNetwork(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error)

AssignNsIdsOverNetwork sends a request to assign Namespace IDs to the current zero leader.

func AssignUidsOverNetwork

func AssignUidsOverNetwork(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error)

AssignUidsOverNetwork sends a request to assign UIDs from the current zero leader.

func BackupGroup

func BackupGroup(ctx context.Context, in *pb.BackupRequest) (*pb.BackupResponse, error)

BackupGroup backs up the group specified in the backup request.

func BlockingStop

func BlockingStop()

BlockingStop stops all the nodes, server between other workers and syncs all marks.

func CommitOverNetwork

func CommitOverNetwork(ctx context.Context, tc *api.TxnContext) (uint64, error)

CommitOverNetwork makes a proxy call to Zero to commit or abort a transaction.

func CreateManifest

func CreateManifest(h UriHandler, uri *url.URL, manifest *MasterManifest) error

func FillRestoreCredentials

func FillRestoreCredentials(location string, req *pb.RestoreRequest) error

FillRestoreCredentials fills the empty values with the default credentials so that a restore request is sent to all the groups with the same credentials.

func GetCredentialsFromRequest

func GetCredentialsFromRequest(req *pb.BackupRequest) *x.MinioCredentials

GetCredentialsFromRequest extracts the credentials from a backup request.

func GetFeaturesList

func GetFeaturesList() []string

GetFeaturesList returns a list of Dgraph features that are available.

func GetMembershipState

func GetMembershipState() *pb.MembershipState

GetMembershipState returns the current membership state.

func GetOngoingTasks

func GetOngoingTasks() []string

GetOngoingTasks returns the list of ongoing tasks.

func GetSchemaOverNetwork

func GetSchemaOverNetwork(ctx context.Context, schema *pb.SchemaRequest) (
	[]*pb.SchemaNode, error)

GetSchemaOverNetwork checks which group should be serving the schema according to fingerprint of the predicate and sends it to that instance.

func GetTypes

func GetTypes(ctx context.Context, req *pb.SchemaRequest) ([]*pb.TypeUpdate, error)

GetTypes processes the type requests and retrieves the desired types.

func GroupId

func GroupId() uint32

GroupId returns the group to which this worker belongs to.

func HasAccessToAllPreds

func HasAccessToAllPreds(ns uint64, groups []string, operation *acl.Operation) bool

func InStream

InStream handles streaming of snapshots to a target group. It first checks the group associated with the incoming stream and, if it's the same as the current node's group, it flushes the data using FlushKvs. If the group is different, it establishes a connection with the leader of that group and streams data to it. The function returns an error if there are any issues in the process, such as a broken connection or failure to establish a stream with the leader.

func Init

func Init(ps *badger.DB)

Init initializes this package.

func InitForLite

func InitForLite(ps *badger.DB)

func InitServerState

func InitServerState()

InitServerState initializes this server's state.

func InitTablet

func InitTablet(pred string)

func InitTasks

func InitTasks()

InitTasks initializes the global Tasks variable.

func KnownGroups

func KnownGroups() []uint32

KnownGroups returns the known groups using the global groupi instance.

func LogDQLRequestEnabled

func LogDQLRequestEnabled() bool

LogDQLRequestEnabled returns true if logging of requests is enabled otherwise false.

func MaxLeaseId

func MaxLeaseId() uint64

MaxLeaseId returns the maximum UID that has been leased.

func MoveTabletOverNetwork

func MoveTabletOverNetwork(ctx context.Context, req *pb.MoveTabletRequest) (*pb.Status, error)

MoveTabletOverNetwork sends a request to move the given tablet to destination group to the current zero leader.

func MutateOverNetwork

func MutateOverNetwork(ctx context.Context, m *pb.Mutations) (*api.TxnContext, error)

MutateOverNetwork checks which group should be running the mutations according to the group config and sends it to that instance.

func NewFileHandler

func NewFileHandler(uri *url.URL) *fileHandler

func NewS3Handler

func NewS3Handler(uri *url.URL, creds *x.MinioCredentials) (*s3Handler, error)

NewS3Handler creates a new session, checks valid bucket at uri.Path, and configures a minio client. It also fills in values used by the handler in subsequent calls. Returns a new S3 minio client, otherwise a nil client with an error.

func NodeId

func NodeId() uint64

NodeId returns the raft id of the node.

func NormalizeExportFormat

func NormalizeExportFormat(format string) string

NormalizeExportFormat returns the normalized string for the export format if it is valid, an empty string otherwise.

func ProcessBackupRequest

func ProcessBackupRequest(ctx context.Context, req *pb.BackupRequest) error

func ProcessDeleteNsRequest

func ProcessDeleteNsRequest(ctx context.Context, ns uint64) error

func ProcessRestoreRequest

func ProcessRestoreRequest(ctx context.Context, req *pb.RestoreRequest, wg *sync.WaitGroup) error

ProcessRestoreRequest verifies the backup data and sends a restore proposal to each group.

func ProcessTaskOverNetwork

func ProcessTaskOverNetwork(ctx context.Context, q *pb.Query) (*pb.Result, error)

ProcessTaskOverNetwork is used to process the query and get the result from the instance which stores posting list corresponding to the predicate in the query.

func RegisterZeroProxyServer

func RegisterZeroProxyServer(s *grpc.Server)

RegisterZeroProxyServer forwards select GRPC calls over to Zero

func RemoveNodeOverNetwork

func RemoveNodeOverNetwork(ctx context.Context, req *pb.RemoveNodeRequest) (*pb.Status, error)

RemoveNodeOverNetwork sends a request to remove the given node from given group to a zero server. This operation doesn't necessarily require a zero leader.

func ResetAclCache

func ResetAclCache()

func ResetGQLSchemaStore

func ResetGQLSchemaStore()

func RunMapper

func RunMapper(req *pb.RestoreRequest, mapDir string) (*mapResult, error)

we create MAP files each of a limited size and write sorted data into it. We may end up creating many such files. Then we take all of these MAP files and read part of the data from each file, sort all of this data and then use streamwriter to write the sorted data into pstore badger. We store some sort of partition keys in the MAP file in the beginning of the file. The partition keys are just intermediate keys among the entries that we store in the map file. When we read data during reduce, we read in the chunks of these partition keys, meaning from one partition key to the next partition key. I am not sure if there is a value in having these partition keys. Maybe, we can live without them.

func RunReducer

func RunReducer(w Writer, mapDir string) error

func RunServer

func RunServer(bindall bool)

RunServer initializes a tcp server on port which listens to requests from other workers for pb.communication.

func SchemaExportKv

func SchemaExportKv(attr string, val []byte, skipZero bool) (*bpb.KV, error)

func SetConfiguration

func SetConfiguration(newConfig *Options)

SetConfiguration sets the server configuration to the given config.

func SetMaxNsID

func SetMaxNsID(nsId uint64)

func SetMaxUID

func SetMaxUID(uid uint64)

func SortOverNetwork

func SortOverNetwork(ctx context.Context, q *pb.SortMessage) (*pb.SortResult, error)

SortOverNetwork sends sort query over the network.

func StartRaftNodes

func StartRaftNodes(walStore *raftwal.DiskStorage, bindall bool)

StartRaftNodes will read the WAL dir, create the RAFT groups, and either start or restart RAFT nodes. This function triggers RAFT nodes to be created, and is the entrance to the RAFT world from main.go.

func StoreExport

func StoreExport(request *pb.ExportRequest, dir string, key x.Sensitive) error

func StoreStats

func StoreStats() string

StoreStats returns stats for data store.

func SubscribeForUpdates

func SubscribeForUpdates(prefixes [][]byte, ignore string, cb func(kvs *badgerpb.KVList),
	group uint32, closer *z.Closer)

SubscribeForUpdates will listen for updates for the given group.

func TaskStatusOverNetwork

func TaskStatusOverNetwork(ctx context.Context, req *pb.TaskStatusRequest,
) (*pb.TaskStatusResponse, error)

TaskStatusOverNetwork fetches the status of a task over the network. Alphas only know about the tasks created by them, but this function would fetch the task from the correct Alpha.

func Timestamps

func Timestamps(ctx context.Context, num *pb.Num) (*pb.AssignedIds, error)

Timestamps sends a request to assign startTs for a new transaction to the current zero leader.

func ToExportKvList

func ToExportKvList(pk x.ParsedKey, pl *posting.List, in *pb.ExportRequest) (*bpb.KVList, error)

func TypeExportKv

func TypeExportKv(attr string, val []byte) (*bpb.KV, error)

func UpdateCacheMb

func UpdateCacheMb(memoryMB int64) error

UpdateCacheMb updates the value of cache_mb and updates the corresponding cache sizes.

func UpdateGQLSchemaOverNetwork

func UpdateGQLSchemaOverNetwork(ctx context.Context, req *pb.UpdateGraphQLSchemaRequest) (*pb.
	UpdateGraphQLSchemaResponse, error)

UpdateGQLSchemaOverNetwork sends the request to the group one leader for execution.

func UpdateLogDQLRequest

func UpdateLogDQLRequest(val bool)

UpdateLogDQLRequest updates value of x.WorkerConfig.LogDQLRequest.

func UpdateMembershipState

func UpdateMembershipState(ctx context.Context) error

UpdateMembershipState contacts zero for an update on membership state.

func ValidateAndConvert

func ValidateAndConvert(edge *pb.DirectedEdge, su *pb.SchemaUpdate) error

ValidateAndConvert checks compatibility or converts to the schema type if the storage type is specified. If no storage type is specified then it converts to the schema type.

func VerifyBackup

func VerifyBackup(req *pb.RestoreRequest, creds *x.MinioCredentials, currentGroups []uint32) error

VerifyBackup will access the backup location and verify that the specified backup can be restored to the cluster.

func WaitForIndexing

func WaitForIndexing(ctx context.Context, shouldWait bool) error

WaitForIndexing does a busy wait for indexing to finish or the context to error out, if the input flag shouldWait is true. Otherwise, it just returns nil straight away. If the context errors, it returns that error. TODO(aman): we should return an error if the indexing fails

func WriteExport

func WriteExport(writers *Writers, kv *bpb.KV, format string) error

Types

type AclCache

type AclCache struct {
	sync.RWMutex
	// contains filtered or unexported fields
}

AclCache is the cache mapping group names to the corresponding group acls

func (*AclCache) AuthorizePredicate

func (cache *AclCache) AuthorizePredicate(groups []string, predicate string,
	operation *acl.Operation) error

func (*AclCache) GetUserPredPerms

func (cache *AclCache) GetUserPredPerms(userId string) map[string]int32

func (*AclCache) Loaded

func (cache *AclCache) Loaded() bool

func (*AclCache) Set

func (cache *AclCache) Set()

func (*AclCache) Update

func (cache *AclCache) Update(ns uint64, groups []acl.Group)

type BackupProcessor

type BackupProcessor struct {
	// DB is the Badger pstore managed by this node.
	DB *badger.DB
	// Request stores the backup request containing the parameters for this backup.
	Request *pb.BackupRequest
	// contains filtered or unexported fields
}

BackupProcessor handles the different stages of the backup process.

func NewBackupProcessor

func NewBackupProcessor(db *badger.DB, req *pb.BackupRequest) *BackupProcessor

func (*BackupProcessor) Close

func (pr *BackupProcessor) Close()

func (*BackupProcessor) CompleteBackup

func (pr *BackupProcessor) CompleteBackup(ctx context.Context, m *Manifest) error

CompleteBackup will finalize a backup by writing the manifest at the backup destination.

func (*BackupProcessor) WriteBackup

func (pr *BackupProcessor) WriteBackup(ctx context.Context) (*pb.BackupResponse, error)

WriteBackup uses the request values to create a stream writer then hand off the data retrieval to stream.Orchestrate. The writer will create all the fd's needed to collect the data and later move to the target. Returns errors on failure, nil on success.

type BackupRes

type BackupRes struct {
	// contains filtered or unexported fields
}

BackupRes is used to represent the response and error of the Backup gRPC call together to be transported via a channel.

type CDC

type CDC struct {
	sync.Mutex
	// contains filtered or unexported fields
}

CDC struct is being used to send out change data capture events. There are two ways to do this: 1. Use Badger Subscribe. 2. Use Raft WAL. We chose to go with Raft WAL because in case we lose connection to the sink (say Kafka), we can resume from the last sent event and ensure there's continuity in event sending. Note the events would sent in the same order as they're being committed. With Badger Subscribe, if we lose the connection, we would have no way to send over the "missed" events. Even if we scan over Badger, we'd still not get those events in the right order, i.e. order of their commit timestamp. So, this approach would be tricky to get right.

func (*CDC) Close

func (cdc *CDC) Close()

type CDCEvent

type CDCEvent struct {
	Meta  *EventMeta  `json:"meta"`
	Type  string      `json:"type"`
	Event interface{} `json:"event"`
}

type DropEvent

type DropEvent struct {
	Operation string `json:"operation"`
	Type      string `json:"type"`
	Pred      string `json:"pred"`
}

type EventMeta

type EventMeta struct {
	RaftIndex uint64 `json:"-"`
	Namespace []byte `json:"-"`
	CommitTs  uint64 `json:"commit_ts"`
}

type ExportStorage

type ExportStorage interface {
	OpenFile(relativePath string) (*ExportWriter, error)
	FinishWriting(w *Writers) (ExportedFiles, error)
}

func NewExportStorage

func NewExportStorage(in *pb.ExportRequest, backupName string) (ExportStorage, error)

type ExportWriter

type ExportWriter struct {
	// contains filtered or unexported fields
}

func (*ExportWriter) Close

func (writer *ExportWriter) Close() error

type ExportedFiles

type ExportedFiles []string

ExportedFiles has the relative path of files that were written during export

func ExportOverNetwork

func ExportOverNetwork(ctx context.Context, input *pb.ExportRequest) (ExportedFiles, error)

ExportOverNetwork sends export requests to all the known groups.

type FuncType

type FuncType int

FuncType represents the type of a query function (aggregation, has, etc).

type GQLSchemaStore

type GQLSchemaStore struct {
	// contains filtered or unexported fields
}

func NewGQLSchemaStore

func NewGQLSchemaStore() *GQLSchemaStore

func (*GQLSchemaStore) GetCurrent

func (gs *GQLSchemaStore) GetCurrent(ns uint64) (*GqlSchema, bool)

func (*GQLSchemaStore) Set

func (gs *GQLSchemaStore) Set(ns uint64, sch *GqlSchema)

type GqlSchema

type GqlSchema struct {
	ID              string `json:"id,omitempty"`
	Schema          string `json:"schema,omitempty"`
	Version         uint64
	GeneratedSchema string
	Loaded          bool // This indicate whether the schema has been loaded into graphql server

}

type LoadResult

type LoadResult struct {
	// Version is the timestamp at which the database is after loading a backup.
	Version uint64
	// MaxLeaseUid is the max UID seen by the load operation. Needed to request zero
	// for the proper number of UIDs.
	MaxLeaseUid uint64
	// MaxLeaseNsId is the max namespace ID seen by the load operation.
	MaxLeaseNsId uint64
	// The error, if any, of the load operation.
	Err error
}

LoadResult holds the output of a Load operation.

func RunOfflineRestore

func RunOfflineRestore(dir, location, backupId, keyFile string, key x.Sensitive,
	ctype options.CompressionType, clevel int) LoadResult

RunOfflineRestore creates required DBs and streams the backups to them.

type Manifest

type Manifest struct {
	// Type is the type of backup, either full or incremental.
	Type string `json:"type"`
	// SinceTsDeprecated is kept for backward compatibility. Use readTs instead of sinceTs.
	SinceTsDeprecated uint64 `json:"since"`
	// ReadTs is the timestamp at which this backup was taken. This would be
	// the since timestamp for the next incremental backup.
	ReadTs uint64 `json:"read_ts"`
	// Groups is the map of valid groups to predicates at the time the backup was created.
	Groups map[uint32][]string `json:"groups"`
	// BackupId is a unique ID assigned to all the backups in the same series
	// (from the first full backup to the last incremental backup).
	BackupId string `json:"backup_id"`
	// BackupNum is a monotonically increasing number assigned to each backup in
	// a series. The full backup as BackupNum equal to one and each incremental
	// backup gets assigned the next available number. Used to verify the integrity
	// of the data during a restore.
	BackupNum uint64 `json:"backup_num"`
	// Version specifies the Dgraph version, the backup was taken on. For the backup taken on older
	// versions (<= 20.11), the predicates in Group map do not have namespace. Version will be zero
	// for older versions.
	Version int `json:"version"`
	// Path is the name of the backup directory to which this manifest belongs to.
	Path string `json:"path"`
	// Encrypted indicates whether this backup was encrypted or not.
	Encrypted bool `json:"encrypted"`
	// DropOperations lists the various DROP operations that took place since the last backup.
	// These are used during restore to redo those operations before applying the backup.
	DropOperations []*pb.DropOperation `json:"drop_operations"`
	// Compression keeps track of the compression that was used for the data.
	Compression string `json:"compression"`
}

Manifest records backup details, these are values used during restore. Since is the timestamp from which the next incremental backup should start (it's set to the readTs of the current backup). Groups are the IDs of the groups involved.

func GetLatestManifest

func GetLatestManifest(h UriHandler, uri *url.URL) (*Manifest, error)

func ListBackupManifests

func ListBackupManifests(l string, creds *x.MinioCredentials) ([]*Manifest, error)

ListBackupManifests scans location l for backup files and returns the list of manifests.

func ProcessListBackups

func ProcessListBackups(ctx context.Context, location string, creds *x.MinioCredentials) (
	[]*Manifest, error)

func (*Manifest) GoString

func (m *Manifest) GoString() string

GoString implements the GoStringer interface for Manifest.

func (*Manifest) ValidReadTs

func (m *Manifest) ValidReadTs() uint64

ValidReadTs function returns the valid read timestamp. The backup can have the readTs=0 if the backup was done on an older version of dgraph. The SinceTsDecprecated is kept for backward compatibility.

type MasterManifest

type MasterManifest struct {
	Manifests []*Manifest
}

func GetManifest

func GetManifest(h UriHandler, uri *url.URL) (*MasterManifest, error)

GetManifest returns the master manifest using the given handler and uri. Additionally, it also upgrades the manifest for the in-memory processing. Note: This function must not be used when using the returned manifest for the purpose of overwriting the old manifest.

func GetManifestNoUpgrade

func GetManifestNoUpgrade(h UriHandler, uri *url.URL) (*MasterManifest, error)

GetManifestNoUpgrade returns the master manifest using the given handler and uri.

type MutationEvent

type MutationEvent struct {
	Operation string      `json:"operation"`
	Uid       uint64      `json:"uid"`
	Attr      string      `json:"attr"`
	Value     interface{} `json:"value"`
	ValueType string      `json:"value_type"`
}

type Options

type Options struct {
	// PostingDir is the path to the directory storing the postings..
	PostingDir string
	// WALDir is the path to the directory storing the write-ahead log.
	WALDir string
	// MutationsMode is the mode used to handle mutation requests.
	MutationsMode int
	// AuthToken is the token to be passed for Alter HTTP requests.
	AuthToken string

	// AclJwtAlg stores the JWT signing algorithm.
	AclJwtAlg jwt.SigningMethod
	// AclSecretKey stores the secret used to sign JSON Web Tokens (JWT).
	// It could be a either a RSA or ECDSA PrivateKey or HMAC symmetric key.
	// depending upon the JWT signing algorithm. Public key can be derived
	// from the private key to verify the signatures when needed.
	AclSecretKey      interface{}
	AclSecretKeyBytes x.Sensitive
	// AccessJwtTtl is the TTL for the access JWT.
	AccessJwtTtl time.Duration
	// RefreshJwtTtl is the TTL of the refresh JWT.
	RefreshJwtTtl time.Duration

	// CachePercentage is the comma-separated list of cache percentages
	// used to split the total cache size among the multiple caches.
	CachePercentage string
	// CacheMb is the total memory allocated between all the caches.
	CacheMb int64
	// RemoveOnUpdate is the parameter that allows the user to set if the cache should keep the items that were
	// just mutated. Keeping these items are good when there is a mixed workload where you are updating the
	// same element multiple times. However, for a heavy mutation workload, not keeping these items would be better
	// , as keeping these elements bloats the cache making it slow.
	RemoveOnUpdate bool

	Audit *x.LoggerConf

	// Define different ChangeDataCapture configurations
	ChangeDataConf string

	// TypeFilterUidLimit decides how many elements would be searched directly
	// vs searched via type index. If the number of elements are too low, then querying the
	// index might be slower. This would allow people to set their limit according to
	// their use case.
	TypeFilterUidLimit uint64
}

Options contains options for the Dgraph server.

var Config Options

Config holds an instance of the server options..

type ServerState

type ServerState struct {
	FinishCh chan struct{} // channel to wait for all pending reqs to finish.

	Pstore   *badger.DB
	WALstore *raftwal.DiskStorage
	// contains filtered or unexported fields
}

ServerState holds the state of the Dgraph server.

var State ServerState

State is the instance of ServerState used by the current server.

func (*ServerState) Dispose

func (s *ServerState) Dispose()

Dispose stops and closes all the resources inside the server state.

func (*ServerState) GetTimestamp

func (s *ServerState) GetTimestamp(readOnly bool) uint64

func (*ServerState) InitStorage

func (s *ServerState) InitStorage()

type Sink

type Sink interface {
	// send in bulk to the sink
	Send(messages []SinkMessage) error
	// close sink
	Close() error
}

func GetSink

func GetSink(conf *z.SuperFlag) (Sink, error)

type SinkMessage

type SinkMessage struct {
	Meta  SinkMeta
	Key   []byte
	Value []byte
}

type SinkMeta

type SinkMeta struct {
	Topic string
}

type TaskKind

type TaskKind uint64
const (
	// Reserve the zero value for errors.
	TaskKindBackup TaskKind = iota + 1
	TaskKindExport
)

func (TaskKind) String

func (k TaskKind) String() string

type TaskMeta

type TaskMeta uint64

TaskMeta stores a timestamp, a TaskKind and a Status.

The format of this is: 32 bits: UNIX timestamp (overflows on 2106-02-07) 16 bits: TaskKind 16 bits: TaskStatus

func (TaskMeta) Kind

func (t TaskMeta) Kind() TaskKind

Kind returns the type of the task.

func (TaskMeta) Status

func (t TaskMeta) Status() TaskStatus

Status returns the current status of the task.

func (TaskMeta) Timestamp

func (t TaskMeta) Timestamp() time.Time

Timestamp returns the timestamp of the last status change of the task.

type TaskStatus

type TaskStatus uint64
const (
	// Reserve the zero value for errors.
	TaskStatusQueued TaskStatus = iota + 1
	TaskStatusRunning
	TaskStatusFailed
	TaskStatusSuccess
)

func (TaskStatus) String

func (status TaskStatus) String() string

type UriHandler

type UriHandler interface {
	// CreateDir creates a directory relative to the root path of the handler.
	CreateDir(path string) error
	// CreateFile creates a file relative to the root path of the handler. It also makes the
	// handler's descriptor to point to this file.
	CreateFile(path string) (io.WriteCloser, error)
	// DirExists returns true if the directory relative to the root path of the handler exists.
	DirExists(path string) bool
	// FileExists returns true if the file relative to the root path of the handler exists.
	FileExists(path string) bool
	// JoinPath appends the given path to the root path of the handler.
	JoinPath(path string) string
	// ListPaths returns a list of all the valid paths from the given root path. The given root path
	// should be relative to the handler's root path.
	ListPaths(path string) []string
	// Read reads the file at given relative path and returns the read bytes.
	Read(path string) ([]byte, error)
	// Rename renames the src file to the destination file.
	Rename(src, dst string) error
	// Stream would stream the path via an instance of io.ReadCloser. Close must be called at the
	// end to release resources appropriately.
	Stream(path string) (io.ReadCloser, error)
}

UriHandler interface is implemented by URI scheme handlers. When adding new scheme handles, for example 'azure://', an object will implement this interface to supply Dgraph with a way to create or load backup files into DB. For all methods below, the URL object is parsed as described in `newHandler' and the Processor object has the DB, estimated tablets size, and backup parameters.

func NewUriHandler

func NewUriHandler(uri *url.URL, creds *x.MinioCredentials) (UriHandler, error)

NewUriHandler parses the requested URI and finds the corresponding UriHandler. If the passed credentials are not nil, they will be used to override the default credentials (only for backups to minio or S3). Target URI formats:

[scheme]://[host]/[path]?[args]
[scheme]:///[path]?[args]
/[path]?[args] (only for local or NFS)

Target URI parts:

scheme - service handler, one of: "file", "s3", "minio"
  host - remote address. ex: "dgraph.s3.amazonaws.com"
  path - directory, bucket or container at target. ex: "/dgraph/backups/"
  args - specific arguments that are ok to appear in logs.

Global args (if supported by the handler):

  secure - true|false turn on/off TLS.
   trace - true|false turn on/off HTTP tracing.
compress - true|false turn on/off data compression.
 encrypt - true|false turn on/off data encryption.

Examples:

s3://dgraph.s3.amazonaws.com/dgraph/backups?secure=true
minio://localhost:9000/dgraph?secure=true
file:///tmp/dgraph/backups
/tmp/dgraph/backups?compress=gzip

type Writer

type Writer interface {
	Write(buf *z.Buffer) error
}

type Writers

type Writers struct {
	DataWriter      *ExportWriter
	SchemaWriter    *ExportWriter
	GqlSchemaWriter *ExportWriter
}

func InitWriters

func InitWriters(s ExportStorage, in *pb.ExportRequest) (*Writers, error)

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL