database

package
v1.2.0 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Mar 8, 2026 License: MIT Imports: 29 Imported by: 0

Documentation

Index

Constants

View Source
const (
	ErrGetAccountBalance = "failed to get account balance"
	ErrRecordLedgerEntry = "failed to record a ledger entry"
)
View Source
const (
	DefaultLimit = 10
	MaxLimit     = 100
)

Variables

View Source
var (
	ErrInvalidLedgerTransactionType = "invalid ledger transaction type"
	ErrRecordTransaction            = "failed to record transaction"
)
View Source
var ErrEventHasAlreadyBeenProcessed = errors.New("contract event has already been processed")

Functions

func ConnectToDB

func ConnectToDB(cnf DatabaseConfig, embedMigrations embed.FS) (*gorm.DB, error)

func SetupTestDB

func SetupTestDB(t testing.TB) (*gorm.DB, func())

SetupTestDB chooses SQLite or Postgres based on TEST_DB_DRIVER environment variable. This is exported so it can be used by tests in this package.

Types

type ActionLogEntryV1

type ActionLogEntryV1 struct {
	ID          uuid.UUID `gorm:"type:char(36);primaryKey"`
	UserWallet  string    `gorm:"column:user_wallet;not null"`
	GatedAction uint8     `gorm:"column:gated_action;not null"`
	CreatedAt   time.Time
}

func (ActionLogEntryV1) TableName

func (ActionLogEntryV1) TableName() string

type ActiveCountByLabel

type ActiveCountByLabel struct {
	Label string `gorm:"column:label"`
	Count uint64 `gorm:"column:count"`
}

CountActiveUsers returns the number of distinct users who had channel state updates within the given duration, grouped by asset. If asset is empty, counts across all assets. ActiveCountByLabel holds a count grouped by a label (asset or application_id).

type AppLedgerEntryV1

type AppLedgerEntryV1 struct {
	ID          uuid.UUID       `gorm:"type:char(36);primaryKey"`
	AccountID   string          `gorm:"column:account_id;not null;index:idx_account_asset_symbol;index:idx_account_wallet"`
	AssetSymbol string          `gorm:"column:asset_symbol;not null;index:idx_account_asset_symbol"`
	Wallet      string          `gorm:"column:wallet;not null;index:idx_account_wallet"`
	Credit      decimal.Decimal `gorm:"column:credit;type:varchar(78);not null"`
	Debit       decimal.Decimal `gorm:"column:debit;type:varchar(78);not null"`
	CreatedAt   time.Time
}

AppLedgerEntryV1 represents a ledger entry in the database

func (AppLedgerEntryV1) TableName

func (AppLedgerEntryV1) TableName() string

type AppParticipantV1

type AppParticipantV1 struct {
	AppSessionID    string `gorm:"column:app_session_id;not null;primaryKey;priority:1"`
	WalletAddress   string `gorm:"column:wallet_address;not null;primaryKey;priority:2"`
	SignatureWeight uint8  `gorm:"column:signature_weight;not null"`
}

AppParticipantV1 represents the definition for an app participant.

func (AppParticipantV1) TableName

func (AppParticipantV1) TableName() string

type AppSessionCount

type AppSessionCount struct {
	Application string               `gorm:"column:application_id"`
	Status      app.AppSessionStatus `gorm:"column:status"`
	Count       uint64               `gorm:"column:count"`
	LastUpdated time.Time            `gorm:"column:last_updated"`
}

AppSessionCount holds the result of a COUNT() GROUP BY query on app sessions.

type AppSessionKeyAppSessionIDV1

type AppSessionKeyAppSessionIDV1 struct {
	SessionKeyStateID string `gorm:"column:session_key_state_id;not null;primaryKey;priority:1"`
	AppSessionID      string `gorm:"column:app_session_id;not null;primaryKey;priority:2;index"`
}

AppSessionKeyAppSessionIDV1 links a session key state to an app session ID.

func (AppSessionKeyAppSessionIDV1) TableName

func (AppSessionKeyAppSessionIDV1) TableName() string

type AppSessionKeyApplicationV1

type AppSessionKeyApplicationV1 struct {
	SessionKeyStateID string `gorm:"column:session_key_state_id;not null;primaryKey;priority:1"`
	ApplicationID     string `gorm:"column:application_id;not null;primaryKey;priority:2;index"`
}

SessionKeyApplicationV1 links a session key state to an application ID.

func (AppSessionKeyApplicationV1) TableName

func (AppSessionKeyApplicationV1) TableName() string

type AppSessionKeyStateV1

type AppSessionKeyStateV1 struct {
	ID             string                        `gorm:"column:id;primaryKey"`
	UserAddress    string                        `gorm:"column:user_address;not null;uniqueIndex:idx_session_key_states_v1_user_key_ver,priority:1"`
	SessionKey     string                        `gorm:"column:session_key;not null;uniqueIndex:idx_session_key_states_v1_user_key_ver,priority:2"`
	Version        uint64                        `gorm:"column:version;not null;uniqueIndex:idx_session_key_states_v1_user_key_ver,priority:3"`
	ApplicationIDs []AppSessionKeyApplicationV1  `gorm:"foreignKey:SessionKeyStateID;references:ID"`
	AppSessionIDs  []AppSessionKeyAppSessionIDV1 `gorm:"foreignKey:SessionKeyStateID;references:ID"`
	ExpiresAt      time.Time                     `gorm:"column:expires_at;not null"`
	UserSig        string                        `gorm:"column:user_sig;not null"`
	CreatedAt      time.Time
}

AppSessionKeyStateV1 represents a session key state in the database. ID is Hash(user_address + session_key + version).

func (AppSessionKeyStateV1) TableName

func (AppSessionKeyStateV1) TableName() string

type AppSessionV1

