streaming

package
v0.0.0-...-798c737 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 2, 2026 License: MIT Imports: 20 Imported by: 0

Documentation

Overview

Package streaming demonstrates server streaming gRPC support in proto-cli.

This example shows how to:

  • Define server streaming RPC methods (one request, multiple responses)
  • Stream responses with configurable output formats (JSON, YAML, Go)
  • Configure message delimiters for NDJSON and other formats
  • Use streaming in both local (direct) and remote (gRPC client) modes
  • Handle real-time data feeds like watch operations

The example includes:

  • ListItems: Streams a list of items matching filter criteria
  • WatchItems: Streams item change events in real-time

Streaming features:

  • Each message is formatted independently using the selected OutputFormat
  • Messages are separated by configurable delimiters (default: newline)
  • Works seamlessly with Unix tools via NDJSON (e.g., | jq ., | grep)
  • Supports both local execution and remote gRPC server calls

To run the example:

go run ./streamcli streaming-service list-items --category books --format json
go run ./streamcli streaming-service watch-items --start-id 1 --format yaml

Generated code in this package should not be edited manually. To regenerate after modifying streaming.proto, run: go generate

Index

Constants

View Source
const (
	StreamingService_ListItems_FullMethodName  = "/streaming.StreamingService/ListItems"
	StreamingService_WatchItems_FullMethodName = "/streaming.StreamingService/WatchItems"
)

Variables

View Source
var File_examples_streaming_streaming_proto protoreflect.FileDescriptor
View Source
var StreamingService_ServiceDesc = grpc.ServiceDesc{
	ServiceName: "streaming.StreamingService",
	HandlerType: (*StreamingServiceServer)(nil),
	Methods:     []grpc.MethodDesc{},
	Streams: []grpc.StreamDesc{
		{
			StreamName:    "ListItems",
			Handler:       _StreamingService_ListItems_Handler,
			ServerStreams: true,
		},
		{
			StreamName:    "WatchItems",
			Handler:       _StreamingService_WatchItems_Handler,
			ServerStreams: true,
		},
	},
	Metadata: "examples/streaming/streaming.proto",
}

StreamingService_ServiceDesc is the grpc.ServiceDesc for StreamingService service. It's only intended for direct use with grpc.RegisterService, and not to be introspected or modified (even as a copy)

Functions

func RegisterStreamingServiceServer

func RegisterStreamingServiceServer(s grpc.ServiceRegistrar, srv StreamingServiceServer)

func StreamingServiceCommand

func StreamingServiceCommand(ctx context.Context, implOrFactory interface{}, opts ...protocli.ServiceOption) *protocli.ServiceCLI

StreamingServiceCommand creates a CLI for StreamingService with options The implOrFactory parameter can be either a direct service implementation or a factory function

func StreamingServiceCommandsFlat

func StreamingServiceCommandsFlat(ctx context.Context, implOrFactory interface{}, opts ...protocli.ServiceOption) []*v3.Command

StreamingServiceCommandsFlat creates a flat command structure for StreamingService (for single-service CLIs) This returns RPC commands directly at the root level instead of nested under a service command. The implOrFactory parameter can be either a direct service implementation or a factory function The returned slice includes all RPC commands plus a daemonize command for starting a gRPC server.

Types

type Item

type Item struct {
	Id       int64  `protobuf:"varint,1,opt,name=id,proto3" json:"id,omitempty"`
	Name     string `protobuf:"bytes,2,opt,name=name,proto3" json:"name,omitempty"`
	Category string `protobuf:"bytes,3,opt,name=category,proto3" json:"category,omitempty"`
	// contains filtered or unexported fields
}

func (*Item) Descriptor deprecated

func (*Item) Descriptor() ([]byte, []int)

Deprecated: Use Item.ProtoReflect.Descriptor instead.

func (*Item) GetCategory

func (x *Item) GetCategory() string

func (*Item) GetId

func (x *Item) GetId() int64

func (*Item) GetName

