Documentation
¶
Index ¶
- Variables
- func EnsureStateRepository(ctx context.Context, conn Connection) error
- func MakeMigrator(conn Connection, namespace string, steps migration.Steps[Connection]) migration.Migrator[Connection]
- type Connection
- type Locker
- type LockerFactory
- type MetaAccessor
- type Queue
- func (q Queue[Entity, JSONDTO]) Migrate(ctx context.Context) error
- func (q Queue[Entity, JSONDTO]) Publish(ctx context.Context, vs ...Entity) error
- func (q Queue[Entity, JSONDTO]) Purge(ctx context.Context) error
- func (q Queue[Entity, JSONDTO]) Subscribe(ctx context.Context) pubsub.Subscription[Entity]
- type QueueMapper
- type Repository
- func (r Repository[ENT, ID]) BeginTx(ctx context.Context) (context.Context, error)
- func (r Repository[ENT, ID]) CommitTx(ctx context.Context) error
- func (r Repository[ENT, ID]) Create(ctx context.Context, ptr *ENT) (rErr error)
- func (r Repository[ENT, ID]) DeleteAll(ctx context.Context) (rErr error)
- func (r Repository[ENT, ID]) DeleteByID(ctx context.Context, id ID) (rErr error)
- func (r Repository[ENT, ID]) FindAll(ctx context.Context) iterkit.ErrSeq[ENT]
- func (r Repository[ENT, ID]) FindByID(ctx context.Context, id ID) (ENT, bool, error)
- func (r Repository[ENT, ID]) FindByIDs(ctx context.Context, ids ...ID) iterkit.SeqE[ENT]
- func (r Repository[ENT, ID]) RollbackTx(ctx context.Context) error
- func (r Repository[ENT, ID]) Save(ctx context.Context, ptr *ENT) (rErr error)
- func (r Repository[ENT, ID]) Update(ctx context.Context, ptr *ENT) (rErr error)
- func (r Repository[ENT, ID]) Upsert(ctx context.Context, ptrs ...*ENT) (rErr error)deprecated
- type T
- type TaskerSchedulerLocks
- type TaskerSchedulerStateRepository
- func (r TaskerSchedulerStateRepository) Create(ctx context.Context, ptr *tasker.ScheduleState) error
- func (r TaskerSchedulerStateRepository) DeleteByID(ctx context.Context, id tasker.ScheduleID) error
- func (r TaskerSchedulerStateRepository) FindByID(ctx context.Context, id tasker.ScheduleID) (ent tasker.ScheduleState, found bool, err error)
- func (r TaskerSchedulerStateRepository) Migrate(ctx context.Context) error
- func (r TaskerSchedulerStateRepository) Update(ctx context.Context, ptr *tasker.ScheduleState) error
Examples ¶
Constants ¶
This section is empty.
Variables ¶
View Source
var ContextTxOptions contextkit.ValueHandler[ctxKeyTxOptions, pgx.TxOptions]
Functions ¶
func EnsureStateRepository ¶
func EnsureStateRepository(ctx context.Context, conn Connection) error
func MakeMigrator ¶
func MakeMigrator(conn Connection, namespace string, steps migration.Steps[Connection]) migration.Migrator[Connection]
Types ¶
type Connection ¶
type Connection struct { flsql.ConnectionAdapter[pgxpool.Pool, pgx.Tx] }
func Connect ¶
func Connect(dsn string) (Connection, error)
Example ¶
package main import ( "context" "go.llib.dev/frameless/adapter/postgresql" ) func main() { c, err := postgresql.Connect(`dsn`) if err != nil { panic(err) } defer c.Close() _, err = c.ExecContext(context.Background(), `SELECT VERSION()`) if err != nil { panic(err) } }
type Locker ¶
type Locker struct { Name string Connection Connection }
Locker is a PG-based shared mutex implementation. It depends on the existence of the frameless_locker_locks table. Locker is safe to call from different application instances, ensuring that only one of them can hold the lock concurrently.
Example ¶
package main import ( "context" "os" "go.llib.dev/frameless/adapter/postgresql" ) func main() { cm, err := postgresql.Connect(os.Getenv("DATABASE_URL")) if err != nil { panic(err) } l := postgresql.Locker{ Name: "my-lock", Connection: cm, } ctx, err := l.Lock(context.Background()) if err != nil { panic(err) } if err := l.Unlock(ctx); err != nil { panic(err) } }
type LockerFactory ¶
type LockerFactory[Key comparable] struct{ Connection Connection }
Example ¶
package main import ( "context" "log" "os" "go.llib.dev/frameless/adapter/postgresql" ) func main() { cm, err := postgresql.Connect(os.Getenv("DATABASE_URL")) if err != nil { log.Fatal(err) } lockerFactory := postgresql.LockerFactory[string]{Connection: cm} if err := lockerFactory.Migrate(context.Background()); err != nil { log.Fatal(err) } locker := lockerFactory.LockerFor("hello world") ctx, err := locker.Lock(context.Background()) if err != nil { log.Fatal(err) } if err := locker.Unlock(ctx); err != nil { log.Fatal(err) } }
func (LockerFactory[Key]) LockerFor ¶
func (lf LockerFactory[Key]) LockerFor(key Key) guard.Locker
func (LockerFactory[Key]) NonBlockingLockerFor ¶
func (lf LockerFactory[Key]) NonBlockingLockerFor(key Key) guard.NonBlockingLocker
type Queue ¶
type Queue[Entity, JSONDTO any] struct { Name string Connection Connection Mapping dtokit.MapperTo[Entity, JSONDTO] // EmptyQueueBreakTime is the time.Duration that the queue waits when the queue is empty for the given queue Name. EmptyQueueBreakTime time.Duration // Blocking flag will cause the Queue.Publish method to wait until the message is processed. Blocking bool // LIFO flag will set the queue to use a Last in First out ordering LIFO bool }
Example ¶
cm, err := postgresql.Connect(os.Getenv("DATABASE_URL")) if err != nil { panic(err) } defer cm.Close() q := postgresql.Queue[Entity, EntityDTO]{ Name: "queue_name", Connection: cm, Mapping: EntityJSONMapping{}, } ctx := context.Background() ent := Entity{Foo: "foo"} err = q.Publish(ctx, ent) if err != nil { panic(err) } for msg, err := range q.Subscribe(ctx) { if err != nil { break } fmt.Println(msg.Data()) _ = msg.ACK() }
type QueueMapper ¶
type Repository ¶
type Repository[ENT, ID any] struct { Connection Connection Mapping flsql.Mapping[ENT, ID] }
Repository is a frameless external resource supplier to store a certain entity type. The Repository supplier itself is a stateless entity.
SRP: DBA
Example ¶
package main import ( "context" "os" "go.llib.dev/frameless/adapter/postgresql" "go.llib.dev/frameless/pkg/flsql" ) func main() { type Entity struct { ID int `ext:"ID"` Value string } mapping := flsql.Mapping[Entity, int]{ TableName: "entities", QueryID: func(id int) (flsql.QueryArgs, error) { return flsql.QueryArgs{"id": id}, nil }, ToArgs: func(e Entity) (flsql.QueryArgs, error) { return flsql.QueryArgs{ `id`: e.ID, `value`: e.Value, }, nil }, ToQuery: func(ctx context.Context) ([]flsql.ColumnName, flsql.MapScan[Entity]) { return []flsql.ColumnName{"id", "value"}, func(v *Entity, s flsql.Scanner) error { return s.Scan(&v.ID, &v.Value) } }, ID: func(e *Entity) *int { return &e.ID }, } cm, err := postgresql.Connect(os.Getenv("DATABASE_URL")) if err != nil { panic(err) } defer cm.Close() repo := postgresql.Repository[Entity, int]{ Connection: cm, Mapping: mapping, } _ = repo }
func NewMigrationStateRepository ¶
func NewMigrationStateRepository(conn Connection) Repository[migration.State, migration.StateID]
func (Repository[ENT, ID]) CommitTx ¶
func (r Repository[ENT, ID]) CommitTx(ctx context.Context) error
func (Repository[ENT, ID]) Create ¶
func (r Repository[ENT, ID]) Create(ctx context.Context, ptr *ENT) (rErr error)
func (Repository[ENT, ID]) DeleteAll ¶
func (r Repository[ENT, ID]) DeleteAll(ctx context.Context) (rErr error)
func (Repository[ENT, ID]) DeleteByID ¶
func (r Repository[ENT, ID]) DeleteByID(ctx context.Context, id ID) (rErr error)
func (Repository[ENT, ID]) FindAll ¶
func (r Repository[ENT, ID]) FindAll(ctx context.Context) iterkit.ErrSeq[ENT]
func (Repository[ENT, ID]) FindByID ¶
func (r Repository[ENT, ID]) FindByID(ctx context.Context, id ID) (ENT, bool, error)
func (Repository[ENT, ID]) FindByIDs ¶
func (r Repository[ENT, ID]) FindByIDs(ctx context.Context, ids ...ID) iterkit.SeqE[ENT]
func (Repository[ENT, ID]) RollbackTx ¶
func (r Repository[ENT, ID]) RollbackTx(ctx context.Context) error
func (Repository[ENT, ID]) Save ¶
func (r Repository[ENT, ID]) Save(ctx context.Context, ptr *ENT) (rErr error)
func (Repository[ENT, ID]) Update ¶
func (r Repository[ENT, ID]) Update(ctx context.Context, ptr *ENT) (rErr error)
func (Repository[ENT, ID]) Upsert
deprecated
func (r Repository[ENT, ID]) Upsert(ctx context.Context, ptrs ...*ENT) (rErr error)
Upsert
Deprecated: use Repository.Save instead
type TaskerSchedulerLocks ¶
type TaskerSchedulerLocks struct{ Connection Connection }
func (TaskerSchedulerLocks) LockerFor ¶
func (lf TaskerSchedulerLocks) LockerFor(id tasker.ScheduleID) guard.Locker
func (TaskerSchedulerLocks) NonBlockingLockerFor ¶
func (lf TaskerSchedulerLocks) NonBlockingLockerFor(id tasker.ScheduleID) guard.NonBlockingLocker
type TaskerSchedulerStateRepository ¶
type TaskerSchedulerStateRepository struct{ Connection Connection }
Example ¶
package main import ( "context" "os" "time" "go.llib.dev/frameless/adapter/postgresql" "go.llib.dev/frameless/pkg/tasker" ) func main() { c, err := postgresql.Connect(os.Getenv("DATABASE_URL")) if err != nil { panic(err.Error()) } s := tasker.Scheduler{ Locks: postgresql.TaskerSchedulerLocks{Connection: c}, States: postgresql.TaskerSchedulerStateRepository{Connection: c}, } maintenance := s.WithSchedule("maintenance", tasker.Monthly{Day: 1, Hour: 12, Location: time.UTC}, func(ctx context.Context) error { // The monthly maintenance task return nil }) // form your main func _ = tasker.Main(context.Background(), maintenance) }
func (TaskerSchedulerStateRepository) Create ¶
func (r TaskerSchedulerStateRepository) Create(ctx context.Context, ptr *tasker.ScheduleState) error
func (TaskerSchedulerStateRepository) DeleteByID ¶
func (r TaskerSchedulerStateRepository) DeleteByID(ctx context.Context, id tasker.ScheduleID) error
func (TaskerSchedulerStateRepository) FindByID ¶
func (r TaskerSchedulerStateRepository) FindByID(ctx context.Context, id tasker.ScheduleID) (ent tasker.ScheduleState, found bool, err error)
func (TaskerSchedulerStateRepository) Migrate ¶
func (r TaskerSchedulerStateRepository) Migrate(ctx context.Context) error
func (TaskerSchedulerStateRepository) Update ¶
func (r TaskerSchedulerStateRepository) Update(ctx context.Context, ptr *tasker.ScheduleState) error
Source Files
¶
Click to show internal directories.
Click to hide internal directories.