Documentation ¶
Overview ¶
Package qsvc contains the service implementation for registering with gRPC. This provides the service that can be registered with a grpc.Server:
import ( "context" "log" "net" "github.com/shiblon/entroq/backend/eqpg" "github.com/shiblon/entroq/qsvc" pb "github.com/shiblon/entroq/proto" "google.golang.org/grpc" ) func main() { ctx := context.Background() listener, err := net.Listen("tcp", "localhost:54321") if err != nil { log.Fatalf("Failed to listen: %v", err) } svc, err := qsvc.New(ctx, eqpg.Opener("localhost:5432", "postgres", "postgres", false)) if err != nil { log.Fatalf("Failed to open service backends: %v", err) } s := grpc.NewServer() pb.RegisterEntroQServer(s, svc) s.Serve(listener) }
Index ¶
- Constants
- type Option
- type QSvc
- func (s *QSvc) Authorize(ctx context.Context, req *authz.Request) error
- func (s *QSvc) Claim(ctx context.Context, req *pb.ClaimRequest) (*pb.ClaimResponse, error)
- func (s *QSvc) Close() error
- func (s *QSvc) Modify(ctx context.Context, req *pb.ModifyRequest) (*pb.ModifyResponse, error)
- func (s *QSvc) QueueStats(ctx context.Context, req *pb.QueuesRequest) (*pb.QueuesResponse, error)
- func (s *QSvc) Queues(ctx context.Context, req *pb.QueuesRequest) (*pb.QueuesResponse, error)
- func (s *QSvc) RefreshMetrics(ctx context.Context) error
- func (s *QSvc) StreamTasks(req *pb.TasksRequest, stream pb.EntroQ_StreamTasksServer) error
- func (s *QSvc) Tasks(ctx context.Context, req *pb.TasksRequest) (*pb.TasksResponse, error)
- func (s *QSvc) Time(ctx context.Context, req *pb.TimeRequest) (*pb.TimeResponse, error)
- func (s *QSvc) TryClaim(ctx context.Context, req *pb.ClaimRequest) (*pb.ClaimResponse, error)
Constants ¶
const (
// MetricNS is the prometheus namespace for all metrics for this module.
MetricNS = "entroq"
)
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type Option ¶
type Option func(*QSvc)
Option allows QSvc creation options to be defined.
func WithAuthorizationHeader ¶
WithAuthorizationHeader sets the name of the header containing an authorization token. Default is "authorization".
func WithAuthorizer ¶
func WithAuthorizer(az authz.Authorizer) Option
WithAuthorizer sets the authorization implementation.
func WithMetricInterval ¶
WithMetricInterval sets the interval between (potentially expensive) queue stats requests for the purpose of providing metrics. It cannot be set to less than 1 minute.
type QSvc ¶
type QSvc struct { pb.UnimplementedEntroQServer // contains filtered or unexported fields }
QSvc is an EntroQServer.
func (*QSvc) Claim ¶
func (s *QSvc) Claim(ctx context.Context, req *pb.ClaimRequest) (*pb.ClaimResponse, error)
Claim is the blocking version of TryClaim.
func (*QSvc) Modify ¶
func (s *QSvc) Modify(ctx context.Context, req *pb.ModifyRequest) (*pb.ModifyResponse, error)
Modify attempts to make the specified modification from the given ModifyRequest. If all goes well, it returns a ModifyResponse. If the modification fails due to a dependency error (one of the specified tasks was not present), the gRPC status mechanism is invoked to return a status with the details slice containing *pb.ModifyDep values. These could be used to reconstruct an entroq.DependencyError, or directly to find out which IDs caused the dependency failure. Code UNKNOWN is returned on other errors.
func (*QSvc) QueueStats ¶
func (s *QSvc) QueueStats(ctx context.Context, req *pb.QueuesRequest) (*pb.QueuesResponse, error)
QueueStats returns a mapping from queue names to queue stats.
func (*QSvc) Queues ¶
func (s *QSvc) Queues(ctx context.Context, req *pb.QueuesRequest) (*pb.QueuesResponse, error)
Queues returns a mapping from queue names to queue sizes.
func (*QSvc) RefreshMetrics ¶
RefreshMetrics collects stats and exports them as prometheus metrics. Intended to be called periodically in the background. Beware of calling too frequently, as this may deny service to users.
func (*QSvc) StreamTasks ¶
func (s *QSvc) StreamTasks(req *pb.TasksRequest, stream pb.EntroQ_StreamTasksServer) error
func (*QSvc) Tasks ¶
func (s *QSvc) Tasks(ctx context.Context, req *pb.TasksRequest) (*pb.TasksResponse, error)
func (*QSvc) Time ¶
func (s *QSvc) Time(ctx context.Context, req *pb.TimeRequest) (*pb.TimeResponse, error)
Time returns the current time in milliseconds since the Epoch.
func (*QSvc) TryClaim ¶
func (s *QSvc) TryClaim(ctx context.Context, req *pb.ClaimRequest) (*pb.ClaimResponse, error)
TryClaim attempts to claim a task, returning immediately. If no tasks are available, it returns a nil response and a nil error.
If req.Wait is present, TryClaim may not return immediately, but may hold onto the connection until either the context expires or a task becomes available to claim. Callers can check for context cancelation codes to know that this has happened, and may opt to immediately re-send the request.