type AppSessionV1 struct {
	ID            string               `gorm:"primaryKey"`
	ApplicationID string               `gorm:"column:application_id;not null"`
	Nonce         uint64               `gorm:"column:nonce;not null"`
	Participants  []AppParticipantV1   `gorm:"foreignKey:AppSessionID;references:ID"`
	SessionData   string               `gorm:"column:session_data;type:text;not null"`
	Quorum        uint8                `gorm:"column:quorum;default:100"`
	Version       uint64               `gorm:"column:version;default:1"`
	Status        app.AppSessionStatus `gorm:"column:status;not null"`
	CreatedAt     time.Time
	UpdatedAt     time.Time
}

AppSessionV1 represents a virtual payment application session between participants

func (AppSessionV1) TableName

func (AppSessionV1) TableName() string

type AppV1

type AppV1 struct {
	ID                          string `gorm:"primaryKey"`
	OwnerWallet                 string `gorm:"column:owner_wallet;not null"`
	Metadata                    string `gorm:"column:metadata;type:text;not null"`
	Version                     uint64 `gorm:"column:version;default:1"`
	CreationApprovalNotRequired bool   `gorm:"column:creation_approval_not_required"`
	CreatedAt                   time.Time
	UpdatedAt                   time.Time
}

AppV1 represents an application registry entry in the database.

func (AppV1) TableName

func (AppV1) TableName() string

type BlockchainAction

type BlockchainAction struct {
	ID           int64                  `gorm:"primary_key"`
	Type         BlockchainActionType   `gorm:"column:action_type;not null"`
	StateID      string                 `gorm:"column:state_id;size:66"`
	BlockchainID uint64                 `gorm:"column:blockchain_id;not null"`
	Data         datatypes.JSON         `gorm:"column:action_data;type:text"`
	Status       BlockchainActionStatus `gorm:"column:status;not null"`
	Retries      uint8                  `gorm:"column:retry_count;default:0"`
	Error        string                 `gorm:"column:last_error;type:text"`
	TxHash       string                 `gorm:"column:transaction_hash;size:66"`
	CreatedAt    time.Time              `gorm:"column:created_at"`
	UpdatedAt    time.Time              `gorm:"column:updated_at"`
}

func (BlockchainAction) TableName

func (BlockchainAction) TableName() string

type BlockchainActionStatus

type BlockchainActionStatus uint8
const (
	BlockchainActionStatusPending BlockchainActionStatus = iota
	BlockchainActionStatusCompleted
	BlockchainActionStatusFailed
)

type BlockchainActionType

type BlockchainActionType uint8
const (
	ActionTypeCheckpoint BlockchainActionType = 1

	ActionTypeInitiateEscrowDeposit BlockchainActionType = 10
	ActionTypeFinalizeEscrowDeposit BlockchainActionType = 11

	ActionTypeInitiateEscrowWithdrawal BlockchainActionType = 20
	ActionTypeFinalizeEscrowWithdrawal BlockchainActionType = 21
)

func (BlockchainActionType) String

func (t BlockchainActionType) String() string

type Channel

type Channel struct {
	ChannelID             string             `gorm:"column:channel_id;primaryKey;"`
	UserWallet            string             `gorm:"column:user_wallet;not null"`
	Asset                 string             `gorm:"column:asset;not null"`
	Type                  core.ChannelType   `gorm:"column:type;not null"`
	BlockchainID          uint64             `gorm:"column:blockchain_id;not null"`
	Token                 string             `gorm:"column:token;not null"`
	ChallengeDuration     uint32             `gorm:"column:challenge_duration;not null"`
	ChallengeExpiresAt    *time.Time         `gorm:"column:challenge_expires_at;default:null"`
	Nonce                 uint64             `gorm:"column:nonce;not null;"`
	ApprovedSigValidators string             `gorm:"column:approved_sig_validators;not null;"`
	Status                core.ChannelStatus `gorm:"column:status;not null;"`
	StateVersion          uint64             `gorm:"column:state_version;not null;"`
	CreatedAt             time.Time
	UpdatedAt             time.Time
}

Channel represents a state channel between participants

func (Channel) TableName

func (Channel) TableName() string

TableName specifies the table name for the Channel model

type ChannelCount

type ChannelCount struct {
	Asset       string             `gorm:"column:asset"`
	Status      core.ChannelStatus `gorm:"column:status"`
	Count       uint64             `gorm:"column:count"`
	LastUpdated time.Time          `gorm:"column:last_updated"`
}

ChannelCount holds the result of a COUNT() GROUP BY query on channels.

type ChannelSessionKeyAssetV1

type ChannelSessionKeyAssetV1 struct {
	SessionKeyStateID string `gorm:"column:session_key_state_id;not null;primaryKey;priority:1"`
	Asset             string `gorm:"column:asset;not null;primaryKey;priority:2;index"`
}

ChannelSessionKeyAssetV1 links a channel session key state to an asset.

func (ChannelSessionKeyAssetV1) TableName

func (ChannelSessionKeyAssetV1) TableName() string

type ChannelSessionKeyStateV1

type ChannelSessionKeyStateV1 struct {
	ID           string                     `gorm:"column:id;primaryKey"`
	UserAddress  string                     `gorm:"column:user_address;not null;uniqueIndex:idx_channel_session_key_states_v1_user_key_ver,priority:1"`
	SessionKey   string                     `gorm:"column:session_key;not null;uniqueIndex:idx_channel_session_key_states_v1_user_key_ver,priority:2"`
	Version      uint64                     `gorm:"column:version;not null;uniqueIndex:idx_channel_session_key_states_v1_user_key_ver,priority:3"`
	Assets       []ChannelSessionKeyAssetV1 `gorm:"foreignKey:SessionKeyStateID;references:ID"`
	MetadataHash string                     `gorm:"column:metadata_hash;type:char(66);not null"`
	ExpiresAt    time.Time                  `gorm:"column:expires_at;not null"`
	UserSig      string                     `gorm:"column:user_sig;not null"`
	CreatedAt    time.Time
}

