postgresql

package module
v0.9.1 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Jul 3, 2025 License: Apache-2.0 Imports: 33 Imported by: 0

README

Postgresql adapters for frameless

Welcome to the PostgreSQL adapters for frameless!

This package provides a set of adapters that allow you to use PostgreSQL database in your application through the frameless ports.

Features

  • Repository implementation for CRUD operations (Create, Read, Update, Delete)
  • Shared Locker implementation for locking across application instances
  • Message queueing system with publish/subscribe functionality
  • Support for transactional queries using the postgresql.Connection

Example Usage

Here are some brief examples of how to use this package:

repo := postgresql.Repository[domain.Ent, domain.EntID]{...}

// Create an entity in the repository
err := repo.Create(ctx, &ent)

// Find an entity by ID in the repository
ent, found, err := repo.FindByID(ctx, id)

// Update an entity in the repository
err := repo.Update(ctx, &ent)

// Delete an entity from the repository
err := repo.DeleteByID(ctx, id)

// Publish a message to a queue
err := queue.Publish(ctx, msg)

// Subscribe to a queue and receive messages
it, err := queue.Subscribe(ctx)
for it.Next() {
    msg := it.Value()
    // process message
}

Getting Started

To get started with this package, simply import it into your Go project:

import "go.llib.dev/frameless/adapter/postgresql"

Take a look at the documentation for more information on how to use each feature.

We hope you find this package useful! If you have any questions or issues, please don't hesitate to reach out.

Tasker Integration

This package also provides an implementation for the frameless/pkg/tasker package, allowing you to store and manage scheduled tasks in a PostgreSQL database.

Documentation

Index

Examples

Constants

This section is empty.

Variables

View Source
var ContextTxOptions contextkit.ValueHandler[ctxKeyTxOptions, pgx.TxOptions]

Functions

func EnsureStateRepository

func EnsureStateRepository(ctx context.Context, conn Connection) error

func MakeMigrator

func MakeMigrator(conn Connection, namespace string, steps migration.Steps[Connection]) migration.Migrator[Connection]

Types

type Connection

type Connection struct {
	flsql.ConnectionAdapter[pgxpool.Pool, pgx.Tx]
}

func Connect

func Connect(dsn string) (Connection, error)
Example
package main

import (
	"context"

	"go.llib.dev/frameless/adapter/postgresql"
)

func main() {
	c, err := postgresql.Connect(`dsn`)
	if err != nil {
		panic(err)
	}
	defer c.Close()

	_, err = c.ExecContext(context.Background(), `SELECT VERSION()`)
	if err != nil {
		panic(err)
	}
}

type Locker

type Locker struct {
	Name       string
	Connection Connection
}

Locker is a PG-based shared mutex implementation. It depends on the existence of the frameless_locker_locks table. Locker is safe to call from different application instances, ensuring that only one of them can hold the lock concurrently.

Example
package main

import (
	"context"
	"os"

	"go.llib.dev/frameless/adapter/postgresql"
)

func main() {
	cm, err := postgresql.Connect(os.Getenv("DATABASE_URL"))
	if err != nil {
		panic(err)
	}

	l := postgresql.Locker{
		Name:       "my-lock",
		Connection: cm,
	}

	ctx, err := l.Lock(context.Background())
	if err != nil {
		panic(err)
	}

	if err := l.Unlock(ctx); err != nil {
		panic(err)
	}
}

func (Locker) Lock

func (l Locker) Lock(ctx context.Context) (context.Context, error)

func (Locker) Migrate

func (l Locker) Migrate(ctx context.Context) error

func (Locker) TryLock

func (l Locker) TryLock(ctx context.Context) (_ context.Context, _ bool, rerr error)

func (Locker) Unlock

func (l Locker) Unlock(ctx context.Context) error

type LockerFactory

type LockerFactory[Key comparable] struct{ Connection Connection }
Example
package main

import (
	"context"
	"log"
	"os"

	"go.llib.dev/frameless/adapter/postgresql"
)

