diff --git a/internal/worker/upstream_buffer.go b/internal/worker/upstream_buffer.go index 7f29c4b6..37f4ba5b 100644 --- a/internal/worker/upstream_buffer.go +++ b/internal/worker/upstream_buffer.go @@ -227,15 +227,15 @@ func (ub *UpstreamBufferDB) QueueSize() (int, error) { } func (ub *UpstreamBufferDB) Flush(ctx context.Context) error { - ub.dbMutex.Lock() - defer ub.dbMutex.Unlock() - if ub.db == nil { log.Panic().Msg("no database opened, unable to queue task updates") } // See if we need to flush at all. + ub.dbMutex.Lock() queueSize, err := ub.queueSize() + ub.dbMutex.Unlock() + switch { case err != nil: return fmt.Errorf("unable to determine queue size: %w", err) @@ -247,7 +247,9 @@ func (ub *UpstreamBufferDB) Flush(ctx context.Context) error { // Keep flushing until the queue is empty or there is an error. var done bool for !done { + ub.dbMutex.Lock() done, err = ub.flushFirstItem(ctx) + ub.dbMutex.Unlock() if err != nil { return err