This section is empty.


This section is empty.


func ErrorEqual

func ErrorEqual(got, want error) bool

    ErrorEqual compares two errors for equivalence.

    func ErrorHasCode

    func ErrorHasCode(got error, wantCode codes.Code) bool

      ErrorHasCode returns true if an error has the desired canonical code.

      func ErrorHasMsg

      func ErrorHasMsg(got error, wantStr string) bool

        ErrorHasMsg returns true if an error message contains the desired substring.

        func RandomLiteZone

        func RandomLiteZone() string

          RandomLiteZone chooses a random Pub/Sub Lite zone for integration tests.


          type Barrier

          type Barrier struct {
          	// contains filtered or unexported fields

            Barrier is used to perform two-way synchronization betwen the server and client (test) to ensure tests are deterministic.

            func (*Barrier) Release

            func (b *Barrier) Release()

              Release should be called by the test.

              type DuplicateMsgDetector

              type DuplicateMsgDetector struct {
              	// contains filtered or unexported fields

                DuplicateMsgDetector can be used to detect duplicate messages, either due to duplicate publishes or receives.

                func NewDuplicateMsgDetector

                func NewDuplicateMsgDetector() *DuplicateMsgDetector

                  NewDuplicateMsgDetector creates a new DuplicateMsgDetector.

                  func (*DuplicateMsgDetector) HasPublishDuplicates

                  func (dm *DuplicateMsgDetector) HasPublishDuplicates() bool

                    HasPublishDuplicates returns true if duplicate published messages were detected.

                    func (*DuplicateMsgDetector) HasReceiveDuplicates

                    func (dm *DuplicateMsgDetector) HasReceiveDuplicates() bool

                      HasReceiveDuplicates returns true if duplicate received messages were detected.

                      func (*DuplicateMsgDetector) Receive

                      func (dm *DuplicateMsgDetector) Receive(data string, offset int64)

                        Receive checks the given message data and offset.

                        func (*DuplicateMsgDetector) Status

                        func (dm *DuplicateMsgDetector) Status() string

                          Status returns a non-empty status string if there were duplicates detected.

                          type FakeSource

                          type FakeSource struct {
                          	Ret int64

                            FakeSource is a fake source that returns a configurable constant.

                            func (*FakeSource) Int63

                            func (f *FakeSource) Int63() int64

                              Int63 returns the configured fake random number.

                              func (*FakeSource) Seed

                              func (f *FakeSource) Seed(seed int64)

                                Seed is unimplemented.

                                type MockServer

                                type MockServer interface {
                                	// OnTestStart must be called at the start of each test to clear any existing
                                	// state and set the test verifiers.
                                	// OnTestEnd should be called at the end of each test to flush the verifiers
                                	// (i.e. check whether any expected requests were not sent to the server).

                                  MockServer is an in-memory mock implementation of a Pub/Sub Lite service, which allows unit tests to inspect requests received by the server and send fake responses. This is the interface that should be used by tests.

                                  type MsgTracker

                                  type MsgTracker struct {
                                  	// contains filtered or unexported fields

                                    MsgTracker is a helper for checking whether a set of messages make a full round trip from publisher to subscriber.

                                    Add() registers published messages. Remove() should be called when messages are received by subscribers. Call Wait() to block until all tracked messages are received. The same MsgTracker instance can be reused to repeat this sequence for multiple test cycles.

                                    Add() and Remove() calls should not be interleaved.

                                    func NewMsgTracker

                                    func NewMsgTracker() *MsgTracker

                                      NewMsgTracker creates a new message tracker.

                                      func (*MsgTracker) Add

                                      func (mt *MsgTracker) Add(msgs ...string)

                                        Add a set of tracked messages.

                                        func (*MsgTracker) Empty

                                        func (mt *MsgTracker) Empty() bool

                                          Empty returns true if there are no tracked messages remaining.

                                          func (*MsgTracker) Remove

                                          func (mt *MsgTracker) Remove(msg string) bool

                                            Remove and return true if `msg` is tracked. Signals the `done` channel once all messages have been received.

                                            func (*MsgTracker) Status

                                            func (mt *MsgTracker) Status() error

                                              Status returns an error if there are tracked messages remaining.

                                              func (*MsgTracker) Wait

                                              func (mt *MsgTracker) Wait(timeout time.Duration) error

                                                Wait up to `timeout` to receive all tracked messages.

                                                type OrderingReceiver

                                                type OrderingReceiver struct {
                                                	// contains filtered or unexported fields

                                                  OrderingReceiver consumes a message string generated by OrderingSender and verifies that messages in a partition are ordered. It is used in conjunction with Subscribers.

                                                  func NewOrderingReceiver

                                                  func NewOrderingReceiver() *OrderingReceiver

                                                    NewOrderingReceiver creates a new OrderingReceiver.

                                                    func (*OrderingReceiver) Receive

                                                    func (or *OrderingReceiver) Receive(data, key string) error

                                                      Receive checks the given message data and key and returns an error if unordered messages are detected.

                                                      Note: a normal scenario resulting in unordered messages is when the Publish stream breaks while there are in-flight batches, which are resent upon stream reconnect. Use DuplicateMsgDetector if it is undesirable to fail a test.

                                                      type OrderingSender

                                                      type OrderingSender struct {
                                                      	TotalMsgCount int64

                                                        OrderingSender generates strings containing a message index to use for verifying message ordering. It is used on conjunction with Publishers.

                                                        func NewOrderingSender

                                                        func NewOrderingSender() *OrderingSender

                                                          NewOrderingSender creats a new OrderingSender.

                                                          func (*OrderingSender) Next

                                                          func (os *OrderingSender) Next(prefix string) string

                                                            Next generates the next string to publish.

                                                            type RPCVerifier

                                                            type RPCVerifier struct {
                                                            	// contains filtered or unexported fields

                                                              RPCVerifier stores an queue of requests expected from the client, and the corresponding response or error to return.

                                                              func NewRPCVerifier

                                                              func NewRPCVerifier(t *testing.T) *RPCVerifier

                                                                NewRPCVerifier creates a new verifier for requests received by the server.

                                                                func (*RPCVerifier) Flush

                                                                func (v *RPCVerifier) Flush()

                                                                  Flush logs an error for any remaining {request, response, error} tuples, in case the client terminated early.

                                                                  func (*RPCVerifier) Pop

                                                                  func (v *RPCVerifier) Pop(gotRequest interface{}) (interface{}, error)

                                                                    Pop validates the received request with the next {request, response, error} tuple.

                                                                    func (*RPCVerifier) Push

                                                                    func (v *RPCVerifier) Push(wantRequest interface{}, retResponse interface{}, retErr error)

                                                                      Push appends a new {request, response, error} tuple.

                                                                      Valid combinations for unary and streaming RPCs: - {request, response, nil} - {request, nil, error}

                                                                      Additional combinations for streams only: - {nil, response, nil}: send a response without a request (e.g. messages). - {nil, nil, error}: break the stream without a request. - {request, nil, nil}: expect a request, but don't send any response.

                                                                      func (*RPCVerifier) PushWithBarrier

                                                                      func (v *RPCVerifier) PushWithBarrier(wantRequest interface{}, retResponse interface{}, retErr error) *Barrier

                                                                        PushWithBarrier is like Push, but returns a barrier that the test should call Release when it would like the response to be sent to the client. This is useful for synchronizing with work that needs to be done on the client.

                                                                        func (*RPCVerifier) TryPop

                                                                        func (v *RPCVerifier) TryPop() (bool, interface{}, error)

                                                                          TryPop should be used only for streams. It checks whether the request in the next tuple is nil, in which case the response or error should be returned to the client without waiting for a request. Useful for streams where the server continuously sends data (e.g. subscribe stream).

                                                                          type Server

                                                                          type Server struct {
                                                                          	LiteServer MockServer
                                                                          	// contains filtered or unexported fields

                                                                            Server is a mock Pub/Sub Lite server that can be used for unit testing.

                                                                            func NewServer

                                                                            func NewServer() (*Server, error)

                                                                              NewServer creates a new mock Pub/Sub Lite server.

                                                                              func (*Server) ClientConn

                                                                              func (s *Server) ClientConn() option.ClientOption

                                                                                ClientConn creates a client connection to the gRPC test server.

                                                                                func (*Server) Close

                                                                                func (s *Server) Close()

                                                                                  Close shuts down the server and releases all resources.

                                                                                  type Verifiers

                                                                                  type Verifiers struct {
                                                                                  	// Global list of verifiers for all unary RPCs.
                                                                                  	GlobalVerifier *RPCVerifier
                                                                                  	// contains filtered or unexported fields

                                                                                    Verifiers contains RPCVerifiers for unary RPCs and streaming RPCs.

                                                                                    func NewVerifiers

                                                                                    func NewVerifiers(t *testing.T) *Verifiers

                                                                                      NewVerifiers creates a new instance of Verifiers for a test.

                                                                                      func (*Verifiers) AddAssignmentStream

                                                                                      func (tv *Verifiers) AddAssignmentStream(subscription string, streamVerifier *RPCVerifier)

                                                                                        AddAssignmentStream adds verifiers for an assignment stream.

                                                                                        func (*Verifiers) AddCommitStream

                                                                                        func (tv *Verifiers) AddCommitStream(subscription string, partition int, streamVerifier *RPCVerifier)

                                                                                          AddCommitStream adds verifiers for a commit stream.

                                                                                          func (*Verifiers) AddPublishStream

                                                                                          func (tv *Verifiers) AddPublishStream(topic string, partition int, streamVerifier *RPCVerifier)

                                                                                            AddPublishStream adds verifiers for a publish stream.

                                                                                            func (*Verifiers) AddSubscribeStream

                                                                                            func (tv *Verifiers) AddSubscribeStream(subscription string, partition int, streamVerifier *RPCVerifier)

                                                                                              AddSubscribeStream adds verifiers for a subscribe stream.