func main() {
	cm, err := postgresql.Connect(os.Getenv("DATABASE_URL"))
	if err != nil {
		log.Fatal(err)
	}

	lockerFactory := postgresql.LockerFactory[string]{Connection: cm}
	if err := lockerFactory.Migrate(context.Background()); err != nil {
		log.Fatal(err)
	}

	locker := lockerFactory.LockerFor("hello world")

	ctx, err := locker.Lock(context.Background())
	if err != nil {
		log.Fatal(err)
	}

	if err := locker.Unlock(ctx); err != nil {
		log.Fatal(err)
	}
}

func (LockerFactory[Key]) LockerFor

func (lf LockerFactory[Key]) LockerFor(key Key) guard.Locker

func (LockerFactory[Key]) Migrate

func (lf LockerFactory[Key]) Migrate(ctx context.Context) error

func (LockerFactory[Key]) NonBlockingLockerFor

func (lf LockerFactory[Key]) NonBlockingLockerFor(key Key) guard.NonBlockingLocker

type MetaAccessor

type MetaAccessor struct{}

func (MetaAccessor) LookupMeta

func (ma MetaAccessor) LookupMeta(ctx context.Context, key string, ptr interface{}) (_found bool, _err error)

func (MetaAccessor) SetMeta

func (ma MetaAccessor) SetMeta(ctx context.Context, key string, value interface{}) (context.Context, error)

type Queue

type Queue[Entity, JSONDTO any] struct {
	Name       string
	Connection Connection
	Mapping    dtokit.MapperTo[Entity, JSONDTO]

	// EmptyQueueBreakTime is the time.Duration that the queue waits when the queue is empty for the given queue Name.
	EmptyQueueBreakTime time.Duration
	// Blocking flag will cause the Queue.Publish method to wait until the message is processed.
	Blocking bool

	// LIFO flag will set the queue to use a Last in First out ordering
	LIFO bool
}
Example
cm, err := postgresql.Connect(os.Getenv("DATABASE_URL"))
if err != nil {
	panic(err)
}
defer cm.Close()

q := postgresql.Queue[Entity, EntityDTO]{
	Name:       "queue_name",
	Connection: cm,
	Mapping:    EntityJSONMapping{},
}

ctx := context.Background()
ent := Entity{Foo: "foo"}

err = q.Publish(ctx, ent)
if err != nil {
	panic(err)
}

for msg, err := range q.Subscribe(ctx) {
	if err != nil {
		break
	}
	fmt.Println(msg.Data())
	_ = msg.ACK()
}

func (Queue[Entity, JSONDTO]) Migrate

func (q Queue[Entity, JSONDTO]) Migrate(ctx context.Context) error

func (Queue[Entity, JSONDTO]) Publish

func (q Queue[Entity, JSONDTO]) Publish(ctx context.Context, vs ...Entity) error

func (Queue[Entity, JSONDTO]) Purge

func (q Queue[Entity, JSONDTO]) Purge(ctx context.Context) error

func (Queue[Entity, JSONDTO]) Subscribe

func (q Queue[Entity, JSONDTO]) Subscribe(ctx context.Context) pubsub.Subscription[Entity]

type QueueMapper

type QueueMapper[ENT, DTO any] interface {
	ToDTO(ent ENT) (DTO, error)
	ToEnt(dto DTO) (ENT, error)
}

type Repository

type Repository[ENT, ID any] struct {
	Connection Connection
	Mapping    flsql.Mapping[ENT, ID]
}

Repository is a frameless external resource supplier to store a certain entity type. The Repository supplier itself is a stateless entity.

SRP: DBA

Example
package main

import (
	"context"
	"os"

	"go.llib.dev/frameless/adapter/postgresql"
	"go.llib.dev/frameless/pkg/flsql"
)

func main() {
	type Entity struct {
		ID    int `ext:"ID"`
		Value string
	}

	mapping := flsql.Mapping[Entity, int]{
		TableName: "entities",

		QueryID: func(id int) (flsql.QueryArgs, error) {
			return flsql.QueryArgs{"id": id}, nil
		},

		ToArgs: func(e Entity) (flsql.QueryArgs, error) {
			return flsql.QueryArgs{
				`id`:    e.ID,
				`value`: e.Value,
			}, nil
		},

		ToQuery: func(ctx context.Context) ([]flsql.ColumnName, flsql.MapScan[Entity]) {
			return []flsql.ColumnName{"id", "value"},
				func(v *Entity, s flsql.Scanner) error {
					return s.Scan(&v.ID, &v.Value)
				}
		},

		ID: func(e *Entity) *int {
			return &e.ID
		},
	}

	cm, err := postgresql.Connect(os.Getenv("DATABASE_URL"))
	if err != nil {
		panic(err)
	}
	defer cm.Close()

	repo := postgresql.Repository[Entity, int]{
		Connection: cm,
		Mapping:    mapping,
	}

	_ = repo
}

