feat: queues (#24142)

This commit is contained in:
Jason Rasmussen
2025-11-25 08:19:40 -05:00
committed by GitHub
parent 66ae07ee39
commit 104fa09f69
37 changed files with 2487 additions and 336 deletions

View File

@@ -2,6 +2,7 @@ import { BadRequestException } from '@nestjs/common';
import { defaults, SystemConfig } from 'src/config';
import { ImmichWorker, JobName, QueueCommand, QueueName } from 'src/enum';
import { QueueService } from 'src/services/queue.service';
import { factory } from 'test/small.factory';
import { newTestService, ServiceMocks } from 'test/utils';
describe(QueueService.name, () => {
@@ -52,80 +53,64 @@ describe(QueueService.name, () => {
describe('getAllJobStatus', () => {
it('should get all job statuses', async () => {
mocks.job.getJobCounts.mockResolvedValue({
active: 1,
completed: 1,
failed: 1,
delayed: 1,
waiting: 1,
paused: 1,
});
mocks.job.getQueueStatus.mockResolvedValue({
isActive: true,
isPaused: true,
});
const stats = factory.queueStatistics({ active: 1 });
const expected = { jobCounts: stats, queueStatus: { isActive: true, isPaused: true } };
const expectedJobStatus = {
jobCounts: {
active: 1,
completed: 1,
delayed: 1,
failed: 1,
waiting: 1,
paused: 1,
},
queueStatus: {
isActive: true,
isPaused: true,
},
};
mocks.job.getJobCounts.mockResolvedValue(stats);
mocks.job.isPaused.mockResolvedValue(true);
await expect(sut.getAll()).resolves.toEqual({
[QueueName.BackgroundTask]: expectedJobStatus,
[QueueName.DuplicateDetection]: expectedJobStatus,
[QueueName.SmartSearch]: expectedJobStatus,
[QueueName.MetadataExtraction]: expectedJobStatus,
[QueueName.Search]: expectedJobStatus,
[QueueName.StorageTemplateMigration]: expectedJobStatus,
[QueueName.Migration]: expectedJobStatus,
[QueueName.ThumbnailGeneration]: expectedJobStatus,
[QueueName.VideoConversion]: expectedJobStatus,
[QueueName.FaceDetection]: expectedJobStatus,
[QueueName.FacialRecognition]: expectedJobStatus,
[QueueName.Sidecar]: expectedJobStatus,
[QueueName.Library]: expectedJobStatus,
[QueueName.Notification]: expectedJobStatus,
[QueueName.BackupDatabase]: expectedJobStatus,
[QueueName.Ocr]: expectedJobStatus,
[QueueName.Workflow]: expectedJobStatus,
await expect(sut.getAllLegacy(factory.auth())).resolves.toEqual({
[QueueName.BackgroundTask]: expected,
[QueueName.DuplicateDetection]: expected,
[QueueName.SmartSearch]: expected,
[QueueName.MetadataExtraction]: expected,
[QueueName.Search]: expected,
[QueueName.StorageTemplateMigration]: expected,
[QueueName.Migration]: expected,
[QueueName.ThumbnailGeneration]: expected,
[QueueName.VideoConversion]: expected,
[QueueName.FaceDetection]: expected,
[QueueName.FacialRecognition]: expected,
[QueueName.Sidecar]: expected,
[QueueName.Library]: expected,
[QueueName.Notification]: expected,
[QueueName.BackupDatabase]: expected,
[QueueName.Ocr]: expected,
[QueueName.Workflow]: expected,
});
});
});
describe('handleCommand', () => {
it('should handle a pause command', async () => {
await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Pause, force: false });
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
await sut.runCommandLegacy(QueueName.MetadataExtraction, { command: QueueCommand.Pause, force: false });
expect(mocks.job.pause).toHaveBeenCalledWith(QueueName.MetadataExtraction);
});
it('should handle a resume command', async () => {
await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Resume, force: false });
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
await sut.runCommandLegacy(QueueName.MetadataExtraction, { command: QueueCommand.Resume, force: false });
expect(mocks.job.resume).toHaveBeenCalledWith(QueueName.MetadataExtraction);
});
it('should handle an empty command', async () => {
await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Empty, force: false });
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
await sut.runCommandLegacy(QueueName.MetadataExtraction, { command: QueueCommand.Empty, force: false });
expect(mocks.job.empty).toHaveBeenCalledWith(QueueName.MetadataExtraction);
});
it('should not start a job that is already running', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: true, isPaused: false });
mocks.job.isActive.mockResolvedValue(true);
await expect(
sut.runCommand(QueueName.VideoConversion, { command: QueueCommand.Start, force: false }),
sut.runCommandLegacy(QueueName.VideoConversion, { command: QueueCommand.Start, force: false }),
).rejects.toBeInstanceOf(BadRequestException);
expect(mocks.job.queue).not.toHaveBeenCalled();
@@ -133,33 +118,37 @@ describe(QueueService.name, () => {
});
it('should handle a start video conversion command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
mocks.job.isActive.mockResolvedValue(false);
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
await sut.runCommand(QueueName.VideoConversion, { command: QueueCommand.Start, force: false });
await sut.runCommandLegacy(QueueName.VideoConversion, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetEncodeVideoQueueAll, data: { force: false } });
});
it('should handle a start storage template migration command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
mocks.job.isActive.mockResolvedValue(false);
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
await sut.runCommand(QueueName.StorageTemplateMigration, { command: QueueCommand.Start, force: false });
await sut.runCommandLegacy(QueueName.StorageTemplateMigration, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.StorageTemplateMigration });
});
it('should handle a start smart search command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
mocks.job.isActive.mockResolvedValue(false);
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
await sut.runCommand(QueueName.SmartSearch, { command: QueueCommand.Start, force: false });
await sut.runCommandLegacy(QueueName.SmartSearch, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SmartSearchQueueAll, data: { force: false } });
});
it('should handle a start metadata extraction command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
mocks.job.isActive.mockResolvedValue(false);
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Start, force: false });
await sut.runCommandLegacy(QueueName.MetadataExtraction, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({
name: JobName.AssetExtractMetadataQueueAll,
@@ -168,17 +157,19 @@ describe(QueueService.name, () => {
});
it('should handle a start sidecar command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
mocks.job.isActive.mockResolvedValue(false);
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
await sut.runCommand(QueueName.Sidecar, { command: QueueCommand.Start, force: false });
await sut.runCommandLegacy(QueueName.Sidecar, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SidecarQueueAll, data: { force: false } });
});
it('should handle a start thumbnail generation command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
mocks.job.isActive.mockResolvedValue(false);
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
await sut.runCommand(QueueName.ThumbnailGeneration, { command: QueueCommand.Start, force: false });
await sut.runCommandLegacy(QueueName.ThumbnailGeneration, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({
name: JobName.AssetGenerateThumbnailsQueueAll,
@@ -187,34 +178,37 @@ describe(QueueService.name, () => {
});
it('should handle a start face detection command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
mocks.job.isActive.mockResolvedValue(false);
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
await sut.runCommand(QueueName.FaceDetection, { command: QueueCommand.Start, force: false });
await sut.runCommandLegacy(QueueName.FaceDetection, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetDetectFacesQueueAll, data: { force: false } });
});
it('should handle a start facial recognition command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
mocks.job.isActive.mockResolvedValue(false);
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
await sut.runCommand(QueueName.FacialRecognition, { command: QueueCommand.Start, force: false });
await sut.runCommandLegacy(QueueName.FacialRecognition, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.FacialRecognitionQueueAll, data: { force: false } });
});
it('should handle a start backup database command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
mocks.job.isActive.mockResolvedValue(false);
mocks.job.getJobCounts.mockResolvedValue(factory.queueStatistics());
await sut.runCommand(QueueName.BackupDatabase, { command: QueueCommand.Start, force: false });
await sut.runCommandLegacy(QueueName.BackupDatabase, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.DatabaseBackup, data: { force: false } });
});
it('should throw a bad request when an invalid queue is used', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
mocks.job.isActive.mockResolvedValue(false);
await expect(
sut.runCommand(QueueName.BackgroundTask, { command: QueueCommand.Start, force: false }),
sut.runCommandLegacy(QueueName.BackgroundTask, { command: QueueCommand.Start, force: false }),
).rejects.toBeInstanceOf(BadRequestException);
expect(mocks.job.queue).not.toHaveBeenCalled();

View File

@@ -2,7 +2,21 @@ import { BadRequestException, Injectable } from '@nestjs/common';
import { ClassConstructor } from 'class-transformer';
import { SystemConfig } from 'src/config';
import { OnEvent } from 'src/decorators';
import { QueueCommandDto, QueueResponseDto, QueuesResponseDto } from 'src/dtos/queue.dto';
import { AuthDto } from 'src/dtos/auth.dto';
import {
mapQueueLegacy,
mapQueuesLegacy,
QueueResponseLegacyDto,
QueuesResponseLegacyDto,
} from 'src/dtos/queue-legacy.dto';
import {
QueueCommandDto,
QueueDeleteDto,
QueueJobResponseDto,
QueueJobSearchDto,
QueueResponseDto,
QueueUpdateDto,
} from 'src/dtos/queue.dto';
import {
BootstrapEventPriority,
CronJob,
@@ -86,7 +100,7 @@ export class QueueService extends BaseService {
this.services = services;
}
async runCommand(name: QueueName, dto: QueueCommandDto): Promise<QueueResponseDto> {
async runCommandLegacy(name: QueueName, dto: QueueCommandDto): Promise<QueueResponseLegacyDto> {
this.logger.debug(`Handling command: queue=${name},command=${dto.command},force=${dto.force}`);
switch (dto.command) {
@@ -117,28 +131,60 @@ export class QueueService extends BaseService {
}
}
const response = await this.getByName(name);
return mapQueueLegacy(response);
}
async getAll(_auth: AuthDto): Promise<QueueResponseDto[]> {
return Promise.all(Object.values(QueueName).map((name) => this.getByName(name)));
}
async getAllLegacy(auth: AuthDto): Promise<QueuesResponseLegacyDto> {
const responses = await this.getAll(auth);
return mapQueuesLegacy(responses);
}
get(auth: AuthDto, name: QueueName): Promise<QueueResponseDto> {
return this.getByName(name);
}
async getAll(): Promise<QueuesResponseDto> {
const response = new QueuesResponseDto();
for (const name of Object.values(QueueName)) {
response[name] = await this.getByName(name);
async update(auth: AuthDto, name: QueueName, dto: QueueUpdateDto): Promise<QueueResponseDto> {
if (dto.isPaused === true) {
if (name === QueueName.BackgroundTask) {
throw new BadRequestException(`The BackgroundTask queue cannot be paused`);
}
await this.jobRepository.pause(name);
}
return response;
if (dto.isPaused === false) {
await this.jobRepository.resume(name);
}
return this.getByName(name);
}
async getByName(name: QueueName): Promise<QueueResponseDto> {
const [jobCounts, queueStatus] = await Promise.all([
this.jobRepository.getJobCounts(name),
this.jobRepository.getQueueStatus(name),
]);
searchJobs(auth: AuthDto, name: QueueName, dto: QueueJobSearchDto): Promise<QueueJobResponseDto[]> {
return this.jobRepository.searchJobs(name, dto);
}
return { jobCounts, queueStatus };
async emptyQueue(auth: AuthDto, name: QueueName, dto: QueueDeleteDto) {
await this.jobRepository.empty(name);
if (dto.failed) {
await this.jobRepository.clear(name, QueueCleanType.Failed);
}
}
private async getByName(name: QueueName): Promise<QueueResponseDto> {
const [statistics, isPaused] = await Promise.all([
this.jobRepository.getJobCounts(name),
this.jobRepository.isPaused(name),
]);
return { name, isPaused, statistics };
}
private async start(name: QueueName, { force }: QueueCommandDto): Promise<void> {
const { isActive } = await this.jobRepository.getQueueStatus(name);
const isActive = await this.jobRepository.isActive(name);
if (isActive) {
throw new BadRequestException(`Job is already running`);
}