Documentation ¶
Index ¶
- Constants
- type DefaultPublisher
- type DefaultQueuePolicy
- type FakeProvider
- func (p *FakeProvider) GetActorAt(ctx context.Context, key block.TipSetKey, addr address.Address) (*actor.Actor, error)
- func (p *FakeProvider) GetHead() block.TipSetKey
- func (p *FakeProvider) Head() block.TipSetKey
- func (p *FakeProvider) SetActor(addr address.Address, act *actor.Actor)
- func (p *FakeProvider) SetHead(head block.TipSetKey)
- func (p *FakeProvider) SetHeadAndActor(t *testing.T, head block.TipSetKey, addr address.Address, actor *actor.Actor)
- type FakeValidator
- type HeadHandler
- type Inbox
- type MockNetworkPublisher
- type MockPublisher
- type NullPolicy
- type Outbox
- func (ob *Outbox) HandleNewHead(ctx context.Context, oldTips, newTips []block.TipSet) error
- func (ob *Outbox) Queue() *Queue
- func (ob *Outbox) Send(ctx context.Context, from, to address.Address, value types.AttoFIL, ...) (out cid.Cid, pubErrCh chan error, err error)
- func (ob *Outbox) SendEncoded(ctx context.Context, from, to address.Address, value types.AttoFIL, ...) (out cid.Cid, pubErrCh chan error, err error)
- func (ob *Outbox) SignedSend(ctx context.Context, signed *types.SignedMessage, bcast bool) (out cid.Cid, pubErrCh chan error, err error)
- type PolicyTarget
- type Pool
- func (pool *Pool) Add(ctx context.Context, msg *types.SignedMessage, height abi.ChainEpoch) (cid.Cid, error)
- func (pool *Pool) Get(c cid.Cid) (*types.SignedMessage, bool)
- func (pool *Pool) LargestNonce(address address.Address) (largest uint64, found bool)
- func (pool *Pool) Pending() []*types.SignedMessage
- func (pool *Pool) PendingBefore(minimumHeight abi.ChainEpoch) []cid.Cid
- func (pool *Pool) Remove(c cid.Cid)
- type PoolValidator
- type Queue
- func (mq *Queue) Clear(ctx context.Context, sender address.Address) bool
- func (mq *Queue) Enqueue(ctx context.Context, msg *types.SignedMessage, stamp uint64) error
- func (mq *Queue) ExpireBefore(ctx context.Context, stamp uint64) map[address.Address][]*types.SignedMessage
- func (mq *Queue) LargestNonce(sender address.Address) (largest uint64, found bool)
- func (mq *Queue) List(sender address.Address) []*Queued
- func (mq *Queue) Oldest() (oldest uint64)
- func (mq *Queue) Queues() []address.Address
- func (mq *Queue) RemoveNext(ctx context.Context, sender address.Address, expectedNonce uint64) (msg *types.SignedMessage, found bool, err error)
- func (mq *Queue) Requeue(ctx context.Context, msg *types.SignedMessage, stamp uint64) error
- func (mq *Queue) Size() int64
- type QueuePolicy
- type Queued
Constants ¶
const InboxMaxAgeTipsets = 6
InboxMaxAgeTipsets is maximum age (in non-empty tipsets) to permit messages to stay in the pool after reception. It should be a little shorter than the outbox max age so that messages expire from mining pools a little before the sender gives up on them.
const OutboxMaxAgeRounds = 10
OutboxMaxAgeRounds is the maximum age (in consensus rounds) to permit messages to stay in the outbound message queue. This should be a little longer than the message pool's timeout so that messages expire from mining pools a little before the sending node gives up on them.
Variables ¶
This section is empty.
Functions ¶
This section is empty.
Types ¶
type DefaultPublisher ¶
type DefaultPublisher struct {
// contains filtered or unexported fields
}
DefaultPublisher adds messages to a message pool and can publish them to its topic. This is wiring for message publication from the outbox.
func NewDefaultPublisher ¶
func NewDefaultPublisher(pubsub networkPublisher, pool *Pool) *DefaultPublisher
NewDefaultPublisher creates a new publisher.
func (*DefaultPublisher) Publish ¶
func (p *DefaultPublisher) Publish(ctx context.Context, message *types.SignedMessage, height abi.ChainEpoch, bcast bool) error
Publish marshals and publishes a message to the core message pool, and if bcast is true, broadcasts it to the network with the publisher's topic.
type DefaultQueuePolicy ¶
type DefaultQueuePolicy struct {
// contains filtered or unexported fields
}
DefaultQueuePolicy manages a target message queue state in response to changes on the blockchain. Messages are removed from the queue as soon as they appear in a block that's part of a heaviest chain. At this point, messages are highly likely to be valid and known to a large number of nodes, even if the block ends up as an abandoned fork. There is no special handling for re-orgs and messages do not revert to the queue if the block ends up childless (in contrast to the message pool).
func NewMessageQueuePolicy ¶
func NewMessageQueuePolicy(messages messageProvider, maxAge uint) *DefaultQueuePolicy
NewMessageQueuePolicy returns a new policy which removes mined messages from the queue and expires messages older than `maxAgeTipsets` rounds.
func (*DefaultQueuePolicy) HandleNewHead ¶
func (p *DefaultQueuePolicy) HandleNewHead(ctx context.Context, target PolicyTarget, oldTips, newTips []block.TipSet) error
HandleNewHead removes from the queue all messages that have now been mined in new blocks.
type FakeProvider ¶
FakeProvider is a chain and actor provider for testing. The provider extends a chain.Builder for providing tipsets and maintains an explicit head CID. The provider can provide an actor for a single (head, address) pair.
func NewFakeProvider ¶
func NewFakeProvider(t *testing.T) *FakeProvider
NewFakeProvider creates a new builder and wraps with a provider. The builder may be accessed by `provider.Builder`.
func (*FakeProvider) GetActorAt ¶
func (p *FakeProvider) GetActorAt(ctx context.Context, key block.TipSetKey, addr address.Address) (*actor.Actor, error)
GetActorAt returns the actor corresponding to (key, addr) if they match those last set.
func (*FakeProvider) GetHead ¶
func (p *FakeProvider) GetHead() block.TipSetKey
GetHead returns the head tipset key.
func (*FakeProvider) Head ¶
func (p *FakeProvider) Head() block.TipSetKey
Head fulfills the ChainReaderAPI interface
func (*FakeProvider) SetActor ¶
func (p *FakeProvider) SetActor(addr address.Address, act *actor.Actor)
SetActor sets an actor to be mocked on chain
func (*FakeProvider) SetHead ¶
func (p *FakeProvider) SetHead(head block.TipSetKey)
SetHead sets the head tipset
func (*FakeProvider) SetHeadAndActor ¶
func (p *FakeProvider) SetHeadAndActor(t *testing.T, head block.TipSetKey, addr address.Address, actor *actor.Actor)
SetHeadAndActor sets the head tipset, along with the from address and actor to be provided.
type FakeValidator ¶
type FakeValidator struct {
RejectMessages bool
}
FakeValidator is a validator which configurably accepts or rejects messages.
func (FakeValidator) ValidateSignedMessageSyntax ¶
func (v FakeValidator) ValidateSignedMessageSyntax(ctx context.Context, msg *types.SignedMessage) error
Validate returns an error only if `RejectMessages` is true.
type HeadHandler ¶
type HeadHandler struct { // Inbox and outbox exported for testing. Inbox *Inbox Outbox *Outbox // contains filtered or unexported fields }
HeadHandler wires up new head tipset handling to the message inbox and outbox.
func NewHeadHandler ¶
func NewHeadHandler(inbox *Inbox, outbox *Outbox, chain chainProvider, head block.TipSet) *HeadHandler
NewHeadHandler build a new new-head handler.
func (*HeadHandler) HandleNewHead ¶
HandleNewHead computes the chain delta implied by a new head and updates the inbox and outbox.
type Inbox ¶
type Inbox struct {
// contains filtered or unexported fields
}
Inbox maintains a pool of received messages.
func (*Inbox) Add ¶
Add adds a message to the pool, tagged with the current block height. An error probably means the message failed to validate, but it could indicate a more serious problem with the system.
func (*Inbox) HandleNewHead ¶
HandleNewHead updates the message pool in response to a new head tipset. This removes messages from the pool that are found in the newly adopted chain and adds back those from the removed chain (if any) that do not appear in the new chain. The `oldChain` and `newChain` lists are expected in descending height order, and each may be empty.
type MockNetworkPublisher ¶
type MockNetworkPublisher struct {
Data []byte
}
MockNetworkPublisher records the last message published.
type MockPublisher ¶
type MockPublisher struct { ReturnError error // Error to be returned by Publish() Message *types.SignedMessage // Message received by Publish() Height abi.ChainEpoch // Height received by Publish() Bcast bool // was this broadcast? }
MockPublisher is a publisher which just stores the last message published.
func (*MockPublisher) Publish ¶
func (p *MockPublisher) Publish(ctx context.Context, message *types.SignedMessage, height abi.ChainEpoch, bcast bool) error
Publish records the message etc for subsequent inspection.
type NullPolicy ¶
type NullPolicy struct { }
NullPolicy is a policy that does nothing.
func (NullPolicy) HandleNewHead ¶
func (NullPolicy) HandleNewHead(ctx context.Context, target PolicyTarget, oldChain, newChain []block.TipSet) error
HandleNewHead does nothing.
type Outbox ¶
type Outbox struct {
// contains filtered or unexported fields
}
Outbox validates and marshals messages for sending and maintains the outbound message queue. The code arrangement here is not quite right. We probably want to factor out the bits that build and sign a message from those that add to the local queue/pool and broadcast it. See discussion in https://github.com/filecoin-project/go-filecoin/pull/3178#discussion_r311593312 and https://github.com/filecoin-project/go-filecoin/issues/3052#issuecomment-513643661
func NewOutbox ¶
func NewOutbox(signer types.Signer, validator messageValidator, queue *Queue, publisher publisher, policy QueuePolicy, chains chainProvider, actors actorProvider, jw journal.Writer) *Outbox
NewOutbox creates a new outbox
func (*Outbox) HandleNewHead ¶
HandleNewHead maintains the message queue in response to a new head tipset.
func (*Outbox) Send ¶
func (ob *Outbox) Send(ctx context.Context, from, to address.Address, value types.AttoFIL, gasPrice types.AttoFIL, gasLimit gas.Unit, bcast bool, method abi.MethodNum, params interface{}) (out cid.Cid, pubErrCh chan error, err error)
Send marshals and sends a message, retaining it in the outbound message queue. If bcast is true, the publisher broadcasts the message to the network at the current block height.
func (*Outbox) SendEncoded ¶
func (ob *Outbox) SendEncoded(ctx context.Context, from, to address.Address, value types.AttoFIL, gasPrice types.AttoFIL, gasLimit gas.Unit, bcast bool, method abi.MethodNum, encodedParams []byte) (out cid.Cid, pubErrCh chan error, err error)
SendEncoded sends an encoded message, retaining it in the outbound message queue. If bcast is true, the publisher broadcasts the message to the network at the current block height.
func (*Outbox) SignedSend ¶
func (ob *Outbox) SignedSend(ctx context.Context, signed *types.SignedMessage, bcast bool) (out cid.Cid, pubErrCh chan error, err error)
SignedSend send a signed message, retaining it in the outbound message queue. If bcast is true, the publisher broadcasts the message to the network at the current block height.
type PolicyTarget ¶
type PolicyTarget interface { RemoveNext(ctx context.Context, sender address.Address, expectedNonce uint64) (msg *types.SignedMessage, found bool, err error) Requeue(ctx context.Context, msg *types.SignedMessage, stamp uint64) error ExpireBefore(ctx context.Context, stamp uint64) map[address.Address][]*types.SignedMessage }
PolicyTarget is outbound queue object on which the policy acts.
type Pool ¶
type Pool struct {
// contains filtered or unexported fields
}
Pool keeps an unordered, de-duplicated set of Messages and supports removal by CID. By 'de-duplicated' we mean that insertion of a message by cid that already exists is a nop. We use a Pool to store all messages received by this node via network or directly created via user command that have yet to be included in a block. Messages are removed as they are processed.
Pool is safe for concurrent access.
func NewPool ¶
func NewPool(cfg *config.MessagePoolConfig, validator PoolValidator) *Pool
NewPool constructs a new Pool.
func (*Pool) Add ¶
func (pool *Pool) Add(ctx context.Context, msg *types.SignedMessage, height abi.ChainEpoch) (cid.Cid, error)
Add adds a message to the pool, tagged with the block height at which it was received. Does nothing if the message is already in the pool.
func (*Pool) Get ¶
func (pool *Pool) Get(c cid.Cid) (*types.SignedMessage, bool)
Get retrieves a message from the pool by CID.
func (*Pool) LargestNonce ¶
LargestNonce returns the largest nonce used by a message from address in the pool. If no messages from address are found, found will be false.
func (*Pool) Pending ¶
func (pool *Pool) Pending() []*types.SignedMessage
Pending returns all pending messages.
func (*Pool) PendingBefore ¶
func (pool *Pool) PendingBefore(minimumHeight abi.ChainEpoch) []cid.Cid
PendingBefore returns the CIDs of messages added with height less than `minimumHeight`.
type PoolValidator ¶
type PoolValidator interface {
ValidateSignedMessageSyntax(ctx context.Context, msg *types.SignedMessage) error
}
PoolValidator defines a validator that ensures a message can go through the pool.
type Queue ¶
type Queue struct {
// contains filtered or unexported fields
}
Queue stores an ordered list of messages (per actor) and enforces that their nonces form a contiguous sequence. Each message is associated with a "stamp" (an opaque integer), and the queue supports expiring any list of messages where the first message has a stamp below some threshold. The relative order of stamps in a queue is not enforced. A message queue is intended to record outbound messages that have been transmitted but not yet appeared in a block, where the stamp could be block height. Queue is safe for concurrent access.
func (*Queue) Clear ¶
Clear removes all messages for a single sender address. Returns whether the queue was non-empty before being cleared.
func (*Queue) Enqueue ¶
Enqueue appends a new message for an address. If the queue already contains any messages for from same address, the new message's nonce must be exactly one greater than the largest nonce present.
func (*Queue) ExpireBefore ¶
func (mq *Queue) ExpireBefore(ctx context.Context, stamp uint64) map[address.Address][]*types.SignedMessage
ExpireBefore clears the queue of any sender where the first message in the queue has a stamp less than `stamp`. Returns a map containing any expired address queues.
func (*Queue) LargestNonce ¶
LargestNonce returns the largest nonce of any message in the queue for an address. If the queue for the address is empty, returns (0, false).
func (*Queue) Oldest ¶
Oldest returns the oldest message stamp in the Queue. Oldest returns 0 if the queue is empty. Exported for testing only.
func (*Queue) Queues ¶
func (mq *Queue) Queues() []address.Address
Queues returns the addresses associated with each non-empty queue. The order of returned addresses is neither defined nor stable.
func (*Queue) RemoveNext ¶
func (mq *Queue) RemoveNext(ctx context.Context, sender address.Address, expectedNonce uint64) (msg *types.SignedMessage, found bool, err error)
RemoveNext removes and returns a single message from the queue, if it bears the expected nonce value, with found = true. Returns found = false if the queue is empty or the expected nonce is less than any in the queue for that address (indicating the message had already been removed). Returns an error if the expected nonce is greater than the smallest in the queue. The caller may wish to check that the returned message is equal to that expected (not just in nonce value).
type QueuePolicy ¶
type QueuePolicy interface { // HandleNewHead updates a message queue in response to a new chain head. The new head may be based // directly on the previous head, or it may be based on a prior tipset (aka a re-org). // - `oldTips` is a list of tipsets that used to be on the main chain but are no longer. // - `newTips` is a list of tipsets that now form the head of the main chain. // Both lists are in descending height order, down to but not including the common ancestor tipset. HandleNewHead(ctx context.Context, target PolicyTarget, oldTips, newTips []block.TipSet) error }
QueuePolicy manages a message queue state in response to changes on the blockchain.
type Queued ¶
type Queued struct { Msg *types.SignedMessage Stamp uint64 }
Queued is a message an the stamp it was enqueued with.