Skip to content

hatchet-dev/pgoutbox

Repository files navigation

pgoutbox - a transactional outbox for pgx

pgoutbox implements a simple transactional outbox for pgx. New messages can be added to a Postgres table using AddMessages and can be flushed to a destination via ProcessMessages.

Here's an example of flushing messages on topic1 by simply printing them to the console:

type printFlusher struct{}

func (printFlusher) Flush(_ context.Context, msgs []*sqlc.Message) error {
	for _, m := range msgs {
		fmt.Printf("  flushed id=%d topic=%s payload=%s\n", m.ID, m.Topic, string(m.Payload))
	}
	return nil
}

outbox, err := pgoutbox.NewOutbox(pool)

if err != nil {
    panic(err)
}

outbox.AddFlusher("topic1", printFlusher{})

Then, within a transaction, messages can be added via:

if err := outbox.AddMessages(ctx, tx, "topic1", msgs); err != nil {
    panic(err)
}

And messages can be flushed after the transaction commits using:

_, err := outbox.ProcessMessages(ctx, "topic1");

if err != nil {
    panic(err)
}

Schema

By default, NewOutbox runs migrations and creates an outbox table in the schema outbox.messages. This can be overwritten via:

outbox, err := pgoutbox.NewOutbox(pool, pgoutbox.WithSchema("my_schema"))

If you'd rather run migrations yourself (for example, as part of a separate release step), disable the auto-migration and invoke Migrate explicitly:

outbox, err := pgoutbox.NewOutbox(pool,
    pgoutbox.WithSchema("my_schema"),
    pgoutbox.WithAutoMigrate(false),
)

if err := pgoutbox.Migrate(ctx, pool, pgoutbox.WithSchema("my_schema")); err != nil {
    panic(err)
}

Multiple topics and flushers

It's easy to configure multiple destinations using topics registered for each flusher:

outbox.AddFlusher("orders", ordersFlusher{})
outbox.AddFlusher("shipments", shipmentsFlusher{})

// within a single transaction, write to whichever topics you need
tx, err := pool.Begin(ctx)
if err != nil {
    panic(err)
}

if err := outbox.AddMessages(ctx, tx, "orders", orderMsgs); err != nil {
    panic(err)
}
if err := outbox.AddMessages(ctx, tx, "shipments", shipmentMsgs); err != nil {
    panic(err)
}

if err := tx.Commit(ctx); err != nil {
    panic(err)
}

// each topic is drained independently by its registered flusher
outbox.ProcessMessages(ctx, "orders")
outbox.ProcessMessages(ctx, "shipments")

Benchmarks

You can run benchmarks locally; for example, to write and flush 100k messages, you can run:

go test -bench=. -benchtime=100000x

On a local Macbook with an M3 Max core, this results in 8492 msgs/sec:

$ go test -bench=. -benchtime=100000x
goos: darwin
goarch: arm64
pkg: github.com/hatchet-dev/pgoutbox
cpu: Apple M3 Max
BenchmarkOutbox_WriteAndPublishThroughput-14              100000            117757 ns/op              8492 msgs/sec

About

A transactional outbox for Go applications which use pgx.

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages