diff --git a/open-api/immich-openapi-specs.json b/open-api/immich-openapi-specs.json index 68af1438cd..80e590e63e 100644 --- a/open-api/immich-openapi-specs.json +++ b/open-api/immich-openapi-specs.json @@ -16660,7 +16660,9 @@ "VersionCheck", "OcrQueueAll", "Ocr", - "WorkflowRun" + "WorkflowRun", + "IntegrityOrphanedAndMissingFiles", + "IntegrityChecksumFiles" ], "type": "string" }, diff --git a/server/src/enum.ts b/server/src/enum.ts index 87ff282f31..f1425cf0f8 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -637,6 +637,10 @@ export enum JobName { // Workflow WorkflowRun = 'WorkflowRun', + + // Integrity + IntegrityOrphanedAndMissingFiles = 'IntegrityOrphanedAndMissingFiles', + IntegrityChecksumFiles = 'IntegrityChecksumFiles', } export enum QueueCommand { diff --git a/server/src/repositories/asset.repository.ts b/server/src/repositories/asset.repository.ts index d3d9ada80f..226d021745 100644 --- a/server/src/repositories/asset.repository.ts +++ b/server/src/repositories/asset.repository.ts @@ -382,6 +382,14 @@ export class AssetRepository { return items.map((asset) => asset.deviceAssetId); } + async getAllAssetPaths() { + return this.db.selectFrom('asset').select(['originalPath', 'encodedVideoPath']).stream(); + } + + async getAllAssetFilePaths() { + return this.db.selectFrom('asset_file').select(['path']).stream(); + } + @GenerateSql({ params: [DummyValue.UUID] }) async getLivePhotoCount(motionId: string): Promise { const [{ count }] = await this.db diff --git a/server/src/services/index.ts b/server/src/services/index.ts index eeb8424048..42bf467fd0 100644 --- a/server/src/services/index.ts +++ b/server/src/services/index.ts @@ -12,6 +12,7 @@ import { CliService } from 'src/services/cli.service'; import { DatabaseService } from 'src/services/database.service'; import { DownloadService } from 'src/services/download.service'; import { DuplicateService } from 'src/services/duplicate.service'; +import { IntegrityService } from 'src/services/integrity.service'; import { JobService } from 'src/services/job.service'; import { LibraryService } from 'src/services/library.service'; import { MaintenanceService } from 'src/services/maintenance.service'; @@ -62,6 +63,7 @@ export const services = [ DatabaseService, DownloadService, DuplicateService, + IntegrityService, JobService, LibraryService, MaintenanceService, diff --git a/server/src/services/integrity.service.ts b/server/src/services/integrity.service.ts new file mode 100644 index 0000000000..70c525dfc7 --- /dev/null +++ b/server/src/services/integrity.service.ts @@ -0,0 +1,120 @@ +import { Injectable } from '@nestjs/common'; +import { JOBS_LIBRARY_PAGINATION_SIZE } from 'src/constants'; +import { StorageCore } from 'src/cores/storage.core'; +import { OnEvent, OnJob } from 'src/decorators'; +import { ImmichWorker, JobName, JobStatus, QueueName, StorageFolder } from 'src/enum'; +import { ArgOf } from 'src/repositories/event.repository'; +import { BaseService } from 'src/services/base.service'; + +@Injectable() +export class IntegrityService extends BaseService { + // private backupLock = false; + + @OnEvent({ name: 'ConfigInit', workers: [ImmichWorker.Microservices] }) + async onConfigInit({ + newConfig: { + backup: { database }, + }, + }: ArgOf<'ConfigInit'>) { + // this.backupLock = await this.databaseRepository.tryLock(DatabaseLock.BackupDatabase); + // if (this.backupLock) { + // this.cronRepository.create({ + // name: 'backupDatabase', + // expression: database.cronExpression, + // onTick: () => handlePromiseError(this.jobRepository.queue({ name: JobName.DatabaseBackup }), this.logger), + // start: database.enabled, + // }); + // } + setTimeout(() => { + this.jobRepository.queue({ + name: JobName.IntegrityOrphanedAndMissingFiles, + data: {}, + }); + }, 1000); + } + + @OnEvent({ name: 'ConfigUpdate', server: true }) + async onConfigUpdate({ newConfig: { backup } }: ArgOf<'ConfigUpdate'>) { + // if (!this.backupLock) { + // return; + // } + // this.cronRepository.update({ + // name: 'backupDatabase', + // expression: backup.database.cronExpression, + // start: backup.database.enabled, + // }); + } + + @OnJob({ name: JobName.IntegrityOrphanedAndMissingFiles, queue: QueueName.BackgroundTask }) + async handleOrphanedAndMissingFiles(): Promise { + // (1) Asset files + const pathsLocal = new Set(); + const pathsDb = new Set(); + + await Promise.all([ + // scan all local paths + (async () => { + const pathsOnDisk = this.storageRepository.walk({ + pathsToCrawl: [ + StorageFolder.EncodedVideo, + StorageFolder.Library, + StorageFolder.Upload, + StorageFolder.Thumbnails, + ].map((folder) => StorageCore.getBaseFolder(folder)), + includeHidden: false, + take: JOBS_LIBRARY_PAGINATION_SIZE, + }); + + for await (const pathBatch of pathsOnDisk) { + for (const path of pathBatch) { + if (!pathsDb.delete(path)) { + pathsLocal.add(path); + } + + console.info(pathsLocal.size, pathsDb.size); + } + } + })(), + // scan "asset" + (async () => { + const pathsInDb = await this.assetRepository.getAllAssetPaths(); + + for await (const { originalPath, encodedVideoPath } of pathsInDb) { + if (!pathsLocal.delete(originalPath)) { + pathsDb.add(originalPath); + } + + if (encodedVideoPath && !pathsLocal.delete(encodedVideoPath)) { + pathsDb.add(encodedVideoPath); + } + + console.info(pathsLocal.size, pathsDb.size); + } + })(), + // scan "asset_files" + (async () => { + const pathsInDb = await this.assetRepository.getAllAssetFilePaths(); + + for await (const { path } of pathsInDb) { + if (!pathsLocal.delete(path)) { + pathsDb.add(path); + } + + console.info(pathsLocal.size, pathsDb.size); + } + })(), + ]); + + console.info('Orphaned files:', pathsLocal); + console.info('Missing files:', pathsDb); + + // profile: skipped + return JobStatus.Success; + } + + @OnJob({ name: JobName.IntegrityChecksumFiles, queue: QueueName.BackgroundTask }) + async handleChecksumFiles(): Promise { + // todo + return JobStatus.Success; + } +} diff --git a/server/src/types.ts b/server/src/types.ts index 848d19177d..889051f580 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -391,7 +391,11 @@ export type JobItem = | { name: JobName.Ocr; data: IEntityJob } // Workflow - | { name: JobName.WorkflowRun; data: IWorkflowJob }; + | { name: JobName.WorkflowRun; data: IWorkflowJob } + + // Integrity + | { name: JobName.IntegrityOrphanedAndMissingFiles; data: IBaseJob } + | { name: JobName.IntegrityChecksumFiles; data: IBaseJob }; export type VectorExtension = (typeof VECTOR_EXTENSIONS)[number];