refactor: job vs queue naming (#23902)

This commit is contained in:
Jason Rasmussen
2025-11-14 14:42:00 -05:00
committed by GitHub
parent 1200bfad13
commit d784d431d0
36 changed files with 1356 additions and 1325 deletions

View File

@@ -7,7 +7,6 @@ import { ONE_HOUR } from 'src/constants';
import { ConfigRepository } from 'src/repositories/config.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
import { AuthService } from 'src/services/auth.service';
import { JobService } from 'src/services/job.service';
import { SharedLinkService } from 'src/services/shared-link.service';
import { VersionService } from 'src/services/version.service';
import { OpenGraphTags } from 'src/utils/misc';
@@ -40,7 +39,6 @@ const render = (index: string, meta: OpenGraphTags) => {
export class ApiService {
constructor(
private authService: AuthService,
private jobService: JobService,
private sharedLinkService: SharedLinkService,
private versionService: VersionService,
private configRepository: ConfigRepository,

View File

@@ -23,6 +23,7 @@ import { NotificationService } from 'src/services/notification.service';
import { OcrService } from 'src/services/ocr.service';
import { PartnerService } from 'src/services/partner.service';
import { PersonService } from 'src/services/person.service';
import { QueueService } from 'src/services/queue.service';
import { SearchService } from 'src/services/search.service';
import { ServerService } from 'src/services/server.service';
import { SessionService } from 'src/services/session.service';
@@ -69,6 +70,7 @@ export const services = [
OcrService,
PartnerService,
PersonService,
QueueService,
SearchService,
ServerService,
SessionService,

View File

@@ -1,6 +1,4 @@
import { BadRequestException } from '@nestjs/common';
import { defaults, SystemConfig } from 'src/config';
import { ImmichWorker, JobCommand, JobName, JobStatus, QueueName } from 'src/enum';
import { ImmichWorker, JobName, JobStatus, QueueName } from 'src/enum';
import { JobService } from 'src/services/job.service';
import { JobItem } from 'src/types';
import { assetStub } from 'test/fixtures/asset.stub';
@@ -20,209 +18,6 @@ describe(JobService.name, () => {
expect(sut).toBeDefined();
});
describe('onConfigUpdate', () => {
it('should update concurrency', () => {
sut.onConfigUpdate({ newConfig: defaults, oldConfig: {} as SystemConfig });
expect(mocks.job.setConcurrency).toHaveBeenCalledTimes(16);
expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(5, QueueName.FacialRecognition, 1);
expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(7, QueueName.DuplicateDetection, 1);
expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(8, QueueName.BackgroundTask, 5);
expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(9, QueueName.StorageTemplateMigration, 1);
});
});
describe('handleNightlyJobs', () => {
it('should run the scheduled jobs', async () => {
await sut.handleNightlyJobs();
expect(mocks.job.queueAll).toHaveBeenCalledWith([
{ name: JobName.AssetDeleteCheck },
{ name: JobName.UserDeleteCheck },
{ name: JobName.PersonCleanup },
{ name: JobName.MemoryCleanup },
{ name: JobName.SessionCleanup },
{ name: JobName.AuditTableCleanup },
{ name: JobName.AuditLogCleanup },
{ name: JobName.MemoryGenerate },
{ name: JobName.UserSyncUsage },
{ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force: false } },
{ name: JobName.FacialRecognitionQueueAll, data: { force: false, nightly: true } },
]);
});
});
describe('getAllJobStatus', () => {
it('should get all job statuses', async () => {
mocks.job.getJobCounts.mockResolvedValue({
active: 1,
completed: 1,
failed: 1,
delayed: 1,
waiting: 1,
paused: 1,
});
mocks.job.getQueueStatus.mockResolvedValue({
isActive: true,
isPaused: true,
});
const expectedJobStatus = {
jobCounts: {
active: 1,
completed: 1,
delayed: 1,
failed: 1,
waiting: 1,
paused: 1,
},
queueStatus: {
isActive: true,
isPaused: true,
},
};
await expect(sut.getAllJobsStatus()).resolves.toEqual({
[QueueName.BackgroundTask]: expectedJobStatus,
[QueueName.DuplicateDetection]: expectedJobStatus,
[QueueName.SmartSearch]: expectedJobStatus,
[QueueName.MetadataExtraction]: expectedJobStatus,
[QueueName.Search]: expectedJobStatus,
[QueueName.StorageTemplateMigration]: expectedJobStatus,
[QueueName.Migration]: expectedJobStatus,
[QueueName.ThumbnailGeneration]: expectedJobStatus,
[QueueName.VideoConversion]: expectedJobStatus,
[QueueName.FaceDetection]: expectedJobStatus,
[QueueName.FacialRecognition]: expectedJobStatus,
[QueueName.Sidecar]: expectedJobStatus,
[QueueName.Library]: expectedJobStatus,
[QueueName.Notification]: expectedJobStatus,
[QueueName.BackupDatabase]: expectedJobStatus,
[QueueName.Ocr]: expectedJobStatus,
});
});
});
describe('handleCommand', () => {
it('should handle a pause command', async () => {
await sut.handleCommand(QueueName.MetadataExtraction, { command: JobCommand.Pause, force: false });
expect(mocks.job.pause).toHaveBeenCalledWith(QueueName.MetadataExtraction);
});
it('should handle a resume command', async () => {
await sut.handleCommand(QueueName.MetadataExtraction, { command: JobCommand.Resume, force: false });
expect(mocks.job.resume).toHaveBeenCalledWith(QueueName.MetadataExtraction);
});
it('should handle an empty command', async () => {
await sut.handleCommand(QueueName.MetadataExtraction, { command: JobCommand.Empty, force: false });
expect(mocks.job.empty).toHaveBeenCalledWith(QueueName.MetadataExtraction);
});
it('should not start a job that is already running', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: true, isPaused: false });
await expect(
sut.handleCommand(QueueName.VideoConversion, { command: JobCommand.Start, force: false }),
).rejects.toBeInstanceOf(BadRequestException);
expect(mocks.job.queue).not.toHaveBeenCalled();
expect(mocks.job.queueAll).not.toHaveBeenCalled();
});
it('should handle a start video conversion command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.handleCommand(QueueName.VideoConversion, { command: JobCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetEncodeVideoQueueAll, data: { force: false } });
});
it('should handle a start storage template migration command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.handleCommand(QueueName.StorageTemplateMigration, { command: JobCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.StorageTemplateMigration });
});
it('should handle a start smart search command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.handleCommand(QueueName.SmartSearch, { command: JobCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SmartSearchQueueAll, data: { force: false } });
});
it('should handle a start metadata extraction command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.handleCommand(QueueName.MetadataExtraction, { command: JobCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({
name: JobName.AssetExtractMetadataQueueAll,
data: { force: false },
});
});
it('should handle a start sidecar command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.handleCommand(QueueName.Sidecar, { command: JobCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SidecarQueueAll, data: { force: false } });
});
it('should handle a start thumbnail generation command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.handleCommand(QueueName.ThumbnailGeneration, { command: JobCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({
name: JobName.AssetGenerateThumbnailsQueueAll,
data: { force: false },
});
});
it('should handle a start face detection command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.handleCommand(QueueName.FaceDetection, { command: JobCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetDetectFacesQueueAll, data: { force: false } });
});
it('should handle a start facial recognition command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.handleCommand(QueueName.FacialRecognition, { command: JobCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.FacialRecognitionQueueAll, data: { force: false } });
});
it('should handle a start backup database command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.handleCommand(QueueName.BackupDatabase, { command: JobCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.DatabaseBackup, data: { force: false } });
});
it('should throw a bad request when an invalid queue is used', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await expect(
sut.handleCommand(QueueName.BackgroundTask, { command: JobCommand.Start, force: false }),
).rejects.toBeInstanceOf(BadRequestException);
expect(mocks.job.queue).not.toHaveBeenCalled();
expect(mocks.job.queueAll).not.toHaveBeenCalled();
});
});
describe('onJobRun', () => {
it('should process a successful job', async () => {
mocks.job.run.mockResolvedValue(JobStatus.Success);

View File

@@ -1,28 +1,12 @@
import { BadRequestException, Injectable } from '@nestjs/common';
import { ClassConstructor } from 'class-transformer';
import { SystemConfig } from 'src/config';
import { OnEvent } from 'src/decorators';
import { mapAsset } from 'src/dtos/asset-response.dto';
import { AllJobStatusResponseDto, JobCommandDto, JobCreateDto, JobStatusDto } from 'src/dtos/job.dto';
import {
AssetType,
AssetVisibility,
BootstrapEventPriority,
CronJob,
DatabaseLock,
ImmichWorker,
JobCommand,
JobName,
JobStatus,
ManualJobName,
QueueCleanType,
QueueName,
} from 'src/enum';
import { ArgOf, ArgsOf } from 'src/repositories/event.repository';
import { JobCreateDto } from 'src/dtos/job.dto';
import { AssetType, AssetVisibility, JobName, JobStatus, ManualJobName } from 'src/enum';
import { ArgsOf } from 'src/repositories/event.repository';
import { BaseService } from 'src/services/base.service';
import { ConcurrentQueueName, JobItem } from 'src/types';
import { JobItem } from 'src/types';
import { hexOrBufferToBase64 } from 'src/utils/bytes';
import { handlePromiseError } from 'src/utils/misc';
const asJobItem = (dto: JobCreateDto): JobItem => {
switch (dto.name) {
@@ -56,196 +40,12 @@ const asJobItem = (dto: JobCreateDto): JobItem => {
}
};
const asNightlyTasksCron = (config: SystemConfig) => {
const [hours, minutes] = config.nightlyTasks.startTime.split(':').map(Number);
return `${minutes} ${hours} * * *`;
};
@Injectable()
export class JobService extends BaseService {
private services: ClassConstructor<unknown>[] = [];
private nightlyJobsLock = false;
@OnEvent({ name: 'ConfigInit' })
async onConfigInit({ newConfig: config }: ArgOf<'ConfigInit'>) {
if (this.worker === ImmichWorker.Microservices) {
this.updateQueueConcurrency(config);
return;
}
this.nightlyJobsLock = await this.databaseRepository.tryLock(DatabaseLock.NightlyJobs);
if (this.nightlyJobsLock) {
const cronExpression = asNightlyTasksCron(config);
this.logger.debug(`Scheduling nightly jobs for ${cronExpression}`);
this.cronRepository.create({
name: CronJob.NightlyJobs,
expression: cronExpression,
start: true,
onTick: () => handlePromiseError(this.handleNightlyJobs(), this.logger),
});
}
}
@OnEvent({ name: 'ConfigUpdate', server: true })
onConfigUpdate({ newConfig: config }: ArgOf<'ConfigUpdate'>) {
if (this.worker === ImmichWorker.Microservices) {
this.updateQueueConcurrency(config);
return;
}
if (this.nightlyJobsLock) {
const cronExpression = asNightlyTasksCron(config);
this.logger.debug(`Scheduling nightly jobs for ${cronExpression}`);
this.cronRepository.update({ name: CronJob.NightlyJobs, expression: cronExpression, start: true });
}
}
@OnEvent({ name: 'AppBootstrap', priority: BootstrapEventPriority.JobService })
onBootstrap() {
this.jobRepository.setup(this.services);
if (this.worker === ImmichWorker.Microservices) {
this.jobRepository.startWorkers();
}
}
private updateQueueConcurrency(config: SystemConfig) {
this.logger.debug(`Updating queue concurrency settings`);
for (const queueName of Object.values(QueueName)) {
let concurrency = 1;
if (this.isConcurrentQueue(queueName)) {
concurrency = config.job[queueName].concurrency;
}
this.logger.debug(`Setting ${queueName} concurrency to ${concurrency}`);
this.jobRepository.setConcurrency(queueName, concurrency);
}
}
setServices(services: ClassConstructor<unknown>[]) {
this.services = services;
}
async create(dto: JobCreateDto): Promise<void> {
await this.jobRepository.queue(asJobItem(dto));
}
async handleCommand(queueName: QueueName, dto: JobCommandDto): Promise<JobStatusDto> {
this.logger.debug(`Handling command: queue=${queueName},command=${dto.command},force=${dto.force}`);
switch (dto.command) {
case JobCommand.Start: {
await this.start(queueName, dto);
break;
}
case JobCommand.Pause: {
await this.jobRepository.pause(queueName);
break;
}
case JobCommand.Resume: {
await this.jobRepository.resume(queueName);
break;
}
case JobCommand.Empty: {
await this.jobRepository.empty(queueName);
break;
}
case JobCommand.ClearFailed: {
const failedJobs = await this.jobRepository.clear(queueName, QueueCleanType.Failed);
this.logger.debug(`Cleared failed jobs: ${failedJobs}`);
break;
}
}
return this.getJobStatus(queueName);
}
async getJobStatus(queueName: QueueName): Promise<JobStatusDto> {
const [jobCounts, queueStatus] = await Promise.all([
this.jobRepository.getJobCounts(queueName),
this.jobRepository.getQueueStatus(queueName),
]);
return { jobCounts, queueStatus };
}
async getAllJobsStatus(): Promise<AllJobStatusResponseDto> {
const response = new AllJobStatusResponseDto();
for (const queueName of Object.values(QueueName)) {
response[queueName] = await this.getJobStatus(queueName);
}
return response;
}
private async start(name: QueueName, { force }: JobCommandDto): Promise<void> {
const { isActive } = await this.jobRepository.getQueueStatus(name);
if (isActive) {
throw new BadRequestException(`Job is already running`);
}
await this.eventRepository.emit('QueueStart', { name });
switch (name) {
case QueueName.VideoConversion: {
return this.jobRepository.queue({ name: JobName.AssetEncodeVideoQueueAll, data: { force } });
}
case QueueName.StorageTemplateMigration: {
return this.jobRepository.queue({ name: JobName.StorageTemplateMigration });
}
case QueueName.Migration: {
return this.jobRepository.queue({ name: JobName.FileMigrationQueueAll });
}
case QueueName.SmartSearch: {
return this.jobRepository.queue({ name: JobName.SmartSearchQueueAll, data: { force } });
}
case QueueName.DuplicateDetection: {
return this.jobRepository.queue({ name: JobName.AssetDetectDuplicatesQueueAll, data: { force } });
}
case QueueName.MetadataExtraction: {
return this.jobRepository.queue({ name: JobName.AssetExtractMetadataQueueAll, data: { force } });
}
case QueueName.Sidecar: {
return this.jobRepository.queue({ name: JobName.SidecarQueueAll, data: { force } });
}
case QueueName.ThumbnailGeneration: {
return this.jobRepository.queue({ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force } });
}
case QueueName.FaceDetection: {
return this.jobRepository.queue({ name: JobName.AssetDetectFacesQueueAll, data: { force } });
}
case QueueName.FacialRecognition: {
return this.jobRepository.queue({ name: JobName.FacialRecognitionQueueAll, data: { force } });
}
case QueueName.Library: {
return this.jobRepository.queue({ name: JobName.LibraryScanQueueAll, data: { force } });
}
case QueueName.BackupDatabase: {
return this.jobRepository.queue({ name: JobName.DatabaseBackup, data: { force } });
}
case QueueName.Ocr: {
return this.jobRepository.queue({ name: JobName.OcrQueueAll, data: { force } });
}
default: {
throw new BadRequestException(`Invalid job name: ${name}`);
}
}
}
@OnEvent({ name: 'JobRun' })
async onJobRun(...[queueName, job]: ArgsOf<'JobRun'>) {
try {
@@ -262,50 +62,6 @@ export class JobService extends BaseService {
}
}
private isConcurrentQueue(name: QueueName): name is ConcurrentQueueName {
return ![
QueueName.FacialRecognition,
QueueName.StorageTemplateMigration,
QueueName.DuplicateDetection,
QueueName.BackupDatabase,
].includes(name);
}
async handleNightlyJobs() {
const config = await this.getConfig({ withCache: false });
const jobs: JobItem[] = [];
if (config.nightlyTasks.databaseCleanup) {
jobs.push(
{ name: JobName.AssetDeleteCheck },
{ name: JobName.UserDeleteCheck },
{ name: JobName.PersonCleanup },
{ name: JobName.MemoryCleanup },
{ name: JobName.SessionCleanup },
{ name: JobName.AuditTableCleanup },
{ name: JobName.AuditLogCleanup },
);
}
if (config.nightlyTasks.generateMemories) {
jobs.push({ name: JobName.MemoryGenerate });
}
if (config.nightlyTasks.syncQuotaUsage) {
jobs.push({ name: JobName.UserSyncUsage });
}
if (config.nightlyTasks.missingThumbnails) {
jobs.push({ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force: false } });
}
if (config.nightlyTasks.clusterNewFaces) {
jobs.push({ name: JobName.FacialRecognitionQueueAll, data: { force: false, nightly: true } });
}
await this.jobRepository.queueAll(jobs);
}
/**
* Queue follow up jobs
*/

View File

@@ -0,0 +1,223 @@
import { BadRequestException } from '@nestjs/common';
import { defaults, SystemConfig } from 'src/config';
import { ImmichWorker, JobName, QueueCommand, QueueName } from 'src/enum';
import { QueueService } from 'src/services/queue.service';
import { newTestService, ServiceMocks } from 'test/utils';
describe(QueueService.name, () => {
let sut: QueueService;
let mocks: ServiceMocks;
beforeEach(() => {
({ sut, mocks } = newTestService(QueueService));
mocks.config.getWorker.mockReturnValue(ImmichWorker.Microservices);
});
it('should work', () => {
expect(sut).toBeDefined();
});
describe('onConfigUpdate', () => {
it('should update concurrency', () => {
sut.onConfigUpdate({ newConfig: defaults, oldConfig: {} as SystemConfig });
expect(mocks.job.setConcurrency).toHaveBeenCalledTimes(16);
expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(5, QueueName.FacialRecognition, 1);
expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(7, QueueName.DuplicateDetection, 1);
expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(8, QueueName.BackgroundTask, 5);
expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(9, QueueName.StorageTemplateMigration, 1);
});
});
describe('handleNightlyJobs', () => {
it('should run the scheduled jobs', async () => {
await sut.handleNightlyJobs();
expect(mocks.job.queueAll).toHaveBeenCalledWith([
{ name: JobName.AssetDeleteCheck },
{ name: JobName.UserDeleteCheck },
{ name: JobName.PersonCleanup },
{ name: JobName.MemoryCleanup },
{ name: JobName.SessionCleanup },
{ name: JobName.AuditTableCleanup },
{ name: JobName.AuditLogCleanup },
{ name: JobName.MemoryGenerate },
{ name: JobName.UserSyncUsage },
{ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force: false } },
{ name: JobName.FacialRecognitionQueueAll, data: { force: false, nightly: true } },
]);
});
});
describe('getAllJobStatus', () => {
it('should get all job statuses', async () => {
mocks.job.getJobCounts.mockResolvedValue({
active: 1,
completed: 1,
failed: 1,
delayed: 1,
waiting: 1,
paused: 1,
});
mocks.job.getQueueStatus.mockResolvedValue({
isActive: true,
isPaused: true,
});
const expectedJobStatus = {
jobCounts: {
active: 1,
completed: 1,
delayed: 1,
failed: 1,
waiting: 1,
paused: 1,
},
queueStatus: {
isActive: true,
isPaused: true,
},
};
await expect(sut.getAll()).resolves.toEqual({
[QueueName.BackgroundTask]: expectedJobStatus,
[QueueName.DuplicateDetection]: expectedJobStatus,
[QueueName.SmartSearch]: expectedJobStatus,
[QueueName.MetadataExtraction]: expectedJobStatus,
[QueueName.Search]: expectedJobStatus,
[QueueName.StorageTemplateMigration]: expectedJobStatus,
[QueueName.Migration]: expectedJobStatus,
[QueueName.ThumbnailGeneration]: expectedJobStatus,
[QueueName.VideoConversion]: expectedJobStatus,
[QueueName.FaceDetection]: expectedJobStatus,
[QueueName.FacialRecognition]: expectedJobStatus,
[QueueName.Sidecar]: expectedJobStatus,
[QueueName.Library]: expectedJobStatus,
[QueueName.Notification]: expectedJobStatus,
[QueueName.BackupDatabase]: expectedJobStatus,
[QueueName.Ocr]: expectedJobStatus,
});
});
});
describe('handleCommand', () => {
it('should handle a pause command', async () => {
await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Pause, force: false });
expect(mocks.job.pause).toHaveBeenCalledWith(QueueName.MetadataExtraction);
});
it('should handle a resume command', async () => {
await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Resume, force: false });
expect(mocks.job.resume).toHaveBeenCalledWith(QueueName.MetadataExtraction);
});
it('should handle an empty command', async () => {
await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Empty, force: false });
expect(mocks.job.empty).toHaveBeenCalledWith(QueueName.MetadataExtraction);
});
it('should not start a job that is already running', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: true, isPaused: false });
await expect(
sut.runCommand(QueueName.VideoConversion, { command: QueueCommand.Start, force: false }),
).rejects.toBeInstanceOf(BadRequestException);
expect(mocks.job.queue).not.toHaveBeenCalled();
expect(mocks.job.queueAll).not.toHaveBeenCalled();
});
it('should handle a start video conversion command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.runCommand(QueueName.VideoConversion, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetEncodeVideoQueueAll, data: { force: false } });
});
it('should handle a start storage template migration command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.runCommand(QueueName.StorageTemplateMigration, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.StorageTemplateMigration });
});
it('should handle a start smart search command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.runCommand(QueueName.SmartSearch, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SmartSearchQueueAll, data: { force: false } });
});
it('should handle a start metadata extraction command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.runCommand(QueueName.MetadataExtraction, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({
name: JobName.AssetExtractMetadataQueueAll,
data: { force: false },
});
});
it('should handle a start sidecar command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.runCommand(QueueName.Sidecar, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.SidecarQueueAll, data: { force: false } });
});
it('should handle a start thumbnail generation command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.runCommand(QueueName.ThumbnailGeneration, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({
name: JobName.AssetGenerateThumbnailsQueueAll,
data: { force: false },
});
});
it('should handle a start face detection command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.runCommand(QueueName.FaceDetection, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.AssetDetectFacesQueueAll, data: { force: false } });
});
it('should handle a start facial recognition command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.runCommand(QueueName.FacialRecognition, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.FacialRecognitionQueueAll, data: { force: false } });
});
it('should handle a start backup database command', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await sut.runCommand(QueueName.BackupDatabase, { command: QueueCommand.Start, force: false });
expect(mocks.job.queue).toHaveBeenCalledWith({ name: JobName.DatabaseBackup, data: { force: false } });
});
it('should throw a bad request when an invalid queue is used', async () => {
mocks.job.getQueueStatus.mockResolvedValue({ isActive: false, isPaused: false });
await expect(
sut.runCommand(QueueName.BackgroundTask, { command: QueueCommand.Start, force: false }),
).rejects.toBeInstanceOf(BadRequestException);
expect(mocks.job.queue).not.toHaveBeenCalled();
expect(mocks.job.queueAll).not.toHaveBeenCalled();
});
});
});

View File

@@ -0,0 +1,250 @@
import { BadRequestException, Injectable } from '@nestjs/common';
import { ClassConstructor } from 'class-transformer';
import { SystemConfig } from 'src/config';
import { OnEvent } from 'src/decorators';
import { QueueCommandDto, QueueResponseDto, QueuesResponseDto } from 'src/dtos/queue.dto';
import {
BootstrapEventPriority,
CronJob,
DatabaseLock,
ImmichWorker,
JobName,
QueueCleanType,
QueueCommand,
QueueName,
} from 'src/enum';
import { ArgOf } from 'src/repositories/event.repository';
import { BaseService } from 'src/services/base.service';
import { ConcurrentQueueName, JobItem } from 'src/types';
import { handlePromiseError } from 'src/utils/misc';
const asNightlyTasksCron = (config: SystemConfig) => {
const [hours, minutes] = config.nightlyTasks.startTime.split(':').map(Number);
return `${minutes} ${hours} * * *`;
};
@Injectable()
export class QueueService extends BaseService {
private services: ClassConstructor<unknown>[] = [];
private nightlyJobsLock = false;
@OnEvent({ name: 'ConfigInit' })
async onConfigInit({ newConfig: config }: ArgOf<'ConfigInit'>) {
if (this.worker === ImmichWorker.Microservices) {
this.updateConcurrency(config);
return;
}
this.nightlyJobsLock = await this.databaseRepository.tryLock(DatabaseLock.NightlyJobs);
if (this.nightlyJobsLock) {
const cronExpression = asNightlyTasksCron(config);
this.logger.debug(`Scheduling nightly jobs for ${cronExpression}`);
this.cronRepository.create({
name: CronJob.NightlyJobs,
expression: cronExpression,
start: true,
onTick: () => handlePromiseError(this.handleNightlyJobs(), this.logger),
});
}
}
@OnEvent({ name: 'ConfigUpdate', server: true })
onConfigUpdate({ newConfig: config }: ArgOf<'ConfigUpdate'>) {
if (this.worker === ImmichWorker.Microservices) {
this.updateConcurrency(config);
return;
}
if (this.nightlyJobsLock) {
const cronExpression = asNightlyTasksCron(config);
this.logger.debug(`Scheduling nightly jobs for ${cronExpression}`);
this.cronRepository.update({ name: CronJob.NightlyJobs, expression: cronExpression, start: true });
}
}
@OnEvent({ name: 'AppBootstrap', priority: BootstrapEventPriority.JobService })
onBootstrap() {
this.jobRepository.setup(this.services);
if (this.worker === ImmichWorker.Microservices) {
this.jobRepository.startWorkers();
}
}
private updateConcurrency(config: SystemConfig) {
this.logger.debug(`Updating queue concurrency settings`);
for (const queueName of Object.values(QueueName)) {
let concurrency = 1;
if (this.isConcurrentQueue(queueName)) {
concurrency = config.job[queueName].concurrency;
}
this.logger.debug(`Setting ${queueName} concurrency to ${concurrency}`);
this.jobRepository.setConcurrency(queueName, concurrency);
}
}
setServices(services: ClassConstructor<unknown>[]) {
this.services = services;
}
async runCommand(name: QueueName, dto: QueueCommandDto): Promise<QueueResponseDto> {
this.logger.debug(`Handling command: queue=${name},command=${dto.command},force=${dto.force}`);
switch (dto.command) {
case QueueCommand.Start: {
await this.start(name, dto);
break;
}
case QueueCommand.Pause: {
await this.jobRepository.pause(name);
break;
}
case QueueCommand.Resume: {
await this.jobRepository.resume(name);
break;
}
case QueueCommand.Empty: {
await this.jobRepository.empty(name);
break;
}
case QueueCommand.ClearFailed: {
const failedJobs = await this.jobRepository.clear(name, QueueCleanType.Failed);
this.logger.debug(`Cleared failed jobs: ${failedJobs}`);
break;
}
}
return this.getByName(name);
}
async getAll(): Promise<QueuesResponseDto> {
const response = new QueuesResponseDto();
for (const name of Object.values(QueueName)) {
response[name] = await this.getByName(name);
}
return response;
}
async getByName(name: QueueName): Promise<QueueResponseDto> {
const [jobCounts, queueStatus] = await Promise.all([
this.jobRepository.getJobCounts(name),
this.jobRepository.getQueueStatus(name),
]);
return { jobCounts, queueStatus };
}
private async start(name: QueueName, { force }: QueueCommandDto): Promise<void> {
const { isActive } = await this.jobRepository.getQueueStatus(name);
if (isActive) {
throw new BadRequestException(`Job is already running`);
}
await this.eventRepository.emit('QueueStart', { name });
switch (name) {
case QueueName.VideoConversion: {
return this.jobRepository.queue({ name: JobName.AssetEncodeVideoQueueAll, data: { force } });
}
case QueueName.StorageTemplateMigration: {
return this.jobRepository.queue({ name: JobName.StorageTemplateMigration });
}
case QueueName.Migration: {
return this.jobRepository.queue({ name: JobName.FileMigrationQueueAll });
}
case QueueName.SmartSearch: {
return this.jobRepository.queue({ name: JobName.SmartSearchQueueAll, data: { force } });
}
case QueueName.DuplicateDetection: {
return this.jobRepository.queue({ name: JobName.AssetDetectDuplicatesQueueAll, data: { force } });
}
case QueueName.MetadataExtraction: {
return this.jobRepository.queue({ name: JobName.AssetExtractMetadataQueueAll, data: { force } });
}
case QueueName.Sidecar: {
return this.jobRepository.queue({ name: JobName.SidecarQueueAll, data: { force } });
}
case QueueName.ThumbnailGeneration: {
return this.jobRepository.queue({ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force } });
}
case QueueName.FaceDetection: {
return this.jobRepository.queue({ name: JobName.AssetDetectFacesQueueAll, data: { force } });
}
case QueueName.FacialRecognition: {
return this.jobRepository.queue({ name: JobName.FacialRecognitionQueueAll, data: { force } });
}
case QueueName.Library: {
return this.jobRepository.queue({ name: JobName.LibraryScanQueueAll, data: { force } });
}
case QueueName.BackupDatabase: {
return this.jobRepository.queue({ name: JobName.DatabaseBackup, data: { force } });
}
case QueueName.Ocr: {
return this.jobRepository.queue({ name: JobName.OcrQueueAll, data: { force } });
}
default: {
throw new BadRequestException(`Invalid job name: ${name}`);
}
}
}
private isConcurrentQueue(name: QueueName): name is ConcurrentQueueName {
return ![
QueueName.FacialRecognition,
QueueName.StorageTemplateMigration,
QueueName.DuplicateDetection,
QueueName.BackupDatabase,
].includes(name);
}
async handleNightlyJobs() {
const config = await this.getConfig({ withCache: false });
const jobs: JobItem[] = [];
if (config.nightlyTasks.databaseCleanup) {
jobs.push(
{ name: JobName.AssetDeleteCheck },
{ name: JobName.UserDeleteCheck },
{ name: JobName.PersonCleanup },
{ name: JobName.MemoryCleanup },
{ name: JobName.SessionCleanup },
{ name: JobName.AuditTableCleanup },
{ name: JobName.AuditLogCleanup },
);
}
if (config.nightlyTasks.generateMemories) {
jobs.push({ name: JobName.MemoryGenerate });
}
if (config.nightlyTasks.syncQuotaUsage) {
jobs.push({ name: JobName.UserSyncUsage });
}
if (config.nightlyTasks.missingThumbnails) {
jobs.push({ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force: false } });
}
if (config.nightlyTasks.clusterNewFaces) {
jobs.push({ name: JobName.FacialRecognitionQueueAll, data: { force: false, nightly: true } });
}
await this.jobRepository.queueAll(jobs);
}
}