ChannelSessionKeyStateV1 represents a channel session key state in the database.

func (ChannelSessionKeyStateV1) TableName

func (ChannelSessionKeyStateV1) TableName() string

type ContractEvent

type ContractEvent struct {
	ID              int64     `gorm:"primary_key;column:id"`
	ContractAddress string    `gorm:"column:contract_address"`
	BlockchainID    uint64    `gorm:"column:blockchain_id"`
	Name            string    `gorm:"column:name"`
	BlockNumber     uint64    `gorm:"column:block_number"`
	TransactionHash string    `gorm:"column:transaction_hash"`
	LogIndex        uint32    `gorm:"column:log_index"`
	CreatedAt       time.Time `gorm:"column:created_at"`
}

func (ContractEvent) TableName

func (ContractEvent) TableName() string

type DBStore

type DBStore struct {
	// contains filtered or unexported fields
}

func (*DBStore) CheckOpenChannel

func (s *DBStore) CheckOpenChannel(wallet, asset string) (string, bool, error)

CheckOpenChannel verifies if a user has an active channel for the given asset and returns the approved signature validators if such a channel exists.

func (*DBStore) Complete

func (s *DBStore) Complete(actionID int64, txHash string) error

func (*DBStore) CountActiveAppSessions

func (s *DBStore) CountActiveAppSessions(window time.Duration) ([]ActiveCountByLabel, error)

CountActiveAppSessions returns app session counts per application within the given window.

func (*DBStore) CountActiveUsers

func (s *DBStore) CountActiveUsers(window time.Duration) ([]ActiveCountByLabel, error)

CountActiveUsers returns distinct user counts per asset and an "all" aggregate for users with channel state updates within the given window.

func (*DBStore) CreateApp

func (s *DBStore) CreateApp(entry app.AppV1) error

CreateApp registers a new application. Returns an error if the app ID already exists.

func (*DBStore) CreateAppSession

func (s *DBStore) CreateAppSession(session app.AppSessionV1) error

CreateAppSession initializes a new application session.

func (*DBStore) CreateChannel

func (s *DBStore) CreateChannel(channel core.Channel) error

CreateChannel creates a new channel entity in the database.

func (*DBStore) EnsureNoOngoingStateTransitions

func (s *DBStore) EnsureNoOngoingStateTransitions(wallet, asset string) error

EnsureNoOngoingStateTransitions validates that no conflicting blockchain operations are pending. This method prevents race conditions by ensuring blockchain state versions match the user's last signed state version before accepting new transitions.

Validation logic by transition type:

  • home_deposit: Verify last_state.version == home_channel.state_version
  • mutual_lock: Verify last_state.version == home_channel.state_version == escrow_channel.state_version AND next transition must be escrow_deposit
  • escrow_lock: Verify last_state.version == escrow_channel.state_version AND next transition must be escrow_withdraw or migrate
  • escrow_withdraw: Verify last_state.version == escrow_channel.state_version
  • migrate: Verify last_state.version == home_channel.state_version

func (*DBStore) ExecuteInTransaction

func (s *DBStore) ExecuteInTransaction(txFunc StoreTxHandler) error

func (*DBStore) Fail

func (s *DBStore) Fail(actionID int64, err string) error

func (*DBStore) FailNoRetry

func (s *DBStore) FailNoRetry(actionID int64, err string) error

func (*DBStore) GetActions

func (s *DBStore) GetActions(limit uint8, blockchainID uint64) ([]BlockchainAction, error)

func (*DBStore) GetActiveHomeChannel

func (s *DBStore) GetActiveHomeChannel(wallet, asset string) (*core.Channel, error)

GetActiveHomeChannel retrieves the active home channel for a user's wallet and asset.

func (*DBStore) GetApp

func (s *DBStore) GetApp(appID string) (*app.AppInfoV1, error)

GetApp retrieves a single application by ID. Returns nil if not found.

func (*DBStore) GetAppCount

func (s *DBStore) GetAppCount(ownerWallet string) (uint64, error)

func (*DBStore) GetAppSession

func (s *DBStore) GetAppSession(sessionID string) (*app.AppSessionV1, error)

GetAppSession retrieves a specific session by ID.

func (*DBStore) GetAppSessionBalances

func (s *DBStore) GetAppSessionBalances(appSessionID string) (map[string]decimal.Decimal, error)

GetAppSessionBalances retrieves the total balances associated with a session.

func (*DBStore) GetAppSessionKeyOwner

func (s *DBStore) GetAppSessionKeyOwner(sessionKey, appSessionId string) (string, error)

GetAppSessionKeyOwner returns the user_address that owns the given session key authorized for the specified app session ID. Only the latest-version, non-expired key with matching permissions is considered. A newer version always supersedes older ones.

func (*DBStore) GetAppSessions

func (s *DBStore) GetAppSessions(appSessionID *string, participant *string, status app.AppSessionStatus, pagination *core.PaginationParams) ([]app.AppSessionV1, core.PaginationMetadata, error)

GetAppSessions retrieves filtered sessions with pagination.

func (*DBStore) GetAppSessionsCountByLabels

func (s *DBStore) GetAppSessionsCountByLabels() ([]AppSessionCount, error)

GetAppSessionsCountByLabels computes app session count deltas since last processed timestamp, upserts them as lifespan metrics, and returns the updated totals.

func (*DBStore) GetApps

func (s *DBStore) GetApps(appID *string, ownerWallet *string, pagination *core.PaginationParams) ([]app.AppInfoV1, core.PaginationMetadata, error)

