Documentation

Overview

    Package mapper implements a simple Datastore mapper.

    It provides a way to apply some function to all datastore entities of some particular kind, in parallel, but with bounded concurrency (to avoid burning through all CPU/Datastore quota at once). This may be useful when examining or mutating large amounts of datastore entities.

    It works by splitting the range of keys into N shards, and launching N worker tasks that each sequentially processes a shard assigned to it, slice by slice.

    Index

    Constants

    This section is empty.

    Variables

    View Source
    var (
    	State_name = map[int32]string{
    		0: "STATE_UNSPECIFIED",
    		1: "STARTING",
    		2: "RUNNING",
    		3: "ABORTING",
    		4: "SUCCESS",
    		5: "FAIL",
    		6: "ABORTED",
    	}
    	State_value = map[string]int32{
    		"STATE_UNSPECIFIED": 0,
    		"STARTING":          1,
    		"RUNNING":           2,
    		"ABORTING":          3,
    		"SUCCESS":           4,
    		"FAIL":              5,
    		"ABORTED":           6,
    	}
    )

      Enum value maps for State.

      View Source
      var ErrNoSuchJob = errors.New("no such mapping job")

        ErrNoSuchJob is returned by GetJob if there's no Job with requested ID.

        View Source
        var File_go_chromium_org_luci_appengine_mapper_messages_proto protoreflect.FileDescriptor

        Functions

        This section is empty.

        Types

        type Controller

        type Controller struct {
        	// MapperQueue is a name of the GAE task queue to use for mapping jobs.
        	//
        	// This queue will perform all "heavy" tasks. It should be configured
        	// appropriately to allow desired number of shards to run in parallel.
        	//
        	// For example, if the largest submitted job is expected to have 128 shards,
        	// max_concurrent_requests setting of the mapper queue should be at least 128,
        	// otherwise some shards will be stalled waiting for others to finish
        	// (defeating the purpose of having large number of shards).
        	//
        	// If empty, "default" is used.
        	MapperQueue string
        
        	// ControlQueue is a name of the GAE task queue to use for control signals.
        	//
        	// This queue is used very lightly when starting and stopping jobs (roughly
        	// 2*Shards tasks overall per job). A default queue.yaml settings for such
        	// queue should be sufficient (unless you run a lot of different jobs at
        	// once).
        	//
        	// If empty, "default" is used.
        	ControlQueue string
        	// contains filtered or unexported fields
        }

          Controller is responsible for starting, progressing and finishing mapping jobs.

          It should be treated as a global singleton object. Having more than one controller in the production application is a bad idea (they'll collide with each other since they use global datastore namespace). It's still useful to instantiate multiple controllers in unit tests.

          func (*Controller) AbortJob

          func (ctl *Controller) AbortJob(ctx context.Context, id JobID) (job *Job, err error)

            AbortJob aborts a job and returns its most recent state.

            Silently does nothing if the job is finished or already aborted.

            Returns ErrNoSuchJob is there's no such job at all. All other possible errors are transient and they are marked as such.

            func (*Controller) GetJob

            func (ctl *Controller) GetJob(ctx context.Context, id JobID) (*Job, error)

              GetJob fetches a previously launched job given its ID.

              Returns ErrNoSuchJob if not found. All other possible errors are transient and they are marked as such.

              func (*Controller) Install

              func (ctl *Controller) Install(disp *tq.Dispatcher)

                Install registers task queue task handlers in the given task queue dispatcher.

                This must be done before Controller is used.

                There can be at most one Controller installed into an instance of TQ dispatcher. Installing more will cause panics.

                If you need multiple different controllers for some reason, create multiple tq.Dispatchers (with different base URLs, so they don't conflict with each other) and install them all into the router.

                func (*Controller) LaunchJob

                func (ctl *Controller) LaunchJob(ctx context.Context, j *JobConfig) (JobID, error)

                  LaunchJob launches a new mapping job, returning its ID (that can be used to control it or query its status).

                  Launches a datastore transaction inside.

                  func (*Controller) RegisterFactory

                  func (ctl *Controller) RegisterFactory(id ID, m Factory)

                    RegisterFactory adds the given mapper factory to the internal registry.

                    Intended to be used during init() time or early during the process initialization. Panics if a factory with such ID has already been registered.

                    The mapper ID will be used internally to identify which mapper a job should be using. If a factory disappears while the job is running (e.g. if the service binary is updated and new binary doesn't have the mapper registered anymore), the job ends with a failure.

                    type Factory

                    type Factory func(ctx context.Context, j *Job, shardIdx int) (Mapper, error)

                      Factory knows how to construct instances of Mapper.

                      Factory is supplied by the users of the library and registered in the controller via RegisterFactory call.

                      It is used to get a mapper to process a set of pages within a shard. It takes a Job (including its Config and Params) and a shard index, so it can prepare the mapper for processing of this specific shard.

                      Returning a transient error triggers an eventual retry. Returning a fatal error causes the shard (eventually the entire job) to be marked as failed.

                      type ID

                      type ID string

                        ID identifies a mapper registered in the controller.

                        It will be passed across processes, so all processes that execute mapper jobs should register same mappers under same IDs.

                        The safest approach is to keep mapper IDs in the app unique, e.g. do NOT reuse them when adding new mappers or significantly changing existing ones.

                        type Job

                        type Job struct {
                        
                        	// ID is auto-generated unique identifier of the job.
                        	ID JobID `gae:"$id"`
                        	// Config is the configuration of this job. Doesn't change once set.
                        	Config JobConfig `gae:",noindex"`
                        	// State is used to track job's lifecycle, see the enum.
                        	State State
                        	// Created is when the job was created, FYI.
                        	Created time.Time
                        	// Updated is when the job was last touched, FYI.
                        	Updated time.Time
                        	// contains filtered or unexported fields
                        }

                          Job is datastore representation of a mapping job (either active or not).

                          It is a root entity with autogenerated key.

                          Use Controller and Job methods to work with jobs. Attempting to use datastore API directly results in an undefined behavior.

                          func (*Job) FetchInfo

                          func (j *Job) FetchInfo(ctx context.Context) (*JobInfo, error)

                            FetchInfo fetches information about the job (including all shards).

                            type JobConfig

                            type JobConfig struct {
                            	Query      Query  // a query identifying a set of entities
                            	Mapper     ID     // ID of a registered mapper to apply to entities
                            	Params     []byte // arbitrary user-provided data to pass to the mapper
                            	ShardCount int    // number of shards to split the key range into
                            	PageSize   int    // how many entities to process at once in each shard
                            
                            	// PagesPerTask is how many pages (each of PageSize entities) to process
                            	// inside a TQ task.
                            	//
                            	// Default is unlimited: process until the deadline.
                            	PagesPerTask int
                            
                            	// TaskDuration is how long to run a single mapping TQ task before
                            	// checkpointing the state and launching the next mapping TQ task.
                            	//
                            	// Small values (e.g. 1 min) makes each processing TQ task relatively small,
                            	// so it doesn't eat a lot of memory, or produces gigantic unreadable logs.
                            	// It also makes TQ's "Pause queue" button more handy.
                            	//
                            	// Default is 1 min.
                            	TaskDuration time.Duration
                            
                            	// TrackProgress enables calculating number of entities per shard before
                            	// launching mappers, and using it to calculate completion ETA.
                            	//
                            	// May be VERY slow if processing large amount of entities. Slowness manifests
                            	// as a delay between job's launch and it actual start of shards processing.
                            	//
                            	// Enable only if shards are relatively small (< 100K entities per shard).
                            	TrackProgress bool
                            }

                              JobConfig defines what a new mapping job should do.

                              It should be supplied by the users of the mapper library.

                              func (*JobConfig) Validate

                              func (jc *JobConfig) Validate() error

                                Validate returns an error of the config is invalid.

                                Mapper existence is not checked.

                                type JobID

                                type JobID int64

                                  JobID identifies a mapping job.

                                  type JobInfo

                                  type JobInfo struct {
                                  	Id                int64                  `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`                                                        // unique job identifier
                                  	State             State                  `protobuf:"varint,2,opt,name=state,proto3,enum=appengine.mapper.messages.State" json:"state,omitempty"`             // overall state of the job
                                  	Created           *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=created,proto3" json:"created,omitempty"`                                               // when it was created
                                  	Updated           *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=updated,proto3" json:"updated,omitempty"`                                               // when it was updated last time
                                  	Eta               *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=eta,proto3" json:"eta,omitempty"`                                                       // when it finishes (if tracking progress)
                                  	ProcessedEntities int64                  `protobuf:"varint,6,opt,name=processed_entities,json=processedEntities,proto3" json:"processed_entities,omitempty"` // number of processed entities thus far
                                  	TotalEntities     int64                  `protobuf:"varint,7,opt,name=total_entities,json=totalEntities,proto3" json:"total_entities,omitempty"`             // total number of entities or -1 if unknown
                                  	EntitiesPerSec    float32                `protobuf:"fixed32,8,opt,name=entities_per_sec,json=entitiesPerSec,proto3" json:"entities_per_sec,omitempty"`       // rate of processing, entities per second
                                  	Shards            []*ShardInfo           `protobuf:"bytes,20,rep,name=shards,proto3" json:"shards,omitempty"`                                                // state of all job's shards
                                  	// contains filtered or unexported fields
                                  }

                                    Information about a job.

                                    func (*JobInfo) Descriptor

                                    func (*JobInfo) Descriptor() ([]byte, []int)

                                      Deprecated: Use JobInfo.ProtoReflect.Descriptor instead.

                                      func (*JobInfo) GetCreated

                                      func (x *JobInfo) GetCreated() *timestamppb.Timestamp

                                      func (*JobInfo) GetEntitiesPerSec

                                      func (x *JobInfo) GetEntitiesPerSec() float32

                                      func (*JobInfo) GetEta

                                      func (x *JobInfo) GetEta() *timestamppb.Timestamp

                                      func (*JobInfo) GetId

                                      func (x *JobInfo) GetId() int64

                                      func (*JobInfo) GetProcessedEntities

                                      func (x *JobInfo) GetProcessedEntities() int64

                                      func (*JobInfo) GetShards

                                      func (x *JobInfo) GetShards() []*ShardInfo

                                      func (*JobInfo) GetState

                                      func (x *JobInfo) GetState() State

                                      func (*JobInfo) GetTotalEntities

                                      func (x *JobInfo) GetTotalEntities() int64

                                      func (*JobInfo) GetUpdated

                                      func (x *JobInfo) GetUpdated() *timestamppb.Timestamp

                                      func (*JobInfo) ProtoMessage

                                      func (*JobInfo) ProtoMessage()

                                      func (*JobInfo) ProtoReflect

                                      func (x *JobInfo) ProtoReflect() protoreflect.Message

                                      func (*JobInfo) Reset

                                      func (x *JobInfo) Reset()

                                      func (*JobInfo) String

                                      func (x *JobInfo) String() string

                                      type Mapper

                                      type Mapper func(ctx context.Context, keys []*datastore.Key) error

                                        Mapper applies some function to the given slice of entities, given by their keys.

                                        May be called multiple times for same key (thus should be idempotent).

                                        Returning a transient error indicates that the processing of this batch of keys should be retried (even if some keys were processed successfully).

                                        Returning a fatal error causes the entire shard (and eventually the entire job) to be marked as failed. The processing of the failed shard stops right away, but other shards are kept running until completion (or their own failure).

                                        The function is called outside of any transactions, so it can start its own if needed.

                                        type Query

                                        type Query struct {
                                        	Kind     string         // entity kind to limit the query, "" for kindless
                                        	Ancestor *datastore.Key // entity group to limit the query to (or nil)
                                        }

                                          Query is a representation of datastore queries supported by the mapper.

                                          A query defines a set of entities the mapper operates on.

                                          This struct can be embedded into entities as is.

                                          func (*Query) ToDatastoreQuery

                                          func (q *Query) ToDatastoreQuery() *datastore.Query

                                            ToDatastoreQuery returns corresponding datastore.Query.

                                            type ShardInfo

                                            type ShardInfo struct {
                                            	Index             int32                  `protobuf:"varint,1,opt,name=index,proto3" json:"index,omitempty"` // zero-based index of the shard
                                            	State             State                  `protobuf:"varint,2,opt,name=state,proto3,enum=appengine.mapper.messages.State" json:"state,omitempty"`
                                            	Error             string                 `protobuf:"bytes,3,opt,name=error,proto3" json:"error,omitempty"`                                                   // human readable error message, for failed shards only
                                            	Created           *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=created,proto3" json:"created,omitempty"`                                               // when it was created
                                            	Updated           *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=updated,proto3" json:"updated,omitempty"`                                               // when it was updated last time
                                            	Eta               *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=eta,proto3" json:"eta,omitempty"`                                                       // when it finishes (if tracking progress)
                                            	ProcessedEntities int64                  `protobuf:"varint,7,opt,name=processed_entities,json=processedEntities,proto3" json:"processed_entities,omitempty"` // number of processed entities thus far
                                            	TotalEntities     int64                  `protobuf:"varint,8,opt,name=total_entities,json=totalEntities,proto3" json:"total_entities,omitempty"`             // total number of entities or -1 if unknown
                                            	EntitiesPerSec    float32                `protobuf:"fixed32,9,opt,name=entities_per_sec,json=entitiesPerSec,proto3" json:"entities_per_sec,omitempty"`       // rate of processing, entities per second
                                            	// contains filtered or unexported fields
                                            }

                                              Information about some single shard of a job.

                                              func (*ShardInfo) Descriptor

                                              func (*ShardInfo) Descriptor() ([]byte, []int)

                                                Deprecated: Use ShardInfo.ProtoReflect.Descriptor instead.

                                                func (*ShardInfo) GetCreated

                                                func (x *ShardInfo) GetCreated() *timestamppb.Timestamp

                                                func (*ShardInfo) GetEntitiesPerSec

                                                func (x *ShardInfo) GetEntitiesPerSec() float32

                                                func (*ShardInfo) GetError

                                                func (x *ShardInfo) GetError() string

                                                func (*ShardInfo) GetEta

                                                func (x *ShardInfo) GetEta() *timestamppb.Timestamp

                                                func (*ShardInfo) GetIndex

                                                func (x *ShardInfo) GetIndex() int32

                                                func (*ShardInfo) GetProcessedEntities

                                                func (x *ShardInfo) GetProcessedEntities() int64

                                                func (*ShardInfo) GetState

                                                func (x *ShardInfo) GetState() State

                                                func (*ShardInfo) GetTotalEntities

                                                func (x *ShardInfo) GetTotalEntities() int64

                                                func (*ShardInfo) GetUpdated

                                                func (x *ShardInfo) GetUpdated() *timestamppb.Timestamp

                                                func (*ShardInfo) ProtoMessage

                                                func (*ShardInfo) ProtoMessage()

                                                func (*ShardInfo) ProtoReflect

                                                func (x *ShardInfo) ProtoReflect() protoreflect.Message

                                                func (*ShardInfo) Reset

                                                func (x *ShardInfo) Reset()

                                                func (*ShardInfo) String

                                                func (x *ShardInfo) String() string

                                                type State

                                                type State int32

                                                  State of a job or one of its shards.

                                                  const (
                                                  	State_STATE_UNSPECIFIED State = 0
                                                  	State_STARTING          State = 1
                                                  	State_RUNNING           State = 2
                                                  	State_ABORTING          State = 3
                                                  	State_SUCCESS           State = 4
                                                  	State_FAIL              State = 5
                                                  	State_ABORTED           State = 6
                                                  )

                                                  func (State) Descriptor

                                                  func (State) Descriptor() protoreflect.EnumDescriptor

                                                  func (State) Enum

                                                  func (x State) Enum() *State

                                                  func (State) EnumDescriptor

                                                  func (State) EnumDescriptor() ([]byte, []int)

                                                    Deprecated: Use State.Descriptor instead.

                                                    func (State) Number

                                                    func (x State) Number() protoreflect.EnumNumber

                                                    func (State) String

                                                    func (x State) String() string

                                                    func (State) Type

                                                    func (State) Type() protoreflect.EnumType

                                                    Directories

                                                    Path Synopsis
                                                    Binary demo contains minimal demo for 'mapper' package.
                                                    Binary demo contains minimal demo for 'mapper' package.
                                                    Package splitter implements SplitIntoRanges function useful when splitting large datastore queries into a bunch of smaller queries with approximately evenly-sized result sets.
                                                    Package splitter implements SplitIntoRanges function useful when splitting large datastore queries into a bunch of smaller queries with approximately evenly-sized result sets.
                                                    internal
                                                    tasks
                                                    Package tasks contains definition of task queue tasks used by the mapper.
                                                    Package tasks contains definition of task queue tasks used by the mapper.