From 2962c54ee26bd4b5b97aebc3844ca566e71ff838 Mon Sep 17 00:00:00 2001 From: izzy Date: Thu, 18 Dec 2025 17:08:45 +0000 Subject: [PATCH] refactor: queue separate jobs for deleting integrity reports --- open-api/immich-openapi-specs.json | 3 +- server/src/enum.ts | 3 +- server/src/services/integrity.service.ts | 78 ++++++++++++++---------- server/src/services/job.service.ts | 6 +- server/src/types.ts | 14 ++++- 5 files changed, 65 insertions(+), 39 deletions(-) diff --git a/open-api/immich-openapi-specs.json b/open-api/immich-openapi-specs.json index a479c29370..168ef1fdef 100644 --- a/open-api/immich-openapi-specs.json +++ b/open-api/immich-openapi-specs.json @@ -17033,7 +17033,8 @@ "IntegrityMissingFilesRefresh", "IntegrityChecksumFiles", "IntegrityChecksumFilesRefresh", - "IntegrityReportDelete" + "IntegrityDeleteReportType", + "IntegrityDeleteReports" ], "type": "string" }, diff --git a/server/src/enum.ts b/server/src/enum.ts index d70427fe52..9862bce520 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -665,7 +665,8 @@ export enum JobName { IntegrityMissingFilesRefresh = 'IntegrityMissingFilesRefresh', IntegrityChecksumFiles = 'IntegrityChecksumFiles', IntegrityChecksumFilesRefresh = 'IntegrityChecksumFilesRefresh', - IntegrityReportDelete = 'IntegrityReportDelete', + IntegrityDeleteReportType = 'IntegrityDeleteReportType', + IntegrityDeleteReports = 'IntegrityDeleteReports', } export enum QueueCommand { diff --git a/server/src/services/integrity.service.ts b/server/src/services/integrity.service.ts index cc1684da52..da20d9a205 100644 --- a/server/src/services/integrity.service.ts +++ b/server/src/services/integrity.service.ts @@ -27,7 +27,8 @@ import { import { ArgOf } from 'src/repositories/event.repository'; import { BaseService } from 'src/services/base.service'; import { - IIntegrityDeleteReportJob, + IIntegrityDeleteReportsJob, + IIntegrityDeleteReportTypeJob, IIntegrityJob, IIntegrityMissingFilesJob, IIntegrityOrphanedFilesJob, @@ -622,8 +623,8 @@ export class IntegrityService extends BaseService { return JobStatus.Success; } - @OnJob({ name: JobName.IntegrityReportDelete, queue: QueueName.IntegrityCheck }) - async handleDeleteIntegrityReport({ type }: IIntegrityDeleteReportJob): Promise { + @OnJob({ name: JobName.IntegrityDeleteReportType, queue: QueueName.IntegrityCheck }) + async handleDeleteAllIntegrityReports({ type }: IIntegrityDeleteReportTypeJob): Promise { this.logger.log(`Deleting all entries for ${type ?? 'all types of'} integrity report`); let properties; @@ -648,39 +649,52 @@ export class IntegrityService extends BaseService { for (const property of properties) { const reports = this.integrityRepository.streamIntegrityReportsByProperty(property, type); - for await (const report of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) { - // todo: queue sub-job here instead? + for await (const batch of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) { + await this.jobRepository.queue({ + name: JobName.IntegrityDeleteReports, + data: { + reports: batch, + }, + }); - switch (property) { - case 'assetId': { - const ids = report.map(({ assetId }) => assetId!); - await this.assetRepository.updateAll(ids, { - deletedAt: new Date(), - status: AssetStatus.Trashed, - }); - - await this.eventRepository.emit('AssetTrashAll', { - assetIds: ids, - userId: '', // ??? - }); - - await this.integrityRepository.deleteByIds(report.map(({ id }) => id)); - break; - } - case 'fileAssetId': { - await this.assetRepository.deleteFiles(report.map(({ fileAssetId }) => ({ id: fileAssetId! }))); - break; - } - default: { - await Promise.all(report.map(({ path }) => this.storageRepository.unlink(path).catch(() => void 0))); - await this.integrityRepository.deleteByIds(report.map(({ id }) => id)); - break; - } - } + this.logger.log(`Queued ${batch.length} reports to delete.`); } } - this.logger.log('Finished deleting integrity report.'); + return JobStatus.Success; + } + + @OnJob({ name: JobName.IntegrityDeleteReports, queue: QueueName.IntegrityCheck }) + async handleDeleteIntegrityReports({ reports }: IIntegrityDeleteReportsJob): Promise { + const byAsset = reports.filter((report) => report.assetId); + const byFileAsset = reports.filter((report) => report.fileAssetId); + const byPath = reports.filter((report) => !report.assetId && !report.fileAssetId); + + if (byAsset.length > 0) { + const ids = byAsset.map(({ assetId }) => assetId!); + await this.assetRepository.updateAll(ids, { + deletedAt: new Date(), + status: AssetStatus.Trashed, + }); + + await this.eventRepository.emit('AssetTrashAll', { + assetIds: ids, + userId: '', // we don't notify any users currently + }); + + await this.integrityRepository.deleteByIds(byAsset.map(({ id }) => id)); + } + + if (byFileAsset.length > 0) { + await this.assetRepository.deleteFiles(byFileAsset.map(({ fileAssetId }) => ({ id: fileAssetId! }))); + } + + if (byPath.length > 0) { + await Promise.all(byPath.map(({ path }) => this.storageRepository.unlink(path).catch(() => void 0))); + await this.integrityRepository.deleteByIds(byPath.map(({ id }) => id)); + } + + this.logger.log(`Deleted ${reports.length} reports.`); return JobStatus.Success; } } diff --git a/server/src/services/job.service.ts b/server/src/services/job.service.ts index d0b41572b6..1cd9ea2e1f 100644 --- a/server/src/services/job.service.ts +++ b/server/src/services/job.service.ts @@ -59,15 +59,15 @@ const asJobItem = (dto: JobCreateDto): JobItem => { } case ManualJobName.IntegrityMissingFilesDeleteAll: { - return { name: JobName.IntegrityReportDelete, data: { type: IntegrityReportType.MissingFile } }; + return { name: JobName.IntegrityDeleteReportType, data: { type: IntegrityReportType.MissingFile } }; } case ManualJobName.IntegrityOrphanFilesDeleteAll: { - return { name: JobName.IntegrityReportDelete, data: { type: IntegrityReportType.OrphanFile } }; + return { name: JobName.IntegrityDeleteReportType, data: { type: IntegrityReportType.OrphanFile } }; } case ManualJobName.IntegrityChecksumFilesDeleteAll: { - return { name: JobName.IntegrityReportDelete, data: { type: IntegrityReportType.ChecksumFail } }; + return { name: JobName.IntegrityDeleteReportType, data: { type: IntegrityReportType.ChecksumFail } }; } default: { diff --git a/server/src/types.ts b/server/src/types.ts index 6ff66f4a3a..4c61f33bab 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -286,10 +286,19 @@ export interface IIntegrityJob { refreshOnly?: boolean; } -export interface IIntegrityDeleteReportJob { +export interface IIntegrityDeleteReportTypeJob { type?: IntegrityReportType; } +export interface IIntegrityDeleteReportsJob { + reports: { + id: string; + assetId: string | null; + fileAssetId: string | null; + path: string; + }[]; +} + export interface IIntegrityOrphanedFilesJob { type: 'asset' | 'asset_file'; paths: string[]; @@ -432,7 +441,8 @@ export type JobItem = | { name: JobName.IntegrityMissingFilesRefresh; data: IIntegrityPathWithReportJob } | { name: JobName.IntegrityChecksumFiles; data?: IIntegrityJob } | { name: JobName.IntegrityChecksumFilesRefresh; data?: IIntegrityPathWithChecksumJob } - | { name: JobName.IntegrityReportDelete; data: IIntegrityDeleteReportJob }; + | { name: JobName.IntegrityDeleteReportType; data: IIntegrityDeleteReportTypeJob } + | { name: JobName.IntegrityDeleteReports; data: IIntegrityDeleteReportsJob }; export type VectorExtension = (typeof VECTOR_EXTENSIONS)[number];