feat: assetId, fileAssetId columns on integrity reports

This commit is contained in:
izzy
2025-12-01 15:49:03 +00:00
parent 042af30bef
commit 806a2880ca
6 changed files with 166 additions and 48 deletions

View File

@@ -1,5 +1,5 @@
import { Injectable } from '@nestjs/common';
import { Kysely } from 'kysely';
import { Kysely, sql } from 'kysely';
import { jsonArrayFrom } from 'kysely/helpers/postgres';
import { InjectKysely } from 'nestjs-kysely';
import { Asset, columns } from 'src/database';
@@ -282,24 +282,52 @@ export class AssetJobRepository {
.selectFrom((eb) =>
eb
.selectFrom('asset')
.select(['originalPath as path'])
.select(['asset.originalPath as path'])
.select((eb) => [
eb.ref('asset.id').$castTo<string | null>().as('assetId'),
sql<string | null>`null::uuid`.as('fileAssetId'),
])
.unionAll(
eb
.selectFrom('asset')
.select(['encodedVideoPath as path'])
.where('encodedVideoPath', 'is not', null)
.where('encodedVideoPath', '!=', '')
.$castTo<{ path: string }>(),
.select((eb) => [
eb.ref('asset.encodedVideoPath').$castTo<string>().as('path'),
eb.ref('asset.id').$castTo<string | null>().as('assetId'),
sql<string | null>`null::uuid`.as('fileAssetId'),
])
.where('asset.encodedVideoPath', 'is not', null)
.where('asset.encodedVideoPath', '!=', sql<string>`''`),
)
.unionAll(
eb
.selectFrom('asset_file')
.select(['path'])
.select((eb) => [
sql<string | null>`null::uuid`.as('assetId'),
eb.ref('asset_file.id').$castTo<string | null>().as('fileAssetId'),
]),
)
.unionAll(eb.selectFrom('asset_file').select(['path']))
.as('allPaths'),
)
.leftJoin('integrity_report', (join) =>
join
.onRef('integrity_report.path', '=', 'allPaths.path')
.on('integrity_report.type', '=', IntegrityReportType.OrphanFile),
.leftJoin(
'integrity_report',
(join) =>
join
.on('integrity_report.type', '=', IntegrityReportType.OrphanFile)
.on((eb) =>
eb.or([
eb('integrity_report.assetId', '=', eb.ref('allPaths.assetId')),
eb('integrity_report.fileAssetId', '=', eb.ref('allPaths.fileAssetId')),
]),
),
// .onRef('integrity_report.path', '=', 'allPaths.path')
)
.select(['allPaths.path as path', 'integrity_report.path as reportId'])
.select([
'allPaths.path as path',
'allPaths.assetId',
'allPaths.fileAssetId',
'integrity_report.path as reportId',
])
.stream();
}
@@ -309,10 +337,17 @@ export class AssetJobRepository {
.selectFrom('asset')
.leftJoin('integrity_report', (join) =>
join
.onRef('integrity_report.path', '=', 'asset.originalPath')
.onRef('integrity_report.assetId', '=', 'asset.id')
// .onRef('integrity_report.path', '=', 'asset.originalPath')
.on('integrity_report.type', '=', IntegrityReportType.ChecksumFail),
)
.select(['asset.originalPath', 'asset.checksum', 'asset.createdAt', 'integrity_report.id as reportId'])
.select([
'asset.originalPath',
'asset.checksum',
'asset.createdAt',
'asset.id as assetId',
'integrity_report.id as reportId',
])
.$if(startMarker !== undefined, (qb) => qb.where('createdAt', '>=', startMarker!))
.$if(endMarker !== undefined, (qb) => qb.where('createdAt', '<=', endMarker!))
.orderBy('createdAt', 'asc')

View File

@@ -19,7 +19,12 @@ export class IntegrityReportRepository {
return this.db
.insertInto('integrity_report')
.values(dto)
.onConflict((oc) => oc.doNothing())
.onConflict((oc) =>
oc.columns(['path', 'type']).doUpdateSet({
assetId: (eb) => eb.ref('excluded.assetId'),
fileAssetId: (eb) => eb.ref('excluded.fileAssetId'),
}),
)
.returningAll()
.executeTakeFirst();
}
@@ -60,7 +65,7 @@ export class IntegrityReportRepository {
return {
items: await this.db
.selectFrom('integrity_report')
.select(['id', 'type', 'path'])
.select(['id', 'type', 'path', 'assetId', 'fileAssetId'])
.where('type', '=', dto.type)
.orderBy('createdAt', 'desc')
.execute(),
@@ -70,19 +75,19 @@ export class IntegrityReportRepository {
getIntegrityReportCsv(type: IntegrityReportType): Readable {
const items = this.db
.selectFrom('integrity_report')
.select(['id', 'type', 'path'])
.select(['id', 'type', 'path', 'assetId', 'fileAssetId'])
.where('type', '=', type)
.orderBy('createdAt', 'desc')
.stream();
// very rudimentary csv serialiser
async function* generator() {
yield 'id,type,path\n';
yield 'id,type,assetId,fileAssetId,path\n';
for await (const item of items) {
// no expectation of particularly bad filenames
// but they could potentially have a newline or quote character
yield `${item.id},${item.type},"${item.path.replace(/"/g, '\\"')}"\n`;
yield `${item.id},${item.type},${item.assetId},${item.fileAssetId},"${item.path.replace(/"/g, '\\"')}"\n`;
}
}

View File

@@ -6,9 +6,15 @@ export async function up(db: Kysely<any>): Promise<void> {
"type" character varying NOT NULL,
"path" character varying NOT NULL,
"createdAt" timestamp with time zone NOT NULL DEFAULT now(),
"assetId" uuid,
"fileAssetId" uuid,
CONSTRAINT "integrity_report_assetId_fkey" FOREIGN KEY ("assetId") REFERENCES "asset" ("id") ON UPDATE CASCADE ON DELETE CASCADE,
CONSTRAINT "integrity_report_fileAssetId_fkey" FOREIGN KEY ("fileAssetId") REFERENCES "asset_file" ("id") ON UPDATE CASCADE ON DELETE CASCADE,
CONSTRAINT "integrity_report_type_path_uq" UNIQUE ("type", "path"),
CONSTRAINT "integrity_report_pkey" PRIMARY KEY ("id")
);`.execute(db);
await sql`CREATE INDEX "integrity_report_assetId_idx" ON "integrity_report" ("assetId");`.execute(db);
await sql`CREATE INDEX "integrity_report_fileAssetId_idx" ON "integrity_report" ("fileAssetId");`.execute(db);
}
export async function down(db: Kysely<any>): Promise<void> {

View File

@@ -1,5 +1,16 @@
import { IntegrityReportType } from 'src/enum';
import { Column, CreateDateColumn, Generated, PrimaryGeneratedColumn, Table, Timestamp, Unique } from 'src/sql-tools';
import { AssetFileTable } from 'src/schema/tables/asset-file.table';
import { AssetTable } from 'src/schema/tables/asset.table';
import {
Column,
CreateDateColumn,
ForeignKeyColumn,
Generated,
PrimaryGeneratedColumn,
Table,
Timestamp,
Unique,
} from 'src/sql-tools';
@Table('integrity_report')
@Unique({ columns: ['type', 'path'] })
@@ -15,4 +26,10 @@ export class IntegrityReportTable {
@CreateDateColumn()
createdAt!: Generated<Timestamp>;
@ForeignKeyColumn(() => AssetTable, { onDelete: 'CASCADE', onUpdate: 'CASCADE', nullable: true })
assetId!: string | null;
@ForeignKeyColumn(() => AssetFileTable, { onDelete: 'CASCADE', onUpdate: 'CASCADE', nullable: true })
fileAssetId!: string | null;
}

View File

@@ -21,27 +21,36 @@ import { ArgOf } from 'src/repositories/event.repository';
import { BaseService } from 'src/services/base.service';
import {
IIntegrityJob,
IIntegrityMissingFilesJob,
IIntegrityOrphanedFilesJob,
IIntegrityPathWithChecksumJob,
IIntegrityPathWithReportJob,
} from 'src/types';
import { handlePromiseError } from 'src/utils/misc';
async function* chunk<T>(generator: AsyncIterableIterator<T>, n: number) {
let chunk: T[] = [];
for await (const item of generator) {
chunk.push(item);
if (chunk.length === n) {
yield chunk;
chunk = [];
}
}
if (chunk.length > 0) {
yield chunk;
}
}
/**
* Orphan Files:
* Files are detected in /data/encoded-video, /data/library, /data/upload
* Checked against the asset table
* Files are detected in /data/thumbs
* Checked against the asset_file table
*
* * Can perform download or delete of files
*
* Missing Files:
* Paths are queried from asset(originalPath, encodedVideoPath), asset_file(path)
* Check whether files exist on disk
*
* * Reports must include origin (asset or asset_file) & ID for further action
* * Can perform trash (asset) or dereference (asset_file)
*
* Checksum Mismatch:
* Paths & checksums are queried from asset(originalPath, checksum)
* Check whether files match checksum, missing files ignored
*
* * Reports must include origin (as above) for further action
* * Can perform download or trash (asset)
*/
@Injectable()
export class IntegrityService extends BaseService {
@@ -85,6 +94,24 @@ export class IntegrityService extends BaseService {
start: checksumFiles.enabled,
});
}
// debug: run on boot
setTimeout(() => {
void this.jobRepository.queue({
name: JobName.IntegrityOrphanedFilesQueueAll,
data: {},
});
void this.jobRepository.queue({
name: JobName.IntegrityMissingFilesQueueAll,
data: {},
});
void this.jobRepository.queue({
name: JobName.IntegrityChecksumFiles,
data: {},
});
}, 1000);
}
@OnEvent({ name: 'ConfigUpdate', server: true })
@@ -220,11 +247,11 @@ export class IntegrityService extends BaseService {
}
@OnJob({ name: JobName.IntegrityOrphanedFilesRefresh, queue: QueueName.BackgroundTask })
async handleOrphanedRefresh({ items: paths }: IIntegrityPathWithReportJob): Promise<JobStatus> {
this.logger.log(`Processing batch of ${paths.length} reports to check if they are out of date.`);
async handleOrphanedRefresh({ items }: IIntegrityPathWithReportJob): Promise<JobStatus> {
this.logger.log(`Processing batch of ${items.length} reports to check if they are out of date.`);
const results = await Promise.all(
paths.map(({ reportId, path }) =>
items.map(({ reportId, path }) =>
stat(path)
.then(() => void 0)
.catch(() => reportId),
@@ -237,7 +264,7 @@ export class IntegrityService extends BaseService {
await this.integrityReportRepository.deleteByIds(reportIds);
}
this.logger.log(`Processed ${paths.length} paths and found ${reportIds.length} report(s) out of date.`);
this.logger.log(`Processed ${items.length} paths and found ${reportIds.length} report(s) out of date.`);
return JobStatus.Success;
}
@@ -286,14 +313,14 @@ export class IntegrityService extends BaseService {
}
@OnJob({ name: JobName.IntegrityMissingFiles, queue: QueueName.BackgroundTask })
async handleMissingFiles({ items: paths }: IIntegrityPathWithReportJob): Promise<JobStatus> {
this.logger.log(`Processing batch of ${paths.length} files to check if they are missing.`);
async handleMissingFiles({ items }: IIntegrityMissingFilesJob): Promise<JobStatus> {
this.logger.log(`Processing batch of ${items.length} files to check if they are missing.`);
const results = await Promise.all(
paths.map((file) =>
stat(file.path)
.then(() => ({ ...file, exists: true }))
.catch(() => ({ ...file, exists: false })),
items.map((item) =>
stat(item.path)
.then(() => ({ ...item, exists: true }))
.catch(() => ({ ...item, exists: false })),
),
);
@@ -308,14 +335,16 @@ export class IntegrityService extends BaseService {
const missingFiles = results.filter(({ exists }) => !exists);
if (missingFiles.length > 0) {
await this.integrityReportRepository.create(
missingFiles.map(({ path }) => ({
missingFiles.map(({ path, assetId, fileAssetId }) => ({
type: IntegrityReportType.MissingFile,
path,
assetId,
fileAssetId,
})),
);
}
this.logger.log(`Processed ${paths.length} and found ${missingFiles.length} missing file(s).`);
this.logger.log(`Processed ${items.length} and found ${missingFiles.length} missing file(s).`);
return JobStatus.Success;
}
@@ -409,7 +438,7 @@ export class IntegrityService extends BaseService {
endMarker = startMarker;
startMarker = undefined;
for await (const { originalPath, checksum, createdAt, reportId } of assets) {
for await (const { originalPath, checksum, createdAt, assetId, reportId } of assets) {
processed++;
try {
@@ -445,6 +474,7 @@ export class IntegrityService extends BaseService {
await this.integrityReportRepository.create({
path: originalPath,
type: IntegrityReportType.ChecksumFail,
assetId,
});
}
@@ -519,3 +549,19 @@ export class IntegrityService extends BaseService {
return JobStatus.Success;
}
}
async function* chunk<T>(generator: AsyncIterableIterator<T>, n: number) {
let chunk: T[] = [];
for await (const item of generator) {
chunk.push(item);
if (chunk.length === n) {
yield chunk;
chunk = [];
}
}
if (chunk.length > 0) {
yield chunk;
}
}

View File

@@ -291,6 +291,15 @@ export interface IIntegrityOrphanedFilesJob {
paths: string[];
}
export interface IIntegrityMissingFilesJob {
items: {
path: string;
reportId: string | null;
assetId: string | null;
fileAssetId: string | null;
}[];
}
export interface IIntegrityPathWithReportJob {
items: { path: string; reportId: string | null }[];
}