message

package
v0.6.4 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: May 19, 2020 License: Apache-2.0, MIT Imports: 19 Imported by: 0

Documentation

Index

Constants

View Source
const InboxMaxAgeTipsets = 6

InboxMaxAgeTipsets is maximum age (in non-empty tipsets) to permit messages to stay in the pool after reception. It should be a little shorter than the outbox max age so that messages expire from mining pools a little before the sender gives up on them.

View Source
const OutboxMaxAgeRounds = 10

OutboxMaxAgeRounds is the maximum age (in consensus rounds) to permit messages to stay in the outbound message queue. This should be a little longer than the message pool's timeout so that messages expire from mining pools a little before the sending node gives up on them.

Variables

This section is empty.

Functions

This section is empty.

Types

type DefaultPublisher

type DefaultPublisher struct {
	// contains filtered or unexported fields
}

DefaultPublisher adds messages to a message pool and can publish them to its topic. This is wiring for message publication from the outbox.

func NewDefaultPublisher

func NewDefaultPublisher(pubsub networkPublisher, pool *Pool) *DefaultPublisher

NewDefaultPublisher creates a new publisher.

func (*DefaultPublisher) Publish

func (p *DefaultPublisher) Publish(ctx context.Context, message *types.SignedMessage, height abi.ChainEpoch, bcast bool) error

Publish marshals and publishes a message to the core message pool, and if bcast is true, broadcasts it to the network with the publisher's topic.

type DefaultQueuePolicy

type DefaultQueuePolicy struct {
	// contains filtered or unexported fields
}

DefaultQueuePolicy manages a target message queue state in response to changes on the blockchain. Messages are removed from the queue as soon as they appear in a block that's part of a heaviest chain. At this point, messages are highly likely to be valid and known to a large number of nodes, even if the block ends up as an abandoned fork. There is no special handling for re-orgs and messages do not revert to the queue if the block ends up childless (in contrast to the message pool).

func NewMessageQueuePolicy

func NewMessageQueuePolicy(messages messageProvider, maxAge uint) *DefaultQueuePolicy

NewMessageQueuePolicy returns a new policy which removes mined messages from the queue and expires messages older than `maxAgeTipsets` rounds.

func (*DefaultQueuePolicy) HandleNewHead

func (p *DefaultQueuePolicy) HandleNewHead(ctx context.Context, target PolicyTarget, oldTips, newTips []block.TipSet) error

HandleNewHead removes from the queue all messages that have now been mined in new blocks.

type FakeProvider

type FakeProvider struct {
	*chain.Builder
	// contains filtered or unexported fields
}

FakeProvider is a chain and actor provider for testing. The provider extends a chain.Builder for providing tipsets and maintains an explicit head CID. The provider can provide an actor for a single (head, address) pair.

func NewFakeProvider

func NewFakeProvider(t *testing.T) *FakeProvider

NewFakeProvider creates a new builder and wraps with a provider. The builder may be accessed by `provider.Builder`.

func (*FakeProvider) GetActorAt

func (p *FakeProvider) GetActorAt(ctx context.Context, key block.TipSetKey, addr address.Address) (*actor.Actor, error)

GetActorAt returns the actor corresponding to (key, addr) if they match those last set.

func (*FakeProvider) GetHead

func (p *FakeProvider) GetHead() block.TipSetKey

GetHead returns the head tipset key.

func (*FakeProvider) Head

func (p *FakeProvider) Head() block.TipSetKey

Head fulfills the ChainReaderAPI interface

func (*FakeProvider) SetActor

func (p *FakeProvider) SetActor(addr address.Address, act *actor.Actor)

SetActor sets an actor to be mocked on chain

func (*FakeProvider) SetHead

func (p *FakeProvider) SetHead(head block.TipSetKey)

SetHead sets the head tipset

func (*FakeProvider) SetHeadAndActor

func (p *FakeProvider) SetHeadAndActor(t *testing.T, head block.TipSetKey, addr address.Address, actor *actor.Actor)

