Package consumertest provides utilities for in-process unit testing of Gazette consumer applications.



    This section is empty.


    This section is empty.


    func CreateShards

    func CreateShards(t require.TestingT, cmr *Consumer, specs ...*pc.ShardSpec)

      CreateShards using the Consumer Apply API, and wait for them to be allocated.

      func WaitForShards

      func WaitForShards(ctx context.Context, rjc pb.RoutedJournalClient, conn *grpc.ClientConn, sel pb.LabelSelector) error

        WaitForShards queries for shards matching LabelSelector |sel|, determines the current write-heads of journals being consumed by matched shards, and polls shards until each has caught up to the determined write-heads of its consumed journals.


        type Args

        type Args struct {
        	C        require.TestingT
        	Etcd     *clientv3.Client       // Etcd client instance.
        	Journals pb.RoutedJournalClient // Broker client instance.
        	App      consumer.Application   // Application of the consumer.
        	Root     string                 // Consumer root in Etcd. Defaults to "/consumertest".
        	Zone     string                 // Zone of the consumer. Defaults to "local".
        	Suffix   string                 // ID Suffix of the consumer. Defaults to "consumer".

          Args of NewConsumer.

          type Consumer

          type Consumer struct {
          	// Server is a loopback Server created for this Consumer, which is available
          	// for test applications to register APIs against.
          	Server *server.Server
          	// Service of the Consumer, which is available for test applications.
          	Service *consumer.Service
          	// Tasks of the Consumer.
          	Tasks *task.Group
          	// contains filtered or unexported fields

            Consumer is a lightweight, embedded Gazette consumer runtime suitable for in-process testing of consumer applications.

            func NewConsumer

            func NewConsumer(args Args) *Consumer

              NewConsumer builds and returns a Consumer.

              func (*Consumer) Signal

              func (cmr *Consumer) Signal()

                Signal the Consumer. The test Consumer will eventually exit, assuming other Consumers(s) are available to take over the assignments.

                func (*Consumer) WaitForPrimary

                func (cmr *Consumer) WaitForPrimary(ctx context.Context, shard pc.ShardID, routeOut *pb.Route) error

                  WaitForPrimary of the identified shard until the Context is cancelled. If no error occurs, then the shard has a primary *Consumer (which is not necessarily this *Consumer instance). If |routeOut| is non-nil, it's populated with the current shard Route.

                  Source Files