diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 3fb2746d..d2de0437 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -47,6 +47,10 @@ type PersistenceService interface { // If no task is available, (nil, nil) is returned, as this is not an error situation. ScheduleTask(ctx context.Context, w *persistence.Worker) (*persistence.Task, error) AddWorkerToTaskFailedList(context.Context, *persistence.Task, *persistence.Worker) (numFailed int, err error) + // ClearFailureListOfTask clears the list of workers that failed this task. + ClearFailureListOfTask(context.Context, *persistence.Task) error + // ClearFailureListOfJob en-mass, for all tasks of this job, clears the list of workers that failed those tasks. + ClearFailureListOfJob(context.Context, *persistence.Job) error // Database queries. QueryJobs(ctx context.Context, query api.JobsQuery) ([]*persistence.Job, error) diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index 59fefdf6..45da9242 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -131,6 +131,18 @@ func (f *Flamenco) SetJobStatus(e echo.Context, jobID string) error { logger.Error().Err(err).Msg("error changing job status") return sendAPIError(e, http.StatusInternalServerError, "unexpected error changing job status") } + + // Only in this function, i.e. only when changing the job from the web + // interface, does requeueing the job mean it should clear the failure list. + // This is why this is implemented here, and not in the Task State Machine. + switch statusChange.Status { + case api.JobStatusRequeueing: + if err := f.persist.ClearFailureListOfJob(ctx, dbJob); err != nil { + logger.Error().Err(err).Msg("error clearing failure list") + return sendAPIError(e, http.StatusInternalServerError, "unexpected error clearing the job's tasks' failure list") + } + } + return e.NoContent(http.StatusNoContent) } @@ -177,6 +189,18 @@ func (f *Flamenco) SetTaskStatus(e echo.Context, taskID string) error { logger.Error().Err(err).Msg("error changing task status") return sendAPIError(e, http.StatusInternalServerError, "unexpected error changing task status") } + + // Only in this function, i.e. only when changing the task from the web + // interface, does requeueing the task mean it should clear the failure list. + // This is why this is implemented here, and not in the Task State Machine. + switch statusChange.Status { + case api.TaskStatusQueued: + if err := f.persist.ClearFailureListOfTask(ctx, dbTask); err != nil { + logger.Error().Err(err).Msg("error clearing failure list") + return sendAPIError(e, http.StatusInternalServerError, "unexpected error clearing the task's failure list") + } + } + return e.NoContent(http.StatusNoContent) } diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index 57be017e..16506984 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -181,6 +181,8 @@ func TestSetJobStatus_happy(t *testing.T) { mf.persistence.EXPECT().FetchJob(ctx, jobID).Return(&dbJob, nil) mf.stateMachine.EXPECT().JobStatusChange(ctx, &dbJob, statusUpdate.Status, "someone pushed a button") + // Going to Cancel Requested should NOT clear the failure list. + // Do the call. echoCtx := mf.prepareMockedJSONRequest(statusUpdate) err := mf.flamenco.SetJobStatus(echoCtx, jobID) @@ -189,6 +191,85 @@ func TestSetJobStatus_happy(t *testing.T) { assertResponseEmpty(t, echoCtx) } +func TestSetJobStatusFailedToRequeueing(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + + jobID := "18a9b096-d77e-438c-9be2-74397038298b" + statusUpdate := api.JobStatusChange{ + Status: api.JobStatusRequeueing, + Reason: "someone pushed a button", + } + dbJob := persistence.Job{ + UUID: jobID, + Name: "test job", + Status: api.JobStatusFailed, + Settings: persistence.StringInterfaceMap{}, + Metadata: persistence.StringStringMap{}, + } + + // Set up expectations. + echoCtx := mf.prepareMockedJSONRequest(statusUpdate) + ctx := echoCtx.Request().Context() + mf.persistence.EXPECT().FetchJob(ctx, jobID).Return(&dbJob, nil) + mf.stateMachine.EXPECT().JobStatusChange(ctx, &dbJob, statusUpdate.Status, "someone pushed a button") + mf.persistence.EXPECT().ClearFailureListOfJob(ctx, &dbJob) + + // Do the call. + err := mf.flamenco.SetJobStatus(echoCtx, jobID) + assert.NoError(t, err) + + assertResponseEmpty(t, echoCtx) +} + +func TestSetTaskStatusQueued(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + + jobID := "18a9b096-d77e-438c-9be2-74397038298b" + taskID := "22a2e6e6-13a3-40e7-befd-d4ec8d97049d" + statusUpdate := api.TaskStatusChange{ + Status: api.TaskStatusQueued, + Reason: "someone pushed a button", + } + dbJob := persistence.Job{ + Model: persistence.Model{ID: 47}, + UUID: jobID, + Name: "test job", + Status: api.JobStatusFailed, + Settings: persistence.StringInterfaceMap{}, + Metadata: persistence.StringStringMap{}, + } + dbTask := persistence.Task{ + UUID: taskID, + Name: "test task", + Status: api.TaskStatusFailed, + Job: &dbJob, + JobID: dbJob.ID, + } + + // Set up expectations. + echoCtx := mf.prepareMockedJSONRequest(statusUpdate) + ctx := echoCtx.Request().Context() + mf.persistence.EXPECT().FetchTask(ctx, taskID).Return(&dbTask, nil) + mf.stateMachine.EXPECT().TaskStatusChange(ctx, &dbTask, statusUpdate.Status) + mf.persistence.EXPECT().ClearFailureListOfTask(ctx, &dbTask) + + updatedTask := dbTask + updatedTask.Activity = "someone pushed a button" + mf.persistence.EXPECT().SaveTaskActivity(ctx, &updatedTask) + + // Do the call. + err := mf.flamenco.SetTaskStatus(echoCtx, taskID) + assert.NoError(t, err) + + assertResponseEmpty(t, echoCtx) +} + func TestFetchTaskLogTail(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 19a2104f..91ce3064 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -55,6 +55,34 @@ func (mr *MockPersistenceServiceMockRecorder) AddWorkerToTaskFailedList(arg0, ar return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "AddWorkerToTaskFailedList", reflect.TypeOf((*MockPersistenceService)(nil).AddWorkerToTaskFailedList), arg0, arg1, arg2) } +// ClearFailureListOfJob mocks base method. +func (m *MockPersistenceService) ClearFailureListOfJob(arg0 context.Context, arg1 *persistence.Job) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ClearFailureListOfJob", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ClearFailureListOfJob indicates an expected call of ClearFailureListOfJob. +func (mr *MockPersistenceServiceMockRecorder) ClearFailureListOfJob(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearFailureListOfJob", reflect.TypeOf((*MockPersistenceService)(nil).ClearFailureListOfJob), arg0, arg1) +} + +// ClearFailureListOfTask mocks base method. +func (m *MockPersistenceService) ClearFailureListOfTask(arg0 context.Context, arg1 *persistence.Task) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ClearFailureListOfTask", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// ClearFailureListOfTask indicates an expected call of ClearFailureListOfTask. +func (mr *MockPersistenceServiceMockRecorder) ClearFailureListOfTask(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearFailureListOfTask", reflect.TypeOf((*MockPersistenceService)(nil).ClearFailureListOfTask), arg0, arg1) +} + // CreateWorker mocks base method. func (m *MockPersistenceService) CreateWorker(arg0 context.Context, arg1 *persistence.Worker) error { m.ctrl.T.Helper() diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 1fa9fc5f..29267f77 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -459,3 +459,26 @@ func (db *DB) AddWorkerToTaskFailedList(ctx context.Context, t *Task, w *Worker) } return int(numFailed64), tx.Error } + +// ClearFailureListOfTask clears the list of workers that failed this task. +func (db *DB) ClearFailureListOfTask(ctx context.Context, t *Task) error { + tx := db.gormDB.WithContext(ctx). + Where("task_id = ?", t.ID). + Delete(&TaskFailure{}) + return tx.Error +} + +// ClearFailureListOfJob en-mass, for all tasks of this job, clears the list of +// workers that failed those tasks. +func (db *DB) ClearFailureListOfJob(ctx context.Context, j *Job) error { + + // SQLite doesn't support JOIN in DELETE queries, so use a sub-query instead. + jobTasksQuery := db.gormDB.Model(&Task{}). + Select("id"). + Where("job_id = ?", j.ID) + + tx := db.gormDB.WithContext(ctx). + Where("task_id in (?)", jobTasksQuery). + Delete(&TaskFailure{}) + return tx.Error +} diff --git a/internal/manager/persistence/jobs_test.go b/internal/manager/persistence/jobs_test.go index ea70ebce..920de094 100644 --- a/internal/manager/persistence/jobs_test.go +++ b/internal/manager/persistence/jobs_test.go @@ -4,6 +4,8 @@ package persistence // SPDX-License-Identifier: GPL-3.0-or-later import ( + "fmt" + "math" "testing" "time" @@ -11,6 +13,7 @@ import ( "golang.org/x/net/context" "git.blender.org/flamenco/internal/manager/job_compilers" + "git.blender.org/flamenco/internal/uuid" "git.blender.org/flamenco/pkg/api" ) @@ -304,6 +307,78 @@ func TestAddWorkerToTaskFailedList(t *testing.T) { assert.Zero(t, num) } +func TestClearFailureListOfTask(t *testing.T) { + ctx, close, db, _, authoredJob := jobTasksTestFixtures(t) + defer close() + + task1, _ := db.FetchTask(ctx, authoredJob.Tasks[1].UUID) + task2, _ := db.FetchTask(ctx, authoredJob.Tasks[2].UUID) + + worker1 := createWorker(ctx, t, db) + + // Create another worker, using the 1st as template: + newWorker := *worker1 + newWorker.ID = 0 + newWorker.UUID = "89ed2b02-b51b-4cd4-b44a-4a1c8d01db85" + newWorker.Name = "Worker 2" + assert.NoError(t, db.SaveWorker(ctx, &newWorker)) + worker2, err := db.FetchWorker(ctx, newWorker.UUID) + assert.NoError(t, err) + + // Store some failures for different tasks. + _, _ = db.AddWorkerToTaskFailedList(ctx, task1, worker1) + _, _ = db.AddWorkerToTaskFailedList(ctx, task1, worker2) + _, _ = db.AddWorkerToTaskFailedList(ctx, task2, worker1) + + // Clearing should just update this one task. + assert.NoError(t, db.ClearFailureListOfTask(ctx, task1)) + var failures = []TaskFailure{} + tx := db.gormDB.Model(&TaskFailure{}).Scan(&failures) + assert.NoError(t, tx.Error) + if assert.Len(t, failures, 1) { + assert.Equal(t, task2.ID, failures[0].TaskID) + assert.Equal(t, worker1.ID, failures[0].WorkerID) + } +} + +func TestClearFailureListOfJob(t *testing.T) { + ctx, close, db, dbJob1, authoredJob1 := jobTasksTestFixtures(t) + defer close() + + // Construct a cloned version of the job. + authoredJob2 := duplicateJobAndTasks(authoredJob1) + persistAuthoredJob(t, ctx, db, authoredJob2) + + task1_1, _ := db.FetchTask(ctx, authoredJob1.Tasks[1].UUID) + task1_2, _ := db.FetchTask(ctx, authoredJob1.Tasks[2].UUID) + task2_1, _ := db.FetchTask(ctx, authoredJob2.Tasks[1].UUID) + + worker1 := createWorker(ctx, t, db) + worker2 := createWorkerFrom(ctx, t, db, *worker1) + + // Store some failures for different tasks and jobs + _, _ = db.AddWorkerToTaskFailedList(ctx, task1_1, worker1) + _, _ = db.AddWorkerToTaskFailedList(ctx, task1_1, worker2) + _, _ = db.AddWorkerToTaskFailedList(ctx, task1_2, worker1) + _, _ = db.AddWorkerToTaskFailedList(ctx, task2_1, worker1) + _, _ = db.AddWorkerToTaskFailedList(ctx, task2_1, worker2) + + // Sanity check: there should be 5 failures registered now. + assert.Equal(t, 5, countTaskFailures(db)) + + // Clearing should be limited to the given job. + assert.NoError(t, db.ClearFailureListOfJob(ctx, dbJob1)) + var failures = []TaskFailure{} + tx := db.gormDB.Model(&TaskFailure{}).Scan(&failures) + assert.NoError(t, tx.Error) + if assert.Len(t, failures, 2) { + assert.Equal(t, task2_1.ID, failures[0].TaskID) + assert.Equal(t, worker1.ID, failures[0].WorkerID) + assert.Equal(t, task2_1.ID, failures[1].TaskID) + assert.Equal(t, worker2.ID, failures[1].WorkerID) + } +} + func createTestAuthoredJobWithTasks() job_compilers.AuthoredJob { task1 := job_compilers.AuthoredTask{ Name: "render-1-3", @@ -384,6 +459,43 @@ func persistAuthoredJob(t *testing.T, ctx context.Context, db *DB, authoredJob j return dbJob } +// duplicateJobAndTasks constructs a copy of the given job and its tasks, ensuring new UUIDs. +// Does NOT copy settings, metadata, or commands. Just for testing with more than one job in the database. +func duplicateJobAndTasks(job job_compilers.AuthoredJob) job_compilers.AuthoredJob { + // The function call already made a new AuthoredJob copy. + // This function just needs to make the tasks are duplicated, make UUIDs + // unique, and ensure that task pointers are pointing to the copy. + + // Duplicate task arrays. + tasks := job.Tasks + job.Tasks = []job_compilers.AuthoredTask{} + job.Tasks = append(job.Tasks, tasks...) + + // Construct a mapping from old UUID to pointer-to-new-task + taskPtrs := map[string]*job_compilers.AuthoredTask{} + for idx := range job.Tasks { + taskPtrs[job.Tasks[idx].UUID] = &job.Tasks[idx] + } + + // Go over all task dependencies, as those are stored as pointers, and update them. + for taskIdx := range job.Tasks { + newDeps := make([]*job_compilers.AuthoredTask, len(job.Tasks[taskIdx].Dependencies)) + for depIdxs, oldTaskPtr := range job.Tasks[taskIdx].Dependencies { + depUUID := oldTaskPtr.UUID + newDeps[depIdxs] = taskPtrs[depUUID] + } + job.Tasks[taskIdx].Dependencies = newDeps + } + + // Assign new UUIDs to the job & tasks. + job.JobID = uuid.New() + for idx := range job.Tasks { + job.Tasks[idx].UUID = uuid.New() + } + + return job +} + func jobTasksTestFixtures(t *testing.T) (context.Context, context.CancelFunc, *DB, *Job, job_compilers.AuthoredJob) { ctx, cancel, db := persistenceTestFixtures(t, schedulerTestTimeout) @@ -420,3 +532,35 @@ func createWorker(ctx context.Context, t *testing.T, db *DB) *Worker { return fetchedWorker } + +// createWorkerFrom duplicates the given worker, ensuring new UUIDs. +func createWorkerFrom(ctx context.Context, t *testing.T, db *DB, worker Worker) *Worker { + worker.ID = 0 + worker.UUID = uuid.New() + worker.Name += " (copy)" + + err := db.SaveWorker(ctx, &worker) + if !assert.NoError(t, err) { + t.FailNow() + } + + dbWorker, err := db.FetchWorker(ctx, worker.UUID) + if !assert.NoError(t, err) { + t.FailNow() + } + + return dbWorker +} + +func countTaskFailures(db *DB) int { + var numFailures int64 + tx := db.gormDB.Model(&TaskFailure{}).Count(&numFailures) + if tx.Error != nil { + panic(tx.Error) + } + + if numFailures > math.MaxInt { + panic(fmt.Sprintf("too many failures: %v", numFailures)) + } + return int(numFailures) +}