func (x *Item) GetName() string

func (*Item) ProtoMessage

func (*Item) ProtoMessage()

func (*Item) ProtoReflect

func (x *Item) ProtoReflect() protoreflect.Message

func (*Item) Reset

func (x *Item) Reset()

func (*Item) String

func (x *Item) String() string

type ItemEvent

type ItemEvent struct {
	EventType string `protobuf:"bytes,1,opt,name=event_type,json=eventType,proto3" json:"event_type,omitempty"` // "created", "updated", "deleted"
	Item      *Item  `protobuf:"bytes,2,opt,name=item,proto3" json:"item,omitempty"`
	Timestamp int64  `protobuf:"varint,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
	// contains filtered or unexported fields
}

func (*ItemEvent) Descriptor deprecated

func (*ItemEvent) Descriptor() ([]byte, []int)

Deprecated: Use ItemEvent.ProtoReflect.Descriptor instead.

func (*ItemEvent) GetEventType

func (x *ItemEvent) GetEventType() string

func (*ItemEvent) GetItem

func (x *ItemEvent) GetItem() *Item

func (*ItemEvent) GetTimestamp

func (x *ItemEvent) GetTimestamp() int64

func (*ItemEvent) ProtoMessage

func (*ItemEvent) ProtoMessage()

func (*ItemEvent) ProtoReflect

func (x *ItemEvent) ProtoReflect() protoreflect.Message

func (*ItemEvent) Reset

func (x *ItemEvent) Reset()

func (*ItemEvent) String

func (x *ItemEvent) String() string

type ItemResponse

type ItemResponse struct {
	Item    *Item  `protobuf:"bytes,1,opt,name=item,proto3" json:"item,omitempty"`
	Message string `protobuf:"bytes,2,opt,name=message,proto3" json:"message,omitempty"`
	// contains filtered or unexported fields
}

func (*ItemResponse) Descriptor deprecated

func (*ItemResponse) Descriptor() ([]byte, []int)

Deprecated: Use ItemResponse.ProtoReflect.Descriptor instead.

func (*ItemResponse) GetItem

func (x *ItemResponse) GetItem() *Item

func (*ItemResponse) GetMessage

func (x *ItemResponse) GetMessage() string

func (*ItemResponse) ProtoMessage

func (*ItemResponse) ProtoMessage()

func (*ItemResponse) ProtoReflect

func (x *ItemResponse) ProtoReflect() protoreflect.Message

func (*ItemResponse) Reset

func (x *ItemResponse) Reset()

func (*ItemResponse) String

func (x *ItemResponse) String() string

type ListItemsRequest

type ListItemsRequest struct {
	Category string `protobuf:"bytes,1,opt,name=category,proto3" json:"category,omitempty"`
	Limit    int32  `protobuf:"varint,2,opt,name=limit,proto3" json:"limit,omitempty"`
	// Optional fields demonstrate explicit presence tracking
	Offset         *int32  `protobuf:"varint,3,opt,name=offset,proto3,oneof" json:"offset,omitempty"`
	SortBy         *string `protobuf:"bytes,4,opt,name=sort_by,json=sortBy,proto3,oneof" json:"sort_by,omitempty"`
	IncludeDeleted *bool   `protobuf:"varint,5,opt,name=include_deleted,json=includeDeleted,proto3,oneof" json:"include_deleted,omitempty"`
	// contains filtered or unexported fields
}

func (*ListItemsRequest) Descriptor deprecated

func (*ListItemsRequest) Descriptor() ([]byte, []int)

Deprecated: Use ListItemsRequest.ProtoReflect.Descriptor instead.

func (*ListItemsRequest) GetCategory

func (x *ListItemsRequest) GetCategory() string

func (*ListItemsRequest) GetIncludeDeleted

func (x *ListItemsRequest) GetIncludeDeleted() bool

func (*ListItemsRequest) GetLimit

func (x *ListItemsRequest) GetLimit() int32

func (*ListItemsRequest) GetOffset

func (x *ListItemsRequest) GetOffset() int32

func (*ListItemsRequest) GetSortBy

func (x *ListItemsRequest) GetSortBy() string

func (*ListItemsRequest) ProtoMessage

func (*ListItemsRequest) ProtoMessage()

func (*ListItemsRequest) ProtoReflect

func (x *ListItemsRequest) ProtoReflect() protoreflect.Message

func (*ListItemsRequest) Reset

func (x *ListItemsRequest) Reset()

func (*ListItemsRequest) String

func (x *ListItemsRequest) String() string

type StreamingService

type StreamingService struct {
	UnimplementedStreamingServiceServer
}

func NewStreamingService

func NewStreamingService() *StreamingService

func (*StreamingService) ListItems

func (*StreamingService) Register

func (s *StreamingService) Register(_ context.Context) error

func (*StreamingService) WatchItems

type StreamingServiceClient

type StreamingServiceClient interface {
	// Server streaming: list items as they're found
	ListItems(ctx context.Context, in *ListItemsRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[ItemResponse], error)
	// Server streaming: watch for changes
	WatchItems(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (grpc.ServerStreamingClient[ItemEvent], error)
}

StreamingServiceClient is the client API for StreamingService service.

For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.

type StreamingServiceServer

type StreamingServiceServer interface {
	// Server streaming: list items as they're found
	ListItems(*ListItemsRequest, grpc.ServerStreamingServer[ItemResponse]) error
	// Server streaming: watch for changes
	WatchItems(*WatchRequest, grpc.ServerStreamingServer[ItemEvent]) error
	// contains filtered or unexported methods
}

StreamingServiceServer is the server API for StreamingService service. All implementations must embed UnimplementedStreamingServiceServer for forward compatibility.

type StreamingService_ListItemsClient

type StreamingService_ListItemsClient = grpc.ServerStreamingClient[ItemResponse]

This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.

type StreamingService_ListItemsServer

type StreamingService_ListItemsServer = grpc.ServerStreamingServer[ItemResponse]

This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.

type StreamingService_WatchItemsClient

type StreamingService_WatchItemsClient = grpc.ServerStreamingClient[ItemEvent]

This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.

type StreamingService_WatchItemsServer

type StreamingService_WatchItemsServer = grpc.ServerStreamingServer[ItemEvent]

This type alias is provided for backwards compatibility with existing code that references the prior non-generic stream type by name.

type UnimplementedStreamingServiceServer

type UnimplementedStreamingServiceServer struct{}

UnimplementedStreamingServiceServer must be embedded to have forward compatible implementations.

NOTE: this should be embedded by value instead of pointer to avoid a nil pointer dereference when methods are called.

func (UnimplementedStreamingServiceServer) ListItems

func (UnimplementedStreamingServiceServer) WatchItems

type UnsafeStreamingServiceServer

type UnsafeStreamingServiceServer interface {
	// contains filtered or unexported methods
}

UnsafeStreamingServiceServer may be embedded to opt out of forward compatibility for this service. Use of this interface is not recommended, as added methods to StreamingServiceServer will result in compilation errors.

type WatchRequest

type WatchRequest struct {
	StartId int64 `protobuf:"varint,1,opt,name=start_id,json=startId,proto3" json:"start_id,omitempty"`
	// contains filtered or unexported fields
}

func (*WatchRequest) Descriptor deprecated

func (*WatchRequest) Descriptor() ([]byte, []int)

Deprecated: Use WatchRequest.ProtoReflect.Descriptor instead.

func (*WatchRequest) GetStartId

func (x *WatchRequest) GetStartId() int64

func (*WatchRequest) ProtoMessage

func (*WatchRequest) ProtoMessage()

func (*WatchRequest) ProtoReflect

func (x *WatchRequest) ProtoReflect() protoreflect.Message

func (*WatchRequest) Reset

func (x *WatchRequest) Reset()

func (*WatchRequest) String

func (x *WatchRequest) String() string

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL