mirror of
https://github.com/immich-app/immich.git
synced 2025-12-29 17:25:00 +03:00
refactor(server): job discovery (#13838)
refactor(server): job discorvery
This commit is contained in:
@@ -1,124 +1,122 @@
|
||||
import { getQueueToken } from '@nestjs/bullmq';
|
||||
import { Inject, Injectable } from '@nestjs/common';
|
||||
import { ModuleRef } from '@nestjs/core';
|
||||
import { ModuleRef, Reflector } from '@nestjs/core';
|
||||
import { SchedulerRegistry } from '@nestjs/schedule';
|
||||
import { Job, JobsOptions, Processor, Queue, Worker, WorkerOptions } from 'bullmq';
|
||||
import { JobsOptions, Queue, Worker } from 'bullmq';
|
||||
import { ClassConstructor } from 'class-transformer';
|
||||
import { CronJob, CronTime } from 'cron';
|
||||
import { setTimeout } from 'node:timers/promises';
|
||||
import { JobConfig } from 'src/decorators';
|
||||
import { MetadataKey } from 'src/enum';
|
||||
import { IConfigRepository } from 'src/interfaces/config.interface';
|
||||
import { IEventRepository } from 'src/interfaces/event.interface';
|
||||
import {
|
||||
IEntityJob,
|
||||
IJobRepository,
|
||||
JobCounts,
|
||||
JobItem,
|
||||
JobName,
|
||||
JobOf,
|
||||
JobStatus,
|
||||
QueueCleanType,
|
||||
QueueName,
|
||||
QueueStatus,
|
||||
} from 'src/interfaces/job.interface';
|
||||
import { ILoggerRepository } from 'src/interfaces/logger.interface';
|
||||
import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc';
|
||||
|
||||
export const JOBS_TO_QUEUE: Record<JobName, QueueName> = {
|
||||
// misc
|
||||
[JobName.ASSET_DELETION]: QueueName.BACKGROUND_TASK,
|
||||
[JobName.ASSET_DELETION_CHECK]: QueueName.BACKGROUND_TASK,
|
||||
[JobName.USER_DELETE_CHECK]: QueueName.BACKGROUND_TASK,
|
||||
[JobName.USER_DELETION]: QueueName.BACKGROUND_TASK,
|
||||
[JobName.DELETE_FILES]: QueueName.BACKGROUND_TASK,
|
||||
[JobName.CLEAN_OLD_AUDIT_LOGS]: QueueName.BACKGROUND_TASK,
|
||||
[JobName.CLEAN_OLD_SESSION_TOKENS]: QueueName.BACKGROUND_TASK,
|
||||
[JobName.PERSON_CLEANUP]: QueueName.BACKGROUND_TASK,
|
||||
[JobName.USER_SYNC_USAGE]: QueueName.BACKGROUND_TASK,
|
||||
|
||||
// backups
|
||||
[JobName.BACKUP_DATABASE]: QueueName.BACKUP_DATABASE,
|
||||
|
||||
// conversion
|
||||
[JobName.QUEUE_VIDEO_CONVERSION]: QueueName.VIDEO_CONVERSION,
|
||||
[JobName.VIDEO_CONVERSION]: QueueName.VIDEO_CONVERSION,
|
||||
|
||||
// thumbnails
|
||||
[JobName.QUEUE_GENERATE_THUMBNAILS]: QueueName.THUMBNAIL_GENERATION,
|
||||
[JobName.GENERATE_THUMBNAILS]: QueueName.THUMBNAIL_GENERATION,
|
||||
[JobName.GENERATE_PERSON_THUMBNAIL]: QueueName.THUMBNAIL_GENERATION,
|
||||
|
||||
// tags
|
||||
[JobName.TAG_CLEANUP]: QueueName.BACKGROUND_TASK,
|
||||
|
||||
// metadata
|
||||
[JobName.QUEUE_METADATA_EXTRACTION]: QueueName.METADATA_EXTRACTION,
|
||||
[JobName.METADATA_EXTRACTION]: QueueName.METADATA_EXTRACTION,
|
||||
[JobName.LINK_LIVE_PHOTOS]: QueueName.METADATA_EXTRACTION,
|
||||
|
||||
// storage template
|
||||
[JobName.STORAGE_TEMPLATE_MIGRATION]: QueueName.STORAGE_TEMPLATE_MIGRATION,
|
||||
[JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE]: QueueName.STORAGE_TEMPLATE_MIGRATION,
|
||||
|
||||
// migration
|
||||
[JobName.QUEUE_MIGRATION]: QueueName.MIGRATION,
|
||||
[JobName.MIGRATE_ASSET]: QueueName.MIGRATION,
|
||||
[JobName.MIGRATE_PERSON]: QueueName.MIGRATION,
|
||||
|
||||
// facial recognition
|
||||
[JobName.QUEUE_FACE_DETECTION]: QueueName.FACE_DETECTION,
|
||||
[JobName.FACE_DETECTION]: QueueName.FACE_DETECTION,
|
||||
[JobName.QUEUE_FACIAL_RECOGNITION]: QueueName.FACIAL_RECOGNITION,
|
||||
[JobName.FACIAL_RECOGNITION]: QueueName.FACIAL_RECOGNITION,
|
||||
|
||||
// smart search
|
||||
[JobName.QUEUE_SMART_SEARCH]: QueueName.SMART_SEARCH,
|
||||
[JobName.SMART_SEARCH]: QueueName.SMART_SEARCH,
|
||||
|
||||
// duplicate detection
|
||||
[JobName.QUEUE_DUPLICATE_DETECTION]: QueueName.DUPLICATE_DETECTION,
|
||||
[JobName.DUPLICATE_DETECTION]: QueueName.DUPLICATE_DETECTION,
|
||||
|
||||
// XMP sidecars
|
||||
[JobName.QUEUE_SIDECAR]: QueueName.SIDECAR,
|
||||
[JobName.SIDECAR_DISCOVERY]: QueueName.SIDECAR,
|
||||
[JobName.SIDECAR_SYNC]: QueueName.SIDECAR,
|
||||
[JobName.SIDECAR_WRITE]: QueueName.SIDECAR,
|
||||
|
||||
// Library management
|
||||
[JobName.LIBRARY_SYNC_FILE]: QueueName.LIBRARY,
|
||||
[JobName.LIBRARY_QUEUE_SYNC_FILES]: QueueName.LIBRARY,
|
||||
[JobName.LIBRARY_QUEUE_SYNC_ASSETS]: QueueName.LIBRARY,
|
||||
[JobName.LIBRARY_DELETE]: QueueName.LIBRARY,
|
||||
[JobName.LIBRARY_SYNC_ASSET]: QueueName.LIBRARY,
|
||||
[JobName.LIBRARY_QUEUE_SYNC_ALL]: QueueName.LIBRARY,
|
||||
[JobName.LIBRARY_QUEUE_CLEANUP]: QueueName.LIBRARY,
|
||||
|
||||
// Notification
|
||||
[JobName.SEND_EMAIL]: QueueName.NOTIFICATION,
|
||||
[JobName.NOTIFY_ALBUM_INVITE]: QueueName.NOTIFICATION,
|
||||
[JobName.NOTIFY_ALBUM_UPDATE]: QueueName.NOTIFICATION,
|
||||
[JobName.NOTIFY_SIGNUP]: QueueName.NOTIFICATION,
|
||||
|
||||
// Version check
|
||||
[JobName.VERSION_CHECK]: QueueName.BACKGROUND_TASK,
|
||||
|
||||
// Trash
|
||||
[JobName.QUEUE_TRASH_EMPTY]: QueueName.BACKGROUND_TASK,
|
||||
type JobMapItem = {
|
||||
jobName: JobName;
|
||||
queueName: QueueName;
|
||||
handler: (job: JobOf<any>) => Promise<JobStatus>;
|
||||
label: string;
|
||||
};
|
||||
|
||||
@Injectable()
|
||||
export class JobRepository implements IJobRepository {
|
||||
private workers: Partial<Record<QueueName, Worker>> = {};
|
||||
private handlers: Partial<Record<JobName, JobMapItem>> = {};
|
||||
|
||||
constructor(
|
||||
private moduleReference: ModuleRef,
|
||||
private schedulerReqistry: SchedulerRegistry,
|
||||
private moduleRef: ModuleRef,
|
||||
private schedulerRegistry: SchedulerRegistry,
|
||||
@Inject(IConfigRepository) private configRepository: IConfigRepository,
|
||||
@Inject(IEventRepository) private eventRepository: IEventRepository,
|
||||
@Inject(ILoggerRepository) private logger: ILoggerRepository,
|
||||
) {
|
||||
this.logger.setContext(JobRepository.name);
|
||||
}
|
||||
|
||||
addHandler(queueName: QueueName, concurrency: number, handler: (item: JobItem) => Promise<void>) {
|
||||
setup({ services }: { services: ClassConstructor<unknown>[] }) {
|
||||
const reflector = this.moduleRef.get(Reflector, { strict: false });
|
||||
|
||||
// discovery
|
||||
for (const Service of services) {
|
||||
const instance = this.moduleRef.get<any>(Service);
|
||||
for (const methodName of getMethodNames(instance)) {
|
||||
const handler = instance[methodName];
|
||||
const config = reflector.get<JobConfig>(MetadataKey.JOB_CONFIG, handler);
|
||||
if (!config) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const { name: jobName, queue: queueName } = config;
|
||||
const label = `${Service.name}.${handler.name}`;
|
||||
|
||||
// one handler per job
|
||||
if (this.handlers[jobName]) {
|
||||
const jobKey = getKeyByValue(JobName, jobName);
|
||||
const errorMessage = `Failed to add job handler for ${label}`;
|
||||
this.logger.error(
|
||||
`${errorMessage}. JobName.${jobKey} is already handled by ${this.handlers[jobName].label}.`,
|
||||
);
|
||||
throw new ImmichStartupError(errorMessage);
|
||||
}
|
||||
|
||||
this.handlers[jobName] = {
|
||||
label,
|
||||
jobName,
|
||||
queueName,
|
||||
handler: handler.bind(instance),
|
||||
};
|
||||
|
||||
this.logger.verbose(`Added job handler: ${jobName} => ${label}`);
|
||||
}
|
||||
}
|
||||
|
||||
// no missing handlers
|
||||
for (const [jobKey, jobName] of Object.entries(JobName)) {
|
||||
const item = this.handlers[jobName];
|
||||
if (!item) {
|
||||
const errorMessage = `Failed to find job handler for Job.${jobKey} ("${jobName}")`;
|
||||
this.logger.error(
|
||||
`${errorMessage}. Make sure to add the @OnJob({ name: JobName.${jobKey}, queue: QueueName.XYZ }) decorator for the new job.`,
|
||||
);
|
||||
throw new ImmichStartupError(errorMessage);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
startWorkers() {
|
||||
const { bull } = this.configRepository.getEnv();
|
||||
const workerHandler: Processor = async (job: Job) => handler(job as JobItem);
|
||||
const workerOptions: WorkerOptions = { ...bull.config, concurrency };
|
||||
this.workers[queueName] = new Worker(queueName, workerHandler, workerOptions);
|
||||
for (const queueName of Object.values(QueueName)) {
|
||||
this.logger.debug(`Starting worker for queue: ${queueName}`);
|
||||
this.workers[queueName] = new Worker(
|
||||
queueName,
|
||||
(job) => this.eventRepository.emit('job.start', queueName, job as JobItem),
|
||||
{ ...bull.config, concurrency: 1 },
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async run({ name, data }: JobItem) {
|
||||
const item = this.handlers[name as JobName];
|
||||
if (!item) {
|
||||
this.logger.warn(`Skipping unknown job: "${name}"`);
|
||||
return JobStatus.SKIPPED;
|
||||
}
|
||||
|
||||
return item.handler(data);
|
||||
}
|
||||
|
||||
addCronJob(name: string, expression: string, onTick: () => void, start = true): void {
|
||||
@@ -141,11 +139,11 @@ export class JobRepository implements IJobRepository {
|
||||
true,
|
||||
);
|
||||
|
||||
this.schedulerReqistry.addCronJob(name, job);
|
||||
this.schedulerRegistry.addCronJob(name, job);
|
||||
}
|
||||
|
||||
updateCronJob(name: string, expression?: string, start?: boolean): void {
|
||||
const job = this.schedulerReqistry.getCronJob(name);
|
||||
const job = this.schedulerRegistry.getCronJob(name);
|
||||
if (expression) {
|
||||
job.setTime(new CronTime(expression));
|
||||
}
|
||||
@@ -204,6 +202,10 @@ export class JobRepository implements IJobRepository {
|
||||
) as unknown as Promise<JobCounts>;
|
||||
}
|
||||
|
||||
private getQueueName(name: JobName) {
|
||||
return (this.handlers[name] as JobMapItem).queueName;
|
||||
}
|
||||
|
||||
async queueAll(items: JobItem[]): Promise<void> {
|
||||
if (items.length === 0) {
|
||||
return;
|
||||
@@ -212,7 +214,7 @@ export class JobRepository implements IJobRepository {
|
||||
const promises = [];
|
||||
const itemsByQueue = {} as Record<string, (JobItem & { data: any; options: JobsOptions | undefined })[]>;
|
||||
for (const item of items) {
|
||||
const queueName = JOBS_TO_QUEUE[item.name];
|
||||
const queueName = this.getQueueName(item.name);
|
||||
const job = {
|
||||
name: item.name,
|
||||
data: item.data || {},
|
||||
@@ -273,11 +275,11 @@ export class JobRepository implements IJobRepository {
|
||||
}
|
||||
|
||||
private getQueue(queue: QueueName): Queue {
|
||||
return this.moduleReference.get<Queue>(getQueueToken(queue), { strict: false });
|
||||
return this.moduleRef.get<Queue>(getQueueToken(queue), { strict: false });
|
||||
}
|
||||
|
||||
public async removeJob(jobId: string, name: JobName): Promise<IEntityJob | undefined> {
|
||||
const existingJob = await this.getQueue(JOBS_TO_QUEUE[name]).getJob(jobId);
|
||||
const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobId);
|
||||
if (!existingJob) {
|
||||
return;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user