Go Best Practices

Transactional outbox

Guaranteeing async work happens if and only if the transaction commits .

Events inside the transaction

When a write operation succeeds , the outbox events must be emitted in the same database transaction . Not after the commit . Not in a separate call . In the same transaction .

If the transaction rolls back , the events never fire . If it commits , the workers pick them up . At-least-once delivery , zero coordination overhead .

The outbox interface is generic . Every domain uses it the same way :

type Outbox[T any] interface {
    Emit(ctx context.Context, tx pgx.Tx, events ...T) error
}

The domain layer calls Emit inside the transaction , passing the active tx . The implementation inserts RiverQueue jobs using InsertTx , which binds them to the same postgres transaction :

func NewRiverOutbox(client *river.Client[pgx.Tx]) Outbox[ContentEvent] {
    return &riverOutbox{client: client}
}

func (o *riverOutbox) Emit(ctx context.Context, tx pgx.Tx, events ...ContentEvent) error {
    for _, event := range events {
        args := mapEvent(event)
        if _, err := o.client.InsertTx(ctx, tx, args, nil); err != nil {
            return err
        }
    }
    return nil
}

The mapEvent function maps domain events to the right worker job args :

func mapEvent(event ContentEvent) river.JobArgs {
    switch event.Type {
    case EventCreated:
        return EventIndexArgs{ContentID: event.ContentID, Action: "index"}
    case EventUpdated:
        return EventIndexArgs{ContentID: event.ContentID, Action: "reindex"}
    case EventDeleted:
        return EventIndexArgs{ContentID: event.ContentID, Action: "remove"}
    default:
        return EventAuditArgs{ContentID: event.ContentID, Action: event.Type}
    }
}

One worker per concern

Each async concern gets its own worker file : event_audit.go , event_index.go , event_analytics.go , event_graph.go . Workers are independent . A failing index projection doesn’t block audit logging .

A worker is a struct that implements RiverQueue’s Worker interface . Each one declares its own JobArgs type with a Kind identifier :

// event_index.go
type EventIndexArgs struct {
    ContentID string `json:"content_id"`
    Action    string `json:"action"`
}

func (EventIndexArgs) Kind() string { return "content.event_index" }

type EventIndexWorker struct {
    search SearchClient
}

func (w *EventIndexWorker) Work(ctx context.Context, job *river.Job[EventIndexArgs]) error {
    switch job.Args.Action {
    case "index", "reindex":
        return w.search.Index(ctx, job.Args.ContentID)
    case "remove":
        return w.search.Remove(ctx, job.Args.ContentID)
    }
    return nil
}

Register workers with RiverQueue at startup . Each worker handles one concern , one file , one responsibility .

Consistent write path

Every write operation follows the same sequence : begin tx , execute query , emit outbox events , commit , update cache . No exceptions . When agents know the pattern , every domain operation looks the same .

func (s *Service) CreateContent(ctx context.Context, input CreateInput) (*Content, error) {
    tx, err := s.pool.Begin(ctx)
    if err != nil {
        return nil, err
    }
    defer tx.Rollback(ctx)

    content, err := s.queries.WithTx(tx).InsertContent(ctx, toInsertParams(input))
    if err != nil {
        return nil, err
    }

    err = s.outbox.Emit(ctx, tx, ContentEvent{
        Type:      EventCreated,
        ContentID: content.ID,
    })
    if err != nil {
        return nil, err
    }

    if err = tx.Commit(ctx); err != nil {
        return nil, err
    }

    s.cache.Set(ctx, content.ID, content)
    return content, nil
}

Begin , query , emit , commit , cache . Same shape in every domain .