GetApps retrieves applications with optional filtering by app ID, owner wallet, and pagination.

func (*DBStore) GetChannelByID

func (s *DBStore) GetChannelByID(channelID string) (*core.Channel, error)

GetChannelByID retrieves a channel by its unique identifier.

func (*DBStore) GetChannelsCountByLabels

func (s *DBStore) GetChannelsCountByLabels() ([]ChannelCount, error)

GetChannelsCountByLabels computes channel count deltas since last processed timestamp, upserts them as lifespan metrics, and returns the updated totals.

func (*DBStore) GetLastAppSessionKeyState

func (s *DBStore) GetLastAppSessionKeyState(wallet, sessionKey string) (*app.AppSessionKeyStateV1, error)

GetLastAppSessionKeyState retrieves the latest version of a specific session key for a user. A newer version always supersedes older ones, even if expired. Returns nil if no state exists.

func (*DBStore) GetLastAppSessionKeyStates

func (s *DBStore) GetLastAppSessionKeyStates(wallet string, sessionKey *string) ([]app.AppSessionKeyStateV1, error)

GetLastAppSessionKeyStates retrieves the latest session key states for a user with optional filtering. Returns only the highest-version row per session key that has not expired.

func (*DBStore) GetLastAppSessionKeyVersion

func (s *DBStore) GetLastAppSessionKeyVersion(wallet, sessionKey string) (uint64, error)

GetLastAppSessionKeyVersion returns the latest version of a session key state for a user. Returns 0 if no state exists.

func (*DBStore) GetLastChannelSessionKeyStates

func (s *DBStore) GetLastChannelSessionKeyStates(wallet string, sessionKey *string) ([]core.ChannelSessionKeyStateV1, error)

GetLastChannelSessionKeyStates retrieves the latest channel session key states for a user with optional filtering. Returns only the highest-version row per session key that has not expired.

func (*DBStore) GetLastChannelSessionKeyVersion

func (s *DBStore) GetLastChannelSessionKeyVersion(wallet, sessionKey string) (uint64, error)

GetLastChannelSessionKeyVersion returns the latest version of a channel session key state. Returns 0 if no state exists.

func (*DBStore) GetLastStateByChannelID

func (s *DBStore) GetLastStateByChannelID(channelID string, signed bool) (*core.State, error)

GetLastStateByChannelID retrieves the most recent state for a given channel. Uses UNION ALL of two indexed queries instead of OR for better performance.

func (*DBStore) GetLastUserState

func (s *DBStore) GetLastUserState(wallet, asset string, signed bool) (*core.State, error)

GetLastUserState retrieves the most recent state for a user's asset.

func (*DBStore) GetLatestEvent

func (s *DBStore) GetLatestEvent(contractAddress string, blockchainID uint64) (core.BlockchainEvent, error)

GetLatestEvent returns the latest block number and log index for a given contract. This function matches the signature required by pkg/blockchain/evm.GetLatestEvent.

func (*DBStore) GetLifetimeMetricLastTimestamp

func (s *DBStore) GetLifetimeMetricLastTimestamp(name string) (time.Time, error)

GetLifetimeMetricLastTimestamp returns the most recent last_timestamp among all metrics with the given name.

func (*DBStore) GetParticipantAllocations

func (s *DBStore) GetParticipantAllocations(appSessionID string) (map[string]map[string]decimal.Decimal, error)

GetParticipantAllocations retrieves specific asset allocations per participant. This will only return participants who have non-zero balances.

func (*DBStore) GetStateByChannelIDAndVersion

func (s *DBStore) GetStateByChannelIDAndVersion(channelID string, version uint64) (*core.State, error)

GetStateByChannelIDAndVersion retrieves a specific state version for a channel. Uses UNION ALL of two indexed queries instead of OR for better performance.

func (*DBStore) GetStateByID

func (s *DBStore) GetStateByID(stateID string) (*core.State, error)

GetStateByID retrieves a state by its deterministic ID.

func (*DBStore) GetTotalUserStaked

func (s *DBStore) GetTotalUserStaked(wallet string) (decimal.Decimal, error)

GetTotalUserStaked returns the total staked amount for a user across all blockchains.

func (*DBStore) GetTotalValueLocked

func (s *DBStore) GetTotalValueLocked() ([]TotalValueLocked, error)

func (*DBStore) GetUserActionCount

func (s *DBStore) GetUserActionCount(wallet string, gatedAction core.GatedAction, window time.Duration) (uint64, error)

GetUserActionCount returns the number of actions matching the given wallet and gated action within the specified time window (counting backwards from now).

func (*DBStore) GetUserActionCounts

func (s *DBStore) GetUserActionCounts(userWallet string, window time.Duration) (map[core.GatedAction]uint64, error)

func (*DBStore) GetUserBalances

func (s *DBStore) GetUserBalances(wallet string) ([]core.BalanceEntry, error)

GetUserBalances retrieves the balances for a user's wallet.

func (*DBStore) GetUserChannels

func (s *DBStore) GetUserChannels(wallet string, status *core.ChannelStatus, asset *string, channelType *core.ChannelType, limit, offset uint32) ([]core.Channel, uint32, error)

GetUserChannels retrieves all channels for a user with optional status, asset, and type filters.

func (*DBStore) GetUserTransactions

func (s *DBStore) GetUserTransactions(accountID string, asset *string, txType *core.TransactionType, fromTime *uint64, toTime *uint64, paginate *core.PaginationParams) ([]core.Transaction, core.PaginationMetadata, error)

GetUserTransactions retrieves transaction history for a user with optional filters.

func (*DBStore) LockUserState

func (s *DBStore) LockUserState(wallet, asset string) (decimal.Decimal, error)

LockUserState locks a user's balance row for update (postgres only, must be used within a transaction). Uses INSERT ... ON CONFLICT DO NOTHING to ensure the row exists, then SELECT ... FOR UPDATE to lock it. Returns the current balance or zero if the row was just inserted.

