From cc31b9c7f172eb2cd2d32efe3eac8025f6dc9ba0 Mon Sep 17 00:00:00 2001 From: izzy Date: Thu, 27 Nov 2025 15:13:19 +0000 Subject: [PATCH] feat: clean up old reports of checksum or missing files refactor: combine the stream query --- server/src/bin/migrations.ts | 8 +++ .../src/repositories/asset-job.repository.ts | 38 ++++++++--- .../integrity-report.repository.ts | 8 +++ server/src/services/integrity.service.ts | 68 ++++++++++--------- server/src/types.ts | 2 +- 5 files changed, 84 insertions(+), 40 deletions(-) diff --git a/server/src/bin/migrations.ts b/server/src/bin/migrations.ts index 588f358023..8c873c4274 100644 --- a/server/src/bin/migrations.ts +++ b/server/src/bin/migrations.ts @@ -62,6 +62,10 @@ const main = async () => { const getDatabaseClient = () => { const configRepository = new ConfigRepository(); const { database } = configRepository.getEnv(); + database.config = { + connectionType: 'url', + url: 'postgres://postgres:postgres@database:5432/immich', + }; return new Kysely(getKyselyConfig(database.config)); }; @@ -130,6 +134,10 @@ const create = (path: string, up: string[], down: string[]) => { const compare = async () => { const configRepository = new ConfigRepository(); const { database } = configRepository.getEnv(); + database.config = { + connectionType: 'url', + url: 'postgres://postgres:postgres@database:5432/immich', + }; const db = postgres(asPostgresConnectionConfig(database.config)); const source = schemaFromCode({ overrides: true, namingStrategy: 'default' }); diff --git a/server/src/repositories/asset-job.repository.ts b/server/src/repositories/asset-job.repository.ts index 4bf68b708a..79727520b1 100644 --- a/server/src/repositories/asset-job.repository.ts +++ b/server/src/repositories/asset-job.repository.ts @@ -4,7 +4,7 @@ import { jsonArrayFrom } from 'kysely/helpers/postgres'; import { InjectKysely } from 'nestjs-kysely'; import { Asset, columns } from 'src/database'; import { DummyValue, GenerateSql } from 'src/decorators'; -import { AssetFileType, AssetType, AssetVisibility } from 'src/enum'; +import { AssetFileType, AssetType, AssetVisibility, IntegrityReportType } from 'src/enum'; import { DB } from 'src/schema'; import { StorageAsset } from 'src/types'; import { @@ -278,19 +278,41 @@ export class AssetJobRepository { @GenerateSql({ params: [], stream: true }) streamAssetPaths() { - return this.db.selectFrom('asset').select(['originalPath', 'encodedVideoPath']).stream(); - } - - @GenerateSql({ params: [], stream: true }) - streamAssetFilePaths() { - return this.db.selectFrom('asset_file').select(['path']).stream(); + return this.db + .selectFrom((eb) => + eb + .selectFrom('asset') + .select(['originalPath as path']) + .unionAll( + eb + .selectFrom('asset') + .select(['encodedVideoPath as path']) + .where('encodedVideoPath', 'is not', null) + .where('encodedVideoPath', '!=', '') + .$castTo<{ path: string }>(), + ) + .unionAll(eb.selectFrom('asset_file').select(['path'])) + .as('allPaths'), + ) + .leftJoin('integrity_report', (join) => + join + .onRef('integrity_report.path', '=', 'allPaths.path') + .on('integrity_report.type', '=', IntegrityReportType.OrphanFile), + ) + .select(['allPaths.path as path', 'integrity_report.path as reportId']) + .stream(); } @GenerateSql({ params: [DummyValue.DATE, DummyValue.DATE], stream: true }) streamAssetChecksums(startMarker?: Date, endMarker?: Date) { return this.db .selectFrom('asset') - .select(['originalPath', 'checksum', 'createdAt']) + .leftJoin('integrity_report', (join) => + join + .onRef('integrity_report.path', '=', 'asset.originalPath') + .on('integrity_report.type', '=', IntegrityReportType.ChecksumFail), + ) + .select(['asset.originalPath', 'asset.checksum', 'asset.createdAt', 'integrity_report.id as reportId']) .$if(startMarker !== undefined, (qb) => qb.where('createdAt', '>=', startMarker!)) .$if(endMarker !== undefined, (qb) => qb.where('createdAt', '<=', endMarker!)) .orderBy('createdAt', 'asc') diff --git a/server/src/repositories/integrity-report.repository.ts b/server/src/repositories/integrity-report.repository.ts index 17e85e78a3..25194731d6 100644 --- a/server/src/repositories/integrity-report.repository.ts +++ b/server/src/repositories/integrity-report.repository.ts @@ -16,4 +16,12 @@ export class IntegrityReportRepository { .returningAll() .executeTakeFirst(); } + + deleteById(id: string) { + return this.db.deleteFrom('integrity_report').where('id', '=', id).execute(); + } + + deleteByIds(ids: string[]) { + return this.db.deleteFrom('integrity_report').where('id', 'in', ids).execute(); + } } diff --git a/server/src/services/integrity.service.ts b/server/src/services/integrity.service.ts index c4db5d49e7..5a093f0828 100644 --- a/server/src/services/integrity.service.ts +++ b/server/src/services/integrity.service.ts @@ -154,23 +154,8 @@ export class IntegrityService extends BaseService { this.logger.log(`Scanning for missing files...`); const assetPaths = this.assetJobRepository.streamAssetPaths(); - const assetFilePaths = this.assetJobRepository.streamAssetFilePaths(); - async function* paths() { - for await (const { originalPath, encodedVideoPath } of assetPaths) { - yield originalPath; - - if (encodedVideoPath) { - yield encodedVideoPath; - } - } - - for await (const { path } of assetFilePaths) { - yield path; - } - } - - async function* chunk(generator: AsyncGenerator, n: number) { + async function* chunk(generator: AsyncIterableIterator, n: number) { let chunk: T[] = []; for await (const item of generator) { chunk.push(item); @@ -187,7 +172,7 @@ export class IntegrityService extends BaseService { } let total = 0; - for await (const batchPaths of chunk(paths(), JOBS_LIBRARY_PAGINATION_SIZE)) { + for await (const batchPaths of chunk(assetPaths, JOBS_LIBRARY_PAGINATION_SIZE)) { await this.jobRepository.queue({ name: JobName.IntegrityMissingFiles, data: { @@ -206,22 +191,31 @@ export class IntegrityService extends BaseService { async handleMissingFiles({ paths }: IIntegrityMissingFilesJob): Promise { this.logger.log(`Processing batch of ${paths.length} files to check if they are missing.`); - const result = await Promise.all( - paths.map((path) => - stat(path) - .then(() => void 0) - .catch(() => path), + const results = await Promise.all( + paths.map((file) => + stat(file.path) + .then(() => ({ ...file, exists: true })) + .catch(() => ({ ...file, exists: false })), ), ); - const missingFiles = result.filter((path) => path) as string[]; + const outdatedReports = results + .filter(({ exists, reportId }) => exists && reportId) + .map(({ reportId }) => reportId!); - await this.integrityReportRepository.create( - missingFiles.map((path) => ({ - type: IntegrityReportType.MissingFile, - path, - })), - ); + if (outdatedReports.length) { + await this.integrityReportRepository.deleteByIds(outdatedReports); + } + + const missingFiles = results.filter(({ exists }) => !exists); + if (missingFiles.length) { + await this.integrityReportRepository.create( + missingFiles.map(({ path }) => ({ + type: IntegrityReportType.MissingFile, + path, + })), + ); + } this.logger.log(`Processed ${paths.length} and found ${missingFiles.length} missing file(s).`); return JobStatus.Success; @@ -264,7 +258,7 @@ export class IntegrityService extends BaseService { endMarker = startMarker; startMarker = undefined; - for await (const { originalPath, checksum, createdAt } of assets) { + for await (const { originalPath, checksum, createdAt, reportId } of assets) { try { const hash = createHash('sha1'); @@ -278,10 +272,22 @@ export class IntegrityService extends BaseService { }), ]); - if (!checksum.equals(hash.digest())) { + if (checksum.equals(hash.digest())) { + if (reportId) { + await this.integrityReportRepository.deleteById(reportId); + } + } else { throw new Error('File failed checksum'); } } catch (error) { + if ((error as { code?: string }).code === 'ENOENT') { + if (reportId) { + await this.integrityReportRepository.deleteById(reportId); + } + // missing file; handled by the missing files job + continue; + } + this.logger.warn('Failed to process a file: ' + error); await this.integrityReportRepository.create({ path: originalPath, diff --git a/server/src/types.ts b/server/src/types.ts index 183bdfc750..41c8feb4e9 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -288,7 +288,7 @@ export interface IIntegrityOrphanedFilesJob { } export interface IIntegrityMissingFilesJob { - paths: string[]; + paths: { path: string; reportId: string | null }[]; } export interface JobCounts {