feat: check orphaned file reports are not out of date

This commit is contained in:
izzy
2025-11-27 15:40:14 +00:00
parent cc31b9c7f1
commit ef7d8e94fa
5 changed files with 70 additions and 23 deletions

View File

@@ -16663,6 +16663,7 @@
"WorkflowRun", "WorkflowRun",
"IntegrityOrphanedFilesQueueAll", "IntegrityOrphanedFilesQueueAll",
"IntegrityOrphanedFiles", "IntegrityOrphanedFiles",
"IntegrityOrphanedCheckReports",
"IntegrityMissingFilesQueueAll", "IntegrityMissingFilesQueueAll",
"IntegrityMissingFiles", "IntegrityMissingFiles",
"IntegrityChecksumFiles" "IntegrityChecksumFiles"

View File

@@ -648,6 +648,7 @@ export enum JobName {
// Integrity // Integrity
IntegrityOrphanedFilesQueueAll = 'IntegrityOrphanedFilesQueueAll', IntegrityOrphanedFilesQueueAll = 'IntegrityOrphanedFilesQueueAll',
IntegrityOrphanedFiles = 'IntegrityOrphanedFiles', IntegrityOrphanedFiles = 'IntegrityOrphanedFiles',
IntegrityOrphanedCheckReports = 'IntegrityOrphanedCheckReports',
IntegrityMissingFilesQueueAll = 'IntegrityMissingFilesQueueAll', IntegrityMissingFilesQueueAll = 'IntegrityMissingFilesQueueAll',
IntegrityMissingFiles = 'IntegrityMissingFiles', IntegrityMissingFiles = 'IntegrityMissingFiles',
IntegrityChecksumFiles = 'IntegrityChecksumFiles', IntegrityChecksumFiles = 'IntegrityChecksumFiles',

View File

@@ -319,6 +319,11 @@ export class AssetJobRepository {
.stream(); .stream();
} }
@GenerateSql({ params: [DummyValue.STRING], stream: true })
streamIntegrityReports(type: IntegrityReportType) {
return this.db.selectFrom('integrity_report').select(['id as reportId', 'path']).where('type', '=', type).stream();
}
@GenerateSql({ params: [], stream: true }) @GenerateSql({ params: [], stream: true })
streamForVideoConversion(force?: boolean) { streamForVideoConversion(force?: boolean) {
return this.db return this.db

View File

@@ -18,7 +18,23 @@ import {
} from 'src/enum'; } from 'src/enum';
import { ArgOf } from 'src/repositories/event.repository'; import { ArgOf } from 'src/repositories/event.repository';
import { BaseService } from 'src/services/base.service'; import { BaseService } from 'src/services/base.service';
import { IIntegrityMissingFilesJob, IIntegrityOrphanedFilesJob } from 'src/types'; import { IIntegrityOrphanedFilesJob, IIntegrityPathWithReportJob } from 'src/types';
async function* chunk<T>(generator: AsyncIterableIterator<T>, n: number) {
let chunk: T[] = [];
for await (const item of generator) {
chunk.push(item);
if (chunk.length === n) {
yield chunk;
chunk = [];
}
}
if (chunk.length) {
yield chunk;
}
}
@Injectable() @Injectable()
export class IntegrityService extends BaseService { export class IntegrityService extends BaseService {
@@ -72,6 +88,23 @@ export class IntegrityService extends BaseService {
@OnJob({ name: JobName.IntegrityOrphanedFilesQueueAll, queue: QueueName.BackgroundTask }) @OnJob({ name: JobName.IntegrityOrphanedFilesQueueAll, queue: QueueName.BackgroundTask })
async handleOrphanedFilesQueueAll(): Promise<JobStatus> { async handleOrphanedFilesQueueAll(): Promise<JobStatus> {
this.logger.log(`Checking for out of date orphaned file reports...`);
const reports = this.assetJobRepository.streamIntegrityReports(IntegrityReportType.OrphanFile);
let total = 0;
for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) {
await this.jobRepository.queue({
name: JobName.IntegrityOrphanedCheckReports,
data: {
items: batchReports,
},
});
total += batchReports.length;
this.logger.log(`Queued report check of ${batchReports.length} report(s) (${total} so far)`);
}
this.logger.log(`Scanning for orphaned files...`); this.logger.log(`Scanning for orphaned files...`);
const assetPaths = this.storageRepository.walk({ const assetPaths = this.storageRepository.walk({
@@ -98,7 +131,7 @@ export class IntegrityService extends BaseService {
} }
} }
let total = 0; total = 0;
for await (const [batchType, batchPaths] of paths()) { for await (const [batchType, batchPaths] of paths()) {
await this.jobRepository.queue({ await this.jobRepository.queue({
name: JobName.IntegrityOrphanedFiles, name: JobName.IntegrityOrphanedFiles,
@@ -149,34 +182,40 @@ export class IntegrityService extends BaseService {
return JobStatus.Success; return JobStatus.Success;
} }
@OnJob({ name: JobName.IntegrityOrphanedCheckReports, queue: QueueName.BackgroundTask })
async handleOrphanedCheckReports({ items: paths }: IIntegrityPathWithReportJob): Promise<JobStatus> {
this.logger.log(`Processing batch of ${paths.length} reports to check if they are out of date.`);
const results = await Promise.all(
paths.map(({ reportId, path }) =>
stat(path)
.then(() => reportId)
.catch(() => void 0),
),
);
const reportIds = results.filter((reportId) => reportId) as string[];
if (reportIds.length) {
await this.integrityReportRepository.deleteByIds(reportIds);
}
this.logger.log(`Processed ${paths.length} and found ${reportIds.length} orphaned file(s).`);
return JobStatus.Success;
}
@OnJob({ name: JobName.IntegrityMissingFilesQueueAll, queue: QueueName.BackgroundTask }) @OnJob({ name: JobName.IntegrityMissingFilesQueueAll, queue: QueueName.BackgroundTask })
async handleMissingFilesQueueAll(): Promise<JobStatus> { async handleMissingFilesQueueAll(): Promise<JobStatus> {
this.logger.log(`Scanning for missing files...`); this.logger.log(`Scanning for missing files...`);
const assetPaths = this.assetJobRepository.streamAssetPaths(); const assetPaths = this.assetJobRepository.streamAssetPaths();
async function* chunk<T>(generator: AsyncIterableIterator<T>, n: number) {
let chunk: T[] = [];
for await (const item of generator) {
chunk.push(item);
if (chunk.length === n) {
yield chunk;
chunk = [];
}
}
if (chunk.length) {
yield chunk;
}
}
let total = 0; let total = 0;
for await (const batchPaths of chunk(assetPaths, JOBS_LIBRARY_PAGINATION_SIZE)) { for await (const batchPaths of chunk(assetPaths, JOBS_LIBRARY_PAGINATION_SIZE)) {
await this.jobRepository.queue({ await this.jobRepository.queue({
name: JobName.IntegrityMissingFiles, name: JobName.IntegrityMissingFiles,
data: { data: {
paths: batchPaths, items: batchPaths,
}, },
}); });
@@ -188,7 +227,7 @@ export class IntegrityService extends BaseService {
} }
@OnJob({ name: JobName.IntegrityMissingFiles, queue: QueueName.BackgroundTask }) @OnJob({ name: JobName.IntegrityMissingFiles, queue: QueueName.BackgroundTask })
async handleMissingFiles({ paths }: IIntegrityMissingFilesJob): Promise<JobStatus> { async handleMissingFiles({ items: paths }: IIntegrityPathWithReportJob): Promise<JobStatus> {
this.logger.log(`Processing batch of ${paths.length} files to check if they are missing.`); this.logger.log(`Processing batch of ${paths.length} files to check if they are missing.`);
const results = await Promise.all( const results = await Promise.all(

View File

@@ -287,8 +287,8 @@ export interface IIntegrityOrphanedFilesJob {
paths: string[]; paths: string[];
} }
export interface IIntegrityMissingFilesJob { export interface IIntegrityPathWithReportJob {
paths: { path: string; reportId: string | null }[]; items: { path: string; reportId: string | null }[];
} }
export interface JobCounts { export interface JobCounts {
@@ -405,8 +405,9 @@ export type JobItem =
// Integrity // Integrity
| { name: JobName.IntegrityOrphanedFilesQueueAll; data: IBaseJob } | { name: JobName.IntegrityOrphanedFilesQueueAll; data: IBaseJob }
| { name: JobName.IntegrityOrphanedFiles; data: IIntegrityOrphanedFilesJob } | { name: JobName.IntegrityOrphanedFiles; data: IIntegrityOrphanedFilesJob }
| { name: JobName.IntegrityOrphanedCheckReports; data: IIntegrityPathWithReportJob }
| { name: JobName.IntegrityMissingFilesQueueAll; data: IBaseJob } | { name: JobName.IntegrityMissingFilesQueueAll; data: IBaseJob }
| { name: JobName.IntegrityMissingFiles; data: IIntegrityMissingFilesJob } | { name: JobName.IntegrityMissingFiles; data: IIntegrityPathWithReportJob }
| { name: JobName.IntegrityChecksumFiles; data: IBaseJob }; | { name: JobName.IntegrityChecksumFiles; data: IBaseJob };
export type VectorExtension = (typeof VECTOR_EXTENSIONS)[number]; export type VectorExtension = (typeof VECTOR_EXTENSIONS)[number];