func (*DBStore) RecordAction

func (s *DBStore) RecordAction(wallet string, gatedAction core.GatedAction) error

RecordAction inserts a new action log entry for a user.

func (*DBStore) RecordAttempt

func (s *DBStore) RecordAttempt(actionID int64, err string) error

func (*DBStore) RecordLedgerEntry

func (s *DBStore) RecordLedgerEntry(userWallet, accountID, asset string, amount decimal.Decimal) error

RecordLedgerEntry logs a movement of funds within the internal ledger.

func (*DBStore) RecordTransaction

func (s *DBStore) RecordTransaction(tx core.Transaction) error

RecordTransaction creates a transaction record linking state transitions.

func (*DBStore) ScheduleCheckpoint

func (s *DBStore) ScheduleCheckpoint(stateID string, blockchainID uint64) error

ScheduleCheckpoint queues a blockchain action to checkpoint a state on home blockchain.

func (*DBStore) ScheduleFinalizeEscrowDeposit

func (s *DBStore) ScheduleFinalizeEscrowDeposit(stateID string, blockchainID uint64) error

ScheduleFinalizeEscrowDeposit schedules a finalize for an escrow deposit operation on non-home blockchain.

func (*DBStore) ScheduleFinalizeEscrowWithdrawal

func (s *DBStore) ScheduleFinalizeEscrowWithdrawal(stateID string, blockchainID uint64) error

ScheduleFinalizeEscrowWithdrawal schedules a finalize for an escrow withdrawal operation on non-home blockchain.

func (*DBStore) ScheduleInitiateEscrowDeposit

func (s *DBStore) ScheduleInitiateEscrowDeposit(stateID string, blockchainID uint64) error

ScheduleInitiateEscrowDeposit queues a blockchain action to initiate escrow deposit on home blockchain.

func (*DBStore) ScheduleInitiateEscrowWithdrawal

func (s *DBStore) ScheduleInitiateEscrowWithdrawal(stateID string, blockchainID uint64) error

ScheduleInitiateEscrowWithdrawal queues a blockchain action to initiate withdrawal on non-home blockchain.

func (*DBStore) StoreAppSessionKeyState

func (s *DBStore) StoreAppSessionKeyState(state app.AppSessionKeyStateV1) error

StoreAppSessionKeyState stores a new session key state version.

func (*DBStore) StoreChannelSessionKeyState

func (s *DBStore) StoreChannelSessionKeyState(state core.ChannelSessionKeyStateV1) error

StoreChannelSessionKeyState stores a new channel session key state version.

func (*DBStore) StoreContractEvent

func (s *DBStore) StoreContractEvent(ev core.BlockchainEvent) error

StoreContractEvent stores a blockchain event to the database. This function matches the signature required by pkg/blockchain/evm.StoreContractEvent.

func (*DBStore) StoreUserState

func (s *DBStore) StoreUserState(state core.State) error

StoreUserState persists a new user state to the database.

func (*DBStore) UpdateAppSession

func (s *DBStore) UpdateAppSession(session app.AppSessionV1) error

UpdateAppSession updates existing session data with optimistic locking.

func (*DBStore) UpdateChannel

func (s *DBStore) UpdateChannel(channel core.Channel) error

UpdateChannel persists changes to a channel's metadata (status, version, etc).

func (*DBStore) UpdateUserStaked

func (s *DBStore) UpdateUserStaked(wallet string, blockchainID uint64, amount decimal.Decimal) error

UpdateUserStaked upserts the staked amount for a user on a specific blockchain.

func (*DBStore) ValidateChannelSessionKeyForAsset

func (s *DBStore) ValidateChannelSessionKeyForAsset(wallet, sessionKey, asset, metadataHash string) (bool, error)

ValidateChannelSessionKeyForAsset checks in a single query that: - a session key state exists for the (wallet, sessionKey) pair, - it is the latest version, - it is not expired, - the asset is in the allowed list, - the metadata hash matches.

type DatabaseConfig

type DatabaseConfig struct {
	URL      string `env:"CLEARNODE_DATABASE_URL" env-default:""`
	Name     string `env:"CLEARNODE_DATABASE_NAME" env-default:""`
	Schema   string `env:"CLEARNODE_DATABASE_SCHEMA" env-default:""`
	Driver   string `env:"CLEARNODE_DATABASE_DRIVER" env-default:"postgres"`
	Username string `env:"CLEARNODE_DATABASE_USERNAME"  env-default:"postgres"`
	Password string `env:"CLEARNODE_DATABASE_PASSWORD" env-default:"your-super-secret-and-long-postgres-password"`
	Host     string `env:"CLEARNODE_DATABASE_HOST" env-default:"localhost"`
	Port     string `env:"CLEARNODE_DATABASE_PORT" env-default:"5432"`
	Retries  int    `env:"CLEARNODE_DATABASE_RETRIES" env-default:"5"`

	// Connection pool settings
	MaxOpenConns    int `env:"CLEARNODE_DATABASE_MAX_OPEN_CONNS" env-default:"100"`
	MaxIdleConns    int `env:"CLEARNODE_DATABASE_MAX_IDLE_CONNS" env-default:"25"`
	ConnMaxLifetime int `env:"CLEARNODE_DATABASE_CONN_MAX_LIFETIME_SEC" env-default:"300"`
	ConnMaxIdleTime int `env:"CLEARNODE_DATABASE_CONN_MAX_IDLE_TIME_SEC" env-default:"60"`
}

In order to connect to Postgresql you need to fill out all the fields.

To connect to sqlite, you just need to specify "sqlite" driver. By default it will use in-memory database. You can provide CLEARNODE_DATABASE_NAME to use the file.

