Files
immich/server/src/services/integrity.service.ts

702 lines
22 KiB
TypeScript
Raw Normal View History

2025-11-26 15:45:58 +00:00
import { Injectable } from '@nestjs/common';
2025-11-27 12:00:35 +00:00
import { createHash } from 'node:crypto';
import { createReadStream } from 'node:fs';
2025-11-26 17:36:28 +00:00
import { stat } from 'node:fs/promises';
import { basename } from 'node:path';
import { Readable, Writable } from 'node:stream';
2025-11-27 12:00:35 +00:00
import { pipeline } from 'node:stream/promises';
2025-11-26 15:45:58 +00:00
import { JOBS_LIBRARY_PAGINATION_SIZE } from 'src/constants';
import { StorageCore } from 'src/cores/storage.core';
import { OnEvent, OnJob } from 'src/decorators';
import { AuthDto } from 'src/dtos/auth.dto';
import {
IntegrityGetReportDto,
IntegrityReportResponseDto,
IntegrityReportSummaryResponseDto,
} from 'src/dtos/integrity.dto';
import {
AssetStatus,
CacheControl,
DatabaseLock,
ImmichWorker,
IntegrityReportType,
JobName,
JobStatus,
QueueName,
StorageFolder,
SystemMetadataKey,
} from 'src/enum';
2025-11-26 15:45:58 +00:00
import { ArgOf } from 'src/repositories/event.repository';
import { BaseService } from 'src/services/base.service';
2025-11-28 17:44:37 +00:00
import {
IIntegrityDeleteReportJob,
2025-11-28 17:44:37 +00:00
IIntegrityJob,
IIntegrityMissingFilesJob,
2025-11-28 17:44:37 +00:00
IIntegrityOrphanedFilesJob,
IIntegrityPathWithChecksumJob,
IIntegrityPathWithReportJob,
} from 'src/types';
import { ImmichFileResponse } from 'src/utils/file';
import { handlePromiseError } from 'src/utils/misc';
/**
* Orphan Files:
* Files are detected in /data/encoded-video, /data/library, /data/upload
* Checked against the asset table
* Files are detected in /data/thumbs
* Checked against the asset_file table
*
* * Can perform download or delete of files
*
* Missing Files:
* Paths are queried from asset(originalPath, encodedVideoPath), asset_file(path)
* Check whether files exist on disk
*
* * Reports must include origin (asset or asset_file) & ID for further action
* * Can perform trash (asset) or delete (asset_file)
*
* Checksum Mismatch:
* Paths & checksums are queried from asset(originalPath, checksum)
* Check whether files match checksum, missing files ignored
*
* * Reports must include origin (as above) for further action
* * Can perform download or trash (asset)
*/
2025-11-26 15:45:58 +00:00
@Injectable()
export class IntegrityService extends BaseService {
private integrityLock = false;
2025-11-26 15:45:58 +00:00
@OnEvent({ name: 'ConfigInit', workers: [ImmichWorker.Microservices] })
async onConfigInit({
newConfig: {
integrityChecks: { orphanedFiles, missingFiles, checksumFiles },
2025-11-26 15:45:58 +00:00
},
}: ArgOf<'ConfigInit'>) {
this.integrityLock = await this.databaseRepository.tryLock(DatabaseLock.IntegrityCheck);
if (this.integrityLock) {
this.cronRepository.create({
name: 'integrityOrphanedFiles',
expression: orphanedFiles.cronExpression,
onTick: () =>
handlePromiseError(
this.jobRepository.queue({ name: JobName.IntegrityOrphanedFilesQueueAll, data: {} }),
this.logger,
),
start: orphanedFiles.enabled,
});
this.cronRepository.create({
name: 'integrityMissingFiles',
expression: missingFiles.cronExpression,
onTick: () =>
handlePromiseError(
this.jobRepository.queue({ name: JobName.IntegrityMissingFilesQueueAll, data: {} }),
this.logger,
),
start: missingFiles.enabled,
});
this.cronRepository.create({
name: 'integrityChecksumFiles',
expression: checksumFiles.cronExpression,
onTick: () =>
handlePromiseError(this.jobRepository.queue({ name: JobName.IntegrityChecksumFiles, data: {} }), this.logger),
start: checksumFiles.enabled,
});
}
// debug: run on boot
setTimeout(() => {
void this.jobRepository.queue({
name: JobName.IntegrityOrphanedFilesQueueAll,
data: {},
});
void this.jobRepository.queue({
name: JobName.IntegrityMissingFilesQueueAll,
data: {},
});
void this.jobRepository.queue({
name: JobName.IntegrityChecksumFiles,
data: {},
});
}, 1000);
2025-11-26 15:45:58 +00:00
}
@OnEvent({ name: 'ConfigUpdate', server: true })
onConfigUpdate({
newConfig: {
integrityChecks: { orphanedFiles, missingFiles, checksumFiles },
},
}: ArgOf<'ConfigUpdate'>) {
if (!this.integrityLock) {
return;
}
this.cronRepository.update({
name: 'integrityOrphanedFiles',
expression: orphanedFiles.cronExpression,
start: orphanedFiles.enabled,
});
this.cronRepository.update({
name: 'integrityMissingFiles',
expression: missingFiles.cronExpression,
start: missingFiles.enabled,
});
this.cronRepository.update({
name: 'integrityChecksumFiles',
expression: checksumFiles.cronExpression,
start: checksumFiles.enabled,
});
2025-11-26 15:45:58 +00:00
}
getIntegrityReportSummary(): Promise<IntegrityReportSummaryResponseDto> {
return this.integrityRepository.getIntegrityReportSummary();
}
async getIntegrityReport(dto: IntegrityGetReportDto): Promise<IntegrityReportResponseDto> {
return this.integrityRepository.getIntegrityReports({ page: dto.page || 1, size: dto.size || 100 }, dto.type);
}
getIntegrityReportCsv(type: IntegrityReportType): Readable {
2025-12-18 12:56:14 +00:00
const items = this.integrityRepository.streamIntegrityReports(type);
// very rudimentary csv serialiser
async function* generator() {
yield 'id,type,assetId,fileAssetId,path\n';
for await (const item of items) {
// no expectation of particularly bad filenames
// but they could potentially have a newline or quote character
yield `${item.id},${item.type},${item.assetId},${item.fileAssetId},"${item.path.replaceAll('"', '""')}"\n`;
}
}
return Readable.from(generator());
}
async getIntegrityReportFile(id: string): Promise<ImmichFileResponse> {
const { path } = await this.integrityRepository.getById(id);
return new ImmichFileResponse({
path,
fileName: basename(path),
contentType: 'application/octet-stream',
cacheControl: CacheControl.PrivateWithoutCache,
});
}
async deleteIntegrityReport(auth: AuthDto, id: string): Promise<void> {
const { path, assetId, fileAssetId } = await this.integrityRepository.getById(id);
if (assetId) {
await this.assetRepository.updateAll([assetId], {
deletedAt: new Date(),
status: AssetStatus.Trashed,
});
await this.eventRepository.emit('AssetTrashAll', {
assetIds: [assetId],
userId: auth.user.id,
});
await this.integrityRepository.deleteById(id);
} else if (fileAssetId) {
await this.assetRepository.deleteFiles([{ id: fileAssetId }]);
} else {
await this.storageRepository.unlink(path);
await this.integrityRepository.deleteById(id);
}
}
@OnJob({ name: JobName.IntegrityOrphanedFilesQueueAll, queue: QueueName.IntegrityCheck })
async handleOrphanedFilesQueueAll({ refreshOnly }: IIntegrityJob = {}): Promise<JobStatus> {
this.logger.log(`Checking for out of date orphaned file reports...`);
2025-12-18 12:56:14 +00:00
const reports = this.integrityRepository.streamIntegrityReportsWithAssetChecksum(IntegrityReportType.OrphanFile);
let total = 0;
for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) {
await this.jobRepository.queue({
2025-11-28 17:44:37 +00:00
name: JobName.IntegrityOrphanedFilesRefresh,
data: {
items: batchReports,
},
});
total += batchReports.length;
this.logger.log(`Queued report check of ${batchReports.length} report(s) (${total} so far)`);
}
if (refreshOnly) {
this.logger.log('Refresh complete.');
return JobStatus.Success;
}
2025-11-26 17:36:28 +00:00
this.logger.log(`Scanning for orphaned files...`);
const assetPaths = this.storageRepository.walk({
pathsToCrawl: [StorageFolder.EncodedVideo, StorageFolder.Library, StorageFolder.Upload].map((folder) =>
StorageCore.getBaseFolder(folder),
),
includeHidden: false,
take: JOBS_LIBRARY_PAGINATION_SIZE,
});
const assetFilePaths = this.storageRepository.walk({
pathsToCrawl: [StorageCore.getBaseFolder(StorageFolder.Thumbnails)],
includeHidden: false,
take: JOBS_LIBRARY_PAGINATION_SIZE,
});
async function* paths() {
for await (const batch of assetPaths) {
yield ['asset', batch] as const;
}
for await (const batch of assetFilePaths) {
yield ['asset_file', batch] as const;
}
}
total = 0;
2025-11-26 17:36:28 +00:00
for await (const [batchType, batchPaths] of paths()) {
await this.jobRepository.queue({
name: JobName.IntegrityOrphanedFiles,
data: {
type: batchType,
paths: batchPaths,
},
});
const count = batchPaths.length;
total += count;
this.logger.log(`Queued orphan check of ${count} file(s) (${total} so far)`);
}
return JobStatus.Success;
}
@OnJob({ name: JobName.IntegrityOrphanedFiles, queue: QueueName.IntegrityCheck })
2025-11-26 17:36:28 +00:00
async handleOrphanedFiles({ type, paths }: IIntegrityOrphanedFilesJob): Promise<JobStatus> {
this.logger.log(`Processing batch of ${paths.length} files to check if they are orphaned.`);
const orphanedFiles = new Set<string>(paths);
if (type === 'asset') {
const assets = await this.integrityRepository.getAssetPathsByPaths(paths);
2025-11-26 17:36:28 +00:00
for (const { originalPath, encodedVideoPath } of assets) {
orphanedFiles.delete(originalPath);
if (encodedVideoPath) {
orphanedFiles.delete(encodedVideoPath);
2025-11-26 15:45:58 +00:00
}
2025-11-26 17:36:28 +00:00
}
} else {
const assets = await this.integrityRepository.getAssetFilePathsByPaths(paths);
2025-11-26 17:36:28 +00:00
for (const { path } of assets) {
orphanedFiles.delete(path);
}
}
2025-12-01 11:51:49 +00:00
if (orphanedFiles.size > 0) {
await this.integrityRepository.create(
[...orphanedFiles].map((path) => ({
type: IntegrityReportType.OrphanFile,
path,
})),
);
}
2025-11-26 17:36:28 +00:00
this.logger.log(`Processed ${paths.length} and found ${orphanedFiles.size} orphaned file(s).`);
return JobStatus.Success;
}
2025-11-26 15:45:58 +00:00
@OnJob({ name: JobName.IntegrityOrphanedFilesRefresh, queue: QueueName.IntegrityCheck })
async handleOrphanedRefresh({ items }: IIntegrityPathWithReportJob): Promise<JobStatus> {
this.logger.log(`Processing batch of ${items.length} reports to check if they are out of date.`);
const results = await Promise.all(
items.map(({ reportId, path }) =>
stat(path)
.then(() => void 0)
.catch(() => reportId),
),
);
const reportIds = results.filter(Boolean) as string[];
if (reportIds.length > 0) {
await this.integrityRepository.deleteByIds(reportIds);
}
this.logger.log(`Processed ${items.length} paths and found ${reportIds.length} report(s) out of date.`);
return JobStatus.Success;
}
@OnJob({ name: JobName.IntegrityMissingFilesQueueAll, queue: QueueName.IntegrityCheck })
async handleMissingFilesQueueAll({ refreshOnly }: IIntegrityJob = {}): Promise<JobStatus> {
if (refreshOnly) {
2025-11-28 17:44:37 +00:00
this.logger.log(`Checking for out of date missing file reports...`);
2025-12-18 12:56:14 +00:00
const reports = this.integrityRepository.streamIntegrityReportsWithAssetChecksum(IntegrityReportType.MissingFile);
2025-11-28 17:44:37 +00:00
let total = 0;
for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) {
await this.jobRepository.queue({
name: JobName.IntegrityMissingFilesRefresh,
data: {
items: batchReports,
},
});
total += batchReports.length;
this.logger.log(`Queued report check of ${batchReports.length} report(s) (${total} so far)`);
}
this.logger.log('Refresh complete.');
return JobStatus.Success;
}
2025-11-27 12:00:35 +00:00
this.logger.log(`Scanning for missing files...`);
const assetPaths = this.integrityRepository.streamAssetPaths();
2025-11-26 15:45:58 +00:00
2025-11-26 17:36:28 +00:00
let total = 0;
for await (const batchPaths of chunk(assetPaths, JOBS_LIBRARY_PAGINATION_SIZE)) {
2025-11-26 17:36:28 +00:00
await this.jobRepository.queue({
name: JobName.IntegrityMissingFiles,
data: {
items: batchPaths,
2025-11-26 17:36:28 +00:00
},
});
total += batchPaths.length;
this.logger.log(`Queued missing check of ${batchPaths.length} file(s) (${total} so far)`);
}
return JobStatus.Success;
}
@OnJob({ name: JobName.IntegrityMissingFiles, queue: QueueName.IntegrityCheck })
async handleMissingFiles({ items }: IIntegrityMissingFilesJob): Promise<JobStatus> {
this.logger.log(`Processing batch of ${items.length} files to check if they are missing.`);
2025-11-26 17:36:28 +00:00
const results = await Promise.all(
items.map((item) =>
stat(item.path)
.then(() => ({ ...item, exists: true }))
.catch(() => ({ ...item, exists: false })),
2025-11-26 17:36:28 +00:00
),
);
const outdatedReports = results
.filter(({ exists, reportId }) => exists && reportId)
.map(({ reportId }) => reportId!);
2025-11-26 15:45:58 +00:00
if (outdatedReports.length > 0) {
await this.integrityRepository.deleteByIds(outdatedReports);
}
const missingFiles = results.filter(({ exists }) => !exists);
if (missingFiles.length > 0) {
await this.integrityRepository.create(
missingFiles.map(({ path, assetId, fileAssetId }) => ({
type: IntegrityReportType.MissingFile,
path,
assetId,
fileAssetId,
})),
);
}
2025-11-26 15:45:58 +00:00
this.logger.log(`Processed ${items.length} and found ${missingFiles.length} missing file(s).`);
2025-11-26 15:45:58 +00:00
return JobStatus.Success;
}
@OnJob({ name: JobName.IntegrityMissingFilesRefresh, queue: QueueName.IntegrityCheck })
2025-11-28 17:44:37 +00:00
async handleMissingRefresh({ items: paths }: IIntegrityPathWithReportJob): Promise<JobStatus> {
this.logger.log(`Processing batch of ${paths.length} reports to check if they are out of date.`);
const results = await Promise.all(
paths.map(({ reportId, path }) =>
stat(path)
.then(() => reportId)
.catch(() => void 0),
),
);
const reportIds = results.filter(Boolean) as string[];
if (reportIds.length > 0) {
await this.integrityRepository.deleteByIds(reportIds);
2025-11-28 17:44:37 +00:00
}
this.logger.log(`Processed ${paths.length} paths and found ${reportIds.length} report(s) out of date.`);
return JobStatus.Success;
}
@OnJob({ name: JobName.IntegrityChecksumFiles, queue: QueueName.IntegrityCheck })
async handleChecksumFiles({ refreshOnly }: IIntegrityJob = {}): Promise<JobStatus> {
if (refreshOnly) {
2025-11-28 17:44:37 +00:00
this.logger.log(`Checking for out of date checksum file reports...`);
2025-12-18 12:56:14 +00:00
const reports = this.integrityRepository.streamIntegrityReportsWithAssetChecksum(
IntegrityReportType.ChecksumFail,
);
2025-11-28 17:44:37 +00:00
let total = 0;
for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) {
await this.jobRepository.queue({
name: JobName.IntegrityChecksumFilesRefresh,
data: {
2025-12-01 11:20:34 +00:00
items: batchReports.map(({ path, reportId, checksum }) => ({
path,
reportId,
checksum: checksum?.toString('hex'),
})),
2025-11-28 17:44:37 +00:00
},
});
total += batchReports.length;
this.logger.log(`Queued report check of ${batchReports.length} report(s) (${total} so far)`);
}
this.logger.log('Refresh complete.');
return JobStatus.Success;
}
2025-12-01 14:27:04 +00:00
const {
integrityChecks: {
checksumFiles: { timeLimit, percentageLimit },
},
} = await this.getConfig({
withCache: true,
});
2025-11-27 12:00:35 +00:00
this.logger.log(
`Checking file checksums... (will run for up to ${(timeLimit / (60 * 60 * 1000)).toFixed(2)} hours or until ${(percentageLimit * 100).toFixed(2)}% of assets are processed)`,
);
let processed = 0;
const startedAt = Date.now();
const { count } = await this.integrityRepository.getAssetCount();
2025-11-27 12:00:35 +00:00
const checkpoint = await this.systemMetadataRepository.get(SystemMetadataKey.IntegrityChecksumCheckpoint);
let startMarker: Date | undefined = checkpoint?.date ? new Date(checkpoint.date) : undefined;
2025-12-01 14:27:04 +00:00
let endMarker: Date | undefined;
2025-11-27 12:00:35 +00:00
const printStats = () => {
const averageTime = ((Date.now() - startedAt) / processed).toFixed(2);
const completionProgress = ((processed / count) * 100).toFixed(2);
this.logger.log(
`Processed ${processed} files so far... (avg. ${averageTime} ms/asset, ${completionProgress}% of all assets)`,
);
};
let lastCreatedAt: Date | undefined;
finishEarly: do {
this.logger.log(
`Processing assets in range [${startMarker?.toISOString() ?? 'beginning'}, ${endMarker?.toISOString() ?? 'end'}]`,
);
const assets = this.integrityRepository.streamAssetChecksums(startMarker, endMarker);
2025-11-27 12:00:35 +00:00
endMarker = startMarker;
startMarker = undefined;
for await (const { originalPath, checksum, createdAt, assetId, reportId } of assets) {
2025-11-28 17:44:37 +00:00
processed++;
2025-11-27 12:00:35 +00:00
try {
const hash = createHash('sha1');
await pipeline([
createReadStream(originalPath),
new Writable({
write(chunk, _encoding, callback) {
hash.update(chunk);
callback();
},
}),
]);
if (checksum.equals(hash.digest())) {
if (reportId) {
await this.integrityRepository.deleteById(reportId);
}
} else {
2025-11-27 12:00:35 +00:00
throw new Error('File failed checksum');
}
} catch (error) {
if ((error as { code?: string }).code === 'ENOENT') {
if (reportId) {
await this.integrityRepository.deleteById(reportId);
}
// missing file; handled by the missing files job
continue;
}
2025-11-27 12:00:35 +00:00
this.logger.warn('Failed to process a file: ' + error);
await this.integrityRepository.create({
path: originalPath,
type: IntegrityReportType.ChecksumFail,
assetId,
});
2025-11-27 12:00:35 +00:00
}
if (processed % 100 === 0) {
printStats();
}
if (Date.now() > startedAt + timeLimit || processed > count * percentageLimit) {
this.logger.log('Reached stop criteria.');
lastCreatedAt = createdAt;
break finishEarly;
}
}
} while (endMarker);
await this.systemMetadataRepository.set(SystemMetadataKey.IntegrityChecksumCheckpoint, {
2025-11-27 12:00:35 +00:00
date: lastCreatedAt?.toISOString(),
});
printStats();
if (lastCreatedAt) {
this.logger.log(`Finished checksum job, will continue from ${lastCreatedAt.toISOString()}.`);
} else {
this.logger.log(`Finished checksum job, covered all assets.`);
}
2025-11-26 15:45:58 +00:00
return JobStatus.Success;
}
2025-11-28 17:44:37 +00:00
@OnJob({ name: JobName.IntegrityChecksumFilesRefresh, queue: QueueName.IntegrityCheck })
2025-11-28 17:44:37 +00:00
async handleChecksumRefresh({ items: paths }: IIntegrityPathWithChecksumJob): Promise<JobStatus> {
this.logger.log(`Processing batch of ${paths.length} reports to check if they are out of date.`);
const results = await Promise.all(
paths.map(async ({ reportId, path, checksum }) => {
2025-12-01 11:51:49 +00:00
if (!checksum) {
return reportId;
}
2025-11-28 17:44:37 +00:00
try {
const hash = createHash('sha1');
await pipeline([
createReadStream(path),
new Writable({
write(chunk, _encoding, callback) {
hash.update(chunk);
callback();
},
}),
]);
2025-12-01 11:20:34 +00:00
if (Buffer.from(checksum, 'hex').equals(hash.digest())) {
2025-11-28 17:44:37 +00:00
return reportId;
}
} catch (error) {
if ((error as { code?: string }).code === 'ENOENT') {
return reportId;
}
}
}),
);
const reportIds = results.filter(Boolean) as string[];
if (reportIds.length > 0) {
await this.integrityRepository.deleteByIds(reportIds);
2025-11-28 17:44:37 +00:00
}
this.logger.log(`Processed ${paths.length} paths and found ${reportIds.length} report(s) out of date.`);
return JobStatus.Success;
}
@OnJob({ name: JobName.IntegrityReportDelete, queue: QueueName.IntegrityCheck })
async handleDeleteIntegrityReport({ type }: IIntegrityDeleteReportJob): Promise<JobStatus> {
this.logger.log(`Deleting all entries for ${type ?? 'all types of'} integrity report`);
let properties;
switch (type) {
case IntegrityReportType.ChecksumFail: {
properties = ['assetId'] as const;
break;
}
case IntegrityReportType.MissingFile: {
properties = ['assetId', 'fileAssetId'] as const;
break;
}
case IntegrityReportType.OrphanFile: {
properties = [void 0] as const;
break;
}
default: {
properties = [void 0, 'assetId', 'fileAssetId'] as const;
break;
}
}
for (const property of properties) {
const reports = this.integrityRepository.streamIntegrityReportsByProperty(property, type);
for await (const report of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) {
// todo: queue sub-job here instead?
switch (property) {
case 'assetId': {
const ids = report.map(({ assetId }) => assetId!);
await this.assetRepository.updateAll(ids, {
deletedAt: new Date(),
status: AssetStatus.Trashed,
});
await this.eventRepository.emit('AssetTrashAll', {
assetIds: ids,
userId: '', // ???
});
await this.integrityRepository.deleteByIds(report.map(({ id }) => id));
break;
}
case 'fileAssetId': {
await this.assetRepository.deleteFiles(report.map(({ fileAssetId }) => ({ id: fileAssetId! })));
break;
}
default: {
await Promise.all(report.map(({ path }) => this.storageRepository.unlink(path).catch(() => void 0)));
await this.integrityRepository.deleteByIds(report.map(({ id }) => id));
break;
}
}
}
}
this.logger.log('Finished deleting integrity report.');
return JobStatus.Success;
}
2025-11-26 15:45:58 +00:00
}
async function* chunk<T>(generator: AsyncIterableIterator<T>, n: number) {
let chunk: T[] = [];
for await (const item of generator) {
chunk.push(item);
if (chunk.length === n) {
yield chunk;
chunk = [];
}
}
if (chunk.length > 0) {
yield chunk;
}
}