func NewMigrationStateRepository

func NewMigrationStateRepository(conn Connection) Repository[migration.State, migration.StateID]

func (Repository[ENT, ID]) BeginTx

func (r Repository[ENT, ID]) BeginTx(ctx context.Context) (context.Context, error)

func (Repository[ENT, ID]) CommitTx

func (r Repository[ENT, ID]) CommitTx(ctx context.Context) error

func (Repository[ENT, ID]) Create

func (r Repository[ENT, ID]) Create(ctx context.Context, ptr *ENT) (rErr error)

func (Repository[ENT, ID]) DeleteAll

func (r Repository[ENT, ID]) DeleteAll(ctx context.Context) (rErr error)

func (Repository[ENT, ID]) DeleteByID

func (r Repository[ENT, ID]) DeleteByID(ctx context.Context, id ID) (rErr error)

func (Repository[ENT, ID]) FindAll

func (r Repository[ENT, ID]) FindAll(ctx context.Context) iterkit.ErrSeq[ENT]

func (Repository[ENT, ID]) FindByID

func (r Repository[ENT, ID]) FindByID(ctx context.Context, id ID) (ENT, bool, error)

func (Repository[ENT, ID]) FindByIDs

func (r Repository[ENT, ID]) FindByIDs(ctx context.Context, ids ...ID) iterkit.SeqE[ENT]

func (Repository[ENT, ID]) RollbackTx

func (r Repository[ENT, ID]) RollbackTx(ctx context.Context) error

func (Repository[ENT, ID]) Save

func (r Repository[ENT, ID]) Save(ctx context.Context, ptr *ENT) (rErr error)

func (Repository[ENT, ID]) Update

func (r Repository[ENT, ID]) Update(ctx context.Context, ptr *ENT) (rErr error)

func (Repository[ENT, ID]) Upsert deprecated

func (r Repository[ENT, ID]) Upsert(ctx context.Context, ptrs ...*ENT) (rErr error)

Upsert

Deprecated: use Repository.Save instead

type T

type T = interface{}

type TaskerSchedulerLocks

type TaskerSchedulerLocks struct{ Connection Connection }

func (TaskerSchedulerLocks) LockerFor

func (TaskerSchedulerLocks) NonBlockingLockerFor

func (lf TaskerSchedulerLocks) NonBlockingLockerFor(id tasker.ScheduleID) guard.NonBlockingLocker

type TaskerSchedulerStateRepository

type TaskerSchedulerStateRepository struct{ Connection Connection }
Example
package main

import (
	"context"
	"os"
	"time"

	"go.llib.dev/frameless/adapter/postgresql"
	"go.llib.dev/frameless/pkg/tasker"
)

func main() {
	c, err := postgresql.Connect(os.Getenv("DATABASE_URL"))
	if err != nil {
		panic(err.Error())
	}

	s := tasker.Scheduler{
		Locks:  postgresql.TaskerSchedulerLocks{Connection: c},
		States: postgresql.TaskerSchedulerStateRepository{Connection: c},
	}

	maintenance := s.WithSchedule("maintenance", tasker.Monthly{Day: 1, Hour: 12, Location: time.UTC},
		func(ctx context.Context) error {
			// The monthly maintenance task
			return nil
		})

	// form your main func
	_ = tasker.Main(context.Background(), maintenance)
}

func (TaskerSchedulerStateRepository) Create

func (TaskerSchedulerStateRepository) DeleteByID

func (TaskerSchedulerStateRepository) FindByID

func (TaskerSchedulerStateRepository) Migrate

func (TaskerSchedulerStateRepository) Update

Directories

Path Synopsis

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL