import { Injectable } from '@nestjs/common'; import { createHash } from 'node:crypto'; import { basename } from 'node:path'; import { Readable, Writable } from 'node:stream'; import { pipeline } from 'node:stream/promises'; import { JOBS_LIBRARY_PAGINATION_SIZE } from 'src/constants'; import { StorageCore } from 'src/cores/storage.core'; import { OnEvent, OnJob } from 'src/decorators'; import { AuthDto } from 'src/dtos/auth.dto'; import { IntegrityGetReportDto, IntegrityReportResponseDto, IntegrityReportSummaryResponseDto, } from 'src/dtos/integrity.dto'; import { AssetStatus, CacheControl, DatabaseLock, ImmichWorker, IntegrityReportType, JobName, JobStatus, QueueName, StorageFolder, SystemMetadataKey, } from 'src/enum'; import { ArgOf } from 'src/repositories/event.repository'; import { BaseService } from 'src/services/base.service'; import { IIntegrityDeleteReportsJob, IIntegrityDeleteReportTypeJob, IIntegrityJob, IIntegrityMissingFilesJob, IIntegrityOrphanedFilesJob, IIntegrityPathWithChecksumJob, IIntegrityPathWithReportJob, } from 'src/types'; import { ImmichFileResponse } from 'src/utils/file'; import { handlePromiseError } from 'src/utils/misc'; /** * Orphan Files: * Files are detected in /data/encoded-video, /data/library, /data/upload * Checked against the asset table * Files are detected in /data/thumbs * Checked against the asset_file table * * * Can perform download or delete of files * * Missing Files: * Paths are queried from asset(originalPath, encodedVideoPath), asset_file(path) * Check whether files exist on disk * * * Reports must include origin (asset or asset_file) & ID for further action * * Can perform trash (asset) or delete (asset_file) * * Checksum Mismatch: * Paths & checksums are queried from asset(originalPath, checksum) * Check whether files match checksum, missing files ignored * * * Reports must include origin (as above) for further action * * Can perform download or trash (asset) */ @Injectable() export class IntegrityService extends BaseService { private integrityLock = false; @OnEvent({ name: 'ConfigInit', workers: [ImmichWorker.Microservices] }) async onConfigInit({ newConfig: { integrityChecks: { orphanedFiles, missingFiles, checksumFiles }, }, }: ArgOf<'ConfigInit'>) { this.integrityLock = await this.databaseRepository.tryLock(DatabaseLock.IntegrityCheck); if (this.integrityLock) { this.cronRepository.create({ name: 'integrityOrphanedFiles', expression: orphanedFiles.cronExpression, onTick: () => handlePromiseError( this.jobRepository.queue({ name: JobName.IntegrityOrphanedFilesQueueAll, data: {} }), this.logger, ), start: orphanedFiles.enabled, }); this.cronRepository.create({ name: 'integrityMissingFiles', expression: missingFiles.cronExpression, onTick: () => handlePromiseError( this.jobRepository.queue({ name: JobName.IntegrityMissingFilesQueueAll, data: {} }), this.logger, ), start: missingFiles.enabled, }); this.cronRepository.create({ name: 'integrityChecksumFiles', expression: checksumFiles.cronExpression, onTick: () => handlePromiseError(this.jobRepository.queue({ name: JobName.IntegrityChecksumFiles, data: {} }), this.logger), start: checksumFiles.enabled, }); } // debug: run on boot setTimeout(() => { void this.jobRepository.queue({ name: JobName.IntegrityOrphanedFilesQueueAll, data: {}, }); void this.jobRepository.queue({ name: JobName.IntegrityMissingFilesQueueAll, data: {}, }); void this.jobRepository.queue({ name: JobName.IntegrityChecksumFiles, data: {}, }); }, 1000); } @OnEvent({ name: 'ConfigUpdate', server: true }) onConfigUpdate({ newConfig: { integrityChecks: { orphanedFiles, missingFiles, checksumFiles }, }, }: ArgOf<'ConfigUpdate'>) { if (!this.integrityLock) { return; } this.cronRepository.update({ name: 'integrityOrphanedFiles', expression: orphanedFiles.cronExpression, start: orphanedFiles.enabled, }); this.cronRepository.update({ name: 'integrityMissingFiles', expression: missingFiles.cronExpression, start: missingFiles.enabled, }); this.cronRepository.update({ name: 'integrityChecksumFiles', expression: checksumFiles.cronExpression, start: checksumFiles.enabled, }); } getIntegrityReportSummary(): Promise { return this.integrityRepository.getIntegrityReportSummary(); } async getIntegrityReport(dto: IntegrityGetReportDto): Promise { return this.integrityRepository.getIntegrityReports({ page: dto.page || 1, size: dto.size || 100 }, dto.type); } getIntegrityReportCsv(type: IntegrityReportType): Readable { const items = this.integrityRepository.streamIntegrityReports(type); // very rudimentary csv serialiser async function* generator() { yield 'id,type,assetId,fileAssetId,path\n'; for await (const item of items) { // no expectation of particularly bad filenames // but they could potentially have a newline or quote character yield `${item.id},${item.type},${item.assetId},${item.fileAssetId},"${item.path.replaceAll('"', '""')}"\n`; } } return Readable.from(generator()); } async getIntegrityReportFile(id: string): Promise { const { path } = await this.integrityRepository.getById(id); return new ImmichFileResponse({ path, fileName: basename(path), contentType: 'application/octet-stream', cacheControl: CacheControl.PrivateWithoutCache, }); } async deleteIntegrityReport(auth: AuthDto, id: string): Promise { const { path, assetId, fileAssetId } = await this.integrityRepository.getById(id); if (assetId) { await this.assetRepository.updateAll([assetId], { deletedAt: new Date(), status: AssetStatus.Trashed, }); await this.eventRepository.emit('AssetTrashAll', { assetIds: [assetId], userId: auth.user.id, }); await this.integrityRepository.deleteById(id); } else if (fileAssetId) { await this.assetRepository.deleteFiles([{ id: fileAssetId }]); } else { await this.storageRepository.unlink(path); await this.integrityRepository.deleteById(id); } } @OnJob({ name: JobName.IntegrityOrphanedFilesQueueAll, queue: QueueName.IntegrityCheck }) async handleOrphanedFilesQueueAll({ refreshOnly }: IIntegrityJob = {}): Promise { this.logger.log(`Checking for out of date orphaned file reports...`); const reports = this.integrityRepository.streamIntegrityReportsWithAssetChecksum(IntegrityReportType.OrphanFile); let total = 0; for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) { await this.jobRepository.queue({ name: JobName.IntegrityOrphanedFilesRefresh, data: { items: batchReports, }, }); total += batchReports.length; this.logger.log(`Queued report check of ${batchReports.length} report(s) (${total} so far)`); } if (refreshOnly) { this.logger.log('Refresh complete.'); return JobStatus.Success; } this.logger.log(`Scanning for orphaned files...`); const assetPaths = this.storageRepository.walk({ pathsToCrawl: [StorageFolder.EncodedVideo, StorageFolder.Library, StorageFolder.Upload].map((folder) => StorageCore.getBaseFolder(folder), ), includeHidden: false, take: JOBS_LIBRARY_PAGINATION_SIZE, }); const assetFilePaths = this.storageRepository.walk({ pathsToCrawl: [StorageCore.getBaseFolder(StorageFolder.Thumbnails)], includeHidden: false, take: JOBS_LIBRARY_PAGINATION_SIZE, }); async function* paths() { for await (const batch of assetPaths) { yield ['asset', batch] as const; } for await (const batch of assetFilePaths) { yield ['asset_file', batch] as const; } } total = 0; for await (const [batchType, batchPaths] of paths()) { await this.jobRepository.queue({ name: JobName.IntegrityOrphanedFiles, data: { type: batchType, paths: batchPaths, }, }); const count = batchPaths.length; total += count; this.logger.log(`Queued orphan check of ${count} file(s) (${total} so far)`); } return JobStatus.Success; } @OnJob({ name: JobName.IntegrityOrphanedFiles, queue: QueueName.IntegrityCheck }) async handleOrphanedFiles({ type, paths }: IIntegrityOrphanedFilesJob): Promise { this.logger.log(`Processing batch of ${paths.length} files to check if they are orphaned.`); const orphanedFiles = new Set(paths); if (type === 'asset') { const assets = await this.integrityRepository.getAssetPathsByPaths(paths); for (const { originalPath, encodedVideoPath } of assets) { orphanedFiles.delete(originalPath); if (encodedVideoPath) { orphanedFiles.delete(encodedVideoPath); } } } else { const assets = await this.integrityRepository.getAssetFilePathsByPaths(paths); for (const { path } of assets) { orphanedFiles.delete(path); } } if (orphanedFiles.size > 0) { await this.integrityRepository.create( [...orphanedFiles].map((path) => ({ type: IntegrityReportType.OrphanFile, path, })), ); } this.logger.log(`Processed ${paths.length} and found ${orphanedFiles.size} orphaned file(s).`); return JobStatus.Success; } @OnJob({ name: JobName.IntegrityOrphanedFilesRefresh, queue: QueueName.IntegrityCheck }) async handleOrphanedRefresh({ items }: IIntegrityPathWithReportJob): Promise { this.logger.log(`Processing batch of ${items.length} reports to check if they are out of date.`); const results = await Promise.all( items.map(({ reportId, path }) => this.storageRepository .stat(path) .then(() => void 0) .catch(() => reportId), ), ); const reportIds = results.filter(Boolean) as string[]; if (reportIds.length > 0) { await this.integrityRepository.deleteByIds(reportIds); } this.logger.log(`Processed ${items.length} paths and found ${reportIds.length} report(s) out of date.`); return JobStatus.Success; } @OnJob({ name: JobName.IntegrityMissingFilesQueueAll, queue: QueueName.IntegrityCheck }) async handleMissingFilesQueueAll({ refreshOnly }: IIntegrityJob = {}): Promise { if (refreshOnly) { this.logger.log(`Checking for out of date missing file reports...`); const reports = this.integrityRepository.streamIntegrityReportsWithAssetChecksum(IntegrityReportType.MissingFile); let total = 0; for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) { await this.jobRepository.queue({ name: JobName.IntegrityMissingFilesRefresh, data: { items: batchReports, }, }); total += batchReports.length; this.logger.log(`Queued report check of ${batchReports.length} report(s) (${total} so far)`); } this.logger.log('Refresh complete.'); return JobStatus.Success; } this.logger.log(`Scanning for missing files...`); const assetPaths = this.integrityRepository.streamAssetPaths(); let total = 0; for await (const batchPaths of chunk(assetPaths, JOBS_LIBRARY_PAGINATION_SIZE)) { await this.jobRepository.queue({ name: JobName.IntegrityMissingFiles, data: { items: batchPaths, }, }); total += batchPaths.length; this.logger.log(`Queued missing check of ${batchPaths.length} file(s) (${total} so far)`); } return JobStatus.Success; } @OnJob({ name: JobName.IntegrityMissingFiles, queue: QueueName.IntegrityCheck }) async handleMissingFiles({ items }: IIntegrityMissingFilesJob): Promise { this.logger.log(`Processing batch of ${items.length} files to check if they are missing.`); const results = await Promise.all( items.map((item) => this.storageRepository .stat(item.path) .then(() => ({ ...item, exists: true })) .catch(() => ({ ...item, exists: false })), ), ); const outdatedReports = results .filter(({ exists, reportId }) => exists && reportId) .map(({ reportId }) => reportId!); if (outdatedReports.length > 0) { await this.integrityRepository.deleteByIds(outdatedReports); } const missingFiles = results.filter(({ exists }) => !exists); if (missingFiles.length > 0) { await this.integrityRepository.create( missingFiles.map(({ path, assetId, fileAssetId }) => ({ type: IntegrityReportType.MissingFile, path, assetId, fileAssetId, })), ); } this.logger.log(`Processed ${items.length} and found ${missingFiles.length} missing file(s).`); return JobStatus.Success; } @OnJob({ name: JobName.IntegrityMissingFilesRefresh, queue: QueueName.IntegrityCheck }) async handleMissingRefresh({ items: paths }: IIntegrityPathWithReportJob): Promise { this.logger.log(`Processing batch of ${paths.length} reports to check if they are out of date.`); const results = await Promise.all( paths.map(({ reportId, path }) => this.storageRepository .stat(path) .then(() => reportId) .catch(() => void 0), ), ); const reportIds = results.filter(Boolean) as string[]; if (reportIds.length > 0) { await this.integrityRepository.deleteByIds(reportIds); } this.logger.log(`Processed ${paths.length} paths and found ${reportIds.length} report(s) out of date.`); return JobStatus.Success; } @OnJob({ name: JobName.IntegrityChecksumFiles, queue: QueueName.IntegrityCheck }) async handleChecksumFiles({ refreshOnly }: IIntegrityJob = {}): Promise { if (refreshOnly) { this.logger.log(`Checking for out of date checksum file reports...`); const reports = this.integrityRepository.streamIntegrityReportsWithAssetChecksum( IntegrityReportType.ChecksumFail, ); let total = 0; for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) { await this.jobRepository.queue({ name: JobName.IntegrityChecksumFilesRefresh, data: { items: batchReports.map(({ path, reportId, checksum }) => ({ path, reportId, checksum: checksum?.toString('hex'), })), }, }); total += batchReports.length; this.logger.log(`Queued report check of ${batchReports.length} report(s) (${total} so far)`); } this.logger.log('Refresh complete.'); return JobStatus.Success; } const { integrityChecks: { checksumFiles: { timeLimit, percentageLimit }, }, } = await this.getConfig({ withCache: true, }); this.logger.log( `Checking file checksums... (will run for up to ${(timeLimit / (60 * 60 * 1000)).toFixed(2)} hours or until ${(percentageLimit * 100).toFixed(2)}% of assets are processed)`, ); let processed = 0; const startedAt = Date.now(); const { count } = await this.integrityRepository.getAssetCount(); const checkpoint = await this.systemMetadataRepository.get(SystemMetadataKey.IntegrityChecksumCheckpoint); let startMarker: Date | undefined = checkpoint?.date ? new Date(checkpoint.date) : undefined; let endMarker: Date | undefined; const printStats = () => { const averageTime = ((Date.now() - startedAt) / processed).toFixed(2); const completionProgress = ((processed / count) * 100).toFixed(2); this.logger.log( `Processed ${processed} files so far... (avg. ${averageTime} ms/asset, ${completionProgress}% of all assets)`, ); }; let lastCreatedAt: Date | undefined; finishEarly: do { this.logger.log( `Processing assets in range [${startMarker?.toISOString() ?? 'beginning'}, ${endMarker?.toISOString() ?? 'end'}]`, ); const assets = this.integrityRepository.streamAssetChecksums(startMarker, endMarker); endMarker = startMarker; startMarker = undefined; for await (const { originalPath, checksum, createdAt, assetId, reportId } of assets) { processed++; try { const hash = createHash('sha1'); await pipeline([ this.storageRepository.createPlainReadStream(originalPath), new Writable({ write(chunk, _encoding, callback) { hash.update(chunk); callback(); }, }), ]); if (checksum.equals(hash.digest())) { if (reportId) { await this.integrityRepository.deleteById(reportId); } } else { throw new Error('File failed checksum'); } } catch (error) { if ((error as { code?: string }).code === 'ENOENT') { if (reportId) { await this.integrityRepository.deleteById(reportId); } // missing file; handled by the missing files job continue; } this.logger.warn('Failed to process a file: ' + error); await this.integrityRepository.create({ path: originalPath, type: IntegrityReportType.ChecksumFail, assetId, }); } if (processed % 100 === 0) { printStats(); } if (Date.now() > startedAt + timeLimit || processed > count * percentageLimit) { this.logger.log('Reached stop criteria.'); lastCreatedAt = createdAt; break finishEarly; } } } while (endMarker); await this.systemMetadataRepository.set(SystemMetadataKey.IntegrityChecksumCheckpoint, { date: lastCreatedAt?.toISOString(), }); printStats(); if (lastCreatedAt) { this.logger.log(`Finished checksum job, will continue from ${lastCreatedAt.toISOString()}.`); } else { this.logger.log(`Finished checksum job, covered all assets.`); } return JobStatus.Success; } @OnJob({ name: JobName.IntegrityChecksumFilesRefresh, queue: QueueName.IntegrityCheck }) async handleChecksumRefresh({ items: paths }: IIntegrityPathWithChecksumJob): Promise { this.logger.log(`Processing batch of ${paths.length} reports to check if they are out of date.`); const results = await Promise.all( paths.map(async ({ reportId, path, checksum }) => { if (!checksum) { return reportId; } try { const hash = createHash('sha1'); await pipeline([ this.storageRepository.createPlainReadStream(path), new Writable({ write(chunk, _encoding, callback) { hash.update(chunk); callback(); }, }), ]); if (Buffer.from(checksum, 'hex').equals(hash.digest())) { return reportId; } } catch (error) { if ((error as { code?: string }).code === 'ENOENT') { return reportId; } } }), ); const reportIds = results.filter(Boolean) as string[]; if (reportIds.length > 0) { await this.integrityRepository.deleteByIds(reportIds); } this.logger.log(`Processed ${paths.length} paths and found ${reportIds.length} report(s) out of date.`); return JobStatus.Success; } @OnJob({ name: JobName.IntegrityDeleteReportType, queue: QueueName.IntegrityCheck }) async handleDeleteAllIntegrityReports({ type }: IIntegrityDeleteReportTypeJob): Promise { this.logger.log(`Deleting all entries for ${type ?? 'all types of'} integrity report`); let properties; switch (type) { case IntegrityReportType.ChecksumFail: { properties = ['assetId'] as const; break; } case IntegrityReportType.MissingFile: { properties = ['assetId', 'fileAssetId'] as const; break; } case IntegrityReportType.OrphanFile: { properties = [void 0] as const; break; } default: { properties = [void 0, 'assetId', 'fileAssetId'] as const; break; } } for (const property of properties) { const reports = this.integrityRepository.streamIntegrityReportsByProperty(property, type); for await (const batch of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) { await this.jobRepository.queue({ name: JobName.IntegrityDeleteReports, data: { reports: batch, }, }); this.logger.log(`Queued ${batch.length} reports to delete.`); } } return JobStatus.Success; } @OnJob({ name: JobName.IntegrityDeleteReports, queue: QueueName.IntegrityCheck }) async handleDeleteIntegrityReports({ reports }: IIntegrityDeleteReportsJob): Promise { const byAsset = reports.filter((report) => report.assetId); const byFileAsset = reports.filter((report) => report.fileAssetId); const byPath = reports.filter((report) => !report.assetId && !report.fileAssetId); if (byAsset.length > 0) { const ids = byAsset.map(({ assetId }) => assetId!); await this.assetRepository.updateAll(ids, { deletedAt: new Date(), status: AssetStatus.Trashed, }); await this.eventRepository.emit('AssetTrashAll', { assetIds: ids, userId: '', // we don't notify any users currently }); await this.integrityRepository.deleteByIds(byAsset.map(({ id }) => id)); } if (byFileAsset.length > 0) { await this.assetRepository.deleteFiles(byFileAsset.map(({ fileAssetId }) => ({ id: fileAssetId! }))); } if (byPath.length > 0) { await Promise.all(byPath.map(({ path }) => this.storageRepository.unlink(path).catch(() => void 0))); await this.integrityRepository.deleteByIds(byPath.map(({ id }) => id)); } this.logger.log(`Deleted ${reports.length} reports.`); return JobStatus.Success; } } async function* chunk(generator: AsyncIterableIterator, n: number) { let chunk: T[] = []; for await (const item of generator) { chunk.push(item); if (chunk.length === n) { yield chunk; chunk = []; } } if (chunk.length > 0) { yield chunk; } }