From 341421045095c5244f1b87da09d17b5df6ebc59a Mon Sep 17 00:00:00 2001 From: izzy Date: Thu, 27 Nov 2025 12:00:35 +0000 Subject: [PATCH] feat: checksum job --- server/src/enum.ts | 1 + .../src/repositories/asset-job.repository.ts | 19 +++ server/src/services/integrity.service.ts | 110 ++++++++++++++++-- server/src/types.ts | 1 + 4 files changed, 122 insertions(+), 9 deletions(-) diff --git a/server/src/enum.ts b/server/src/enum.ts index 58fe0fb3a7..9c8c49c1e8 100644 --- a/server/src/enum.ts +++ b/server/src/enum.ts @@ -301,6 +301,7 @@ export enum SystemMetadataKey { SystemFlags = 'system-flags', VersionCheckState = 'version-check-state', License = 'license', + IntegrityChecksumCheckpoint = 'integrity-checksum-checkpoint', } export enum UserMetadataKey { diff --git a/server/src/repositories/asset-job.repository.ts b/server/src/repositories/asset-job.repository.ts index d7e5c1fdb8..4bf68b708a 100644 --- a/server/src/repositories/asset-job.repository.ts +++ b/server/src/repositories/asset-job.repository.ts @@ -268,6 +268,14 @@ export class AssetJobRepository { return this.db.selectFrom('asset_file').select(['path']).where('path', 'in', paths).execute(); } + @GenerateSql({ params: [] }) + getAssetCount() { + return this.db + .selectFrom('asset') + .select((eb) => eb.fn.countAll().as('count')) + .executeTakeFirstOrThrow(); + } + @GenerateSql({ params: [], stream: true }) streamAssetPaths() { return this.db.selectFrom('asset').select(['originalPath', 'encodedVideoPath']).stream(); @@ -278,6 +286,17 @@ export class AssetJobRepository { return this.db.selectFrom('asset_file').select(['path']).stream(); } + @GenerateSql({ params: [DummyValue.DATE, DummyValue.DATE], stream: true }) + streamAssetChecksums(startMarker?: Date, endMarker?: Date) { + return this.db + .selectFrom('asset') + .select(['originalPath', 'checksum', 'createdAt']) + .$if(startMarker !== undefined, (qb) => qb.where('createdAt', '>=', startMarker!)) + .$if(endMarker !== undefined, (qb) => qb.where('createdAt', '<=', endMarker!)) + .orderBy('createdAt', 'asc') + .stream(); + } + @GenerateSql({ params: [], stream: true }) streamForVideoConversion(force?: boolean) { return this.db diff --git a/server/src/services/integrity.service.ts b/server/src/services/integrity.service.ts index 49900cd1f2..0347a06edf 100644 --- a/server/src/services/integrity.service.ts +++ b/server/src/services/integrity.service.ts @@ -1,9 +1,13 @@ import { Injectable } from '@nestjs/common'; +import { createHash } from 'node:crypto'; +import { createReadStream } from 'node:fs'; import { stat } from 'node:fs/promises'; +import { Writable } from 'node:stream'; +import { pipeline } from 'node:stream/promises'; import { JOBS_LIBRARY_PAGINATION_SIZE } from 'src/constants'; import { StorageCore } from 'src/cores/storage.core'; import { OnEvent, OnJob } from 'src/decorators'; -import { ImmichWorker, JobName, JobStatus, QueueName, StorageFolder } from 'src/enum'; +import { ImmichWorker, JobName, JobStatus, QueueName, StorageFolder, SystemMetadataKey } from 'src/enum'; import { ArgOf } from 'src/repositories/event.repository'; import { BaseService } from 'src/services/base.service'; import { IIntegrityMissingFilesJob, IIntegrityOrphanedFilesJob } from 'src/types'; @@ -28,13 +32,18 @@ export class IntegrityService extends BaseService { // }); // } setTimeout(() => { - this.jobRepository.queue({ - name: JobName.IntegrityOrphanedFilesQueueAll, - data: {}, - }); + // this.jobRepository.queue({ + // name: JobName.IntegrityOrphanedFilesQueueAll, + // data: {}, + // }); + + // this.jobRepository.queue({ + // name: JobName.IntegrityMissingFilesQueueAll, + // data: {}, + // }); this.jobRepository.queue({ - name: JobName.IntegrityMissingFilesQueueAll, + name: JobName.IntegrityChecksumFiles, data: {}, }); }, 1000); @@ -120,7 +129,7 @@ export class IntegrityService extends BaseService { } } - // do something with orphanedFiles + // todo: do something with orphanedFiles console.info(orphanedFiles); this.logger.log(`Processed ${paths.length} and found ${orphanedFiles.size} orphaned file(s).`); @@ -129,6 +138,8 @@ export class IntegrityService extends BaseService { @OnJob({ name: JobName.IntegrityMissingFilesQueueAll, queue: QueueName.BackgroundTask }) async handleMissingFilesQueueAll(): Promise { + this.logger.log(`Scanning for missing files...`); + const assetPaths = this.assetJobRepository.streamAssetPaths(); const assetFilePaths = this.assetJobRepository.streamAssetFilePaths(); @@ -192,7 +203,7 @@ export class IntegrityService extends BaseService { const missingFiles = result.filter((path) => path); - // do something with missingFiles + // todo: do something with missingFiles console.info(missingFiles); this.logger.log(`Processed ${paths.length} and found ${missingFiles.length} missing file(s).`); @@ -201,7 +212,88 @@ export class IntegrityService extends BaseService { @OnJob({ name: JobName.IntegrityChecksumFiles, queue: QueueName.BackgroundTask }) async handleChecksumFiles(): Promise { - // todo + const timeLimit = 60 * 60 * 1000; // 1000; + const percentageLimit = 1.0; // 0.25; + + this.logger.log( + `Checking file checksums... (will run for up to ${(timeLimit / (60 * 60 * 1000)).toFixed(2)} hours or until ${(percentageLimit * 100).toFixed(2)}% of assets are processed)`, + ); + + let processed = 0; + const startedAt = Date.now(); + const { count } = await this.assetJobRepository.getAssetCount(); + const checkpoint = await this.systemMetadataRepository.get(SystemMetadataKey.IntegrityChecksumCheckpoint); + + let startMarker: Date | undefined = checkpoint?.date ? new Date(checkpoint.date) : undefined; + let endMarker: Date | undefined; // todo + + const printStats = () => { + const averageTime = ((Date.now() - startedAt) / processed).toFixed(2); + const completionProgress = ((processed / count) * 100).toFixed(2); + + this.logger.log( + `Processed ${processed} files so far... (avg. ${averageTime} ms/asset, ${completionProgress}% of all assets)`, + ); + }; + + let lastCreatedAt: Date | undefined; + + finishEarly: do { + this.logger.log( + `Processing assets in range [${startMarker?.toISOString() ?? 'beginning'}, ${endMarker?.toISOString() ?? 'end'}]`, + ); + + const assets = this.assetJobRepository.streamAssetChecksums(startMarker, endMarker); + endMarker = startMarker; + startMarker = undefined; + + for await (const { originalPath, checksum, createdAt } of assets) { + try { + const hash = createHash('sha1'); + + await pipeline([ + createReadStream(originalPath), + new Writable({ + write(chunk, _encoding, callback) { + hash.update(chunk); + callback(); + }, + }), + ]); + + if (!checksum.equals(hash.digest())) { + throw new Error('File failed checksum'); + } + } catch (error) { + this.logger.warn('Failed to process a file: ' + error); + // todo: do something with originalPath + } + + processed++; + if (processed % 100 === 0) { + printStats(); + } + + if (Date.now() > startedAt + timeLimit || processed > count * percentageLimit) { + this.logger.log('Reached stop criteria.'); + lastCreatedAt = createdAt; + break finishEarly; + } + } + } while (endMarker); + + this.systemMetadataRepository.set(SystemMetadataKey.IntegrityChecksumCheckpoint, { + date: lastCreatedAt?.toISOString(), + }); + + printStats(); + + if (lastCreatedAt) { + this.logger.log(`Finished checksum job, will continue from ${lastCreatedAt.toISOString()}.`); + } else { + this.logger.log(`Finished checksum job, covered all assets.`); + } + return JobStatus.Success; } } diff --git a/server/src/types.ts b/server/src/types.ts index ebc05946a3..183bdfc750 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -522,6 +522,7 @@ export interface SystemMetadata extends Record; [SystemMetadataKey.VersionCheckState]: VersionCheckMetadata; [SystemMetadataKey.MemoriesState]: MemoriesState; + [SystemMetadataKey.IntegrityChecksumCheckpoint]: { date?: string }; } export interface UserPreferences {