Package pstest provides a fake Cloud PubSub service for testing. It implements a simplified form of the service, suitable for unit tests. It may behave differently from the actual service in ways in which the service is non-deterministic or unspecified: timing, delivery order, etc.

This package is EXPERIMENTAL and is subject to change without notice.

See the example for usage.




type Message

type Message struct {
	ID          string
	Data        []byte
	Attributes  map[string]string
	PublishTime time.Time
	Deliveries  int // number of times delivery of the message was attempted
	Acks        int // number of acks received from clients
	// contains filtered or unexported fields

A Message is a message that was published to the server.

type Server

type Server struct {
	Addr string // The address that the server is listening on.
	// contains filtered or unexported fields

func NewServer

func NewServer() *Server

NewServer creates a new fake server running in the current process.

package main

import (

func main() {
	ctx := context.Background()
	// Start a fake server running locally.
	srv := pstest.NewServer()
	// Connect to the server without using TLS.
	conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
	if err != nil {
		// TODO: Handle error.
	// Use the connection when creating a pubsub client.
	client, err := pubsub.NewClient(ctx, "project", option.WithGRPCConn(conn))
	if err != nil {
		// TODO: Handle error.
	defer client.Close()
	_ = client // TODO: Use the client.

func (*Server) Message

func (s *Server) Message(id string) *Message

Message returns the message with the given ID, or nil if no message with that ID was published.

func (*Server) Messages

func (s *Server) Messages() []*Message

Messages returns information about all messages ever published.

func (*Server) Publish

func (s *Server) Publish(topic string, data []byte, attrs map[string]string) string

Publish behaves as if the Publish RPC was called with a message with the given data and attrs. It returns the ID of the message. The topic will be created if it doesn't exist.

Publish panics if there is an error, which is appropriate for testing.

func (*Server) SetStreamTimeout

func (s *Server) SetStreamTimeout(d time.Duration)

SetStreamTimeout sets the amount of time a stream will be active before it shuts itself down. This mimics the real service's behavior of closing streams after 30 minutes. If SetStreamTimeout is never called or is passed zero, streams never shut down.

func (*Server) Wait

func (s *Server) Wait()

Wait blocks until all server activity has completed.

Source Files

