msg

package
v0.30.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Feb 6, 2019 License: Apache-2.0 Imports: 25 Imported by: 0

README

Msg - Messaging Service

Google Cloud pubsub

The following workflow define simple topic/subscription producing and consuming example.

Example credentials 'am' is name of google secrets placed to ~/.secret/am.json

endly -r=pubsub.yaml

@pubsub.yaml

pipeline:
  create:
    action: msg:setupResource
    resources:
      - URL: myTopic
        type: topic
        vendor: gc
        credentials: am

      - URL: mySubscription
        type: subscription
        vendor: gc
        credentials: am
        config:
          topic:
            URL: /projects/${msg.projectID}/topics/myTopic

  setup:
    action: msg:push
    dest:
      URL: /projects/${msg.projectID}/topics/myTopic
      credentials: am
    messages:
      - data: "this is my 1st message"
        attributes:
          attr1: abc
      - data: "this is my 2nd message"
        attributes:
          attr1: xyz

  validate:
    action: msg:pull
    count: 2
    nack: true
    source:
      URL: /projects/${msg.projectID}/subscriptions/mySubscription
      credentials: am
    expect:
      - '@indexBy@': 'Attributes.attr1'
      - Data: "this is my 1st message"
        Attributes:
          attr1: abc
      - Data: "this is my 2nd message"
        Attributes:
          attr1: xyz
Amazon Simple Queue Service

The following workflow define simple topic/subscription producing and consuming example.

Example credentials 'am' is name of google secrets placed to ~/.secret/am.json

endly -r=topic.yaml

@topic.yaml

pipeline:
  setup:
    action: msg:setupResource
    credentials: $awsCredentials
    resources:
      - URL: mye2eQueue
        type: queue
        vendor: aws
  trigger:
    action: msg:push
    sleepTimeMs: 30000
    credentials: $awsCredentials
    dest:
      URL: mye2eQueue
      type: queue
      vendor: aws
    messages:
      - data: 'Test: this is my 1st message'
      - data: 'Test: this is my 2nd message'
    validate:
        action: msg:pull
        credentials: aws
        count: 2
        source:
          URL: mye2eQueue
          type: queue
          vendor: aws
        expect:
          - Data: "Test: this is my 1st message"
          - Data: "Test: this is my 2nd message"
Amazon Simple Notifiation Service

Documentation

Index

Constants

View Source
const (
	ResourceVendorGoogleCloud         = "gc"
	ResourceVendorGoogleCloudPlatform = "gcp"
	ResourceVendorAmazonWebService    = "aws"
)
View Source
const (
	ResourceTypeTopic        = "topic"
	ResourceTypeSubscription = "subscription"
	ResourceTypeQueue        = "queue"
)
View Source
const (
	//ServiceID represents gloud msg  pubsub service id.
	ServiceID = "msg"
)

Variables

This section is empty.

Functions

func New

func New() endly.Service

New creates a new NoOperation service.

Types

type Client

type Client interface {
	Push(dest *Resource, message *Message) (Result, error)

	PullN(source *Resource, count int, nack bool) ([]*Message, error)

	SetupResource(resource *ResourceSetup) (*Resource, error)

	DeleteResource(resource *Resource) error

	Close() error
}

func NewPubSubClient

func NewPubSubClient(context *endly.Context, dest *Resource, timeout time.Duration) (Client, error)

NewPubSubClient creates a new Client

type Config

type Config struct {
	Topic               *Resource
	Labels              map[string]string
	Attributes          map[string]string
	AckDeadline         time.Duration
	RetentionDuration   time.Duration
	RetainAckedMessages bool
}

Config represent a subscription config

func NewConfig

func NewConfig(topic string) *Config

NewConfig create new config

type CreateRequest

type CreateRequest struct {
	Credentials string
	Resources   []*ResourceSetup
}

CreateRequest represents a create resource request

func (*CreateRequest) Init

func (r *CreateRequest) Init() error

func (*CreateRequest) Validate

func (r *CreateRequest) Validate() error

type CreateResponse

type CreateResponse struct {
	Resources []*Resource
}

CreateResponse represents a create resource response

type DeleteRequest

type DeleteRequest struct {
	Credentials string
	Resources   []*Resource
}

DeleteRequest represents a delete resource request

func (*DeleteRequest) Init

func (r *DeleteRequest) Init() error

type DeleteResponse

type DeleteResponse struct{}

DeleteResponse represents a delete resource response

type Message

type Message struct {
	ID          string
	Subject     string
	Attributes  map[string]interface{}
	Data        interface{}
	Transformed interface{} `description:"udf transformed data"`
}

func (*Message) Expand

func (m *Message) Expand(state data.Map) *Message

type PullRequest

type PullRequest struct {
	Credentials string
	Source      *Resource
	TimeoutMs   int
	Count       int
	Nack        bool `description:"flag indicates that the client will not or cannot process a Message passed to the Subscriber.Receive callback."`
	UDF         string
	Expect      []map[string]interface{}
}

PullRequest represents a pull request

func (*PullRequest) Init

func (r *PullRequest) Init() error

func (*PullRequest) Validate

func (r *PullRequest) Validate() error

type PullResponse

type PullResponse struct {
	Messages []*Message
	Assert   *validator.AssertResponse
}

PullRequest represents a pull response

type PushRequest

type PushRequest struct {
	Credentials string
	Dest        *Resource
	Messages    []*Message
	Source      *url.Resource
	TimeoutMs   int
	UDF         string
	// contains filtered or unexported fields
}

PushRequest represents push request

func (*PushRequest) Init

func (r *PushRequest) Init() error

func (*PushRequest) Validate

func (r *PushRequest) Validate() error

type PushResponse

type PushResponse struct {
	Results []Result
}

PushResponse represents a push response

type Resource

type Resource struct {
	URL         string
	Credentials string
	ID          string
	Name        string
	Type        string `description:"resource type: topic, subscription"`
	Vendor      string
	Config      interface{} `description:"vendor client config"`
	// contains filtered or unexported fields
}

func NewResource

func NewResource(resourceType, URL, credentials string) *Resource

NewResource creates a new resource

func (*Resource) Init

func (r *Resource) Init() error

Init initializes resource

type ResourceSetup

type ResourceSetup struct {
	Resource
	Recreate bool
	Config   *Config
}

Resource represents resource setup

func NewResourceSetup

func NewResourceSetup(resourceType, URL, credentials string, recreate bool, config *Config) *ResourceSetup

NewResourceSetup creates a new URL

func (*ResourceSetup) Init

func (r *ResourceSetup) Init() error

Init initializes setup resource

func (*ResourceSetup) Validate

func (r *ResourceSetup) Validate() error

type Result

type Result interface{}

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL