From 85d53de1f99f0ccb904dc7c140a75bf4b96b326b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sybren=20A=2E=20St=C3=BCvel?= Date: Fri, 30 Sep 2022 16:29:11 +0200 Subject: [PATCH] Manager: implement API endpoint for changing job priority The priority of an existing can now be changed. It will be taken into account when assigning tasks to workers, but it will not reassign tasks that are already active. --- CHANGELOG.md | 1 + internal/manager/api_impl/interfaces.go | 2 + internal/manager/api_impl/jobs.go | 47 ++++++++++++++ internal/manager/api_impl/jobs_test.go | 61 +++++++++++++++++++ .../api_impl/mocks/api_impl_mock.gen.go | 26 ++++++++ internal/manager/persistence/jobs.go | 11 ++++ 6 files changed, 148 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b23fed64..7a3b16ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ bugs in actually-released versions. ## 3.1 - in development - Web interface: make the worker IP address clickable; it will be copied to the clipboard when clicked. +- Add API operation to change the priority of an existing job. ## 3.0 - released 2022-09-12 diff --git a/internal/manager/api_impl/interfaces.go b/internal/manager/api_impl/interfaces.go index 103b7c93..0b9661a3 100644 --- a/internal/manager/api_impl/interfaces.go +++ b/internal/manager/api_impl/interfaces.go @@ -31,6 +31,7 @@ type PersistenceService interface { StoreAuthoredJob(ctx context.Context, authoredJob job_compilers.AuthoredJob) error // FetchJob fetches a single job, without fetching its tasks. FetchJob(ctx context.Context, jobID string) (*persistence.Job, error) + SaveJobPriority(ctx context.Context, job *persistence.Job) error // FetchTask fetches the given task and the accompanying job. FetchTask(ctx context.Context, taskID string) (*persistence.Task, error) FetchTaskFailureList(context.Context, *persistence.Task) ([]*persistence.Worker, error) @@ -97,6 +98,7 @@ var _ TaskStateMachine = (*task_state_machine.StateMachine)(nil) type ChangeBroadcaster interface { // BroadcastNewJob sends a 'new job' notification to all SocketIO clients. BroadcastNewJob(jobUpdate api.SocketIOJobUpdate) + BroadcastJobUpdate(jobUpdate api.SocketIOJobUpdate) BroadcastLastRenderedImage(update api.SocketIOLastRenderedUpdate) // Note that there is no BroadcastNewTask. The 'new job' broadcast is sent diff --git a/internal/manager/api_impl/jobs.go b/internal/manager/api_impl/jobs.go index b032f9c3..25ebadfe 100644 --- a/internal/manager/api_impl/jobs.go +++ b/internal/manager/api_impl/jobs.go @@ -178,6 +178,53 @@ func (f *Flamenco) SetJobStatus(e echo.Context, jobID string) error { return e.NoContent(http.StatusNoContent) } +// SetJobPriority is used by the web interface to change a job's priority. +func (f *Flamenco) SetJobPriority(e echo.Context, jobID string) error { + logger := requestLogger(e) + ctx := e.Request().Context() + + logger = logger.With().Str("job", jobID).Logger() + + var prioChange api.SetJobPriorityJSONRequestBody + if err := e.Bind(&prioChange); err != nil { + logger.Warn().Err(err).Msg("bad request received") + return sendAPIError(e, http.StatusBadRequest, "invalid format") + } + + dbJob, err := f.persist.FetchJob(ctx, jobID) + if err != nil { + if errors.Is(err, persistence.ErrJobNotFound) { + return sendAPIError(e, http.StatusNotFound, "no such job") + } + logger.Error().Err(err).Msg("error fetching job") + return sendAPIError(e, http.StatusInternalServerError, "error fetching job") + } + + logger = logger.With(). + Str("jobName", dbJob.Name). + Int("prioCurrent", dbJob.Priority). + Int("prioRequested", prioChange.Priority). + Logger() + logger.Info().Msg("job priority change requested") + + // From here on, the request can be handled even when the client disconnects. + bgCtx, bgCtxCancel := bgContext() + defer bgCtxCancel() + + dbJob.Priority = prioChange.Priority + err = f.persist.SaveJobPriority(bgCtx, dbJob) + if err != nil { + logger.Error().Err(err).Msg("error changing job priority") + return sendAPIError(e, http.StatusInternalServerError, "unexpected error changing job priority") + } + + // Broadcast this change to the SocketIO clients. + jobUpdate := webupdates.NewJobUpdate(dbJob) + f.broadcaster.BroadcastJobUpdate(jobUpdate) + + return e.NoContent(http.StatusNoContent) +} + // SetTaskStatus is used by the web interface to change a task's status. func (f *Flamenco) SetTaskStatus(e echo.Context, taskID string) error { logger := requestLogger(e) diff --git a/internal/manager/api_impl/jobs_test.go b/internal/manager/api_impl/jobs_test.go index 140d4716..eef218af 100644 --- a/internal/manager/api_impl/jobs_test.go +++ b/internal/manager/api_impl/jobs_test.go @@ -353,6 +353,67 @@ func TestSetJobStatus_happy(t *testing.T) { assertResponseNoContent(t, echoCtx) } +func TestSetJobPrio_nonexistentJob(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + + jobID := "18a9b096-d77e-438c-9be2-74397038298b" + prioUpdate := api.JobPriorityChange{Priority: 47} + + mf.persistence.EXPECT().FetchJob(gomock.Any(), jobID).Return(nil, persistence.ErrJobNotFound) + + // Do the call. + echoCtx := mf.prepareMockedJSONRequest(prioUpdate) + err := mf.flamenco.SetJobStatus(echoCtx, jobID) + assert.NoError(t, err) + + assertResponseAPIError(t, echoCtx, http.StatusNotFound, "no such job") +} + +func TestSetJobPrio(t *testing.T) { + mockCtrl := gomock.NewController(t) + defer mockCtrl.Finish() + + mf := newMockedFlamenco(mockCtrl) + + jobID := "18a9b096-d77e-438c-9be2-74397038298b" + prioUpdate := api.JobPriorityChange{Priority: 47} + dbJob := persistence.Job{ + UUID: jobID, + Name: "test job", + Priority: 50, + Settings: persistence.StringInterfaceMap{}, + Metadata: persistence.StringStringMap{}, + } + + echoCtx := mf.prepareMockedJSONRequest(prioUpdate) + + // Set up expectations. + ctx := echoCtx.Request().Context() + mf.persistence.EXPECT().FetchJob(ctx, jobID).Return(&dbJob, nil).AnyTimes() + jobWithNewPrio := dbJob + jobWithNewPrio.Priority = 47 + mf.persistence.EXPECT().SaveJobPriority(gomock.Not(ctx), &jobWithNewPrio) + + // Expect the change to be broadcast over SocketIO. + expectUpdate := api.SocketIOJobUpdate{ + Id: dbJob.UUID, + Name: &dbJob.Name, + RefreshTasks: false, + Priority: prioUpdate.Priority, + Status: dbJob.Status, + Updated: dbJob.UpdatedAt, + } + mf.broadcaster.EXPECT().BroadcastJobUpdate(expectUpdate) + + err := mf.flamenco.SetJobPriority(echoCtx, jobID) + assert.NoError(t, err) + + assertResponseNoContent(t, echoCtx) +} + func TestSetJobStatusFailedToRequeueing(t *testing.T) { mockCtrl := gomock.NewController(t) defer mockCtrl.Finish() diff --git a/internal/manager/api_impl/mocks/api_impl_mock.gen.go b/internal/manager/api_impl/mocks/api_impl_mock.gen.go index 3a5b5858..b98f9bd5 100644 --- a/internal/manager/api_impl/mocks/api_impl_mock.gen.go +++ b/internal/manager/api_impl/mocks/api_impl_mock.gen.go @@ -319,6 +319,20 @@ func (mr *MockPersistenceServiceMockRecorder) RemoveFromJobBlocklist(arg0, arg1, return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RemoveFromJobBlocklist", reflect.TypeOf((*MockPersistenceService)(nil).RemoveFromJobBlocklist), arg0, arg1, arg2, arg3) } +// SaveJobPriority mocks base method. +func (m *MockPersistenceService) SaveJobPriority(arg0 context.Context, arg1 *persistence.Job) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "SaveJobPriority", arg0, arg1) + ret0, _ := ret[0].(error) + return ret0 +} + +// SaveJobPriority indicates an expected call of SaveJobPriority. +func (mr *MockPersistenceServiceMockRecorder) SaveJobPriority(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SaveJobPriority", reflect.TypeOf((*MockPersistenceService)(nil).SaveJobPriority), arg0, arg1) +} + // SaveTask mocks base method. func (m *MockPersistenceService) SaveTask(arg0 context.Context, arg1 *persistence.Task) error { m.ctrl.T.Helper() @@ -484,6 +498,18 @@ func (m *MockChangeBroadcaster) EXPECT() *MockChangeBroadcasterMockRecorder { return m.recorder } +// BroadcastJobUpdate mocks base method. +func (m *MockChangeBroadcaster) BroadcastJobUpdate(arg0 api.SocketIOJobUpdate) { + m.ctrl.T.Helper() + m.ctrl.Call(m, "BroadcastJobUpdate", arg0) +} + +// BroadcastJobUpdate indicates an expected call of BroadcastJobUpdate. +func (mr *MockChangeBroadcasterMockRecorder) BroadcastJobUpdate(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BroadcastJobUpdate", reflect.TypeOf((*MockChangeBroadcaster)(nil).BroadcastJobUpdate), arg0) +} + // BroadcastLastRenderedImage mocks base method. func (m *MockChangeBroadcaster) BroadcastLastRenderedImage(arg0 api.SocketIOLastRenderedUpdate) { m.ctrl.T.Helper() diff --git a/internal/manager/persistence/jobs.go b/internal/manager/persistence/jobs.go index 43a0c640..e40bdf76 100644 --- a/internal/manager/persistence/jobs.go +++ b/internal/manager/persistence/jobs.go @@ -234,6 +234,17 @@ func (db *DB) SaveJobStatus(ctx context.Context, j *Job) error { return nil } +// SaveJobPriority saves the job's Priority field. +func (db *DB) SaveJobPriority(ctx context.Context, j *Job) error { + tx := db.gormDB.WithContext(ctx). + Model(j). + Updates(Job{Priority: j.Priority}) + if tx.Error != nil { + return jobError(tx.Error, "saving job priority") + } + return nil +} + func (db *DB) FetchTask(ctx context.Context, taskUUID string) (*Task, error) { dbTask := Task{} tx := db.gormDB.WithContext(ctx).