SetHeadAndActor sets the head tipset, along with the from address and actor to be provided.

type FakeValidator

type FakeValidator struct {
	RejectMessages bool
}

FakeValidator is a validator which configurably accepts or rejects messages.

func (FakeValidator) ValidateSignedMessageSyntax

func (v FakeValidator) ValidateSignedMessageSyntax(ctx context.Context, msg *types.SignedMessage) error

Validate returns an error only if `RejectMessages` is true.

type HeadHandler

type HeadHandler struct {
	// Inbox and outbox exported for testing.
	Inbox  *Inbox
	Outbox *Outbox
	// contains filtered or unexported fields
}

HeadHandler wires up new head tipset handling to the message inbox and outbox.

func NewHeadHandler

func NewHeadHandler(inbox *Inbox, outbox *Outbox, chain chainProvider, head block.TipSet) *HeadHandler

NewHeadHandler build a new new-head handler.

func (*HeadHandler) HandleNewHead

func (h *HeadHandler) HandleNewHead(ctx context.Context, newHead block.TipSet) error

HandleNewHead computes the chain delta implied by a new head and updates the inbox and outbox.

type Inbox

type Inbox struct {
	// contains filtered or unexported fields
}

Inbox maintains a pool of received messages.

func NewInbox

func NewInbox(pool *Pool, maxAgeRounds uint, chain chainProvider, messages messageProvider) *Inbox

NewInbox constructs a new inbox.

func (*Inbox) Add

func (ib *Inbox) Add(ctx context.Context, msg *types.SignedMessage) (cid.Cid, error)

Add adds a message to the pool, tagged with the current block height. An error probably means the message failed to validate, but it could indicate a more serious problem with the system.

func (*Inbox) HandleNewHead

func (ib *Inbox) HandleNewHead(ctx context.Context, oldChain, newChain []block.TipSet) error

HandleNewHead updates the message pool in response to a new head tipset. This removes messages from the pool that are found in the newly adopted chain and adds back those from the removed chain (if any) that do not appear in the new chain. The `oldChain` and `newChain` lists are expected in descending height order, and each may be empty.

func (*Inbox) Pool

func (ib *Inbox) Pool() *Pool

Pool returns the inbox's message pool.

type MockNetworkPublisher

type MockNetworkPublisher struct {
	Data []byte
}

MockNetworkPublisher records the last message published.

func (*MockNetworkPublisher) Publish

func (p *MockNetworkPublisher) Publish(ctx context.Context, data []byte) error

Publish records the topic and message.

type MockPublisher

type MockPublisher struct {
	ReturnError error                // Error to be returned by Publish()
	Message     *types.SignedMessage // Message received by Publish()
	Height      abi.ChainEpoch       // Height received by Publish()
	Bcast       bool                 // was this broadcast?
}

MockPublisher is a publisher which just stores the last message published.

func (*MockPublisher) Publish

func (p *MockPublisher) Publish(ctx context.Context, message *types.SignedMessage, height abi.ChainEpoch, bcast bool) error

Publish records the message etc for subsequent inspection.

type NullPolicy

type NullPolicy struct {
}

NullPolicy is a policy that does nothing.

func (NullPolicy) HandleNewHead

func (NullPolicy) HandleNewHead(ctx context.Context, target PolicyTarget, oldChain, newChain []block.TipSet) error

HandleNewHead does nothing.

type Outbox

type Outbox struct {
	// contains filtered or unexported fields
}

Outbox validates and marshals messages for sending and maintains the outbound message queue. The code arrangement here is not quite right. We probably want to factor out the bits that build and sign a message from those that add to the local queue/pool and broadcast it. See discussion in https://github.com/filecoin-project/go-filecoin/pull/3178#discussion_r311593312 and https://github.com/filecoin-project/go-filecoin/issues/3052#issuecomment-513643661

func NewOutbox