type DatabaseStore

type DatabaseStore interface {
	// ExecuteInTransaction runs the provided handler within a database transaction.
	// If the handler returns an error, the transaction is rolled back.
	// If the handler completes successfully, the transaction is committed.
	ExecuteInTransaction(handler StoreTxHandler) error

	// GetUserBalances retrieves the balances for a user's wallet.
	GetUserBalances(wallet string) ([]core.BalanceEntry, error)

	// LockUserState locks a user's balance row for update (postgres only, must be used within a transaction).
	// Uses INSERT ... ON CONFLICT DO NOTHING to ensure the row exists, then SELECT ... FOR UPDATE to lock it.
	// Returns the current balance or zero if the row was just inserted.
	LockUserState(wallet, asset string) (decimal.Decimal, error)

	// GetUserTransactions retrieves transaction history for a user with optional filters.
	GetUserTransactions(wallet string, asset *string, txType *core.TransactionType, fromTime *uint64, toTime *uint64, paginate *core.PaginationParams) ([]core.Transaction, core.PaginationMetadata, error)

	// RecordTransaction creates a transaction record linking state transitions.
	RecordTransaction(tx core.Transaction) error

	// CreateChannel creates a new channel entity in the database.
	CreateChannel(channel core.Channel) error

	// GetChannelByID retrieves a channel by its unique identifier.
	GetChannelByID(channelID string) (*core.Channel, error)

	// GetActiveHomeChannel retrieves the active home channel for a user's wallet and asset.
	GetActiveHomeChannel(wallet, asset string) (*core.Channel, error)

	// CheckOpenChannel verifies if a user has an active channel for the given asset
	// and returns the approved signature validators if such a channel exists.
	CheckOpenChannel(wallet, asset string) (string, bool, error)

	// UpdateChannel persists changes to a channel's metadata (status, version, etc).
	UpdateChannel(channel core.Channel) error

	// GetUserChannels retrieves all channels for a user with optional status, asset, and type filters.
	GetUserChannels(wallet string, status *core.ChannelStatus, asset *string, channelType *core.ChannelType, limit, offset uint32) ([]core.Channel, uint32, error)

	// GetLastStateByChannelID retrieves the most recent state for a given channel.
	// If signed is true, only returns states with both user and node signatures.
	GetLastStateByChannelID(channelID string, signed bool) (*core.State, error)

	// GetStateByChannelIDAndVersion retrieves a specific state version for a channel.
	// Returns nil if the state with the specified version does not exist.
	GetStateByChannelIDAndVersion(channelID string, version uint64) (*core.State, error)

	// GetLastUserState retrieves the most recent state for a user's asset.
	GetLastUserState(wallet, asset string, signed bool) (*core.State, error)

	// StoreUserState persists a new user state to the database.
	StoreUserState(state core.State) error

	// EnsureNoOngoingStateTransitions validates that no conflicting blockchain operations are pending.
	EnsureNoOngoingStateTransitions(wallet, asset string) error

	// ScheduleInitiateEscrowWithdrawal queues a blockchain action to initiate withdrawal.
	// This queues the state to be submitted on-chain to initiate an escrow withdrawal.
	ScheduleInitiateEscrowWithdrawal(stateID string, chainID uint64) error

	// ScheduleCheckpoint schedules a checkpoint operation for a home channel state.
	// This queues the state to be submitted on-chain to update the channel's on-chain state.
	ScheduleCheckpoint(stateID string, chainID uint64) error

	// ScheduleFinalizeEscrowDeposit schedules a checkpoint for an escrow deposit operation.
	// This queues the state to be submitted on-chain to finalize an escrow deposit.
	ScheduleFinalizeEscrowDeposit(stateID string, chainID uint64) error

	// ScheduleFinalizeEscrowWithdrawal schedules a checkpoint for an escrow withdrawal operation.
	// This queues the state to be submitted on-chain to finalize an escrow withdrawal.
	ScheduleFinalizeEscrowWithdrawal(stateID string, chainID uint64) error

	// ScheduleInitiateEscrowDeposit schedules a checkpoint for an escrow deposit operation.
	// This queues the state to be submitted on-chain for an escrow deposit on home chain.
	ScheduleInitiateEscrowDeposit(stateID string, chainID uint64) error

	// Fail marks a blockchain action as failed and increments the retry counter.
	Fail(actionID int64, err string) error

	// FailNoRetry marks a blockchain action as failed without incrementing the retry counter.
	FailNoRetry(actionID int64, err string) error

	// RecordAttempt records a failed attempt for a blockchain action and increments the retry counter.
	// The action remains in pending status.
	RecordAttempt(actionID int64, err string) error

	// Complete marks a blockchain action as completed with the given transaction hash.
	Complete(actionID int64, txHash string) error

	// GetActions retrieves pending blockchain actions, optionally limited by count.
	GetActions(limit uint8, chainID uint64) ([]BlockchainAction, error)

	// GetStateByID retrieves a state by its deterministic ID.
	GetStateByID(stateID string) (*core.State, error)

	// CreateApp registers a new application. Returns an error if the app ID already exists.
	CreateApp(entry app.AppV1) error

	// GetApp retrieves a single application by ID. Returns nil if not found.
	GetApp(appID string) (*app.AppInfoV1, error)

	// GetApps retrieves applications with optional filtering by app ID, owner wallet, and pagination.
	GetApps(appID *string, ownerWallet *string, pagination *core.PaginationParams) ([]app.AppInfoV1, core.PaginationMetadata, error)

	// GetAppCount returns the total number of applications owned by a specific wallet.
	GetAppCount(ownerWallet string) (uint64, error)

	// CreateAppSession initializes a new application session.
	CreateAppSession(session app.AppSessionV1) error

	// GetAppSession retrieves a specific session by ID.
	GetAppSession(sessionID string) (*app.AppSessionV1, error)

	// GetAppSessions retrieves filtered sessions with pagination.
	GetAppSessions(appSessionID *string, participant *string, status app.AppSessionStatus, pagination *core.PaginationParams) ([]app.AppSessionV1, core.PaginationMetadata, error)

	// UpdateAppSession updates existing session data.
	UpdateAppSession(session app.AppSessionV1) error

	// GetAppSessionBalances retrieves the total balances associated with a session.
	GetAppSessionBalances(sessionID string) (map[string]decimal.Decimal, error)

	// GetParticipantAllocations retrieves specific asset allocations per participant.
	GetParticipantAllocations(sessionID string) (map[string]map[string]decimal.Decimal, error)

	// RecordLedgerEntry logs a movement of funds within the internal ledger.
	RecordLedgerEntry(userWallet, accountID, asset string, amount decimal.Decimal) error

	// StoreAppSessionKeyState stores or updates a session key state.
	StoreAppSessionKeyState(state app.AppSessionKeyStateV1) error

	GetAppSessionKeyOwner(sessionKey, appSessionId string) (string, error)

	// SessionKeyStateExists returns the latest version of a non-expired session key state for a user.
	// Returns 0 if no state exists.
	GetLastAppSessionKeyVersion(wallet, sessionKey string) (uint64, error)

	// GetLatestSessionKeyState retrieves the latest version of a specific session key for a user.
	// Returns nil if no state exists.
	GetLastAppSessionKeyState(wallet, sessionKey string) (*app.AppSessionKeyStateV1, error)

	// GetLastKeyStates retrieves the latest session key states for a user with optional filtering.
	GetLastAppSessionKeyStates(wallet string, sessionKey *string) ([]app.AppSessionKeyStateV1, error)

	// StoreChannelSessionKeyState stores or updates a channel session key state.
	StoreChannelSessionKeyState(state core.ChannelSessionKeyStateV1) error

	// GetLastChannelSessionKeyVersion returns the latest version for a (wallet, sessionKey) pair.
	// Returns 0 if no state exists.
	GetLastChannelSessionKeyVersion(wallet, sessionKey string) (uint64, error)

	// GetLastChannelSessionKeyStates retrieves the latest channel session key states for a user,
	// optionally filtered by session key.
	GetLastChannelSessionKeyStates(wallet string, sessionKey *string) ([]core.ChannelSessionKeyStateV1, error)

	// ValidateChannelSessionKeyForAsset checks that a valid, non-expired session key state
	// exists at its latest version for the (wallet, sessionKey) pair, includes the given asset,
	// and matches the metadata hash.
	ValidateChannelSessionKeyForAsset(wallet, sessionKey, asset, metadataHash string) (bool, error)

	// CountActiveUsers returns distinct user counts per asset within the given window.
	CountActiveUsers(window time.Duration) ([]ActiveCountByLabel, error)

	// CountActiveAppSessions returns app session counts per application within the given window.
	CountActiveAppSessions(window time.Duration) ([]ActiveCountByLabel, error)

	// GetLifetimeMetricLastTimestamp returns the most recent last_timestamp among all metrics with the given name.
	GetLifetimeMetricLastTimestamp(name string) (time.Time, error)

	// GetAppSessionsCountByLabels computes app session count deltas, upserts as lifespan metrics, and returns updated totals.
	GetAppSessionsCountByLabels() ([]AppSessionCount, error)

	// GetChannelsCountByLabels computes channel count deltas, upserts as lifespan metrics, and returns updated totals.
	GetChannelsCountByLabels() ([]ChannelCount, error)

	// GetTotalValueLocked computes TVL deltas by domain (channels, app_sessions) and asset, upserts as lifespan metrics, and returns updated totals.
	GetTotalValueLocked() ([]TotalValueLocked, error)

	// UpdateUserStaked upserts the staked amount for a user on a specific blockchain.
	UpdateUserStaked(wallet string, blockchainID uint64, amount decimal.Decimal) error

	// GetTotalUserStaked returns the total staked amount for a user across all blockchains.
	GetTotalUserStaked(wallet string) (decimal.Decimal, error)

	// RecordAction inserts a new action log entry for a user.
	RecordAction(wallet string, gatedAction core.GatedAction) error

	// GetUserActionCount returns the number of actions matching the given wallet, method, and path
	// within the specified time window.
	GetUserActionCount(wallet string, gatedAction core.GatedAction, window time.Duration) (uint64, error)

	// GetUserActionCounts returns a map of gated actions to their respective counts for a user within the specified time window.
	GetUserActionCounts(userWallet string, window time.Duration) (map[core.GatedAction]uint64, error)

	// StoreContractEvent stores a blockchain event to prevent duplicate processing.
	StoreContractEvent(ev core.BlockchainEvent) error

	// GetLatestEvent returns the latest block number and log index for a given contract.
	GetLatestEvent(contractAddress string, blockchainID uint64) (core.BlockchainEvent, error)
}

