Files
immich/server/src/domain/job/job.service.ts

200 lines
7.7 KiB
TypeScript
Raw Normal View History

import { AssetType } from '@app/infra/entities';
import { BadRequestException, Inject, Injectable, Logger } from '@nestjs/common';
import { IAssetRepository, mapAsset } from '../asset';
import { CommunicationEvent, ICommunicationRepository } from '../communication';
2023-03-24 00:55:15 -04:00
import { assertMachineLearningEnabled } from '../domain.constant';
import { ISystemConfigRepository } from '../system-config';
import { SystemConfigCore } from '../system-config/system-config.core';
import { JobCommand, JobName, QueueName } from './job.constants';
2023-07-01 00:41:12 -04:00
import { AllJobStatusResponseDto, JobCommandDto, JobStatusDto } from './job.dto';
import { IJobRepository, JobHandler, JobItem } from './job.repository';
@Injectable()
export class JobService {
private logger = new Logger(JobService.name);
private configCore: SystemConfigCore;
constructor(
@Inject(IAssetRepository) private assetRepository: IAssetRepository,
@Inject(ICommunicationRepository) private communicationRepository: ICommunicationRepository,
@Inject(IJobRepository) private jobRepository: IJobRepository,
@Inject(ISystemConfigRepository) configRepository: ISystemConfigRepository,
) {
this.configCore = new SystemConfigCore(configRepository);
}
2023-05-17 13:07:17 -04:00
async handleCommand(queueName: QueueName, dto: JobCommandDto): Promise<JobStatusDto> {
this.logger.debug(`Handling command: queue=${queueName},force=${dto.force}`);
switch (dto.command) {
case JobCommand.START:
await this.start(queueName, dto);
break;
case JobCommand.PAUSE:
await this.jobRepository.pause(queueName);
break;
case JobCommand.RESUME:
await this.jobRepository.resume(queueName);
break;
case JobCommand.EMPTY:
await this.jobRepository.empty(queueName);
break;
}
return this.getJobStatus(queueName);
}
async getJobStatus(queueName: QueueName): Promise<JobStatusDto> {
const [jobCounts, queueStatus] = await Promise.all([
this.jobRepository.getJobCounts(queueName),
this.jobRepository.getQueueStatus(queueName),
]);
return { jobCounts, queueStatus };
}
async getAllJobsStatus(): Promise<AllJobStatusResponseDto> {
const response = new AllJobStatusResponseDto();
for (const queueName of Object.values(QueueName)) {
response[queueName] = await this.getJobStatus(queueName);
}
return response;
}
private async start(name: QueueName, { force }: JobCommandDto): Promise<void> {
const { isActive } = await this.jobRepository.getQueueStatus(name);
if (isActive) {
throw new BadRequestException(`Job is already running`);
}
switch (name) {
case QueueName.VIDEO_CONVERSION:
return this.jobRepository.queue({ name: JobName.QUEUE_VIDEO_CONVERSION, data: { force } });
case QueueName.STORAGE_TEMPLATE_MIGRATION:
return this.jobRepository.queue({ name: JobName.STORAGE_TEMPLATE_MIGRATION });
case QueueName.OBJECT_TAGGING:
assertMachineLearningEnabled();
return this.jobRepository.queue({ name: JobName.QUEUE_OBJECT_TAGGING, data: { force } });
case QueueName.CLIP_ENCODING:
assertMachineLearningEnabled();
return this.jobRepository.queue({ name: JobName.QUEUE_ENCODE_CLIP, data: { force } });
case QueueName.METADATA_EXTRACTION:
return this.jobRepository.queue({ name: JobName.QUEUE_METADATA_EXTRACTION, data: { force } });
feat(server): xmp sidecar metadata (#2466) * initial commit for XMP sidecar support * Added support for 'missing' metadata files to include those without sidecar files, now detects sidecar files in the filesystem for media already ingested but the sidecar was created afterwards * didn't mean to commit default log level during testing * new sidecar logic for video metadata as well * Added xml mimetype for sidecars only * don't need capture group for this regex * wrong default value reverted * simplified the move here - keep it in the same try catch since the outcome is to move the media back anyway * simplified setter logic Co-authored-by: Jason Rasmussen <jrasm91@gmail.com> * simplified logic per suggestions * sidecar is now its own queue with a discover and sync, updated UI for the new job queueing * queue a sidecar job for every asset based on discovery or sync, though the logic is almost identical aside from linking the sidecar * now queue sidecar jobs for each assset, though logic is mostly the same between discovery and sync * simplified logic of filename extraction and asset instantiation * not sure how that got deleted.. * updated code per suggestions and comments in the PR * stat was not being used, removed the variable set * better type checking, using in-scope variables for exif getter instead of passing in every time * removed commented out test * ran and resolved all lints, formats, checks, and tests * resolved suggested change in PR * made getExifProperty more dynamic with multiple possible args for fallbacks, fixed typo, used generic in function for better type checking * better error handling and moving files back to positions on move or save failure * regenerated api * format fixes * Added XMP documentation * documentation typo * Merged in main * missed merge conflict * more changes due to a merge * Resolving conflicts * added icon for sidecar jobs --------- Co-authored-by: Jason Rasmussen <jrasm91@gmail.com> Co-authored-by: Alex Tran <alex.tran1502@gmail.com>
2023-05-24 21:59:30 -04:00
case QueueName.SIDECAR:
return this.jobRepository.queue({ name: JobName.QUEUE_SIDECAR, data: { force } });
case QueueName.THUMBNAIL_GENERATION:
return this.jobRepository.queue({ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force } });
2023-05-17 13:07:17 -04:00
case QueueName.RECOGNIZE_FACES:
return this.jobRepository.queue({ name: JobName.QUEUE_RECOGNIZE_FACES, data: { force } });
default:
throw new BadRequestException(`Invalid job name: ${name}`);
}
}
async registerHandlers(jobHandlers: Record<JobName, JobHandler>) {
const config = await this.configCore.getConfig();
for (const queueName of Object.values(QueueName)) {
const concurrency = config.job[queueName].concurrency;
this.logger.debug(`Registering ${queueName} with a concurrency of ${concurrency}`);
this.jobRepository.addHandler(queueName, concurrency, async (item: JobItem): Promise<void> => {
const { name, data } = item;
try {
const handler = jobHandlers[name];
const success = await handler(data);
if (success) {
await this.onDone(item);
}
} catch (error: Error | any) {
this.logger.error(`Unable to run job handler: ${error}`, error?.stack, data);
}
});
}
this.configCore.config$.subscribe((config) => {
this.logger.log(`Updating queue concurrency settings`);
for (const queueName of Object.values(QueueName)) {
const concurrency = config.job[queueName].concurrency;
this.logger.debug(`Setting ${queueName} concurrency to ${concurrency}`);
this.jobRepository.setConcurrency(queueName, concurrency);
}
});
}
async handleNightlyJobs() {
await this.jobRepository.queue({ name: JobName.USER_DELETE_CHECK });
await this.jobRepository.queue({ name: JobName.PERSON_CLEANUP });
await this.jobRepository.queue({ name: JobName.QUEUE_GENERATE_THUMBNAILS, data: { force: false } });
await this.jobRepository.queue({ name: JobName.CLEAN_OLD_AUDIT_LOGS });
}
/**
* Queue follow up jobs
*/
2023-06-01 16:07:45 -04:00
private async onDone(item: JobItem) {
switch (item.name) {
case JobName.SIDECAR_SYNC:
case JobName.SIDECAR_DISCOVERY:
2023-06-01 16:07:45 -04:00
await this.jobRepository.queue({ name: JobName.METADATA_EXTRACTION, data: item.data });
break;
case JobName.METADATA_EXTRACTION:
await this.jobRepository.queue({ name: JobName.LINK_LIVE_PHOTOS, data: item.data });
break;
case JobName.LINK_LIVE_PHOTOS:
await this.jobRepository.queue({ name: JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE, data: item.data });
break;
case JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE:
if (item.data.source === 'upload') {
await this.jobRepository.queue({ name: JobName.GENERATE_JPEG_THUMBNAIL, data: item.data });
}
break;
case JobName.GENERATE_JPEG_THUMBNAIL: {
await this.jobRepository.queue({ name: JobName.GENERATE_WEBP_THUMBNAIL, data: item.data });
await this.jobRepository.queue({ name: JobName.GENERATE_THUMBHASH_THUMBNAIL, data: item.data });
await this.jobRepository.queue({ name: JobName.CLASSIFY_IMAGE, data: item.data });
await this.jobRepository.queue({ name: JobName.ENCODE_CLIP, data: item.data });
await this.jobRepository.queue({ name: JobName.RECOGNIZE_FACES, data: item.data });
if (item.data.source !== 'upload') {
break;
}
const [asset] = await this.assetRepository.getByIds([item.data.id]);
if (asset) {
if (asset.type === AssetType.VIDEO) {
await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: item.data });
} else if (asset.livePhotoVideoId) {
await this.jobRepository.queue({ name: JobName.VIDEO_CONVERSION, data: { id: asset.livePhotoVideoId } });
}
this.communicationRepository.send(CommunicationEvent.UPLOAD_SUCCESS, asset.ownerId, mapAsset(asset));
}
break;
}
}
// In addition to the above jobs, all of these should queue `SEARCH_INDEX_ASSET`
switch (item.name) {
case JobName.CLASSIFY_IMAGE:
case JobName.ENCODE_CLIP:
case JobName.RECOGNIZE_FACES:
case JobName.LINK_LIVE_PHOTOS:
await this.jobRepository.queue({ name: JobName.SEARCH_INDEX_ASSET, data: { ids: [item.data.id] } });
break;
}
}
}