func NewOutbox(signer types.Signer, validator messageValidator, queue *Queue,
	publisher publisher, policy QueuePolicy, chains chainProvider, actors actorProvider, jw journal.Writer) *Outbox

NewOutbox creates a new outbox

func (*Outbox) HandleNewHead

func (ob *Outbox) HandleNewHead(ctx context.Context, oldTips, newTips []block.TipSet) error

HandleNewHead maintains the message queue in response to a new head tipset.

func (*Outbox) Queue

func (ob *Outbox) Queue() *Queue

Queue returns the outbox's outbound message queue.

func (*Outbox) Send

func (ob *Outbox) Send(ctx context.Context, from, to address.Address, value types.AttoFIL,
	gasPrice types.AttoFIL, gasLimit gas.Unit, bcast bool, method abi.MethodNum, params interface{}) (out cid.Cid, pubErrCh chan error, err error)

Send marshals and sends a message, retaining it in the outbound message queue. If bcast is true, the publisher broadcasts the message to the network at the current block height.

func (*Outbox) SendEncoded

func (ob *Outbox) SendEncoded(ctx context.Context, from, to address.Address, value types.AttoFIL,
	gasPrice types.AttoFIL, gasLimit gas.Unit, bcast bool, method abi.MethodNum, encodedParams []byte) (out cid.Cid, pubErrCh chan error, err error)

SendEncoded sends an encoded message, retaining it in the outbound message queue. If bcast is true, the publisher broadcasts the message to the network at the current block height.

func (*Outbox) SignedSend

func (ob *Outbox) SignedSend(ctx context.Context, signed *types.SignedMessage, bcast bool) (out cid.Cid, pubErrCh chan error, err error)

SignedSend send a signed message, retaining it in the outbound message queue. If bcast is true, the publisher broadcasts the message to the network at the current block height.

type PolicyTarget

type PolicyTarget interface {
	RemoveNext(ctx context.Context, sender address.Address, expectedNonce uint64) (msg *types.SignedMessage, found bool, err error)
	Requeue(ctx context.Context, msg *types.SignedMessage, stamp uint64) error
	ExpireBefore(ctx context.Context, stamp uint64) map[address.Address][]*types.SignedMessage
}

PolicyTarget is outbound queue object on which the policy acts.

type Pool

type Pool struct {
	// contains filtered or unexported fields
}

Pool keeps an unordered, de-duplicated set of Messages and supports removal by CID. By 'de-duplicated' we mean that insertion of a message by cid that already exists is a nop. We use a Pool to store all messages received by this node via network or directly created via user command that have yet to be included in a block. Messages are removed as they are processed.

Pool is safe for concurrent access.

func NewPool

func NewPool(cfg *config.MessagePoolConfig, validator PoolValidator) *Pool

NewPool constructs a new Pool.

func (*Pool) Add

func (pool *Pool) Add(ctx context.Context, msg *types.SignedMessage, height abi.ChainEpoch) (cid.Cid, error)

Add adds a message to the pool, tagged with the block height at which it was received. Does nothing if the message is already in the pool.

func (*Pool) Get

func (pool *Pool) Get(c cid.Cid) (*types.SignedMessage, bool)

Get retrieves a message from the pool by CID.

func (*Pool) LargestNonce

func (pool *Pool) LargestNonce(address address.Address) (largest uint64, found bool)

LargestNonce returns the largest nonce used by a message from address in the pool. If no messages from address are found, found will be false.

func (*Pool) Pending

func (pool *Pool) Pending() []*types.SignedMessage

Pending returns all pending messages.

func (*Pool) PendingBefore

func (pool *Pool) PendingBefore(minimumHeight abi.ChainEpoch) []cid.Cid

PendingBefore returns the CIDs of messages added with height less than `minimumHeight`.

func (*Pool) Remove

func (pool *Pool) Remove(c cid.Cid)

Remove removes the message by CID from the pending pool.

type PoolValidator

