diff --git a/server/src/repositories/asset-job.repository.ts b/server/src/repositories/asset-job.repository.ts index 7c277e085d..643966e3c7 100644 --- a/server/src/repositories/asset-job.repository.ts +++ b/server/src/repositories/asset-job.repository.ts @@ -1,5 +1,5 @@ import { Injectable } from '@nestjs/common'; -import { Kysely } from 'kysely'; +import { Kysely, sql } from 'kysely'; import { jsonArrayFrom } from 'kysely/helpers/postgres'; import { InjectKysely } from 'nestjs-kysely'; import { Asset, columns } from 'src/database'; @@ -282,24 +282,52 @@ export class AssetJobRepository { .selectFrom((eb) => eb .selectFrom('asset') - .select(['originalPath as path']) + .select(['asset.originalPath as path']) + .select((eb) => [ + eb.ref('asset.id').$castTo().as('assetId'), + sql`null::uuid`.as('fileAssetId'), + ]) .unionAll( eb .selectFrom('asset') - .select(['encodedVideoPath as path']) - .where('encodedVideoPath', 'is not', null) - .where('encodedVideoPath', '!=', '') - .$castTo<{ path: string }>(), + .select((eb) => [ + eb.ref('asset.encodedVideoPath').$castTo().as('path'), + eb.ref('asset.id').$castTo().as('assetId'), + sql`null::uuid`.as('fileAssetId'), + ]) + .where('asset.encodedVideoPath', 'is not', null) + .where('asset.encodedVideoPath', '!=', sql`''`), + ) + .unionAll( + eb + .selectFrom('asset_file') + .select(['path']) + .select((eb) => [ + sql`null::uuid`.as('assetId'), + eb.ref('asset_file.id').$castTo().as('fileAssetId'), + ]), ) - .unionAll(eb.selectFrom('asset_file').select(['path'])) .as('allPaths'), ) - .leftJoin('integrity_report', (join) => - join - .onRef('integrity_report.path', '=', 'allPaths.path') - .on('integrity_report.type', '=', IntegrityReportType.OrphanFile), + .leftJoin( + 'integrity_report', + (join) => + join + .on('integrity_report.type', '=', IntegrityReportType.OrphanFile) + .on((eb) => + eb.or([ + eb('integrity_report.assetId', '=', eb.ref('allPaths.assetId')), + eb('integrity_report.fileAssetId', '=', eb.ref('allPaths.fileAssetId')), + ]), + ), + // .onRef('integrity_report.path', '=', 'allPaths.path') ) - .select(['allPaths.path as path', 'integrity_report.path as reportId']) + .select([ + 'allPaths.path as path', + 'allPaths.assetId', + 'allPaths.fileAssetId', + 'integrity_report.path as reportId', + ]) .stream(); } @@ -309,10 +337,17 @@ export class AssetJobRepository { .selectFrom('asset') .leftJoin('integrity_report', (join) => join - .onRef('integrity_report.path', '=', 'asset.originalPath') + .onRef('integrity_report.assetId', '=', 'asset.id') + // .onRef('integrity_report.path', '=', 'asset.originalPath') .on('integrity_report.type', '=', IntegrityReportType.ChecksumFail), ) - .select(['asset.originalPath', 'asset.checksum', 'asset.createdAt', 'integrity_report.id as reportId']) + .select([ + 'asset.originalPath', + 'asset.checksum', + 'asset.createdAt', + 'asset.id as assetId', + 'integrity_report.id as reportId', + ]) .$if(startMarker !== undefined, (qb) => qb.where('createdAt', '>=', startMarker!)) .$if(endMarker !== undefined, (qb) => qb.where('createdAt', '<=', endMarker!)) .orderBy('createdAt', 'asc') diff --git a/server/src/repositories/integrity-report.repository.ts b/server/src/repositories/integrity-report.repository.ts index 5e6cc0666c..fe8c99818f 100644 --- a/server/src/repositories/integrity-report.repository.ts +++ b/server/src/repositories/integrity-report.repository.ts @@ -19,7 +19,12 @@ export class IntegrityReportRepository { return this.db .insertInto('integrity_report') .values(dto) - .onConflict((oc) => oc.doNothing()) + .onConflict((oc) => + oc.columns(['path', 'type']).doUpdateSet({ + assetId: (eb) => eb.ref('excluded.assetId'), + fileAssetId: (eb) => eb.ref('excluded.fileAssetId'), + }), + ) .returningAll() .executeTakeFirst(); } @@ -60,7 +65,7 @@ export class IntegrityReportRepository { return { items: await this.db .selectFrom('integrity_report') - .select(['id', 'type', 'path']) + .select(['id', 'type', 'path', 'assetId', 'fileAssetId']) .where('type', '=', dto.type) .orderBy('createdAt', 'desc') .execute(), @@ -70,19 +75,19 @@ export class IntegrityReportRepository { getIntegrityReportCsv(type: IntegrityReportType): Readable { const items = this.db .selectFrom('integrity_report') - .select(['id', 'type', 'path']) + .select(['id', 'type', 'path', 'assetId', 'fileAssetId']) .where('type', '=', type) .orderBy('createdAt', 'desc') .stream(); // very rudimentary csv serialiser async function* generator() { - yield 'id,type,path\n'; + yield 'id,type,assetId,fileAssetId,path\n'; for await (const item of items) { // no expectation of particularly bad filenames // but they could potentially have a newline or quote character - yield `${item.id},${item.type},"${item.path.replace(/"/g, '\\"')}"\n`; + yield `${item.id},${item.type},${item.assetId},${item.fileAssetId},"${item.path.replace(/"/g, '\\"')}"\n`; } } diff --git a/server/src/schema/migrations/1764255490085-CreateIntegrityReportTable.ts b/server/src/schema/migrations/1764255490085-CreateIntegrityReportTable.ts index a8e4c0f18f..bbff9184a1 100644 --- a/server/src/schema/migrations/1764255490085-CreateIntegrityReportTable.ts +++ b/server/src/schema/migrations/1764255490085-CreateIntegrityReportTable.ts @@ -6,9 +6,15 @@ export async function up(db: Kysely): Promise { "type" character varying NOT NULL, "path" character varying NOT NULL, "createdAt" timestamp with time zone NOT NULL DEFAULT now(), + "assetId" uuid, + "fileAssetId" uuid, + CONSTRAINT "integrity_report_assetId_fkey" FOREIGN KEY ("assetId") REFERENCES "asset" ("id") ON UPDATE CASCADE ON DELETE CASCADE, + CONSTRAINT "integrity_report_fileAssetId_fkey" FOREIGN KEY ("fileAssetId") REFERENCES "asset_file" ("id") ON UPDATE CASCADE ON DELETE CASCADE, CONSTRAINT "integrity_report_type_path_uq" UNIQUE ("type", "path"), CONSTRAINT "integrity_report_pkey" PRIMARY KEY ("id") );`.execute(db); + await sql`CREATE INDEX "integrity_report_assetId_idx" ON "integrity_report" ("assetId");`.execute(db); + await sql`CREATE INDEX "integrity_report_fileAssetId_idx" ON "integrity_report" ("fileAssetId");`.execute(db); } export async function down(db: Kysely): Promise { diff --git a/server/src/schema/tables/integrity-report.table.ts b/server/src/schema/tables/integrity-report.table.ts index fe983f09d6..34ae50ab8e 100644 --- a/server/src/schema/tables/integrity-report.table.ts +++ b/server/src/schema/tables/integrity-report.table.ts @@ -1,5 +1,16 @@ import { IntegrityReportType } from 'src/enum'; -import { Column, CreateDateColumn, Generated, PrimaryGeneratedColumn, Table, Timestamp, Unique } from 'src/sql-tools'; +import { AssetFileTable } from 'src/schema/tables/asset-file.table'; +import { AssetTable } from 'src/schema/tables/asset.table'; +import { + Column, + CreateDateColumn, + ForeignKeyColumn, + Generated, + PrimaryGeneratedColumn, + Table, + Timestamp, + Unique, +} from 'src/sql-tools'; @Table('integrity_report') @Unique({ columns: ['type', 'path'] }) @@ -15,4 +26,10 @@ export class IntegrityReportTable { @CreateDateColumn() createdAt!: Generated; + + @ForeignKeyColumn(() => AssetTable, { onDelete: 'CASCADE', onUpdate: 'CASCADE', nullable: true }) + assetId!: string | null; + + @ForeignKeyColumn(() => AssetFileTable, { onDelete: 'CASCADE', onUpdate: 'CASCADE', nullable: true }) + fileAssetId!: string | null; } diff --git a/server/src/services/integrity.service.ts b/server/src/services/integrity.service.ts index e18da2b7df..e161b40662 100644 --- a/server/src/services/integrity.service.ts +++ b/server/src/services/integrity.service.ts @@ -21,27 +21,36 @@ import { ArgOf } from 'src/repositories/event.repository'; import { BaseService } from 'src/services/base.service'; import { IIntegrityJob, + IIntegrityMissingFilesJob, IIntegrityOrphanedFilesJob, IIntegrityPathWithChecksumJob, IIntegrityPathWithReportJob, } from 'src/types'; import { handlePromiseError } from 'src/utils/misc'; -async function* chunk(generator: AsyncIterableIterator, n: number) { - let chunk: T[] = []; - for await (const item of generator) { - chunk.push(item); - - if (chunk.length === n) { - yield chunk; - chunk = []; - } - } - - if (chunk.length > 0) { - yield chunk; - } -} +/** + * Orphan Files: + * Files are detected in /data/encoded-video, /data/library, /data/upload + * Checked against the asset table + * Files are detected in /data/thumbs + * Checked against the asset_file table + * + * * Can perform download or delete of files + * + * Missing Files: + * Paths are queried from asset(originalPath, encodedVideoPath), asset_file(path) + * Check whether files exist on disk + * + * * Reports must include origin (asset or asset_file) & ID for further action + * * Can perform trash (asset) or dereference (asset_file) + * + * Checksum Mismatch: + * Paths & checksums are queried from asset(originalPath, checksum) + * Check whether files match checksum, missing files ignored + * + * * Reports must include origin (as above) for further action + * * Can perform download or trash (asset) + */ @Injectable() export class IntegrityService extends BaseService { @@ -85,6 +94,24 @@ export class IntegrityService extends BaseService { start: checksumFiles.enabled, }); } + + // debug: run on boot + setTimeout(() => { + void this.jobRepository.queue({ + name: JobName.IntegrityOrphanedFilesQueueAll, + data: {}, + }); + + void this.jobRepository.queue({ + name: JobName.IntegrityMissingFilesQueueAll, + data: {}, + }); + + void this.jobRepository.queue({ + name: JobName.IntegrityChecksumFiles, + data: {}, + }); + }, 1000); } @OnEvent({ name: 'ConfigUpdate', server: true }) @@ -220,11 +247,11 @@ export class IntegrityService extends BaseService { } @OnJob({ name: JobName.IntegrityOrphanedFilesRefresh, queue: QueueName.BackgroundTask }) - async handleOrphanedRefresh({ items: paths }: IIntegrityPathWithReportJob): Promise { - this.logger.log(`Processing batch of ${paths.length} reports to check if they are out of date.`); + async handleOrphanedRefresh({ items }: IIntegrityPathWithReportJob): Promise { + this.logger.log(`Processing batch of ${items.length} reports to check if they are out of date.`); const results = await Promise.all( - paths.map(({ reportId, path }) => + items.map(({ reportId, path }) => stat(path) .then(() => void 0) .catch(() => reportId), @@ -237,7 +264,7 @@ export class IntegrityService extends BaseService { await this.integrityReportRepository.deleteByIds(reportIds); } - this.logger.log(`Processed ${paths.length} paths and found ${reportIds.length} report(s) out of date.`); + this.logger.log(`Processed ${items.length} paths and found ${reportIds.length} report(s) out of date.`); return JobStatus.Success; } @@ -286,14 +313,14 @@ export class IntegrityService extends BaseService { } @OnJob({ name: JobName.IntegrityMissingFiles, queue: QueueName.BackgroundTask }) - async handleMissingFiles({ items: paths }: IIntegrityPathWithReportJob): Promise { - this.logger.log(`Processing batch of ${paths.length} files to check if they are missing.`); + async handleMissingFiles({ items }: IIntegrityMissingFilesJob): Promise { + this.logger.log(`Processing batch of ${items.length} files to check if they are missing.`); const results = await Promise.all( - paths.map((file) => - stat(file.path) - .then(() => ({ ...file, exists: true })) - .catch(() => ({ ...file, exists: false })), + items.map((item) => + stat(item.path) + .then(() => ({ ...item, exists: true })) + .catch(() => ({ ...item, exists: false })), ), ); @@ -308,14 +335,16 @@ export class IntegrityService extends BaseService { const missingFiles = results.filter(({ exists }) => !exists); if (missingFiles.length > 0) { await this.integrityReportRepository.create( - missingFiles.map(({ path }) => ({ + missingFiles.map(({ path, assetId, fileAssetId }) => ({ type: IntegrityReportType.MissingFile, path, + assetId, + fileAssetId, })), ); } - this.logger.log(`Processed ${paths.length} and found ${missingFiles.length} missing file(s).`); + this.logger.log(`Processed ${items.length} and found ${missingFiles.length} missing file(s).`); return JobStatus.Success; } @@ -409,7 +438,7 @@ export class IntegrityService extends BaseService { endMarker = startMarker; startMarker = undefined; - for await (const { originalPath, checksum, createdAt, reportId } of assets) { + for await (const { originalPath, checksum, createdAt, assetId, reportId } of assets) { processed++; try { @@ -445,6 +474,7 @@ export class IntegrityService extends BaseService { await this.integrityReportRepository.create({ path: originalPath, type: IntegrityReportType.ChecksumFail, + assetId, }); } @@ -519,3 +549,19 @@ export class IntegrityService extends BaseService { return JobStatus.Success; } } + +async function* chunk(generator: AsyncIterableIterator, n: number) { + let chunk: T[] = []; + for await (const item of generator) { + chunk.push(item); + + if (chunk.length === n) { + yield chunk; + chunk = []; + } + } + + if (chunk.length > 0) { + yield chunk; + } +} diff --git a/server/src/types.ts b/server/src/types.ts index 03575d5836..e0ed847e8d 100644 --- a/server/src/types.ts +++ b/server/src/types.ts @@ -291,6 +291,15 @@ export interface IIntegrityOrphanedFilesJob { paths: string[]; } +export interface IIntegrityMissingFilesJob { + items: { + path: string; + reportId: string | null; + assetId: string | null; + fileAssetId: string | null; + }[]; +} + export interface IIntegrityPathWithReportJob { items: { path: string; reportId: string | null }[]; }