pgoutbox implements a simple transactional outbox for pgx. New messages can be added to a Postgres table using AddMessages and can be flushed to a destination via ProcessMessages.
Here's an example of flushing messages on topic1 by simply printing them to the console:
type printFlusher struct{}
func (printFlusher) Flush(_ context.Context, msgs []*sqlc.Message) error {
for _, m := range msgs {
fmt.Printf(" flushed id=%d topic=%s payload=%s\n", m.ID, m.Topic, string(m.Payload))
}
return nil
}
outbox, err := pgoutbox.NewOutbox(pool)
if err != nil {
panic(err)
}
outbox.AddFlusher("topic1", printFlusher{})Then, within a transaction, messages can be added via:
if err := outbox.AddMessages(ctx, tx, "topic1", msgs); err != nil {
panic(err)
}And messages can be flushed after the transaction commits using:
_, err := outbox.ProcessMessages(ctx, "topic1");
if err != nil {
panic(err)
}By default, NewOutbox runs migrations and creates an outbox table in the schema outbox.messages. This can be overwritten via:
outbox, err := pgoutbox.NewOutbox(pool, pgoutbox.WithSchema("my_schema"))If you'd rather run migrations yourself (for example, as part of a separate release step), disable the auto-migration and invoke Migrate explicitly:
outbox, err := pgoutbox.NewOutbox(pool,
pgoutbox.WithSchema("my_schema"),
pgoutbox.WithAutoMigrate(false),
)
if err := pgoutbox.Migrate(ctx, pool, pgoutbox.WithSchema("my_schema")); err != nil {
panic(err)
}It's easy to configure multiple destinations using topics registered for each flusher:
outbox.AddFlusher("orders", ordersFlusher{})
outbox.AddFlusher("shipments", shipmentsFlusher{})
// within a single transaction, write to whichever topics you need
tx, err := pool.Begin(ctx)
if err != nil {
panic(err)
}
if err := outbox.AddMessages(ctx, tx, "orders", orderMsgs); err != nil {
panic(err)
}
if err := outbox.AddMessages(ctx, tx, "shipments", shipmentMsgs); err != nil {
panic(err)
}
if err := tx.Commit(ctx); err != nil {
panic(err)
}
// each topic is drained independently by its registered flusher
outbox.ProcessMessages(ctx, "orders")
outbox.ProcessMessages(ctx, "shipments")You can run benchmarks locally; for example, to write and flush 100k messages, you can run:
go test -bench=. -benchtime=100000x
On a local Macbook with an M3 Max core, this results in 8492 msgs/sec:
$ go test -bench=. -benchtime=100000x
goos: darwin
goarch: arm64
pkg: github.com/hatchet-dev/pgoutbox
cpu: Apple M3 Max
BenchmarkOutbox_WriteAndPublishThroughput-14 100000 117757 ns/op 8492 msgs/sec