type PoolValidator interface {
	ValidateSignedMessageSyntax(ctx context.Context, msg *types.SignedMessage) error
}

PoolValidator defines a validator that ensures a message can go through the pool.

type Queue

type Queue struct {
	// contains filtered or unexported fields
}

Queue stores an ordered list of messages (per actor) and enforces that their nonces form a contiguous sequence. Each message is associated with a "stamp" (an opaque integer), and the queue supports expiring any list of messages where the first message has a stamp below some threshold. The relative order of stamps in a queue is not enforced. A message queue is intended to record outbound messages that have been transmitted but not yet appeared in a block, where the stamp could be block height. Queue is safe for concurrent access.

func NewQueue

func NewQueue() *Queue

NewQueue constructs a new, empty queue.

func (*Queue) Clear

func (mq *Queue) Clear(ctx context.Context, sender address.Address) bool

Clear removes all messages for a single sender address. Returns whether the queue was non-empty before being cleared.

func (*Queue) Enqueue

func (mq *Queue) Enqueue(ctx context.Context, msg *types.SignedMessage, stamp uint64) error

Enqueue appends a new message for an address. If the queue already contains any messages for from same address, the new message's nonce must be exactly one greater than the largest nonce present.

func (*Queue) ExpireBefore

func (mq *Queue) ExpireBefore(ctx context.Context, stamp uint64) map[address.Address][]*types.SignedMessage

ExpireBefore clears the queue of any sender where the first message in the queue has a stamp less than `stamp`. Returns a map containing any expired address queues.

func (*Queue) LargestNonce

func (mq *Queue) LargestNonce(sender address.Address) (largest uint64, found bool)

LargestNonce returns the largest nonce of any message in the queue for an address. If the queue for the address is empty, returns (0, false).

func (*Queue) List

func (mq *Queue) List(sender address.Address) []*Queued

List returns a copy of the list of messages queued for an address.

func (*Queue) Oldest

func (mq *Queue) Oldest() (oldest uint64)

Oldest returns the oldest message stamp in the Queue. Oldest returns 0 if the queue is empty. Exported for testing only.

func (*Queue) Queues

func (mq *Queue) Queues() []address.Address

Queues returns the addresses associated with each non-empty queue. The order of returned addresses is neither defined nor stable.

func (*Queue) RemoveNext

func (mq *Queue) RemoveNext(ctx context.Context, sender address.Address, expectedNonce uint64) (msg *types.SignedMessage, found bool, err error)

RemoveNext removes and returns a single message from the queue, if it bears the expected nonce value, with found = true. Returns found = false if the queue is empty or the expected nonce is less than any in the queue for that address (indicating the message had already been removed). Returns an error if the expected nonce is greater than the smallest in the queue. The caller may wish to check that the returned message is equal to that expected (not just in nonce value).

func (*Queue) Requeue

func (mq *Queue) Requeue(ctx context.Context, msg *types.SignedMessage, stamp uint64) error

Requeue prepends a message for an address. If the queue already contains any messages from the same address, the message's nonce must be exactly one *less than* the smallest nonce present.

func (*Queue) Size

func (mq *Queue) Size() int64

Size returns the total number of messages in the Queue.

type QueuePolicy

type QueuePolicy interface {
	// HandleNewHead updates a message queue in response to a new chain head. The new head may be based
	// directly on the previous head, or it may be based on a prior tipset (aka a re-org).
	// - `oldTips` is a list of tipsets that used to be on the main chain but are no longer.
	// - `newTips` is a list of tipsets that now form the head of the main chain.
	// Both lists are in descending height order, down to but not including the common ancestor tipset.
	HandleNewHead(ctx context.Context, target PolicyTarget, oldTips, newTips []block.TipSet) error
}

QueuePolicy manages a message queue state in response to changes on the blockchain.

type Queued

type Queued struct {
	Msg   *types.SignedMessage
	Stamp uint64
}

Queued is a message an the stamp it was enqueued with.

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL