refactor: batched integrity checks

This commit is contained in:
izzy
2025-11-26 17:36:28 +00:00
parent f77f43a83d
commit 4a7120cdeb
5 changed files with 184 additions and 55 deletions

View File

@@ -16661,7 +16661,10 @@
"OcrQueueAll", "OcrQueueAll",
"Ocr", "Ocr",
"WorkflowRun", "WorkflowRun",
"IntegrityOrphanedAndMissingFiles", "IntegrityOrphanedFilesQueueAll",
"IntegrityOrphanedFiles",
"IntegrityMissingFilesQueueAll",
"IntegrityMissingFiles",
"IntegrityChecksumFiles" "IntegrityChecksumFiles"
], ],
"type": "string" "type": "string"

View File

@@ -639,7 +639,10 @@ export enum JobName {
WorkflowRun = 'WorkflowRun', WorkflowRun = 'WorkflowRun',
// Integrity // Integrity
IntegrityOrphanedAndMissingFiles = 'IntegrityOrphanedAndMissingFiles', IntegrityOrphanedFilesQueueAll = 'IntegrityOrphanedFilesQueueAll',
IntegrityOrphanedFiles = 'IntegrityOrphanedFiles',
IntegrityMissingFilesQueueAll = 'IntegrityMissingFilesQueueAll',
IntegrityMissingFiles = 'IntegrityMissingFiles',
IntegrityChecksumFiles = 'IntegrityChecksumFiles', IntegrityChecksumFiles = 'IntegrityChecksumFiles',
} }

View File

