Transactional outbox
Guaranteeing async work happens if and only if the transaction commits .
Events inside the transaction
When a write operation succeeds , the outbox events must be emitted in the same database transaction . Not after the commit . Not in a separate call . In the same transaction .
If the transaction rolls back , the events never fire . If it commits , the workers pick them up . At-least-once delivery , zero coordination overhead .
The outbox interface is generic . Every domain uses it the same way :
type Outbox[T any] interface {
Emit(ctx context.Context, tx pgx.Tx, events ...T) error
}
The domain layer calls Emit inside the transaction , passing the active tx . The implementation inserts RiverQueue jobs using InsertTx , which binds them to the same postgres transaction :
func NewRiverOutbox(client *river.Client[pgx.Tx]) Outbox[ContentEvent] {
return &riverOutbox{client: client}
}
func (o *riverOutbox) Emit(ctx context.Context, tx pgx.Tx, events ...ContentEvent) error {
for _, event := range events {
args := mapEvent(event)
if _, err := o.client.InsertTx(ctx, tx, args, nil); err != nil {
return err
}
}
return nil
}
The mapEvent function maps domain events to the right worker job args :
func mapEvent(event ContentEvent) river.JobArgs {
switch event.Type {
case EventCreated:
return EventIndexArgs{ContentID: event.ContentID, Action: "index"}
case EventUpdated:
return EventIndexArgs{ContentID: event.ContentID, Action: "reindex"}
case EventDeleted:
return EventIndexArgs{ContentID: event.ContentID, Action: "remove"}
default:
return EventAuditArgs{ContentID: event.ContentID, Action: event.Type}
}
}
One worker per concern
Each async concern gets its own worker file : event_audit.go , event_index.go , event_analytics.go , event_graph.go . Workers are independent . A failing index projection doesn’t block audit logging .
A worker is a struct that implements RiverQueue’s Worker interface . Each one declares its own JobArgs type with a Kind identifier :
// event_index.go
type EventIndexArgs struct {
ContentID string `json:"content_id"`
Action string `json:"action"`
}
func (EventIndexArgs) Kind() string { return "content.event_index" }
type EventIndexWorker struct {
search SearchClient
}
func (w *EventIndexWorker) Work(ctx context.Context, job *river.Job[EventIndexArgs]) error {
switch job.Args.Action {
case "index", "reindex":
return w.search.Index(ctx, job.Args.ContentID)
case "remove":
return w.search.Remove(ctx, job.Args.ContentID)
}
return nil
}
Register workers with RiverQueue at startup . Each worker handles one concern , one file , one responsibility .
Consistent write path
Every write operation follows the same sequence : begin tx , execute query , emit outbox events , commit , update cache . No exceptions . When agents know the pattern , every domain operation looks the same .
func (s *Service) CreateContent(ctx context.Context, input CreateInput) (*Content, error) {
tx, err := s.pool.Begin(ctx)
if err != nil {
return nil, err
}
defer tx.Rollback(ctx)
content, err := s.queries.WithTx(tx).InsertContent(ctx, toInsertParams(input))
if err != nil {
return nil, err
}
err = s.outbox.Emit(ctx, tx, ContentEvent{
Type: EventCreated,
ContentID: content.ID,
})
if err != nil {
return nil, err
}
if err = tx.Commit(ctx); err != nil {
return nil, err
}
s.cache.Set(ctx, content.ID, content)
return content, nil
}
Begin , query , emit , commit , cache . Same shape in every domain .