refactor: move all new queries into integrity repository

This commit is contained in:
izzy
2025-12-02 13:15:48 +00:00
parent 6cfd1994c4
commit 64cc64dd56
9 changed files with 276 additions and 274 deletions

View File

@@ -1,10 +1,10 @@
import { Injectable } from '@nestjs/common'; import { Injectable } from '@nestjs/common';
import { Kysely, sql } from 'kysely'; import { Kysely } from 'kysely';
import { jsonArrayFrom } from 'kysely/helpers/postgres'; import { jsonArrayFrom } from 'kysely/helpers/postgres';
import { InjectKysely } from 'nestjs-kysely'; import { InjectKysely } from 'nestjs-kysely';
import { Asset, columns } from 'src/database'; import { Asset, columns } from 'src/database';
import { DummyValue, GenerateSql } from 'src/decorators'; import { DummyValue, GenerateSql } from 'src/decorators';
import { AssetFileType, AssetType, AssetVisibility, IntegrityReportType } from 'src/enum'; import { AssetFileType, AssetType, AssetVisibility } from 'src/enum';
import { DB } from 'src/schema'; import { DB } from 'src/schema';
import { StorageAsset } from 'src/types'; import { StorageAsset } from 'src/types';
import { import {
@@ -254,118 +254,6 @@ export class AssetJobRepository {
.executeTakeFirst(); .executeTakeFirst();
} }
@GenerateSql({ params: [DummyValue.STRING] })
getAssetPathsByPaths(paths: string[]) {
return this.db
.selectFrom('asset')
.select(['originalPath', 'encodedVideoPath'])
.where((eb) => eb.or([eb('originalPath', 'in', paths), eb('encodedVideoPath', 'in', paths)]))
.execute();
}
@GenerateSql({ params: [DummyValue.STRING] })
getAssetFilePathsByPaths(paths: string[]) {
return this.db.selectFrom('asset_file').select(['path']).where('path', 'in', paths).execute();
}
@GenerateSql({ params: [] })
getAssetCount() {
return this.db
.selectFrom('asset')
.select((eb) => eb.fn.countAll<number>().as('count'))
.executeTakeFirstOrThrow();
}
@GenerateSql({ params: [], stream: true })
streamAssetPaths() {
return this.db
.selectFrom((eb) =>
eb
.selectFrom('asset')
.select(['asset.originalPath as path'])
.select((eb) => [
eb.ref('asset.id').$castTo<string | null>().as('assetId'),
sql<string | null>`null::uuid`.as('fileAssetId'),
])
.unionAll(
eb
.selectFrom('asset')
.select((eb) => [
eb.ref('asset.encodedVideoPath').$castTo<string>().as('path'),
eb.ref('asset.id').$castTo<string | null>().as('assetId'),
sql<string | null>`null::uuid`.as('fileAssetId'),
])
.where('asset.encodedVideoPath', 'is not', null)
.where('asset.encodedVideoPath', '!=', sql<string>`''`),
)
.unionAll(
eb
.selectFrom('asset_file')
.select(['path'])
.select((eb) => [
sql<string | null>`null::uuid`.as('assetId'),
eb.ref('asset_file.id').$castTo<string | null>().as('fileAssetId'),
]),
)
.as('allPaths'),
)
.leftJoin(
'integrity_report',
(join) =>
join
.on('integrity_report.type', '=', IntegrityReportType.OrphanFile)
.on((eb) =>
eb.or([
eb('integrity_report.assetId', '=', eb.ref('allPaths.assetId')),
eb('integrity_report.fileAssetId', '=', eb.ref('allPaths.fileAssetId')),
]),
),
// .onRef('integrity_report.path', '=', 'allPaths.path')
)
.select([
'allPaths.path as path',
'allPaths.assetId',
'allPaths.fileAssetId',
'integrity_report.path as reportId',
])
.stream();
}
@GenerateSql({ params: [DummyValue.DATE, DummyValue.DATE], stream: true })
streamAssetChecksums(startMarker?: Date, endMarker?: Date) {
return this.db
.selectFrom('asset')
.leftJoin('integrity_report', (join) =>
join
.onRef('integrity_report.assetId', '=', 'asset.id')
// .onRef('integrity_report.path', '=', 'asset.originalPath')
.on('integrity_report.type', '=', IntegrityReportType.ChecksumFail),
)
.select([
'asset.originalPath',
'asset.checksum',
'asset.createdAt',
'asset.id as assetId',
'integrity_report.id as reportId',
])
.$if(startMarker !== undefined, (qb) => qb.where('createdAt', '>=', startMarker!))
.$if(endMarker !== undefined, (qb) => qb.where('createdAt', '<=', endMarker!))
.orderBy('createdAt', 'asc')
.stream();
}
@GenerateSql({ params: [DummyValue.STRING], stream: true })
streamIntegrityReports(type: IntegrityReportType) {
return this.db
.selectFrom('integrity_report')
.select(['integrity_report.id as reportId', 'integrity_report.path'])
.where('integrity_report.type', '=', type)
.$if(type === IntegrityReportType.ChecksumFail, (eb) =>
eb.leftJoin('asset', 'integrity_report.path', 'asset.originalPath').select('asset.checksum'),
)
.stream();
}
@GenerateSql({ params: [], stream: true }) @GenerateSql({ params: [], stream: true })
streamForVideoConversion(force?: boolean) { streamForVideoConversion(force?: boolean) {
return this.db return this.db

View File

@@ -382,14 +382,6 @@ export class AssetRepository {
return items.map((asset) => asset.deviceAssetId); return items.map((asset) => asset.deviceAssetId);
} }
getAllAssetPaths() {
return this.db.selectFrom('asset').select(['originalPath', 'encodedVideoPath']).stream();
}
getAllAssetFilePaths() {
return this.db.selectFrom('asset_file').select(['path']).stream();
}
@GenerateSql({ params: [DummyValue.UUID] }) @GenerateSql({ params: [DummyValue.UUID] })
async getLivePhotoCount(motionId: string): Promise<number> { async getLivePhotoCount(motionId: string): Promise<number> {
const [{ count }] = await this.db const [{ count }] = await this.db

View File

@@ -15,7 +15,7 @@ import { DownloadRepository } from 'src/repositories/download.repository';
import { DuplicateRepository } from 'src/repositories/duplicate.repository'; import { DuplicateRepository } from 'src/repositories/duplicate.repository';
import { EmailRepository } from 'src/repositories/email.repository'; import { EmailRepository } from 'src/repositories/email.repository';
import { EventRepository } from 'src/repositories/event.repository'; import { EventRepository } from 'src/repositories/event.repository';
import { IntegrityReportRepository } from 'src/repositories/integrity-report.repository'; import { IntegrityRepository } from 'src/repositories/integrity.repository';
import { JobRepository } from 'src/repositories/job.repository'; import { JobRepository } from 'src/repositories/job.repository';
import { LibraryRepository } from 'src/repositories/library.repository'; import { LibraryRepository } from 'src/repositories/library.repository';
import { LoggingRepository } from 'src/repositories/logging.repository'; import { LoggingRepository } from 'src/repositories/logging.repository';
@@ -69,7 +69,7 @@ export const repositories = [
DuplicateRepository, DuplicateRepository,
EmailRepository, EmailRepository,
EventRepository, EventRepository,
IntegrityReportRepository, IntegrityRepository,
JobRepository, JobRepository,
LibraryRepository, LibraryRepository,
LoggingRepository, LoggingRepository,

View File

@@ -1,116 +0,0 @@
import { Injectable } from '@nestjs/common';
import { Insertable, Kysely } from 'kysely';
import { InjectKysely } from 'nestjs-kysely';
import { Readable } from 'node:stream';
import { DummyValue, GenerateSql } from 'src/decorators';
import {
MaintenanceGetIntegrityReportDto,
MaintenanceIntegrityReportResponseDto,
MaintenanceIntegrityReportSummaryResponseDto,
} from 'src/dtos/maintenance.dto';
import { IntegrityReportType } from 'src/enum';
import { DB } from 'src/schema';
import { IntegrityReportTable } from 'src/schema/tables/integrity-report.table';
@Injectable()
export class IntegrityReportRepository {
constructor(@InjectKysely() private db: Kysely<DB>) {}
create(dto: Insertable<IntegrityReportTable> | Insertable<IntegrityReportTable>[]) {
return this.db
.insertInto('integrity_report')
.values(dto)
.onConflict((oc) =>
oc.columns(['path', 'type']).doUpdateSet({
assetId: (eb) => eb.ref('excluded.assetId'),
fileAssetId: (eb) => eb.ref('excluded.fileAssetId'),
}),
)
.returningAll()
.executeTakeFirst();
}
getById(id: string) {
return this.db
.selectFrom('integrity_report')
.selectAll('integrity_report')
.where('id', '=', id)
.executeTakeFirstOrThrow();
}
getIntegrityReportSummary(): Promise<MaintenanceIntegrityReportSummaryResponseDto> {
return this.db
.selectFrom('integrity_report')
.select((eb) =>
eb.fn
.countAll<number>()
.filterWhere('type', '=', IntegrityReportType.ChecksumFail)
.as(IntegrityReportType.ChecksumFail),
)
.select((eb) =>
eb.fn
.countAll<number>()
.filterWhere('type', '=', IntegrityReportType.MissingFile)
.as(IntegrityReportType.MissingFile),
)
.select((eb) =>
eb.fn
.countAll<number>()
.filterWhere('type', '=', IntegrityReportType.OrphanFile)
.as(IntegrityReportType.OrphanFile),
)
.executeTakeFirstOrThrow();
}
async getIntegrityReport(dto: MaintenanceGetIntegrityReportDto): Promise<MaintenanceIntegrityReportResponseDto> {
return {
items: await this.db
.selectFrom('integrity_report')
.select(['id', 'type', 'path', 'assetId', 'fileAssetId'])
.where('type', '=', dto.type)
.orderBy('createdAt', 'desc')
.execute(),
};
}
getIntegrityReportCsv(type: IntegrityReportType): Readable {
const items = this.db
.selectFrom('integrity_report')
.select(['id', 'type', 'path', 'assetId', 'fileAssetId'])
.where('type', '=', type)
.orderBy('createdAt', 'desc')
.stream();
// very rudimentary csv serialiser
async function* generator() {
yield 'id,type,assetId,fileAssetId,path\n';
for await (const item of items) {
// no expectation of particularly bad filenames
// but they could potentially have a newline or quote character
yield `${item.id},${item.type},${item.assetId},${item.fileAssetId},"${item.path.replace(/"/g, '\\"')}"\n`;
}
}
return Readable.from(generator());
}
deleteById(id: string) {
return this.db.deleteFrom('integrity_report').where('id', '=', id).execute();
}
deleteByIds(ids: string[]) {
return this.db.deleteFrom('integrity_report').where('id', 'in', ids).execute();
}
@GenerateSql({ params: [DummyValue.STRING], stream: true })
streamIntegrityReportsByProperty(property?: 'assetId' | 'fileAssetId', filterType?: IntegrityReportType) {
return this.db
.selectFrom('integrity_report')
.select(['id', 'path', 'assetId', 'fileAssetId'])
.$if(filterType !== undefined, (eb) => eb.where('type', '=', filterType!))
.$if(property === undefined, (eb) => eb.where('assetId', 'is', null).where('fileAssetId', 'is', null))
.$if(property !== undefined, (eb) => eb.where(property!, 'is not', null))
.stream();
}
}

View File

@@ -0,0 +1,238 @@
import { Injectable } from '@nestjs/common';
import { Insertable, Kysely, sql } from 'kysely';
import { InjectKysely } from 'nestjs-kysely';
import { Readable } from 'node:stream';
import { DummyValue, GenerateSql } from 'src/decorators';
import {
MaintenanceGetIntegrityReportDto,
MaintenanceIntegrityReportResponseDto,
MaintenanceIntegrityReportSummaryResponseDto,
} from 'src/dtos/maintenance.dto';
import { IntegrityReportType } from 'src/enum';
import { DB } from 'src/schema';
import { IntegrityReportTable } from 'src/schema/tables/integrity-report.table';
@Injectable()
export class IntegrityRepository {
constructor(@InjectKysely() private db: Kysely<DB>) {}
create(dto: Insertable<IntegrityReportTable> | Insertable<IntegrityReportTable>[]) {
return this.db
.insertInto('integrity_report')
.values(dto)
.onConflict((oc) =>
oc.columns(['path', 'type']).doUpdateSet({
assetId: (eb) => eb.ref('excluded.assetId'),
fileAssetId: (eb) => eb.ref('excluded.fileAssetId'),
}),
)
.returningAll()
.executeTakeFirst();
}
getById(id: string) {
return this.db
.selectFrom('integrity_report')
.selectAll('integrity_report')
.where('id', '=', id)
.executeTakeFirstOrThrow();
}
getIntegrityReportSummary(): Promise<MaintenanceIntegrityReportSummaryResponseDto> {
return this.db
.selectFrom('integrity_report')
.select((eb) =>
eb.fn
.countAll<number>()
.filterWhere('type', '=', IntegrityReportType.ChecksumFail)
.as(IntegrityReportType.ChecksumFail),
)
.select((eb) =>
eb.fn
.countAll<number>()
.filterWhere('type', '=', IntegrityReportType.MissingFile)
.as(IntegrityReportType.MissingFile),
)
.select((eb) =>
eb.fn
.countAll<number>()
.filterWhere('type', '=', IntegrityReportType.OrphanFile)
.as(IntegrityReportType.OrphanFile),
)
.executeTakeFirstOrThrow();
}
async getIntegrityReport(dto: MaintenanceGetIntegrityReportDto): Promise<MaintenanceIntegrityReportResponseDto> {
return {
items: await this.db
.selectFrom('integrity_report')
.select(['id', 'type', 'path', 'assetId', 'fileAssetId'])
.where('type', '=', dto.type)
.orderBy('createdAt', 'desc')
.execute(),
};
}
getIntegrityReportCsv(type: IntegrityReportType): Readable {
const items = this.db
.selectFrom('integrity_report')
.select(['id', 'type', 'path', 'assetId', 'fileAssetId'])
.where('type', '=', type)
.orderBy('createdAt', 'desc')
.stream();
// very rudimentary csv serialiser
async function* generator() {
yield 'id,type,assetId,fileAssetId,path\n';
for await (const item of items) {
// no expectation of particularly bad filenames
// but they could potentially have a newline or quote character
yield `${item.id},${item.type},${item.assetId},${item.fileAssetId},"${item.path.replace(/"/g, '\\"')}"\n`;
}
}
return Readable.from(generator());
}
@GenerateSql({ params: [] })
getAllAssetPaths() {
return this.db.selectFrom('asset').select(['originalPath', 'encodedVideoPath']).stream();
}
@GenerateSql({ params: [] })
getAllAssetFilePaths() {
return this.db.selectFrom('asset_file').select(['path']).stream();
}
@GenerateSql({ params: [DummyValue.STRING] })
getAssetPathsByPaths(paths: string[]) {
return this.db
.selectFrom('asset')
.select(['originalPath', 'encodedVideoPath'])
.where((eb) => eb.or([eb('originalPath', 'in', paths), eb('encodedVideoPath', 'in', paths)]))
.execute();
}
@GenerateSql({ params: [DummyValue.STRING] })
getAssetFilePathsByPaths(paths: string[]) {
return this.db.selectFrom('asset_file').select(['path']).where('path', 'in', paths).execute();
}
@GenerateSql({ params: [] })
getAssetCount() {
return this.db
.selectFrom('asset')
.select((eb) => eb.fn.countAll<number>().as('count'))
.executeTakeFirstOrThrow();
}
@GenerateSql({ params: [], stream: true })
streamAssetPaths() {
return this.db
.selectFrom((eb) =>
eb
.selectFrom('asset')
.select(['asset.originalPath as path'])
.select((eb) => [
eb.ref('asset.id').$castTo<string | null>().as('assetId'),
sql<string | null>`null::uuid`.as('fileAssetId'),
])
.unionAll(
eb
.selectFrom('asset')
.select((eb) => [
eb.ref('asset.encodedVideoPath').$castTo<string>().as('path'),
eb.ref('asset.id').$castTo<string | null>().as('assetId'),
sql<string | null>`null::uuid`.as('fileAssetId'),
])
.where('asset.encodedVideoPath', 'is not', null)
.where('asset.encodedVideoPath', '!=', sql<string>`''`),
)
.unionAll(
eb
.selectFrom('asset_file')
.select(['path'])
.select((eb) => [
sql<string | null>`null::uuid`.as('assetId'),
eb.ref('asset_file.id').$castTo<string | null>().as('fileAssetId'),
]),
)
.as('allPaths'),
)
.leftJoin(
'integrity_report',
(join) =>
join
.on('integrity_report.type', '=', IntegrityReportType.OrphanFile)
.on((eb) =>
eb.or([
eb('integrity_report.assetId', '=', eb.ref('allPaths.assetId')),
eb('integrity_report.fileAssetId', '=', eb.ref('allPaths.fileAssetId')),
]),
),
// .onRef('integrity_report.path', '=', 'allPaths.path')
)
.select([
'allPaths.path as path',
'allPaths.assetId',
'allPaths.fileAssetId',
'integrity_report.path as reportId',
])
.stream();
}
@GenerateSql({ params: [DummyValue.DATE, DummyValue.DATE], stream: true })
streamAssetChecksums(startMarker?: Date, endMarker?: Date) {
return this.db
.selectFrom('asset')
.leftJoin('integrity_report', (join) =>
join
.onRef('integrity_report.assetId', '=', 'asset.id')
// .onRef('integrity_report.path', '=', 'asset.originalPath')
.on('integrity_report.type', '=', IntegrityReportType.ChecksumFail),
)
.select([
'asset.originalPath',
'asset.checksum',
'asset.createdAt',
'asset.id as assetId',
'integrity_report.id as reportId',
])
.$if(startMarker !== undefined, (qb) => qb.where('createdAt', '>=', startMarker!))
.$if(endMarker !== undefined, (qb) => qb.where('createdAt', '<=', endMarker!))
.orderBy('createdAt', 'asc')
.stream();
}
@GenerateSql({ params: [DummyValue.STRING], stream: true })
streamIntegrityReports(type: IntegrityReportType) {
return this.db
.selectFrom('integrity_report')
.select(['integrity_report.id as reportId', 'integrity_report.path'])
.where('integrity_report.type', '=', type)
.$if(type === IntegrityReportType.ChecksumFail, (eb) =>
eb.leftJoin('asset', 'integrity_report.path', 'asset.originalPath').select('asset.checksum'),
)
.stream();
}
@GenerateSql({ params: [DummyValue.STRING], stream: true })
streamIntegrityReportsByProperty(property?: 'assetId' | 'fileAssetId', filterType?: IntegrityReportType) {
return this.db
.selectFrom('integrity_report')
.select(['id', 'path', 'assetId', 'fileAssetId'])
.$if(filterType !== undefined, (eb) => eb.where('type', '=', filterType!))
.$if(property === undefined, (eb) => eb.where('assetId', 'is', null).where('fileAssetId', 'is', null))
.$if(property !== undefined, (eb) => eb.where(property!, 'is not', null))
.stream();
}
deleteById(id: string) {
return this.db.deleteFrom('integrity_report').where('id', '=', id).execute();
}
deleteByIds(ids: string[]) {
return this.db.deleteFrom('integrity_report').where('id', 'in', ids).execute();
}
}

View File

@@ -22,7 +22,7 @@ import { DownloadRepository } from 'src/repositories/download.repository';
import { DuplicateRepository } from 'src/repositories/duplicate.repository'; import { DuplicateRepository } from 'src/repositories/duplicate.repository';
import { EmailRepository } from 'src/repositories/email.repository'; import { EmailRepository } from 'src/repositories/email.repository';
import { EventRepository } from 'src/repositories/event.repository'; import { EventRepository } from 'src/repositories/event.repository';
import { IntegrityReportRepository } from 'src/repositories/integrity-report.repository'; import { IntegrityRepository } from 'src/repositories/integrity.repository';
import { JobRepository } from 'src/repositories/job.repository'; import { JobRepository } from 'src/repositories/job.repository';
import { LibraryRepository } from 'src/repositories/library.repository'; import { LibraryRepository } from 'src/repositories/library.repository';
import { LoggingRepository } from 'src/repositories/logging.repository'; import { LoggingRepository } from 'src/repositories/logging.repository';
@@ -80,7 +80,7 @@ export const BASE_SERVICE_DEPENDENCIES = [
DuplicateRepository, DuplicateRepository,
EmailRepository, EmailRepository,
EventRepository, EventRepository,
IntegrityReportRepository, IntegrityRepository,
JobRepository, JobRepository,
LibraryRepository, LibraryRepository,
MachineLearningRepository, MachineLearningRepository,
@@ -139,7 +139,7 @@ export class BaseService {
protected duplicateRepository: DuplicateRepository, protected duplicateRepository: DuplicateRepository,
protected emailRepository: EmailRepository, protected emailRepository: EmailRepository,
protected eventRepository: EventRepository, protected eventRepository: EventRepository,
protected integrityReportRepository: IntegrityReportRepository, protected integrityRepository: IntegrityRepository,
protected jobRepository: JobRepository, protected jobRepository: JobRepository,
protected libraryRepository: LibraryRepository, protected libraryRepository: LibraryRepository,
protected machineLearningRepository: MachineLearningRepository, protected machineLearningRepository: MachineLearningRepository,

View File

@@ -149,7 +149,7 @@ export class IntegrityService extends BaseService {
async handleOrphanedFilesQueueAll({ refreshOnly }: IIntegrityJob = {}): Promise<JobStatus> { async handleOrphanedFilesQueueAll({ refreshOnly }: IIntegrityJob = {}): Promise<JobStatus> {
this.logger.log(`Checking for out of date orphaned file reports...`); this.logger.log(`Checking for out of date orphaned file reports...`);
const reports = this.assetJobRepository.streamIntegrityReports(IntegrityReportType.OrphanFile); const reports = this.integrityRepository.streamIntegrityReports(IntegrityReportType.OrphanFile);
let total = 0; let total = 0;
for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) { for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) {
@@ -220,7 +220,7 @@ export class IntegrityService extends BaseService {
const orphanedFiles = new Set<string>(paths); const orphanedFiles = new Set<string>(paths);
if (type === 'asset') { if (type === 'asset') {
const assets = await this.assetJobRepository.getAssetPathsByPaths(paths); const assets = await this.integrityRepository.getAssetPathsByPaths(paths);
for (const { originalPath, encodedVideoPath } of assets) { for (const { originalPath, encodedVideoPath } of assets) {
orphanedFiles.delete(originalPath); orphanedFiles.delete(originalPath);
@@ -229,14 +229,14 @@ export class IntegrityService extends BaseService {
} }
} }
} else { } else {
const assets = await this.assetJobRepository.getAssetFilePathsByPaths(paths); const assets = await this.integrityRepository.getAssetFilePathsByPaths(paths);
for (const { path } of assets) { for (const { path } of assets) {
orphanedFiles.delete(path); orphanedFiles.delete(path);
} }
} }
if (orphanedFiles.size > 0) { if (orphanedFiles.size > 0) {
await this.integrityReportRepository.create( await this.integrityRepository.create(
[...orphanedFiles].map((path) => ({ [...orphanedFiles].map((path) => ({
type: IntegrityReportType.OrphanFile, type: IntegrityReportType.OrphanFile,
path, path,
@@ -263,7 +263,7 @@ export class IntegrityService extends BaseService {
const reportIds = results.filter(Boolean) as string[]; const reportIds = results.filter(Boolean) as string[];
if (reportIds.length > 0) { if (reportIds.length > 0) {
await this.integrityReportRepository.deleteByIds(reportIds); await this.integrityRepository.deleteByIds(reportIds);
} }
this.logger.log(`Processed ${items.length} paths and found ${reportIds.length} report(s) out of date.`); this.logger.log(`Processed ${items.length} paths and found ${reportIds.length} report(s) out of date.`);
@@ -275,7 +275,7 @@ export class IntegrityService extends BaseService {
if (refreshOnly) { if (refreshOnly) {
this.logger.log(`Checking for out of date missing file reports...`); this.logger.log(`Checking for out of date missing file reports...`);
const reports = this.assetJobRepository.streamIntegrityReports(IntegrityReportType.MissingFile); const reports = this.integrityRepository.streamIntegrityReports(IntegrityReportType.MissingFile);
let total = 0; let total = 0;
for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) { for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) {
@@ -296,7 +296,7 @@ export class IntegrityService extends BaseService {
this.logger.log(`Scanning for missing files...`); this.logger.log(`Scanning for missing files...`);
const assetPaths = this.assetJobRepository.streamAssetPaths(); const assetPaths = this.integrityRepository.streamAssetPaths();
let total = 0; let total = 0;
for await (const batchPaths of chunk(assetPaths, JOBS_LIBRARY_PAGINATION_SIZE)) { for await (const batchPaths of chunk(assetPaths, JOBS_LIBRARY_PAGINATION_SIZE)) {
@@ -331,12 +331,12 @@ export class IntegrityService extends BaseService {
.map(({ reportId }) => reportId!); .map(({ reportId }) => reportId!);
if (outdatedReports.length > 0) { if (outdatedReports.length > 0) {
await this.integrityReportRepository.deleteByIds(outdatedReports); await this.integrityRepository.deleteByIds(outdatedReports);
} }
const missingFiles = results.filter(({ exists }) => !exists); const missingFiles = results.filter(({ exists }) => !exists);
if (missingFiles.length > 0) { if (missingFiles.length > 0) {
await this.integrityReportRepository.create( await this.integrityRepository.create(
missingFiles.map(({ path, assetId, fileAssetId }) => ({ missingFiles.map(({ path, assetId, fileAssetId }) => ({
type: IntegrityReportType.MissingFile, type: IntegrityReportType.MissingFile,
path, path,
@@ -365,7 +365,7 @@ export class IntegrityService extends BaseService {
const reportIds = results.filter(Boolean) as string[]; const reportIds = results.filter(Boolean) as string[];
if (reportIds.length > 0) { if (reportIds.length > 0) {
await this.integrityReportRepository.deleteByIds(reportIds); await this.integrityRepository.deleteByIds(reportIds);
} }
this.logger.log(`Processed ${paths.length} paths and found ${reportIds.length} report(s) out of date.`); this.logger.log(`Processed ${paths.length} paths and found ${reportIds.length} report(s) out of date.`);
@@ -377,7 +377,7 @@ export class IntegrityService extends BaseService {
if (refreshOnly) { if (refreshOnly) {
this.logger.log(`Checking for out of date checksum file reports...`); this.logger.log(`Checking for out of date checksum file reports...`);
const reports = this.assetJobRepository.streamIntegrityReports(IntegrityReportType.ChecksumFail); const reports = this.integrityRepository.streamIntegrityReports(IntegrityReportType.ChecksumFail);
let total = 0; let total = 0;
for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) { for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) {
@@ -414,7 +414,7 @@ export class IntegrityService extends BaseService {
let processed = 0; let processed = 0;
const startedAt = Date.now(); const startedAt = Date.now();
const { count } = await this.assetJobRepository.getAssetCount(); const { count } = await this.integrityRepository.getAssetCount();
const checkpoint = await this.systemMetadataRepository.get(SystemMetadataKey.IntegrityChecksumCheckpoint); const checkpoint = await this.systemMetadataRepository.get(SystemMetadataKey.IntegrityChecksumCheckpoint);
let startMarker: Date | undefined = checkpoint?.date ? new Date(checkpoint.date) : undefined; let startMarker: Date | undefined = checkpoint?.date ? new Date(checkpoint.date) : undefined;
@@ -436,7 +436,7 @@ export class IntegrityService extends BaseService {
`Processing assets in range [${startMarker?.toISOString() ?? 'beginning'}, ${endMarker?.toISOString() ?? 'end'}]`, `Processing assets in range [${startMarker?.toISOString() ?? 'beginning'}, ${endMarker?.toISOString() ?? 'end'}]`,
); );
const assets = this.assetJobRepository.streamAssetChecksums(startMarker, endMarker); const assets = this.integrityRepository.streamAssetChecksums(startMarker, endMarker);
endMarker = startMarker; endMarker = startMarker;
startMarker = undefined; startMarker = undefined;
@@ -458,7 +458,7 @@ export class IntegrityService extends BaseService {
if (checksum.equals(hash.digest())) { if (checksum.equals(hash.digest())) {
if (reportId) { if (reportId) {
await this.integrityReportRepository.deleteById(reportId); await this.integrityRepository.deleteById(reportId);
} }
} else { } else {
throw new Error('File failed checksum'); throw new Error('File failed checksum');
@@ -466,14 +466,14 @@ export class IntegrityService extends BaseService {
} catch (error) { } catch (error) {
if ((error as { code?: string }).code === 'ENOENT') { if ((error as { code?: string }).code === 'ENOENT') {
if (reportId) { if (reportId) {
await this.integrityReportRepository.deleteById(reportId); await this.integrityRepository.deleteById(reportId);
} }
// missing file; handled by the missing files job // missing file; handled by the missing files job
continue; continue;
} }
this.logger.warn('Failed to process a file: ' + error); this.logger.warn('Failed to process a file: ' + error);
await this.integrityReportRepository.create({ await this.integrityRepository.create({
path: originalPath, path: originalPath,
type: IntegrityReportType.ChecksumFail, type: IntegrityReportType.ChecksumFail,
assetId, assetId,
@@ -544,7 +544,7 @@ export class IntegrityService extends BaseService {
const reportIds = results.filter(Boolean) as string[]; const reportIds = results.filter(Boolean) as string[];
if (reportIds.length > 0) { if (reportIds.length > 0) {
await this.integrityReportRepository.deleteByIds(reportIds); await this.integrityRepository.deleteByIds(reportIds);
} }
this.logger.log(`Processed ${paths.length} paths and found ${reportIds.length} report(s) out of date.`); this.logger.log(`Processed ${paths.length} paths and found ${reportIds.length} report(s) out of date.`);
@@ -576,7 +576,7 @@ export class IntegrityService extends BaseService {
} }
for (const property of properties) { for (const property of properties) {
const reports = this.integrityReportRepository.streamIntegrityReportsByProperty(property, type); const reports = this.integrityRepository.streamIntegrityReportsByProperty(property, type);
for await (const report of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) { for await (const report of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) {
// todo: queue sub-job here instead? // todo: queue sub-job here instead?
@@ -593,7 +593,7 @@ export class IntegrityService extends BaseService {
userId: '', // ??? userId: '', // ???
}); });
await this.integrityReportRepository.deleteByIds(report.map(({ id }) => id)); await this.integrityRepository.deleteByIds(report.map(({ id }) => id));
break; break;
} }
case 'fileAssetId': { case 'fileAssetId': {
@@ -602,7 +602,7 @@ export class IntegrityService extends BaseService {
} }
default: { default: {
await Promise.all(report.map(({ path }) => this.storageRepository.unlink(path).catch(() => void 0))); await Promise.all(report.map(({ path }) => this.storageRepository.unlink(path).catch(() => void 0)));
await this.integrityReportRepository.deleteByIds(report.map(({ id }) => id)); await this.integrityRepository.deleteByIds(report.map(({ id }) => id));
break; break;
} }
} }

View File

@@ -61,19 +61,19 @@ export class MaintenanceService extends BaseService {
} }
getIntegrityReportSummary(): Promise<MaintenanceIntegrityReportSummaryResponseDto> { getIntegrityReportSummary(): Promise<MaintenanceIntegrityReportSummaryResponseDto> {
return this.integrityReportRepository.getIntegrityReportSummary(); return this.integrityRepository.getIntegrityReportSummary();
} }
getIntegrityReport(dto: MaintenanceGetIntegrityReportDto): Promise<MaintenanceIntegrityReportResponseDto> { getIntegrityReport(dto: MaintenanceGetIntegrityReportDto): Promise<MaintenanceIntegrityReportResponseDto> {
return this.integrityReportRepository.getIntegrityReport(dto); return this.integrityRepository.getIntegrityReport(dto);
} }
getIntegrityReportCsv(type: IntegrityReportType): Readable { getIntegrityReportCsv(type: IntegrityReportType): Readable {
return this.integrityReportRepository.getIntegrityReportCsv(type); return this.integrityRepository.getIntegrityReportCsv(type);
} }
async getIntegrityReportFile(id: string): Promise<ImmichFileResponse> { async getIntegrityReportFile(id: string): Promise<ImmichFileResponse> {
const { path } = await this.integrityReportRepository.getById(id); const { path } = await this.integrityRepository.getById(id);
return new ImmichFileResponse({ return new ImmichFileResponse({
path, path,
@@ -84,7 +84,7 @@ export class MaintenanceService extends BaseService {
} }
async deleteIntegrityReport(auth: AuthDto, id: string): Promise<void> { async deleteIntegrityReport(auth: AuthDto, id: string): Promise<void> {
const { path, assetId, fileAssetId } = await this.integrityReportRepository.getById(id); const { path, assetId, fileAssetId } = await this.integrityRepository.getById(id);
if (assetId) { if (assetId) {
await this.assetRepository.updateAll([assetId], { await this.assetRepository.updateAll([assetId], {
@@ -97,12 +97,12 @@ export class MaintenanceService extends BaseService {
userId: auth.user.id, userId: auth.user.id,
}); });
await this.integrityReportRepository.deleteById(id); await this.integrityRepository.deleteById(id);
} else if (fileAssetId) { } else if (fileAssetId) {
await this.assetRepository.deleteFiles([{ id: fileAssetId }]); await this.assetRepository.deleteFiles([{ id: fileAssetId }]);
} else { } else {
await this.storageRepository.unlink(path); await this.storageRepository.unlink(path);
await this.integrityReportRepository.deleteById(id); await this.integrityRepository.deleteById(id);
} }
} }
} }

View File

@@ -31,7 +31,7 @@ import { DownloadRepository } from 'src/repositories/download.repository';
import { DuplicateRepository } from 'src/repositories/duplicate.repository'; import { DuplicateRepository } from 'src/repositories/duplicate.repository';
import { EmailRepository } from 'src/repositories/email.repository'; import { EmailRepository } from 'src/repositories/email.repository';
import { EventRepository } from 'src/repositories/event.repository'; import { EventRepository } from 'src/repositories/event.repository';
import { IntegrityReportRepository } from 'src/repositories/integrity-report.repository'; import { IntegrityRepository } from 'src/repositories/integrity.repository';
import { JobRepository } from 'src/repositories/job.repository'; import { JobRepository } from 'src/repositories/job.repository';
import { LibraryRepository } from 'src/repositories/library.repository'; import { LibraryRepository } from 'src/repositories/library.repository';
import { LoggingRepository } from 'src/repositories/logging.repository'; import { LoggingRepository } from 'src/repositories/logging.repository';
@@ -226,7 +226,7 @@ export type ServiceOverrides = {
duplicateRepository: DuplicateRepository; duplicateRepository: DuplicateRepository;
email: EmailRepository; email: EmailRepository;
event: EventRepository; event: EventRepository;
integrityReport: IntegrityReportRepository; integrityReport: IntegrityRepository;
job: JobRepository; job: JobRepository;
library: LibraryRepository; library: LibraryRepository;
logger: LoggingRepository; logger: LoggingRepository;
@@ -300,7 +300,7 @@ export const getMocks = () => {
email: automock(EmailRepository, { args: [loggerMock] }), email: automock(EmailRepository, { args: [loggerMock] }),
// eslint-disable-next-line no-sparse-arrays // eslint-disable-next-line no-sparse-arrays
event: automock(EventRepository, { args: [, , loggerMock], strict: false }), event: automock(EventRepository, { args: [, , loggerMock], strict: false }),
integrityReport: automock(IntegrityReportRepository, { strict: false }), integrityReport: automock(IntegrityRepository, { strict: false }),
job: newJobRepositoryMock(), job: newJobRepositoryMock(),
apiKey: automock(ApiKeyRepository), apiKey: automock(ApiKeyRepository),
library: automock(LibraryRepository, { strict: false }), library: automock(LibraryRepository, { strict: false }),
@@ -369,7 +369,7 @@ export const newTestService = <T extends BaseService>(
overrides.duplicateRepository || (mocks.duplicateRepository as As<DuplicateRepository>), overrides.duplicateRepository || (mocks.duplicateRepository as As<DuplicateRepository>),
overrides.email || (mocks.email as As<EmailRepository>), overrides.email || (mocks.email as As<EmailRepository>),
overrides.event || (mocks.event as As<EventRepository>), overrides.event || (mocks.event as As<EventRepository>),
overrides.integrityReport || (mocks.integrityReport as As<IntegrityReportRepository>), overrides.integrityReport || (mocks.integrityReport as As<IntegrityRepository>),
overrides.job || (mocks.job as As<JobRepository>), overrides.job || (mocks.job as As<JobRepository>),
overrides.library || (mocks.library as As<LibraryRepository>), overrides.library || (mocks.library as As<LibraryRepository>),
overrides.machineLearning || (mocks.machineLearning as As<MachineLearningRepository>), overrides.machineLearning || (mocks.machineLearning as As<MachineLearningRepository>),