mirror of
https://github.com/immich-app/immich.git
synced 2025-12-20 17:25:35 +03:00
stash: integrity checks
This commit is contained in:
@@ -16660,7 +16660,9 @@
|
||||
"VersionCheck",
|
||||
"OcrQueueAll",
|
||||
"Ocr",
|
||||
"WorkflowRun"
|
||||
"WorkflowRun",
|
||||
"IntegrityOrphanedAndMissingFiles",
|
||||
"IntegrityChecksumFiles"
|
||||
],
|
||||
"type": "string"
|
||||
},
|
||||
|
||||
@@ -637,6 +637,10 @@ export enum JobName {
|
||||
|
||||
// Workflow
|
||||
WorkflowRun = 'WorkflowRun',
|
||||
|
||||
// Integrity
|
||||
IntegrityOrphanedAndMissingFiles = 'IntegrityOrphanedAndMissingFiles',
|
||||
IntegrityChecksumFiles = 'IntegrityChecksumFiles',
|
||||
}
|
||||
|
||||
export enum QueueCommand {
|
||||
|
||||
@@ -382,6 +382,14 @@ export class AssetRepository {
|
||||
return items.map((asset) => asset.deviceAssetId);
|
||||
}
|
||||
|
||||
async getAllAssetPaths() {
|
||||
return this.db.selectFrom('asset').select(['originalPath', 'encodedVideoPath']).stream();
|
||||
}
|
||||
|
||||
async getAllAssetFilePaths() {
|
||||
return this.db.selectFrom('asset_file').select(['path']).stream();
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [DummyValue.UUID] })
|
||||
async getLivePhotoCount(motionId: string): Promise<number> {
|
||||
const [{ count }] = await this.db
|
||||
|
||||
@@ -12,6 +12,7 @@ import { CliService } from 'src/services/cli.service';
|
||||
import { DatabaseService } from 'src/services/database.service';
|
||||
import { DownloadService } from 'src/services/download.service';
|
||||
import { DuplicateService } from 'src/services/duplicate.service';
|
||||
import { IntegrityService } from 'src/services/integrity.service';
|
||||
import { JobService } from 'src/services/job.service';
|
||||
import { LibraryService } from 'src/services/library.service';
|
||||
import { MaintenanceService } from 'src/services/maintenance.service';
|
||||
@@ -62,6 +63,7 @@ export const services = [
|
||||
DatabaseService,
|
||||
DownloadService,
|
||||
DuplicateService,
|
||||
IntegrityService,
|
||||
JobService,
|
||||
LibraryService,
|
||||
MaintenanceService,
|
||||
|
||||
120
server/src/services/integrity.service.ts
Normal file
120
server/src/services/integrity.service.ts
Normal file
@@ -0,0 +1,120 @@
|
||||
import { Injectable } from '@nestjs/common';
|
||||
import { JOBS_LIBRARY_PAGINATION_SIZE } from 'src/constants';
|
||||
import { StorageCore } from 'src/cores/storage.core';
|
||||
import { OnEvent, OnJob } from 'src/decorators';
|
||||
import { ImmichWorker, JobName, JobStatus, QueueName, StorageFolder } from 'src/enum';
|
||||
import { ArgOf } from 'src/repositories/event.repository';
|
||||
import { BaseService } from 'src/services/base.service';
|
||||
|
||||
@Injectable()
|
||||
export class IntegrityService extends BaseService {
|
||||
// private backupLock = false;
|
||||
|
||||
@OnEvent({ name: 'ConfigInit', workers: [ImmichWorker.Microservices] })
|
||||
async onConfigInit({
|
||||
newConfig: {
|
||||
backup: { database },
|
||||
},
|
||||
}: ArgOf<'ConfigInit'>) {
|
||||
// this.backupLock = await this.databaseRepository.tryLock(DatabaseLock.BackupDatabase);
|
||||
// if (this.backupLock) {
|
||||
// this.cronRepository.create({
|
||||
// name: 'backupDatabase',
|
||||
// expression: database.cronExpression,
|
||||
// onTick: () => handlePromiseError(this.jobRepository.queue({ name: JobName.DatabaseBackup }), this.logger),
|
||||
// start: database.enabled,
|
||||
// });
|
||||
// }
|
||||
setTimeout(() => {
|
||||
this.jobRepository.queue({
|
||||
name: JobName.IntegrityOrphanedAndMissingFiles,
|
||||
data: {},
|
||||
});
|
||||
}, 1000);
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'ConfigUpdate', server: true })
|
||||
async onConfigUpdate({ newConfig: { backup } }: ArgOf<'ConfigUpdate'>) {
|
||||
// if (!this.backupLock) {
|
||||
// return;
|
||||
// }
|
||||
// this.cronRepository.update({
|
||||
// name: 'backupDatabase',
|
||||
// expression: backup.database.cronExpression,
|
||||
// start: backup.database.enabled,
|
||||
// });
|
||||
}
|
||||
|
||||
@OnJob({ name: JobName.IntegrityOrphanedAndMissingFiles, queue: QueueName.BackgroundTask })
|
||||
async handleOrphanedAndMissingFiles(): Promise<JobStatus> {
|
||||
// (1) Asset files
|
||||
const pathsLocal = new Set<string>();
|
||||
const pathsDb = new Set<string>();
|
||||
|
||||
await Promise.all([
|
||||
// scan all local paths
|
||||
(async () => {
|
||||
const pathsOnDisk = this.storageRepository.walk({
|
||||
pathsToCrawl: [
|
||||
StorageFolder.EncodedVideo,
|
||||
StorageFolder.Library,
|
||||
StorageFolder.Upload,
|
||||
StorageFolder.Thumbnails,
|
||||
].map((folder) => StorageCore.getBaseFolder(folder)),
|
||||
includeHidden: false,
|
||||
take: JOBS_LIBRARY_PAGINATION_SIZE,
|
||||
});
|
||||
|
||||
for await (const pathBatch of pathsOnDisk) {
|
||||
for (const path of pathBatch) {
|
||||
if (!pathsDb.delete(path)) {
|
||||
pathsLocal.add(path);
|
||||
}
|
||||
|
||||
console.info(pathsLocal.size, pathsDb.size);
|
||||
}
|
||||
}
|
||||
})(),
|
||||
// scan "asset"
|
||||
(async () => {
|
||||
const pathsInDb = await this.assetRepository.getAllAssetPaths();
|
||||
|
||||
for await (const { originalPath, encodedVideoPath } of pathsInDb) {
|
||||
if (!pathsLocal.delete(originalPath)) {
|
||||
pathsDb.add(originalPath);
|
||||
}
|
||||
|
||||
if (encodedVideoPath && !pathsLocal.delete(encodedVideoPath)) {
|
||||
pathsDb.add(encodedVideoPath);
|
||||
}
|
||||
|
||||
console.info(pathsLocal.size, pathsDb.size);
|
||||
}
|
||||
})(),
|
||||
// scan "asset_files"
|
||||
(async () => {
|
||||
const pathsInDb = await this.assetRepository.getAllAssetFilePaths();
|
||||
|
||||
for await (const { path } of pathsInDb) {
|
||||
if (!pathsLocal.delete(path)) {
|
||||
pathsDb.add(path);
|
||||
}
|
||||
|
||||
console.info(pathsLocal.size, pathsDb.size);
|
||||
}
|
||||
})(),
|
||||
]);
|
||||
|
||||
console.info('Orphaned files:', pathsLocal);
|
||||
console.info('Missing files:', pathsDb);
|
||||
|
||||
// profile: skipped
|
||||
return JobStatus.Success;
|
||||
}
|
||||
|
||||
@OnJob({ name: JobName.IntegrityChecksumFiles, queue: QueueName.BackgroundTask })
|
||||
async handleChecksumFiles(): Promise<JobStatus> {
|
||||
// todo
|
||||
return JobStatus.Success;
|
||||
}
|
||||
}
|
||||
@@ -391,7 +391,11 @@ export type JobItem =
|
||||
| { name: JobName.Ocr; data: IEntityJob }
|
||||
|
||||
// Workflow
|
||||
| { name: JobName.WorkflowRun; data: IWorkflowJob };
|
||||
| { name: JobName.WorkflowRun; data: IWorkflowJob }
|
||||
|
||||
// Integrity
|
||||
| { name: JobName.IntegrityOrphanedAndMissingFiles; data: IBaseJob }
|
||||
| { name: JobName.IntegrityChecksumFiles; data: IBaseJob };
|
||||
|
||||
export type VectorExtension = (typeof VECTOR_EXTENSIONS)[number];
|
||||
|
||||
|
||||
Reference in New Issue
Block a user