From ef7d8e94fa00eb26e9f5cc4992a616b024274981 Mon Sep 17 00:00:00 2001 From: izzy Date: Thu, 27 Nov 2025 15:40:14 +0000 Subject: [PATCH] feat: check orphaned file reports are not out of date --- open-api/immich-openapi-specs.json | 1 + server/src/enum.ts | 1 + .../src/repositories/asset-job.repository.ts | 5 ++ server/src/services/integrity.service.ts | 79 ++++++++++++++----- server/src/types.ts | 7 +- 5 files changed, 70 insertions(+), 23 deletions(-) diff --git a/open-api/immich-openapi-specs.json b/open-api/immich-openapi-specs.json index cc311e9335..9fe4664352 100644 --- a/open-api/immich-openapi-specs.json +++ b/open-api/immich-openapi-specs.json @@ -16663,6 +16663,7 @@ "WorkflowRun", "IntegrityOrphanedFilesQueueAll", "IntegrityOrphanedFiles", + "IntegrityOrphanedCheckReports", "IntegrityMissingFilesQueueAll", "IntegrityMissingFiles", "IntegrityChecksumFiles" diff --git a/server/src/enum.ts b/server/src/enum.ts index 145067b5e3..40f7f45495 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -648,6 +648,7 @@ export enum JobName { // Integrity IntegrityOrphanedFilesQueueAll = 'IntegrityOrphanedFilesQueueAll', IntegrityOrphanedFiles = 'IntegrityOrphanedFiles', + IntegrityOrphanedCheckReports = 'IntegrityOrphanedCheckReports', IntegrityMissingFilesQueueAll = 'IntegrityMissingFilesQueueAll', IntegrityMissingFiles = 'IntegrityMissingFiles', IntegrityChecksumFiles = 'IntegrityChecksumFiles', diff --git a/server/src/repositories/asset-job.repository.ts b/server/src/repositories/asset-job.repository.ts index 79727520b1..8d5292ca0f 100644 --- a/server/src/repositories/asset-job.repository.ts +++ b/server/src/repositories/asset-job.repository.ts @@ -319,6 +319,11 @@ export class AssetJobRepository { .stream(); } + @GenerateSql({ params: [DummyValue.STRING], stream: true }) + streamIntegrityReports(type: IntegrityReportType) { + return this.db.selectFrom('integrity_report').select(['id as reportId', 'path']).where('type', '=', type).stream(); + } + @GenerateSql({ params: [], stream: true }) streamForVideoConversion(force?: boolean) { return this.db diff --git a/server/src/services/integrity.service.ts b/server/src/services/integrity.service.ts index 5a093f0828..924957f52a 100644 --- a/server/src/services/integrity.service.ts +++ b/server/src/services/integrity.service.ts @@ -18,7 +18,23 @@ import { } from 'src/enum'; import { ArgOf } from 'src/repositories/event.repository'; import { BaseService } from 'src/services/base.service'; -import { IIntegrityMissingFilesJob, IIntegrityOrphanedFilesJob } from 'src/types'; +import { IIntegrityOrphanedFilesJob, IIntegrityPathWithReportJob } from 'src/types'; + +async function* chunk(generator: AsyncIterableIterator, n: number) { + let chunk: T[] = []; + for await (const item of generator) { + chunk.push(item); + + if (chunk.length === n) { + yield chunk; + chunk = []; + } + } + + if (chunk.length) { + yield chunk; + } +} @Injectable() export class IntegrityService extends BaseService { @@ -72,6 +88,23 @@ export class IntegrityService extends BaseService { @OnJob({ name: JobName.IntegrityOrphanedFilesQueueAll, queue: QueueName.BackgroundTask }) async handleOrphanedFilesQueueAll(): Promise { + this.logger.log(`Checking for out of date orphaned file reports...`); + + const reports = this.assetJobRepository.streamIntegrityReports(IntegrityReportType.OrphanFile); + + let total = 0; + for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) { + await this.jobRepository.queue({ + name: JobName.IntegrityOrphanedCheckReports, + data: { + items: batchReports, + }, + }); + + total += batchReports.length; + this.logger.log(`Queued report check of ${batchReports.length} report(s) (${total} so far)`); + } + this.logger.log(`Scanning for orphaned files...`); const assetPaths = this.storageRepository.walk({ @@ -98,7 +131,7 @@ export class IntegrityService extends BaseService { } } - let total = 0; + total = 0; for await (const [batchType, batchPaths] of paths()) { await this.jobRepository.queue({ name: JobName.IntegrityOrphanedFiles, @@ -149,34 +182,40 @@ export class IntegrityService extends BaseService { return JobStatus.Success; } + @OnJob({ name: JobName.IntegrityOrphanedCheckReports, queue: QueueName.BackgroundTask }) + async handleOrphanedCheckReports({ items: paths }: IIntegrityPathWithReportJob): Promise { + this.logger.log(`Processing batch of ${paths.length} reports to check if they are out of date.`); + + const results = await Promise.all( + paths.map(({ reportId, path }) => + stat(path) + .then(() => reportId) + .catch(() => void 0), + ), + ); + + const reportIds = results.filter((reportId) => reportId) as string[]; + + if (reportIds.length) { + await this.integrityReportRepository.deleteByIds(reportIds); + } + + this.logger.log(`Processed ${paths.length} and found ${reportIds.length} orphaned file(s).`); + return JobStatus.Success; + } + @OnJob({ name: JobName.IntegrityMissingFilesQueueAll, queue: QueueName.BackgroundTask }) async handleMissingFilesQueueAll(): Promise { this.logger.log(`Scanning for missing files...`); const assetPaths = this.assetJobRepository.streamAssetPaths(); - async function* chunk(generator: AsyncIterableIterator, n: number) { - let chunk: T[] = []; - for await (const item of generator) { - chunk.push(item); - - if (chunk.length === n) { - yield chunk; - chunk = []; - } - } - - if (chunk.length) { - yield chunk; - } - } - let total = 0; for await (const batchPaths of chunk(assetPaths, JOBS_LIBRARY_PAGINATION_SIZE)) { await this.jobRepository.queue({ name: JobName.IntegrityMissingFiles, data: { - paths: batchPaths, + items: batchPaths, }, }); @@ -188,7 +227,7 @@ export class IntegrityService extends BaseService { } @OnJob({ name: JobName.IntegrityMissingFiles, queue: QueueName.BackgroundTask }) - async handleMissingFiles({ paths }: IIntegrityMissingFilesJob): Promise { + async handleMissingFiles({ items: paths }: IIntegrityPathWithReportJob): Promise { this.logger.log(`Processing batch of ${paths.length} files to check if they are missing.`); const results = await Promise.all( diff --git a/server/src/types.ts b/server/src/types.ts index 41c8feb4e9..b02626ce26 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -287,8 +287,8 @@ export interface IIntegrityOrphanedFilesJob { paths: string[]; } -export interface IIntegrityMissingFilesJob { - paths: { path: string; reportId: string | null }[]; +export interface IIntegrityPathWithReportJob { + items: { path: string; reportId: string | null }[]; } export interface JobCounts { @@ -405,8 +405,9 @@ export type JobItem = // Integrity | { name: JobName.IntegrityOrphanedFilesQueueAll; data: IBaseJob } | { name: JobName.IntegrityOrphanedFiles; data: IIntegrityOrphanedFilesJob } + | { name: JobName.IntegrityOrphanedCheckReports; data: IIntegrityPathWithReportJob } | { name: JobName.IntegrityMissingFilesQueueAll; data: IBaseJob } - | { name: JobName.IntegrityMissingFiles; data: IIntegrityMissingFilesJob } + | { name: JobName.IntegrityMissingFiles; data: IIntegrityPathWithReportJob } | { name: JobName.IntegrityChecksumFiles; data: IBaseJob }; export type VectorExtension = (typeof VECTOR_EXTENSIONS)[number];