feat: refresh missing & checksum

This commit is contained in:
izzy
2025-11-28 17:44:37 +00:00
parent e447ba87c6
commit 4d7f7b80da
5 changed files with 185 additions and 13 deletions

View File

@@ -6,6 +6,7 @@ import {
CheckExistingAssetsDto,
CreateAlbumDto,
CreateLibraryDto,
JobCreateDto,
MaintenanceAction,
MetadataSearchDto,
Permission,
@@ -21,6 +22,7 @@ import {
checkExistingAssets,
createAlbum,
createApiKey,
createJob,
createLibrary,
createPartner,
createPerson,
@@ -52,9 +54,12 @@ import {
import { BrowserContext } from '@playwright/test';
import { exec, spawn } from 'node:child_process';
import { createHash } from 'node:crypto';
import { existsSync, mkdirSync, renameSync, rmSync, writeFileSync } from 'node:fs';
import { createWriteStream, existsSync, mkdirSync, renameSync, rmSync, writeFileSync } from 'node:fs';
import { mkdtemp } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { dirname, resolve } from 'node:path';
import { dirname, join, resolve } from 'node:path';
import { Readable } from 'node:stream';
import { pipeline } from 'node:stream/promises';
import { setTimeout as setAsyncTimeout } from 'node:timers/promises';
import { promisify } from 'node:util';
import pg from 'pg';
@@ -171,6 +176,7 @@ export const utils = {
'user',
'system_metadata',
'tag',
'integrity_report',
];
const sql: string[] = [];
@@ -481,6 +487,9 @@ export const utils = {
tagAssets: (accessToken: string, tagId: string, assetIds: string[]) =>
tagAssets({ id: tagId, bulkIdsDto: { ids: assetIds } }, { headers: asBearerAuth(accessToken) }),
createJob: async (accessToken: string, jobCreateDto: JobCreateDto) =>
createJob({ jobCreateDto }, { headers: asBearerAuth(accessToken) }),
queueCommand: async (accessToken: string, name: QueueName, queueCommandDto: QueueCommandDto) =>
runQueueCommandLegacy({ name, queueCommandDto }, { headers: asBearerAuth(accessToken) }),
@@ -559,6 +568,50 @@ export const utils = {
mkdirSync(`${testAssetDir}/temp`, { recursive: true });
},
putFile(source: string, dest: string) {
return executeCommand('docker', ['cp', source, `immich-e2e-server:${dest}`]).promise;
},
async putTextFile(contents: string, dest: string) {
const dir = await mkdtemp(join(tmpdir(), 'test-'));
const fn = join(dir, 'file');
await pipeline(Readable.from(contents), createWriteStream(fn));
return executeCommand('docker', ['cp', fn, `immich-e2e-server:${dest}`]).promise;
},
async move(source: string, dest: string) {
return executeCommand('docker', ['exec', 'immich-e2e-server', 'mv', source, dest]).promise;
},
async copyFolder(source: string, dest: string) {
return executeCommand('docker', ['exec', 'immich-e2e-server', 'cp', '-r', source, dest]).promise;
},
async deleteFile(path: string) {
return executeCommand('docker', ['exec', 'immich-e2e-server', 'rm', path]).promise;
},
async deleteFolder(path: string) {
return executeCommand('docker', ['exec', 'immich-e2e-server', 'rm', '-r', path]).promise;
},
async truncateFolder(path: string) {
return executeCommand('docker', [
'exec',
'immich-e2e-server',
'find',
path,
'-type',
'f',
'-exec',
'truncate',
'-s',
'1',
'{}',
'\;',
]).promise;
},
resetAdminConfig: async (accessToken: string) => {
const defaultConfig = await getConfigDefaults({ headers: asBearerAuth(accessToken) });
await updateConfig({ systemConfigDto: defaultConfig }, { headers: asBearerAuth(accessToken) });

View File

@@ -654,10 +654,12 @@ export enum JobName {
// Integrity
IntegrityOrphanedFilesQueueAll = 'IntegrityOrphanedFilesQueueAll',
IntegrityOrphanedFiles = 'IntegrityOrphanedFiles',
IntegrityOrphanedCheckReports = 'IntegrityOrphanedCheckReports',
IntegrityOrphanedFilesRefresh = 'IntegrityOrphanedRefresh',
IntegrityMissingFilesQueueAll = 'IntegrityMissingFilesQueueAll',
IntegrityMissingFiles = 'IntegrityMissingFiles',
IntegrityMissingFilesRefresh = 'IntegrityMissingFilesRefresh',
IntegrityChecksumFiles = 'IntegrityChecksumFiles',
IntegrityChecksumFilesRefresh = 'IntegrityChecksumFilesRefresh',
}
export enum QueueCommand {

View File

@@ -321,7 +321,14 @@ export class AssetJobRepository {
@GenerateSql({ params: [DummyValue.STRING], stream: true })
streamIntegrityReports(type: IntegrityReportType) {
return this.db.selectFrom('integrity_report').select(['id as reportId', 'path']).where('type', '=', type).stream();
return this.db
.selectFrom('integrity_report')
.select(['integrity_report.id as reportId', 'integrity_report.path'])
.where('integrity_report.type', '=', type)
.$if(type === IntegrityReportType.ChecksumFail, (eb) =>
eb.leftJoin('asset', 'integrity_report.path', 'asset.originalPath').select('asset.checksum'),
)
.stream();
}
@GenerateSql({ params: [], stream: true })

View File

@@ -19,7 +19,12 @@ import {
} from 'src/enum';
import { ArgOf } from 'src/repositories/event.repository';
import { BaseService } from 'src/services/base.service';
import { IIntegrityJob, IIntegrityOrphanedFilesJob, IIntegrityPathWithReportJob } from 'src/types';
import {
IIntegrityJob,
IIntegrityOrphanedFilesJob,
IIntegrityPathWithChecksumJob,
IIntegrityPathWithReportJob,
} from 'src/types';
import { handlePromiseError } from 'src/utils/misc';
async function* chunk<T>(generator: AsyncIterableIterator<T>, n: number) {
@@ -138,7 +143,7 @@ export class IntegrityService extends BaseService {
let total = 0;
for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) {
await this.jobRepository.queue({
name: JobName.IntegrityOrphanedCheckReports,
name: JobName.IntegrityOrphanedFilesRefresh,
data: {
items: batchReports,
},
@@ -230,8 +235,8 @@ export class IntegrityService extends BaseService {
return JobStatus.Success;
}
@OnJob({ name: JobName.IntegrityOrphanedCheckReports, queue: QueueName.BackgroundTask })
async handleOrphanedCheckReports({ items: paths }: IIntegrityPathWithReportJob): Promise<JobStatus> {
@OnJob({ name: JobName.IntegrityOrphanedFilesRefresh, queue: QueueName.BackgroundTask })
async handleOrphanedRefresh({ items: paths }: IIntegrityPathWithReportJob): Promise<JobStatus> {
this.logger.log(`Processing batch of ${paths.length} reports to check if they are out of date.`);
const results = await Promise.all(
@@ -255,7 +260,23 @@ export class IntegrityService extends BaseService {
@OnJob({ name: JobName.IntegrityMissingFilesQueueAll, queue: QueueName.BackgroundTask })
async handleMissingFilesQueueAll({ refreshOnly }: IIntegrityJob = {}): Promise<JobStatus> {
if (refreshOnly) {
// TODO
this.logger.log(`Checking for out of date missing file reports...`);
const reports = this.assetJobRepository.streamIntegrityReports(IntegrityReportType.MissingFile);
let total = 0;
for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) {
await this.jobRepository.queue({
name: JobName.IntegrityMissingFilesRefresh,
data: {
items: batchReports,
},
});
total += batchReports.length;
this.logger.log(`Queued report check of ${batchReports.length} report(s) (${total} so far)`);
}
this.logger.log('Refresh complete.');
return JobStatus.Success;
}
@@ -314,10 +335,48 @@ export class IntegrityService extends BaseService {
return JobStatus.Success;
}
@OnJob({ name: JobName.IntegrityMissingFilesRefresh, queue: QueueName.BackgroundTask })
async handleMissingRefresh({ items: paths }: IIntegrityPathWithReportJob): Promise<JobStatus> {
this.logger.log(`Processing batch of ${paths.length} reports to check if they are out of date.`);
const results = await Promise.all(
paths.map(({ reportId, path }) =>
stat(path)
.then(() => reportId)
.catch(() => void 0),
),
);
const reportIds = results.filter(Boolean) as string[];
if (reportIds.length > 0) {
await this.integrityReportRepository.deleteByIds(reportIds);
}
this.logger.log(`Processed ${paths.length} paths and found ${reportIds.length} report(s) out of date.`);
return JobStatus.Success;
}
@OnJob({ name: JobName.IntegrityChecksumFiles, queue: QueueName.BackgroundTask })
async handleChecksumFiles({ refreshOnly }: IIntegrityJob = {}): Promise<JobStatus> {
if (refreshOnly) {
// TODO
this.logger.log(`Checking for out of date checksum file reports...`);
const reports = this.assetJobRepository.streamIntegrityReports(IntegrityReportType.ChecksumFail);
let total = 0;
for await (const batchReports of chunk(reports, JOBS_LIBRARY_PAGINATION_SIZE)) {
await this.jobRepository.queue({
name: JobName.IntegrityChecksumFilesRefresh,
data: {
items: batchReports,
},
});
total += batchReports.length;
this.logger.log(`Queued report check of ${batchReports.length} report(s) (${total} so far)`);
}
this.logger.log('Refresh complete.');
return JobStatus.Success;
}
@@ -358,6 +417,8 @@ export class IntegrityService extends BaseService {
startMarker = undefined;
for await (const { originalPath, checksum, createdAt, reportId } of assets) {
processed++;
try {
const hash = createHash('sha1');
@@ -394,7 +455,6 @@ export class IntegrityService extends BaseService {
});
}
processed++;
if (processed % 100 === 0) {
printStats();
}
@@ -421,4 +481,48 @@ export class IntegrityService extends BaseService {
return JobStatus.Success;
}
@OnJob({ name: JobName.IntegrityChecksumFilesRefresh, queue: QueueName.BackgroundTask })
async handleChecksumRefresh({ items: paths }: IIntegrityPathWithChecksumJob): Promise<JobStatus> {
this.logger.log(`Processing batch of ${paths.length} reports to check if they are out of date.`);
const results = await Promise.all(
paths.map(async ({ reportId, path, checksum }) => {
console.info('chekc', reportId, path, checksum);
if (!checksum) return reportId;
try {
const hash = createHash('sha1');
await pipeline([
createReadStream(path),
new Writable({
write(chunk, _encoding, callback) {
hash.update(chunk);
callback();
},
}),
]);
console.info('compare', checksum, hash.digest());
if (checksum.equals(hash.digest())) {
return reportId;
}
} catch (error) {
if ((error as { code?: string }).code === 'ENOENT') {
return reportId;
}
}
}),
);
const reportIds = results.filter(Boolean) as string[];
if (reportIds.length > 0) {
await this.integrityReportRepository.deleteByIds(reportIds);
}
this.logger.log(`Processed ${paths.length} paths and found ${reportIds.length} report(s) out of date.`);
return JobStatus.Success;
}
}

View File

@@ -295,6 +295,10 @@ export interface IIntegrityPathWithReportJob {
items: { path: string; reportId: string | null }[];
}
export interface IIntegrityPathWithChecksumJob {
items: { path: string; reportId: string | null; checksum?: Buffer | null }[];
}
export interface JobCounts {
active: number;
completed: number;
@@ -409,10 +413,12 @@ export type JobItem =
// Integrity
| { name: JobName.IntegrityOrphanedFilesQueueAll; data?: IIntegrityJob }
| { name: JobName.IntegrityOrphanedFiles; data: IIntegrityOrphanedFilesJob }
| { name: JobName.IntegrityOrphanedCheckReports; data: IIntegrityPathWithReportJob }
| { name: JobName.IntegrityOrphanedFilesRefresh; data: IIntegrityPathWithReportJob }
| { name: JobName.IntegrityMissingFilesQueueAll; data?: IIntegrityJob }
| { name: JobName.IntegrityMissingFiles; data: IIntegrityPathWithReportJob }
| { name: JobName.IntegrityChecksumFiles; data?: IIntegrityJob };
| { name: JobName.IntegrityMissingFilesRefresh; data: IIntegrityPathWithReportJob }
| { name: JobName.IntegrityChecksumFiles; data?: IIntegrityJob }
| { name: JobName.IntegrityChecksumFilesRefresh; data?: IIntegrityPathWithChecksumJob };
export type VectorExtension = (typeof VECTOR_EXTENSIONS)[number];