refactor: queue separate jobs for deleting integrity reports

This commit is contained in:
izzy
2025-12-18 17:08:45 +00:00
parent 8b1e29998e
commit 2962c54ee2
5 changed files with 65 additions and 39 deletions

View File

@@ -17033,7 +17033,8 @@
"IntegrityMissingFilesRefresh", "IntegrityMissingFilesRefresh",
"IntegrityChecksumFiles", "IntegrityChecksumFiles",
"IntegrityChecksumFilesRefresh", "IntegrityChecksumFilesRefresh",
"IntegrityReportDelete" "IntegrityDeleteReportType",
"IntegrityDeleteReports"
], ],
"type": "string" "type": "string"
}, },

View File

@@ -665,7 +665,8 @@ export enum JobName {
IntegrityMissingFilesRefresh = 'IntegrityMissingFilesRefresh', IntegrityMissingFilesRefresh = 'IntegrityMissingFilesRefresh',
IntegrityChecksumFiles = 'IntegrityChecksumFiles', IntegrityChecksumFiles = 'IntegrityChecksumFiles',
IntegrityChecksumFilesRefresh = 'IntegrityChecksumFilesRefresh', IntegrityChecksumFilesRefresh = 'IntegrityChecksumFilesRefresh',
IntegrityReportDelete = 'IntegrityReportDelete', IntegrityDeleteReportType = 'IntegrityDeleteReportType',
IntegrityDeleteReports = 'IntegrityDeleteReports',
} }
export enum QueueCommand { export enum QueueCommand {

View File

@@ -27,7 +27,8 @@ import {
import { ArgOf } from 'src/repositories/event.repository'; import { ArgOf } from 'src/repositories/event.repository';
import { BaseService } from 'src/services/base.service'; import { BaseService } from 'src/services/base.service';
import { import {
IIntegrityDeleteReportJob, IIntegrityDeleteReportsJob,
IIntegrityDeleteReportTypeJob,
IIntegrityJob, IIntegrityJob,
IIntegrityMissingFilesJob, IIntegrityMissingFilesJob,
IIntegrityOrphanedFilesJob, IIntegrityOrphanedFilesJob,
@@ -622,8 +623,8 @@ export class IntegrityService extends BaseService {
return JobStatus.Success; return JobStatus.Success;
} }
@OnJob({ name: JobName.IntegrityReportDelete, queue: QueueName.IntegrityCheck }) @OnJob({ name: JobName.IntegrityDeleteReportType, queue: QueueName.IntegrityCheck })
async handleDeleteIntegrityReport({ type }: IIntegrityDeleteReportJob): Promise<JobStatus> { async handleDeleteAllIntegrityReports({ type }: IIntegrityDeleteReportTypeJob): Promise<JobStatus> {
this.logger.log(`Deleting all entries for ${type ?? 'all types of'} integrity report`); this.logger.log(`Deleting all entries for ${type ?? 'all types of'} integrity report`);
let properties; let properties;
@@ -648,39 +649,52 @@ export class IntegrityService extends BaseService {
for (const property of properties) { for (const property of properties) {
const reports = this.integrityRepository.streamIntegrityReportsByProperty(property, type); const reports = this.integrityRepository.streamIntegrityReportsByProperty(property, type);
for await (const report of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) { for await (const batch of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) {
// todo: queue sub-job here instead? await this.jobRepository.queue({
name: JobName.IntegrityDeleteReports,
data: {
reports: batch,
},
});
switch (property) { this.logger.log(`Queued ${batch.length} reports to delete.`);
case 'assetId': {
const ids = report.map(({ assetId }) => assetId!);
await this.assetRepository.updateAll(ids, {
deletedAt: new Date(),
status: AssetStatus.Trashed,
});
await this.eventRepository.emit('AssetTrashAll', {
assetIds: ids,
userId: '', // ???
});
await this.integrityRepository.deleteByIds(report.map(({ id }) => id));
break;
}
case 'fileAssetId': {
await this.assetRepository.deleteFiles(report.map(({ fileAssetId }) => ({ id: fileAssetId! })));
break;
}
default: {
await Promise.all(report.map(({ path }) => this.storageRepository.unlink(path).catch(() => void 0)));
await this.integrityRepository.deleteByIds(report.map(({ id }) => id));
break;
}
}
} }
} }
this.logger.log('Finished deleting integrity report.'); return JobStatus.Success;
}
@OnJob({ name: JobName.IntegrityDeleteReports, queue: QueueName.IntegrityCheck })
async handleDeleteIntegrityReports({ reports }: IIntegrityDeleteReportsJob): Promise<JobStatus> {
const byAsset = reports.filter((report) => report.assetId);
const byFileAsset = reports.filter((report) => report.fileAssetId);
const byPath = reports.filter((report) => !report.assetId && !report.fileAssetId);
if (byAsset.length > 0) {
const ids = byAsset.map(({ assetId }) => assetId!);
await this.assetRepository.updateAll(ids, {
deletedAt: new Date(),
status: AssetStatus.Trashed,
});
await this.eventRepository.emit('AssetTrashAll', {
assetIds: ids,
userId: '', // we don't notify any users currently
});
await this.integrityRepository.deleteByIds(byAsset.map(({ id }) => id));
}
if (byFileAsset.length > 0) {
await this.assetRepository.deleteFiles(byFileAsset.map(({ fileAssetId }) => ({ id: fileAssetId! })));
}
if (byPath.length > 0) {
await Promise.all(byPath.map(({ path }) => this.storageRepository.unlink(path).catch(() => void 0)));
await this.integrityRepository.deleteByIds(byPath.map(({ id }) => id));
}
this.logger.log(`Deleted ${reports.length} reports.`);
return JobStatus.Success; return JobStatus.Success;
} }
} }

View File

@@ -59,15 +59,15 @@ const asJobItem = (dto: JobCreateDto): JobItem => {
} }
case ManualJobName.IntegrityMissingFilesDeleteAll: { case ManualJobName.IntegrityMissingFilesDeleteAll: {
return { name: JobName.IntegrityReportDelete, data: { type: IntegrityReportType.MissingFile } }; return { name: JobName.IntegrityDeleteReportType, data: { type: IntegrityReportType.MissingFile } };
} }
case ManualJobName.IntegrityOrphanFilesDeleteAll: { case ManualJobName.IntegrityOrphanFilesDeleteAll: {
return { name: JobName.IntegrityReportDelete, data: { type: IntegrityReportType.OrphanFile } }; return { name: JobName.IntegrityDeleteReportType, data: { type: IntegrityReportType.OrphanFile } };
} }
case ManualJobName.IntegrityChecksumFilesDeleteAll: { case ManualJobName.IntegrityChecksumFilesDeleteAll: {
return { name: JobName.IntegrityReportDelete, data: { type: IntegrityReportType.ChecksumFail } }; return { name: JobName.IntegrityDeleteReportType, data: { type: IntegrityReportType.ChecksumFail } };
} }
default: { default: {

View File

@@ -286,10 +286,19 @@ export interface IIntegrityJob {
refreshOnly?: boolean; refreshOnly?: boolean;
} }
export interface IIntegrityDeleteReportJob { export interface IIntegrityDeleteReportTypeJob {
type?: IntegrityReportType; type?: IntegrityReportType;
} }
export interface IIntegrityDeleteReportsJob {
reports: {
id: string;
assetId: string | null;
fileAssetId: string | null;
path: string;
}[];
}
export interface IIntegrityOrphanedFilesJob { export interface IIntegrityOrphanedFilesJob {
type: 'asset' | 'asset_file'; type: 'asset' | 'asset_file';
paths: string[]; paths: string[];
@@ -432,7 +441,8 @@ export type JobItem =
| { name: JobName.IntegrityMissingFilesRefresh; data: IIntegrityPathWithReportJob } | { name: JobName.IntegrityMissingFilesRefresh; data: IIntegrityPathWithReportJob }
| { name: JobName.IntegrityChecksumFiles; data?: IIntegrityJob } | { name: JobName.IntegrityChecksumFiles; data?: IIntegrityJob }
| { name: JobName.IntegrityChecksumFilesRefresh; data?: IIntegrityPathWithChecksumJob } | { name: JobName.IntegrityChecksumFilesRefresh; data?: IIntegrityPathWithChecksumJob }
| { name: JobName.IntegrityReportDelete; data: IIntegrityDeleteReportJob }; | { name: JobName.IntegrityDeleteReportType; data: IIntegrityDeleteReportTypeJob }
| { name: JobName.IntegrityDeleteReports; data: IIntegrityDeleteReportsJob };
export type VectorExtension = (typeof VECTOR_EXTENSIONS)[number]; export type VectorExtension = (typeof VECTOR_EXTENSIONS)[number];