Documentation
¶
Index ¶
- Constants
- Variables
- func RegisterMonitoringServiceServer(s grpc.ServiceRegistrar, srv MonitoringServiceServer)
- func SetLogLevel(ll LogLevel)
- type Consumer
- type CurrentWorkingsMessages
- type CurrentWorkingsRequest
- type CurrentWorkingsResponse
- func (*CurrentWorkingsResponse) Descriptor() ([]byte, []int)deprecated
- func (x *CurrentWorkingsResponse) GetTasks() []*Task
- func (*CurrentWorkingsResponse) ProtoMessage()
- func (x *CurrentWorkingsResponse) ProtoReflect() protoreflect.Message
- func (x *CurrentWorkingsResponse) Reset()
- func (x *CurrentWorkingsResponse) String() string
- type FetcherParameter
- type Gateway
- type GatewayParameter
- type HTTPInvoker
- type Invoker
- type LogLevel
- type Message
- type MonitoringService
- type MonitoringServiceClient
- type MonitoringServiceServer
- type QueueResultStatus
- type System
- type SystemBuilder
- type Task
- func (*Task) Descriptor() ([]byte, []int)deprecated
- func (x *Task) GetId() string
- func (x *Task) GetReceipt() string
- func (x *Task) GetStartedAt() *timestamppb.Timestamp
- func (*Task) ProtoMessage()
- func (x *Task) ProtoReflect() protoreflect.Message
- func (x *Task) Reset()
- func (x *Task) String() string
- type UnimplementedMonitoringServiceServer
- type UnsafeMonitoringServiceServer
Constants ¶
const DisableMonitoring = -1
DisableMonitoring makes gRPC server disable to run.
Variables ¶
var File_sqsd_proto protoreflect.FileDescriptor
var MonitoringService_ServiceDesc = grpc.ServiceDesc{ ServiceName: "sqsd.MonitoringService", HandlerType: (*MonitoringServiceServer)(nil), Methods: []grpc.MethodDesc{ { MethodName: "CurrentWorkings", Handler: _MonitoringService_CurrentWorkings_Handler, }, }, Streams: []grpc.StreamDesc{}, Metadata: "sqsd.proto", }
MonitoringService_ServiceDesc is the grpc.ServiceDesc for MonitoringService service. It's only intended for direct use with grpc.RegisterService, and not to be introspected or modified (even as a copy)
Functions ¶
func RegisterMonitoringServiceServer ¶
func RegisterMonitoringServiceServer(s grpc.ServiceRegistrar, srv MonitoringServiceServer)
func SetLogLevel ¶
func SetLogLevel(ll LogLevel)
SetLogLevel set supplied log.Level in actor, mailbox and our logger.
Types ¶
type Consumer ¶
type Consumer struct {
// contains filtered or unexported fields
}
Consumer manages Invoker's invokation from receiving queues.
func NewConsumer ¶
NewConsumer returns Consumer actor Prop.
func (*Consumer) NewDistributorActorProps ¶
NewDistributorActorProps returns actor properties of distributor.
type CurrentWorkingsMessages ¶
type CurrentWorkingsMessages struct{}
CurrentWorkingsMessages is message which MonitoringReceiver actor receives.
type CurrentWorkingsRequest ¶
type CurrentWorkingsRequest struct {
// contains filtered or unexported fields
}
func (*CurrentWorkingsRequest) Descriptor
deprecated
func (*CurrentWorkingsRequest) Descriptor() ([]byte, []int)
Deprecated: Use CurrentWorkingsRequest.ProtoReflect.Descriptor instead.
func (*CurrentWorkingsRequest) ProtoMessage ¶
func (*CurrentWorkingsRequest) ProtoMessage()
func (*CurrentWorkingsRequest) ProtoReflect ¶
func (x *CurrentWorkingsRequest) ProtoReflect() protoreflect.Message
func (*CurrentWorkingsRequest) Reset ¶
func (x *CurrentWorkingsRequest) Reset()
func (*CurrentWorkingsRequest) String ¶
func (x *CurrentWorkingsRequest) String() string
type CurrentWorkingsResponse ¶
type CurrentWorkingsResponse struct {
Tasks []*Task `protobuf:"bytes,1,rep,name=tasks,proto3" json:"tasks,omitempty"`
// contains filtered or unexported fields
}
func (*CurrentWorkingsResponse) Descriptor
deprecated
func (*CurrentWorkingsResponse) Descriptor() ([]byte, []int)
Deprecated: Use CurrentWorkingsResponse.ProtoReflect.Descriptor instead.
func (*CurrentWorkingsResponse) GetTasks ¶
func (x *CurrentWorkingsResponse) GetTasks() []*Task
func (*CurrentWorkingsResponse) ProtoMessage ¶
func (*CurrentWorkingsResponse) ProtoMessage()
func (*CurrentWorkingsResponse) ProtoReflect ¶
func (x *CurrentWorkingsResponse) ProtoReflect() protoreflect.Message
func (*CurrentWorkingsResponse) Reset ¶
func (x *CurrentWorkingsResponse) Reset()
func (*CurrentWorkingsResponse) String ¶
func (x *CurrentWorkingsResponse) String() string
type FetcherParameter ¶
type FetcherParameter func(*fetcher)
FetcherParameter sets parameter to fetcher by functional option pattern.
func FetcherDistributorInterval ¶
func FetcherDistributorInterval(d time.Duration) FetcherParameter
FetcherDistributorInterval sets interval duration of distributor request to fetcher. Fetcher watches distributor status because fetcher should be stopped when messages which distributor has is over capacity.
func FetcherInterval ¶
func FetcherInterval(d time.Duration) FetcherParameter
FetcherInterval sets interval duration of receiving queue request to fetcher.
func FetcherMaxMessages ¶ added in v1.2.1
func FetcherMaxMessages(n int64) FetcherParameter
FetcherMaxMessages sets MaxNumberOfMessages of SQS between 1 and 10. Fetcher's default value is 10. if supplied value is out of range, forcely sets 1 or 10. (if n is less than 1, set 1 and is more than 10, set 10)
func FetcherQueueLocker ¶
func FetcherQueueLocker(l locker.QueueLocker) FetcherParameter
FetcherQueueLocker sets QueueLocker in Fetcher to block duplicated queue.
type Gateway ¶
type Gateway struct {
// contains filtered or unexported fields
}
Gateway fetches and removes jobs from SQS.
func NewGateway ¶
func NewGateway(queue *sqs.SQS, qURL string, fns ...GatewayParameter) *Gateway
NewGateway returns Gateway object.
func (*Gateway) NewFetcherGroup ¶
NewFetcherGroup returns parallelized Fetcher properties which is provided as BroadcastGroup.
func (*Gateway) NewRemoverGroup ¶
NewRemoverGroup returns parallelized Remover properties which is provided as RoundRobinGroup.
type GatewayParameter ¶
type GatewayParameter func(*Gateway)
GatewayParameter sets parameter in Gateway.
func GatewayParallel ¶
func GatewayParallel(p int) GatewayParameter
GatewayParallel sets parallel size in Gateway.
func GatewayVisibilityTimeout ¶
func GatewayVisibilityTimeout(d time.Duration) GatewayParameter
GatewayVisibilityTimeout sets visibility timeout in Gateway to receive messages from SQS.
type HTTPInvoker ¶
type HTTPInvoker struct {
// contains filtered or unexported fields
}
HTTPInvoker invokes worker process by HTTP POST request.
func NewHTTPInvoker ¶
func NewHTTPInvoker(rawurl string, dur time.Duration) (*HTTPInvoker, error)
NewHTTPInvoker returns HTTPInvoker instance.
type LogLevel ¶
LogLevel sets log level for sqsd.
func (*LogLevel) UnmarshalText ¶
UnmarshalText scans levelMap from supplied text and set scanned level to LogLevel.
type Message ¶
type Message struct {
ID string
Payload string
Receipt string
ReceivedAt time.Time
ResultStatus QueueResultStatus
}
Message provides transition from sqs.Message
func (Message) ResultFailed ¶
ResultFailed returns Queue has RequestFail status.
func (Message) ResultSucceeded ¶
ResultSucceeded returns Queue has RequestSuccess status.
type MonitoringService ¶
type MonitoringService struct {
UnimplementedMonitoringServiceServer
// contains filtered or unexported fields
}
MonitoringService provides grpc handler for MonitoringService.
func NewMonitoringService ¶
func NewMonitoringService(ctx *actor.RootContext, consumer *actor.PID) *MonitoringService
NewMonitoringService returns new MonitoringService object.
func (*MonitoringService) CurrentWorkings ¶
func (s *MonitoringService) CurrentWorkings(context.Context, *CurrentWorkingsRequest) (*CurrentWorkingsResponse, error)
CurrentWorkings handles CurrentWorkings grpc request using actor system.
func (*MonitoringService) WaitUntilAllEnds ¶
func (s *MonitoringService) WaitUntilAllEnds(timeout time.Duration) error
WaitUntilAllEnds waits until all worker tasks finishes.
type MonitoringServiceClient ¶
type MonitoringServiceClient interface {
CurrentWorkings(ctx context.Context, in *CurrentWorkingsRequest, opts ...grpc.CallOption) (*CurrentWorkingsResponse, error)
}
MonitoringServiceClient is the client API for MonitoringService service.
For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
func NewMonitoringServiceClient ¶
func NewMonitoringServiceClient(cc grpc.ClientConnInterface) MonitoringServiceClient
type MonitoringServiceServer ¶
type MonitoringServiceServer interface {
CurrentWorkings(context.Context, *CurrentWorkingsRequest) (*CurrentWorkingsResponse, error)
// contains filtered or unexported methods
}
MonitoringServiceServer is the server API for MonitoringService service. All implementations must embed UnimplementedMonitoringServiceServer for forward compatibility
type QueueResultStatus ¶
type QueueResultStatus int
QueueResultStatus represents status for Queue result.
const ( // NotRequested represents queue has no result. NotRequested QueueResultStatus = iota // default // RequestSuccess represents queue request has succeeded. RequestSuccess // RequestFail represents queue request has failed. RequestFail )
type System ¶
type System struct {
// contains filtered or unexported fields
}
System controls actor system of sqsd.
type SystemBuilder ¶
type SystemBuilder func(*System)
SystemBuilder provides constructor for system object requirements.
func ConsumerBuilder ¶
func ConsumerBuilder(invoker Invoker, parallel int) SystemBuilder
ConsumerBuilder builds consumer for system.
func GatewayBuilder ¶
func GatewayBuilder(queue *sqs.SQS, queueURL string, parallel int, timeout time.Duration, params ...FetcherParameter) SystemBuilder
GatewayBuilder builds gateway for system.
func MonitorBuilder ¶
func MonitorBuilder(port int) SystemBuilder
MonitorBuilder sets monitor server port to system.
type Task ¶
type Task struct {
Id string `protobuf:"bytes,1,opt,name=id,proto3" json:"id,omitempty"`
Receipt string `protobuf:"bytes,2,opt,name=receipt,proto3" json:"receipt,omitempty"`
StartedAt *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=started_at,json=startedAt,proto3" json:"started_at,omitempty"`
// contains filtered or unexported fields
}
func (*Task) Descriptor
deprecated
func (*Task) GetReceipt ¶
func (*Task) GetStartedAt ¶
func (x *Task) GetStartedAt() *timestamppb.Timestamp
func (*Task) ProtoMessage ¶
func (*Task) ProtoMessage()
func (*Task) ProtoReflect ¶
func (x *Task) ProtoReflect() protoreflect.Message
type UnimplementedMonitoringServiceServer ¶
type UnimplementedMonitoringServiceServer struct {
}
UnimplementedMonitoringServiceServer must be embedded to have forward compatible implementations.
func (UnimplementedMonitoringServiceServer) CurrentWorkings ¶
func (UnimplementedMonitoringServiceServer) CurrentWorkings(context.Context, *CurrentWorkingsRequest) (*CurrentWorkingsResponse, error)
type UnsafeMonitoringServiceServer ¶
type UnsafeMonitoringServiceServer interface {
// contains filtered or unexported methods
}
UnsafeMonitoringServiceServer may be embedded to opt out of forward compatibility for this service. Use of this interface is not recommended, as added methods to MonitoringServiceServer will result in compilation errors.