DatabaseStore defines the unified persistence layer.

func NewDBStore

func NewDBStore(db *gorm.DB) DatabaseStore

type LifespanMetric

type LifespanMetric struct {
	ID            string          `gorm:"column:id;primaryKey;size:66"`
	Name          string          `gorm:"column:name;not null"`
	Labels        datatypes.JSON  `gorm:"column:labels;type:text"`
	Value         decimal.Decimal `gorm:"column:value;type:varchar(78);not null"`
	LastTimestamp time.Time       `gorm:"column:last_timestamp;not null"`
	UpdatedAt     time.Time
}

func (LifespanMetric) TableName

func (LifespanMetric) TableName() string

type ListOptions

type ListOptions struct {
	Offset uint32    `json:"offset,omitempty"`
	Limit  uint32    `json:"limit,omitempty"`
	Sort   *SortType `json:"sort,omitempty"` // Optional sort type (asc/desc)
}

type SortType

type SortType string
const (
	SortTypeAscending  SortType = "asc"
	SortTypeDescending SortType = "desc"
)

func (SortType) ToString

func (s SortType) ToString() string

type State

type State struct {
	// ID is a 64-character deterministic hash
	ID         string `gorm:"column:id;primaryKey;size:64"`
	Asset      string `gorm:"column:asset;not null"`
	UserWallet string `gorm:"column:user_wallet;not null"`
	Epoch      uint64 `gorm:"column:epoch;not null"`
	Version    uint64 `gorm:"column:version;not null"`

	// Transition
	TransitionType      uint8           `gorm:"column:transition_type;not null"`
	TransitionTxID      string          `gorm:"column:transition_tx_id;size:66;not null"`
	TransitionAccountID string          `gorm:"column:transition_account_id;size:66;not null"`
	TransitionAmount    decimal.Decimal `gorm:"column:transition_amount;type:varchar(78);not null"`

	// Optional channel references
	HomeChannelID   *string `gorm:"column:home_channel_id"`
	EscrowChannelID *string `gorm:"column:escrow_channel_id"`

	// Home Channel balances and flows
	// Using decimal.Decimal for int256 values and int64 for flow values
	HomeUserBalance decimal.Decimal `gorm:"column:home_user_balance;type:varchar(78)"`
	HomeUserNetFlow decimal.Decimal `gorm:"column:home_user_net_flow;default:0"`
	HomeNodeBalance decimal.Decimal `gorm:"column:home_node_balance;type:varchar(78)"`
	HomeNodeNetFlow decimal.Decimal `gorm:"column:home_node_net_flow;default:0"`

	// Escrow Channel balances and flows
	EscrowUserBalance decimal.Decimal `gorm:"column:escrow_user_balance;type:varchar(78)"`
	EscrowUserNetFlow decimal.Decimal `gorm:"column:escrow_user_net_flow;default:0"`
	EscrowNodeBalance decimal.Decimal `gorm:"column:escrow_node_balance;type:varchar(78)"`
	EscrowNodeNetFlow decimal.Decimal `gorm:"column:escrow_node_net_flow;default:0"`

	UserSig *string `gorm:"column:user_sig;type:text"`
	NodeSig *string `gorm:"column:node_sig;type:text"`

	// Read-only fields populated from JOINs with channels table
	HomeBlockchainID   *uint64 `gorm:"->;column:home_blockchain_id"`
	HomeTokenAddress   *string `gorm:"->;column:home_token_address"`
	EscrowBlockchainID *uint64 `gorm:"->;column:escrow_blockchain_id"`
	EscrowTokenAddress *string `gorm:"->;column:escrow_token_address"`

	CreatedAt time.Time
}