@@ -254,6 +254,30 @@ export class AssetJobRepository {
.executeTakeFirst(); .executeTakeFirst();
} }
@GenerateSql({ params: [DummyValue.STRING] })
getAssetPathsByPaths(paths: string[]) {
return this.db
.selectFrom('asset')
.select(['originalPath', 'encodedVideoPath'])
.where((eb) => eb.or([eb('originalPath', 'in', paths), eb('encodedVideoPath', 'in', paths)]))
.execute();
}
@GenerateSql({ params: [DummyValue.STRING] })
getAssetFilePathsByPaths(paths: string[]) {
return this.db.selectFrom('asset_file').select(['path']).where('path', 'in', paths).execute();
}
@GenerateSql({ params: [], stream: true })
streamAssetPaths() {
return this.db.selectFrom('asset').select(['originalPath', 'encodedVideoPath']).stream();
}
@GenerateSql({ params: [], stream: true })
streamAssetFilePaths() {
return this.db.selectFrom('asset_file').select(['path']).stream();
}
@GenerateSql({ params: [], stream: true }) @GenerateSql({ params: [], stream: true })
streamForVideoConversion(force?: boolean) { streamForVideoConversion(force?: boolean) {
return this.db return this.db

View File

@@ -1,10 +1,12 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { stat } from 'node:fs/promises';
import { JOBS_LIBRARY_PAGINATION_SIZE } from 'src/constants'; import { JOBS_LIBRARY_PAGINATION_SIZE } from 'src/constants';
import { StorageCore } from 'src/cores/storage.core'; import { StorageCore } from 'src/cores/storage.core';
import { OnEvent, OnJob } from 'src/decorators'; import { OnEvent, OnJob } from 'src/decorators';
import { ImmichWorker, JobName, JobStatus, QueueName, StorageFolder } from 'src/enum'; import { ImmichWorker, JobName, JobStatus, QueueName, StorageFolder } from 'src/enum';
import { ArgOf } from 'src/repositories/event.repository'; import { ArgOf } from 'src/repositories/event.repository';
import { BaseService } from 'src/services/base.service'; import { BaseService } from 'src/services/base.service';
import { IIntegrityMissingFilesJob, IIntegrityOrphanedFilesJob } from 'src/types';
@Injectable() @Injectable()
export class IntegrityService extends BaseService { export class IntegrityService extends BaseService {
@@ -27,7 +29,12 @@ export class IntegrityService extends BaseService {
// } // }
setTimeout(() => { setTimeout(() => {
this.jobRepository.queue({ this.jobRepository.queue({
name: JobName.IntegrityOrphanedAndMissingFiles, name: JobName.IntegrityOrphanedFilesQueueAll,
data: {},
});
this.jobRepository.queue({
name: JobName.IntegrityMissingFilesQueueAll,
data: {}, data: {},
}); });
}, 1000); }, 1000);
@@ -45,70 +52,150 @@ export class IntegrityService extends BaseService {
// }); // });
} }
@OnJob({ name: JobName.IntegrityOrphanedAndMissingFiles, queue: QueueName.BackgroundTask }) @OnJob({ name: JobName.IntegrityOrphanedFilesQueueAll, queue: QueueName.BackgroundTask })
async handleOrphanedAndMissingFiles(): Promise<JobStatus> { async handleOrphanedFilesQueueAll(): Promise<JobStatus> {
// (1) Asset files this.logger.log(`Scanning for orphaned files...`);
const pathsLocal = new Set<string>();
const pathsDb = new Set<string>();
await Promise.all([ const assetPaths = this.storageRepository.walk({
// scan all local paths pathsToCrawl: [StorageFolder.EncodedVideo, StorageFolder.Library, StorageFolder.Upload].map((folder) =>
(async () => { StorageCore.getBaseFolder(folder),
const pathsOnDisk = this.storageRepository.walk({ ),
pathsToCrawl: [
StorageFolder.EncodedVideo,
StorageFolder.Library,
StorageFolder.Upload,
StorageFolder.Thumbnails,
].map((folder) => StorageCore.getBaseFolder(folder)),
includeHidden: false, includeHidden: false,
take: JOBS_LIBRARY_PAGINATION_SIZE, take: JOBS_LIBRARY_PAGINATION_SIZE,
}); });
for await (const pathBatch of pathsOnDisk) { const assetFilePaths = this.storageRepository.walk({
for (const path of pathBatch) { pathsToCrawl: [StorageCore.getBaseFolder(StorageFolder.Thumbnails)],
if (!pathsDb.delete(path)) { includeHidden: false,
pathsLocal.add(path); take: JOBS_LIBRARY_PAGINATION_SIZE,
});
async function* paths() {
for await (const batch of assetPaths) {
yield ['asset', batch] as const;
} }
console.info(pathsLocal.size, pathsDb.size); for await (const batch of assetFilePaths) {
yield ['asset_file', batch] as const;
} }
} }
})(),
// scan "asset"
(async () => {
const pathsInDb = await this.assetRepository.getAllAssetPaths();
for await (const { originalPath, encodedVideoPath } of pathsInDb) { let total = 0;
if (!pathsLocal.delete(originalPath)) { for await (const [batchType, batchPaths] of paths()) {
pathsDb.add(originalPath); await this.jobRepository.queue({
name: JobName.IntegrityOrphanedFiles,
data: {
type: batchType,
paths: batchPaths,
},
});
const count = batchPaths.length;
total += count;
this.logger.log(`Queued orphan check of ${count} file(s) (${total} so far)`);
} }
if (encodedVideoPath && !pathsLocal.delete(encodedVideoPath)) { return JobStatus.Success;
pathsDb.add(encodedVideoPath);
} }
console.info(pathsLocal.size, pathsDb.size); @OnJob({ name: JobName.IntegrityOrphanedFiles, queue: QueueName.BackgroundTask })
} async handleOrphanedFiles({ type, paths }: IIntegrityOrphanedFilesJob): Promise<JobStatus> {
})(), this.logger.log(`Processing batch of ${paths.length} files to check if they are orphaned.`);
// scan "asset_files"
(async () => {
const pathsInDb = await this.assetRepository.getAllAssetFilePaths();
for await (const { path } of pathsInDb) { const orphanedFiles = new Set<string>(paths);
if (!pathsLocal.delete(path)) { if (type === 'asset') {
pathsDb.add(path); const assets = await this.assetJobRepository.getAssetPathsByPaths(paths);
for (const { originalPath, encodedVideoPath } of assets) {
orphanedFiles.delete(originalPath);
if (encodedVideoPath) {
orphanedFiles.delete(encodedVideoPath);
}
}
} else {
const assets = await this.assetJobRepository.getAssetFilePathsByPaths(paths);
for (const { path } of assets) {
orphanedFiles.delete(path);
}
} }
console.info(pathsLocal.size, pathsDb.size); // do something with orphanedFiles
console.info(orphanedFiles);
this.logger.log(`Processed ${paths.length} and found ${orphanedFiles.size} orphaned file(s).`);
return JobStatus.Success;
} }
})(),
]);
console.info('Orphaned files:', pathsLocal); @OnJob({ name: JobName.IntegrityMissingFilesQueueAll, queue: QueueName.BackgroundTask })
console.info('Missing files:', pathsDb); async handleMissingFilesQueueAll(): Promise<JobStatus> {
const assetPaths = this.assetJobRepository.streamAssetPaths();
const assetFilePaths = this.assetJobRepository.streamAssetFilePaths();
// profile: skipped async function* paths() {
for await (const { originalPath, encodedVideoPath } of assetPaths) {
yield originalPath;
if (encodedVideoPath) {
yield encodedVideoPath;
}
}
for await (const { path } of assetFilePaths) {
yield path;
}
}
async function* chunk<T>(generator: AsyncGenerator<T>, n: number) {
let chunk: T[] = [];
for await (const item of generator) {
chunk.push(item);
if (chunk.length === n) {
yield chunk;
chunk = [];
}
}
if (chunk.length) {
yield chunk;
}
}
let total = 0;
for await (const batchPaths of chunk(paths(), JOBS_LIBRARY_PAGINATION_SIZE)) {
await this.jobRepository.queue({
name: JobName.IntegrityMissingFiles,
data: {
paths: batchPaths,
},
});
total += batchPaths.length;
this.logger.log(`Queued missing check of ${batchPaths.length} file(s) (${total} so far)`);
}
return JobStatus.Success;
}
@OnJob({ name: JobName.IntegrityMissingFiles, queue: QueueName.BackgroundTask })
async handleMissingFiles({ paths }: IIntegrityMissingFilesJob): Promise<JobStatus> {
this.logger.log(`Processing batch of ${paths.length} files to check if they are missing.`);
const result = await Promise.all(
paths.map((path) =>
stat(path)
.then(() => void 0)
.catch(() => path),
),
);
const missingFiles = result.filter((path) => path);
// do something with missingFiles
console.info(missingFiles);
this.logger.log(`Processed ${paths.length} and found ${missingFiles.length} missing file(s).`);
return JobStatus.Success; return JobStatus.Success;
} }

View File

@@ -282,6 +282,15 @@ export interface IWorkflowJob<T extends PluginTriggerType = PluginTriggerType> {
event: WorkflowData[T]; event: WorkflowData[T];
} }
export interface IIntegrityOrphanedFilesJob {
type: 'asset' | 'asset_file';
paths: string[];
}
export interface IIntegrityMissingFilesJob {
paths: string[];
}
export interface JobCounts { export interface JobCounts {
active: number; active: number;
completed: number; completed: number;
@@ -394,7 +403,10 @@ export type JobItem =
| { name: JobName.WorkflowRun; data: IWorkflowJob } | { name: JobName.WorkflowRun; data: IWorkflowJob }
// Integrity // Integrity
| { name: JobName.IntegrityOrphanedAndMissingFiles; data: IBaseJob } | { name: JobName.IntegrityOrphanedFilesQueueAll; data: IBaseJob }
| { name: JobName.IntegrityOrphanedFiles; data: IIntegrityOrphanedFilesJob }
| { name: JobName.IntegrityMissingFilesQueueAll; data: IBaseJob }
| { name: JobName.IntegrityMissingFiles; data: IIntegrityMissingFilesJob }
| { name: JobName.IntegrityChecksumFiles; data: IBaseJob }; | { name: JobName.IntegrityChecksumFiles; data: IBaseJob };
export type VectorExtension = (typeof VECTOR_EXTENSIONS)[number]; export type VectorExtension = (typeof VECTOR_EXTENSIONS)[number];