diff --git a/open-api/immich-openapi-specs.json b/open-api/immich-openapi-specs.json index 80e590e63e..cc311e9335 100644 --- a/open-api/immich-openapi-specs.json +++ b/open-api/immich-openapi-specs.json @@ -16661,7 +16661,10 @@ "OcrQueueAll", "Ocr", "WorkflowRun", - "IntegrityOrphanedAndMissingFiles", + "IntegrityOrphanedFilesQueueAll", + "IntegrityOrphanedFiles", + "IntegrityMissingFilesQueueAll", + "IntegrityMissingFiles", "IntegrityChecksumFiles" ], "type": "string" diff --git a/server/src/enum.ts b/server/src/enum.ts index f1425cf0f8..58fe0fb3a7 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -639,7 +639,10 @@ export enum JobName { WorkflowRun = 'WorkflowRun', // Integrity - IntegrityOrphanedAndMissingFiles = 'IntegrityOrphanedAndMissingFiles', + IntegrityOrphanedFilesQueueAll = 'IntegrityOrphanedFilesQueueAll', + IntegrityOrphanedFiles = 'IntegrityOrphanedFiles', + IntegrityMissingFilesQueueAll = 'IntegrityMissingFilesQueueAll', + IntegrityMissingFiles = 'IntegrityMissingFiles', IntegrityChecksumFiles = 'IntegrityChecksumFiles', } diff --git a/server/src/repositories/asset-job.repository.ts b/server/src/repositories/asset-job.repository.ts index 8d54e93c87..d7e5c1fdb8 100644 --- a/server/src/repositories/asset-job.repository.ts +++ b/server/src/repositories/asset-job.repository.ts @@ -254,6 +254,30 @@ export class AssetJobRepository { .executeTakeFirst(); } + @GenerateSql({ params: [DummyValue.STRING] }) + getAssetPathsByPaths(paths: string[]) { + return this.db + .selectFrom('asset') + .select(['originalPath', 'encodedVideoPath']) + .where((eb) => eb.or([eb('originalPath', 'in', paths), eb('encodedVideoPath', 'in', paths)])) + .execute(); + } + + @GenerateSql({ params: [DummyValue.STRING] }) + getAssetFilePathsByPaths(paths: string[]) { + return this.db.selectFrom('asset_file').select(['path']).where('path', 'in', paths).execute(); + } + + @GenerateSql({ params: [], stream: true }) + streamAssetPaths() { + return this.db.selectFrom('asset').select(['originalPath', 'encodedVideoPath']).stream(); + } + + @GenerateSql({ params: [], stream: true }) + streamAssetFilePaths() { + return this.db.selectFrom('asset_file').select(['path']).stream(); + } + @GenerateSql({ params: [], stream: true }) streamForVideoConversion(force?: boolean) { return this.db diff --git a/server/src/services/integrity.service.ts b/server/src/services/integrity.service.ts index 70c525dfc7..49900cd1f2 100644 --- a/server/src/services/integrity.service.ts +++ b/server/src/services/integrity.service.ts @@ -1,10 +1,12 @@ import { Injectable } from '@nestjs/common'; +import { stat } from 'node:fs/promises'; import { JOBS_LIBRARY_PAGINATION_SIZE } from 'src/constants'; import { StorageCore } from 'src/cores/storage.core'; import { OnEvent, OnJob } from 'src/decorators'; import { ImmichWorker, JobName, JobStatus, QueueName, StorageFolder } from 'src/enum'; import { ArgOf } from 'src/repositories/event.repository'; import { BaseService } from 'src/services/base.service'; +import { IIntegrityMissingFilesJob, IIntegrityOrphanedFilesJob } from 'src/types'; @Injectable() export class IntegrityService extends BaseService { @@ -27,7 +29,12 @@ export class IntegrityService extends BaseService { // } setTimeout(() => { this.jobRepository.queue({ - name: JobName.IntegrityOrphanedAndMissingFiles, + name: JobName.IntegrityOrphanedFilesQueueAll, + data: {}, + }); + + this.jobRepository.queue({ + name: JobName.IntegrityMissingFilesQueueAll, data: {}, }); }, 1000); @@ -45,70 +52,150 @@ export class IntegrityService extends BaseService { // }); } - @OnJob({ name: JobName.IntegrityOrphanedAndMissingFiles, queue: QueueName.BackgroundTask }) - async handleOrphanedAndMissingFiles(): Promise { - // (1) Asset files - const pathsLocal = new Set(); - const pathsDb = new Set(); + @OnJob({ name: JobName.IntegrityOrphanedFilesQueueAll, queue: QueueName.BackgroundTask }) + async handleOrphanedFilesQueueAll(): Promise { + this.logger.log(`Scanning for orphaned files...`); - await Promise.all([ - // scan all local paths - (async () => { - const pathsOnDisk = this.storageRepository.walk({ - pathsToCrawl: [ - StorageFolder.EncodedVideo, - StorageFolder.Library, - StorageFolder.Upload, - StorageFolder.Thumbnails, - ].map((folder) => StorageCore.getBaseFolder(folder)), - includeHidden: false, - take: JOBS_LIBRARY_PAGINATION_SIZE, - }); + const assetPaths = this.storageRepository.walk({ + pathsToCrawl: [StorageFolder.EncodedVideo, StorageFolder.Library, StorageFolder.Upload].map((folder) => + StorageCore.getBaseFolder(folder), + ), + includeHidden: false, + take: JOBS_LIBRARY_PAGINATION_SIZE, + }); - for await (const pathBatch of pathsOnDisk) { - for (const path of pathBatch) { - if (!pathsDb.delete(path)) { - pathsLocal.add(path); - } + const assetFilePaths = this.storageRepository.walk({ + pathsToCrawl: [StorageCore.getBaseFolder(StorageFolder.Thumbnails)], + includeHidden: false, + take: JOBS_LIBRARY_PAGINATION_SIZE, + }); - console.info(pathsLocal.size, pathsDb.size); - } + async function* paths() { + for await (const batch of assetPaths) { + yield ['asset', batch] as const; + } + + for await (const batch of assetFilePaths) { + yield ['asset_file', batch] as const; + } + } + + let total = 0; + for await (const [batchType, batchPaths] of paths()) { + await this.jobRepository.queue({ + name: JobName.IntegrityOrphanedFiles, + data: { + type: batchType, + paths: batchPaths, + }, + }); + + const count = batchPaths.length; + total += count; + + this.logger.log(`Queued orphan check of ${count} file(s) (${total} so far)`); + } + + return JobStatus.Success; + } + + @OnJob({ name: JobName.IntegrityOrphanedFiles, queue: QueueName.BackgroundTask }) + async handleOrphanedFiles({ type, paths }: IIntegrityOrphanedFilesJob): Promise { + this.logger.log(`Processing batch of ${paths.length} files to check if they are orphaned.`); + + const orphanedFiles = new Set(paths); + if (type === 'asset') { + const assets = await this.assetJobRepository.getAssetPathsByPaths(paths); + for (const { originalPath, encodedVideoPath } of assets) { + orphanedFiles.delete(originalPath); + + if (encodedVideoPath) { + orphanedFiles.delete(encodedVideoPath); } - })(), - // scan "asset" - (async () => { - const pathsInDb = await this.assetRepository.getAllAssetPaths(); + } + } else { + const assets = await this.assetJobRepository.getAssetFilePathsByPaths(paths); + for (const { path } of assets) { + orphanedFiles.delete(path); + } + } - for await (const { originalPath, encodedVideoPath } of pathsInDb) { - if (!pathsLocal.delete(originalPath)) { - pathsDb.add(originalPath); - } + // do something with orphanedFiles + console.info(orphanedFiles); - if (encodedVideoPath && !pathsLocal.delete(encodedVideoPath)) { - pathsDb.add(encodedVideoPath); - } + this.logger.log(`Processed ${paths.length} and found ${orphanedFiles.size} orphaned file(s).`); + return JobStatus.Success; + } - console.info(pathsLocal.size, pathsDb.size); + @OnJob({ name: JobName.IntegrityMissingFilesQueueAll, queue: QueueName.BackgroundTask }) + async handleMissingFilesQueueAll(): Promise { + const assetPaths = this.assetJobRepository.streamAssetPaths(); + const assetFilePaths = this.assetJobRepository.streamAssetFilePaths(); + + async function* paths() { + for await (const { originalPath, encodedVideoPath } of assetPaths) { + yield originalPath; + + if (encodedVideoPath) { + yield encodedVideoPath; } - })(), - // scan "asset_files" - (async () => { - const pathsInDb = await this.assetRepository.getAllAssetFilePaths(); + } - for await (const { path } of pathsInDb) { - if (!pathsLocal.delete(path)) { - pathsDb.add(path); - } + for await (const { path } of assetFilePaths) { + yield path; + } + } - console.info(pathsLocal.size, pathsDb.size); + async function* chunk(generator: AsyncGenerator, n: number) { + let chunk: T[] = []; + for await (const item of generator) { + chunk.push(item); + + if (chunk.length === n) { + yield chunk; + chunk = []; } - })(), - ]); + } - console.info('Orphaned files:', pathsLocal); - console.info('Missing files:', pathsDb); + if (chunk.length) { + yield chunk; + } + } - // profile: skipped + let total = 0; + for await (const batchPaths of chunk(paths(), JOBS_LIBRARY_PAGINATION_SIZE)) { + await this.jobRepository.queue({ + name: JobName.IntegrityMissingFiles, + data: { + paths: batchPaths, + }, + }); + + total += batchPaths.length; + this.logger.log(`Queued missing check of ${batchPaths.length} file(s) (${total} so far)`); + } + + return JobStatus.Success; + } + + @OnJob({ name: JobName.IntegrityMissingFiles, queue: QueueName.BackgroundTask }) + async handleMissingFiles({ paths }: IIntegrityMissingFilesJob): Promise { + this.logger.log(`Processing batch of ${paths.length} files to check if they are missing.`); + + const result = await Promise.all( + paths.map((path) => + stat(path) + .then(() => void 0) + .catch(() => path), + ), + ); + + const missingFiles = result.filter((path) => path); + + // do something with missingFiles + console.info(missingFiles); + + this.logger.log(`Processed ${paths.length} and found ${missingFiles.length} missing file(s).`); return JobStatus.Success; } diff --git a/server/src/types.ts b/server/src/types.ts index 889051f580..ebc05946a3 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -282,6 +282,15 @@ export interface IWorkflowJob { event: WorkflowData[T]; } +export interface IIntegrityOrphanedFilesJob { + type: 'asset' | 'asset_file'; + paths: string[]; +} + +export interface IIntegrityMissingFilesJob { + paths: string[]; +} + export interface JobCounts { active: number; completed: number; @@ -394,7 +403,10 @@ export type JobItem = | { name: JobName.WorkflowRun; data: IWorkflowJob } // Integrity - | { name: JobName.IntegrityOrphanedAndMissingFiles; data: IBaseJob } + | { name: JobName.IntegrityOrphanedFilesQueueAll; data: IBaseJob } + | { name: JobName.IntegrityOrphanedFiles; data: IIntegrityOrphanedFilesJob } + | { name: JobName.IntegrityMissingFilesQueueAll; data: IBaseJob } + | { name: JobName.IntegrityMissingFiles; data: IIntegrityMissingFilesJob } | { name: JobName.IntegrityChecksumFiles; data: IBaseJob }; export type VectorExtension = (typeof VECTOR_EXTENSIONS)[number];