State represents an immutable state in the system ID is deterministic: Hash(UserWallet, Asset, CycleIndex, Version)

func (State) TableName

func (State) TableName() string

TableName specifies the table name for the State model

type StoreTxHandler

type StoreTxHandler func(DatabaseStore) error

StoreTxHandler is a function that executes Store operations within a transaction.

type TotalValueLocked

type TotalValueLocked struct {
	Asset       string          `gorm:"column:asset"`
	Domain      string          `gorm:"column:domain"`
	Value       decimal.Decimal `gorm:"column:value"`
	LastUpdated time.Time       `gorm:"column:last_updated"`
}

TotalValueLocked holds the total value locked for a given asset, along with the last update timestamp.

type Transaction

type Transaction struct {
	// ID is a 64-character deterministic hash
	ID                 string               `gorm:"column:id;primaryKey;size:64"`
	Type               core.TransactionType `gorm:"column:tx_type;not null;index:idx_type;index:idx_from_to_account"`
	AssetSymbol        string               `gorm:"column:asset_symbol;not null"`
	FromAccount        string               `gorm:"column:from_account;not null;index:idx_from_account;index:idx_from_to_account"`
	ToAccount          string               `gorm:"column:to_account;not null;index:idx_to_account;index:idx_from_to_account"`
	SenderNewStateID   *string              `gorm:"column:sender_new_state_id;size:64"`
	ReceiverNewStateID *string              `gorm:"column:receiver_new_state_id;size:64"`
	Amount             decimal.Decimal      `gorm:"column:amount;type:decimal(38,18);not null"`
	CreatedAt          time.Time
}

Transaction represents an immutable transaction in the system ID is deterministic based on transaction initiation: 1) Initiated by User: Hash(ToAccount, SenderNewStateID) 2) Initiated by Node: Hash(FromAccount, ReceiverNewStateID)

func (Transaction) TableName

func (Transaction) TableName() string

type UserBalance

type UserBalance struct {
	UserWallet string          `gorm:"column:user_wallet;primaryKey;size:42"`
	Asset      string          `gorm:"column:asset;primaryKey;size:20"`
	Balance    decimal.Decimal `gorm:"column:balance;type:varchar(78);not null"`
	CreatedAt  time.Time       `gorm:"column:created_at"`
	UpdatedAt  time.Time       `gorm:"column:updated_at"`
}

UserBalance represents aggregated user balance for an asset

func (UserBalance) TableName

func (UserBalance) TableName() string

TableName specifies the table name for the UserBalance model

type UserStakedV1

type UserStakedV1 struct {
	UserWallet   string          `gorm:"column:user_wallet;primaryKey;not null"`
	BlockchainID uint64          `gorm:"column:blockchain_id;primaryKey;not null"`
	Amount       decimal.Decimal `gorm:"column:amount;type:varchar(78);not null"`
	CreatedAt    time.Time
	UpdatedAt    time.Time
}

func (UserStakedV1) TableName

func (UserStakedV1) TableName() string

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL