Files
immich/server/src/services/library.service.ts
Tijs 3e3ca4c104 feat(server): exclude syncthing folders from external libraries (#24240)
* Add SyncThing folders to External library exclusion

SyncThing is a popular library for syncing files (like pictures) between systems. It can really mess up your library if an external library, which is also used by SyncThing, is added and these folders are not excluded.

* Plural

* fix formatting

---------

Co-authored-by: Jonathan Jogenfors <jonathan@jogenfors.se>
2025-11-28 11:40:33 +00:00

782 lines
25 KiB
TypeScript

import { BadRequestException, Injectable } from '@nestjs/common';
import { Insertable } from 'kysely';
import { R_OK } from 'node:constants';
import { Stats } from 'node:fs';
import path, { basename, isAbsolute, parse } from 'node:path';
import picomatch from 'picomatch';
import { JOBS_LIBRARY_PAGINATION_SIZE } from 'src/constants';
import { StorageCore } from 'src/cores/storage.core';
import { OnEvent, OnJob } from 'src/decorators';
import {
CreateLibraryDto,
LibraryResponseDto,
LibraryStatsResponseDto,
mapLibrary,
UpdateLibraryDto,
ValidateLibraryDto,
ValidateLibraryImportPathResponseDto,
ValidateLibraryResponseDto,
} from 'src/dtos/library.dto';
import { AssetStatus, AssetType, CronJob, DatabaseLock, ImmichWorker, JobName, JobStatus, QueueName } from 'src/enum';
import { ArgOf } from 'src/repositories/event.repository';
import { AssetSyncResult } from 'src/repositories/library.repository';
import { AssetTable } from 'src/schema/tables/asset.table';
import { BaseService } from 'src/services/base.service';
import { JobOf } from 'src/types';
import { mimeTypes } from 'src/utils/mime-types';
import { handlePromiseError } from 'src/utils/misc';
@Injectable()
export class LibraryService extends BaseService {
private watchLibraries = false;
private lock = false;
private watchers: Record<string, () => Promise<void>> = {};
@OnEvent({ name: 'ConfigInit', workers: [ImmichWorker.Microservices] })
async onConfigInit({
newConfig: {
library: { watch, scan },
},
}: ArgOf<'ConfigInit'>) {
// This ensures that library watching only occurs in one microservice
this.lock = await this.databaseRepository.tryLock(DatabaseLock.Library);
this.watchLibraries = this.lock && watch.enabled;
if (this.lock) {
this.cronRepository.create({
name: CronJob.LibraryScan,
expression: scan.cronExpression,
onTick: () => handlePromiseError(this.jobRepository.queue({ name: JobName.LibraryScanQueueAll }), this.logger),
start: scan.enabled,
});
}
if (this.watchLibraries) {
await this.watchAll();
}
}
@OnEvent({ name: 'ConfigUpdate', server: true })
async onConfigUpdate({ newConfig: { library } }: ArgOf<'ConfigUpdate'>) {
if (!this.lock) {
return;
}
this.cronRepository.update({
name: CronJob.LibraryScan,
expression: library.scan.cronExpression,
start: library.scan.enabled,
});
if (library.watch.enabled !== this.watchLibraries) {
// Watch configuration changed, update accordingly
this.watchLibraries = library.watch.enabled;
await (this.watchLibraries ? this.watchAll() : this.unwatchAll());
}
}
private async watch(id: string): Promise<boolean> {
if (!this.watchLibraries) {
return false;
}
const library = await this.findOrFail(id);
if (library.importPaths.length === 0) {
return false;
}
await this.unwatch(id);
this.logger.log(`Starting to watch library ${library.id} with import path(s) ${library.importPaths}`);
const matcher = picomatch(`**/*{${mimeTypes.getSupportedFileExtensions().join(',')}}`, {
nocase: true,
ignore: library.exclusionPatterns,
});
let _resolve: () => void;
const ready$ = new Promise<void>((resolve) => (_resolve = resolve));
const handler = async (event: string, path: string) => {
if (matcher(path)) {
this.logger.debug(`File ${event} event received for ${path} in library ${library.id}}`);
await this.jobRepository.queue({
name: JobName.LibrarySyncFiles,
data: { libraryId: library.id, paths: [path] },
});
} else {
this.logger.verbose(`Ignoring file ${event} event for ${path} in library ${library.id}`);
}
};
const deletionHandler = async (path: string) => {
this.logger.debug(`File unlink event received for ${path} in library ${library.id}}`);
await this.jobRepository.queue({
name: JobName.LibraryRemoveAsset,
data: { libraryId: library.id, paths: [path] },
});
};
this.watchers[id] = this.storageRepository.watch(
library.importPaths,
{
usePolling: false,
ignoreInitial: true,
awaitWriteFinish: {
stabilityThreshold: 5000,
pollInterval: 1000,
},
},
{
onReady: () => _resolve(),
onAdd: (path) => {
return handlePromiseError(handler('add', path), this.logger);
},
onChange: (path) => {
return handlePromiseError(handler('change', path), this.logger);
},
onUnlink: (path) => {
return handlePromiseError(deletionHandler(path), this.logger);
},
onError: (error) => {
this.logger.error(`Library watcher for library ${library.id} encountered error: ${error}`);
},
},
);
// Wait for the watcher to initialize before returning
await ready$;
return true;
}
async unwatch(id: string) {
if (this.watchers[id]) {
await this.watchers[id]();
delete this.watchers[id];
}
}
@OnEvent({ name: 'AppShutdown' })
async onShutdown() {
await this.unwatchAll();
}
private async unwatchAll() {
if (!this.lock) {
return false;
}
for (const id in this.watchers) {
await this.unwatch(id);
}
}
async watchAll() {
if (!this.lock) {
return false;
}
const libraries = await this.libraryRepository.getAll(false);
for (const library of libraries) {
await this.watch(library.id);
}
}
async getStatistics(id: string): Promise<LibraryStatsResponseDto> {
const statistics = await this.libraryRepository.getStatistics(id);
if (!statistics) {
throw new BadRequestException(`Library ${id} not found`);
}
return statistics;
}
async get(id: string): Promise<LibraryResponseDto> {
const library = await this.findOrFail(id);
return mapLibrary(library);
}
async getAll(): Promise<LibraryResponseDto[]> {
const libraries = await this.libraryRepository.getAll(false);
return libraries.map((library) => mapLibrary(library));
}
@OnJob({ name: JobName.LibraryDeleteCheck, queue: QueueName.Library })
async handleQueueCleanup(): Promise<JobStatus> {
this.logger.log('Checking for any libraries pending deletion...');
const pendingDeletions = await this.libraryRepository.getAllDeleted();
if (pendingDeletions.length > 0) {
const libraryString = pendingDeletions.length === 1 ? 'library' : 'libraries';
this.logger.log(`Found ${pendingDeletions.length} ${libraryString} pending deletion, cleaning up...`);
await this.jobRepository.queueAll(
pendingDeletions.map((libraryToDelete) => ({ name: JobName.LibraryDelete, data: { id: libraryToDelete.id } })),
);
}
return JobStatus.Success;
}
async create(dto: CreateLibraryDto): Promise<LibraryResponseDto> {
const library = await this.libraryRepository.create({
ownerId: dto.ownerId,
name: dto.name ?? 'New External Library',
importPaths: dto.importPaths ?? [],
exclusionPatterns: dto.exclusionPatterns ?? [
'**/@eaDir/**',
'**/._*',
'**/#recycle/**',
'**/#snapshot/**',
'**/.stversions/**',
'**/.stfolder/**',
],
});
return mapLibrary(library);
}
@OnJob({ name: JobName.LibrarySyncFiles, queue: QueueName.Library })
async handleSyncFiles(job: JobOf<JobName.LibrarySyncFiles>): Promise<JobStatus> {
const library = await this.libraryRepository.get(job.libraryId);
// We need to check if the library still exists as it could have been deleted after the scan was queued
if (!library) {
this.logger.debug(`Library ${job.libraryId} not found, skipping file import`);
return JobStatus.Failed;
} else if (library.deletedAt) {
this.logger.debug(`Library ${job.libraryId} is deleted, won't import assets into it`);
return JobStatus.Failed;
}
const assetImports: Insertable<AssetTable>[] = [];
await Promise.all(
job.paths.map((path) =>
this.processEntity(path, library.ownerId, job.libraryId)
.then((asset) => assetImports.push(asset))
.catch((error: any) => this.logger.error(`Error processing ${path} for library ${job.libraryId}: ${error}`)),
),
);
const assetIds: string[] = [];
for (let i = 0; i < assetImports.length; i += 5000) {
// Chunk the imports to avoid the postgres limit of max parameters at once
const chunk = assetImports.slice(i, i + 5000);
await this.assetRepository.createAll(chunk).then((assets) => assetIds.push(...assets.map((asset) => asset.id)));
}
const progressMessage =
job.progressCounter && job.totalAssets
? `(${job.progressCounter} of ${job.totalAssets})`
: `(${job.progressCounter} done so far)`;
this.logger.log(`Imported ${assetIds.length} ${progressMessage} file(s) into library ${job.libraryId}`);
await this.queuePostSyncJobs(assetIds);
return JobStatus.Success;
}
private async validateImportPath(importPath: string): Promise<ValidateLibraryImportPathResponseDto> {
const validation = new ValidateLibraryImportPathResponseDto();
validation.importPath = importPath;
if (StorageCore.isImmichPath(importPath)) {
validation.message = 'Cannot use media upload folder for external libraries';
return validation;
}
if (!isAbsolute(importPath)) {
validation.message = `Import path must be absolute, try ${path.resolve(importPath)}`;
return validation;
}
try {
const stat = await this.storageRepository.stat(importPath);
if (!stat.isDirectory()) {
validation.message = 'Not a directory';
return validation;
}
} catch (error: any) {
if (error.code === 'ENOENT') {
validation.message = 'Path does not exist (ENOENT)';
return validation;
}
validation.message = String(error);
return validation;
}
const access = await this.storageRepository.checkFileExists(importPath, R_OK);
if (!access) {
validation.message = 'Lacking read permission for folder';
return validation;
}
validation.isValid = true;
return validation;
}
async validate(id: string, dto: ValidateLibraryDto): Promise<ValidateLibraryResponseDto> {
const importPaths = await Promise.all(
(dto.importPaths || []).map((importPath) => this.validateImportPath(importPath)),
);
return { importPaths };
}
async update(id: string, dto: UpdateLibraryDto): Promise<LibraryResponseDto> {
await this.findOrFail(id);
if (dto.importPaths) {
const validation = await this.validate(id, { importPaths: dto.importPaths });
if (validation.importPaths) {
for (const path of validation.importPaths) {
if (!path.isValid) {
throw new BadRequestException(`Invalid import path: ${path.message}`);
}
}
}
}
const library = await this.libraryRepository.update(id, dto);
return mapLibrary(library);
}
async delete(id: string) {
await this.findOrFail(id);
if (this.watchLibraries) {
await this.unwatch(id);
}
await this.libraryRepository.softDelete(id);
await this.jobRepository.queue({ name: JobName.LibraryDelete, data: { id } });
}
@OnJob({ name: JobName.LibraryDelete, queue: QueueName.Library })
async handleDeleteLibrary(job: JobOf<JobName.LibraryDelete>): Promise<JobStatus> {
const libraryId = job.id;
await this.assetRepository.updateByLibraryId(libraryId, { deletedAt: new Date() });
let assetsFound = false;
let chunk: string[] = [];
const queueChunk = async () => {
if (chunk.length > 0) {
assetsFound = true;
this.logger.debug(`Queueing deletion of ${chunk.length} asset(s) in library ${libraryId}`);
await this.jobRepository.queueAll(
chunk.map((id) => ({ name: JobName.AssetDelete, data: { id, deleteOnDisk: false } })),
);
chunk = [];
}
};
this.logger.debug(`Will delete all assets in library ${libraryId}`);
const assets = this.libraryRepository.streamAssetIds(libraryId);
for await (const asset of assets) {
chunk.push(asset.id);
if (chunk.length >= JOBS_LIBRARY_PAGINATION_SIZE) {
await queueChunk();
}
}
await queueChunk();
if (!assetsFound) {
this.logger.log(`Deleting library ${libraryId}`);
await this.libraryRepository.delete(libraryId);
}
return JobStatus.Success;
}
private async processEntity(filePath: string, ownerId: string, libraryId: string) {
const assetPath = path.normalize(filePath);
const stat = await this.storageRepository.stat(assetPath);
return {
ownerId,
libraryId,
checksum: this.cryptoRepository.hashSha1(`path:${assetPath}`),
originalPath: assetPath,
fileCreatedAt: stat.mtime,
fileModifiedAt: stat.mtime,
localDateTime: stat.mtime,
// TODO: device asset id is deprecated, remove it
deviceAssetId: `${basename(assetPath)}`.replaceAll(/\s+/g, ''),
deviceId: 'Library Import',
type: mimeTypes.isVideo(assetPath) ? AssetType.Video : AssetType.Image,
originalFileName: parse(assetPath).base,
isExternal: true,
livePhotoVideoId: null,
};
}
async queuePostSyncJobs(assetIds: string[]) {
this.logger.debug(`Queuing sidecar discovery for ${assetIds.length} asset(s)`);
// We queue a sidecar discovery which, in turn, queues metadata extraction
await this.jobRepository.queueAll(
assetIds.map((assetId) => ({
name: JobName.SidecarCheck,
data: { id: assetId, source: 'upload' },
})),
);
}
async queueScan(id: string) {
await this.findOrFail(id);
this.logger.log(`Starting to scan library ${id}`);
await this.jobRepository.queue({
name: JobName.LibrarySyncFilesQueueAll,
data: {
id,
},
});
await this.jobRepository.queue({ name: JobName.LibrarySyncAssetsQueueAll, data: { id } });
}
async queueScanAll() {
await this.jobRepository.queue({ name: JobName.LibraryScanQueueAll, data: {} });
}
@OnJob({ name: JobName.LibraryScanQueueAll, queue: QueueName.Library })
async handleQueueScanAll(): Promise<JobStatus> {
this.logger.log(`Initiating scan of all external libraries...`);
await this.jobRepository.queue({ name: JobName.LibraryDeleteCheck, data: {} });
const libraries = await this.libraryRepository.getAll(true);
await this.jobRepository.queueAll(
libraries.map((library) => ({
name: JobName.LibrarySyncFilesQueueAll,
data: {
id: library.id,
},
})),
);
await this.jobRepository.queueAll(
libraries.map((library) => ({
name: JobName.LibrarySyncAssetsQueueAll,
data: {
id: library.id,
},
})),
);
return JobStatus.Success;
}
@OnJob({ name: JobName.LibrarySyncAssets, queue: QueueName.Library })
async handleSyncAssets(job: JobOf<JobName.LibrarySyncAssets>): Promise<JobStatus> {
const assets = await this.assetJobRepository.getForSyncAssets(job.assetIds);
const assetIdsToOffline: string[] = [];
const trashedAssetIdsToOffline: string[] = [];
const assetIdsToOnline: string[] = [];
const trashedAssetIdsToOnline: string[] = [];
const assetIdsToUpdate: string[] = [];
this.logger.debug(`Checking batch of ${assets.length} existing asset(s) in library ${job.libraryId}`);
const stats = await Promise.all(
assets.map((asset) => this.storageRepository.stat(asset.originalPath).catch(() => null)),
);
for (let i = 0; i < assets.length; i++) {
const asset = assets[i];
const stat = stats[i];
const action = this.checkExistingAsset(asset, stat);
switch (action) {
case AssetSyncResult.OFFLINE: {
if (asset.status === AssetStatus.Trashed) {
trashedAssetIdsToOffline.push(asset.id);
} else {
assetIdsToOffline.push(asset.id);
}
break;
}
case AssetSyncResult.UPDATE: {
assetIdsToUpdate.push(asset.id);
break;
}
case AssetSyncResult.CHECK_OFFLINE: {
const isInImportPath = job.importPaths.find((path) => asset.originalPath.startsWith(path));
if (!isInImportPath) {
this.logger.verbose(
`Offline asset ${asset.originalPath} is still not in any import path, keeping offline in library ${job.libraryId}`,
);
break;
}
const isExcluded = job.exclusionPatterns.some((pattern) => picomatch.isMatch(asset.originalPath, pattern));
if (!isExcluded) {
this.logger.debug(`Offline asset ${asset.originalPath} is now online in library ${job.libraryId}`);
if (asset.status === AssetStatus.Trashed) {
trashedAssetIdsToOnline.push(asset.id);
} else {
assetIdsToOnline.push(asset.id);
}
break;
}
this.logger.verbose(
`Offline asset ${asset.originalPath} is in an import path but still covered by exclusion pattern, keeping offline in library ${job.libraryId}`,
);
break;
}
}
}
const promises = [];
if (assetIdsToOffline.length > 0) {
promises.push(this.assetRepository.updateAll(assetIdsToOffline, { isOffline: true, deletedAt: new Date() }));
}
if (trashedAssetIdsToOffline.length > 0) {
promises.push(this.assetRepository.updateAll(trashedAssetIdsToOffline, { isOffline: true }));
}
if (assetIdsToOnline.length > 0) {
promises.push(this.assetRepository.updateAll(assetIdsToOnline, { isOffline: false, deletedAt: null }));
}
if (trashedAssetIdsToOnline.length > 0) {
promises.push(this.assetRepository.updateAll(trashedAssetIdsToOnline, { isOffline: false }));
}
if (assetIdsToUpdate.length > 0) {
promises.push(this.queuePostSyncJobs(assetIdsToUpdate));
}
await Promise.all(promises);
const remainingCount = assets.length - assetIdsToOffline.length - assetIdsToUpdate.length - assetIdsToOnline.length;
const cumulativePercentage = ((100 * job.progressCounter) / job.totalAssets).toFixed(1);
this.logger.log(
`Checked existing asset(s): ${assetIdsToOffline.length + trashedAssetIdsToOffline.length} offlined, ${assetIdsToOnline.length + trashedAssetIdsToOnline.length} onlined, ${assetIdsToUpdate.length} updated, ${remainingCount} unchanged of current batch of ${assets.length} (Total progress: ${job.progressCounter} of ${job.totalAssets}, ${cumulativePercentage} %) in library ${job.libraryId}.`,
);
return JobStatus.Success;
}
private checkExistingAsset(
asset: {
isOffline: boolean;
libraryId: string | null;
originalPath: string;
status: AssetStatus;
fileModifiedAt: Date;
},
stat: Stats | null,
): AssetSyncResult {
if (!stat) {
// File not found on disk or permission error
if (asset.isOffline) {
this.logger.verbose(
`Asset ${asset.originalPath} is still not accessible, keeping offline in library ${asset.libraryId}`,
);
return AssetSyncResult.DO_NOTHING;
}
this.logger.debug(
`Asset ${asset.originalPath} is no longer on disk or is inaccessible because of permissions, marking offline in library ${asset.libraryId}`,
);
return AssetSyncResult.OFFLINE;
}
if (asset.isOffline && asset.status !== AssetStatus.Deleted) {
// Only perform the expensive check if the asset is offline
return AssetSyncResult.CHECK_OFFLINE;
}
if (stat.mtime.valueOf() !== asset.fileModifiedAt.valueOf()) {
this.logger.verbose(`Asset ${asset.originalPath} needs metadata extraction in library ${asset.libraryId}`);
return AssetSyncResult.UPDATE;
}
return AssetSyncResult.DO_NOTHING;
}
@OnJob({ name: JobName.LibrarySyncFilesQueueAll, queue: QueueName.Library })
async handleQueueSyncFiles(job: JobOf<JobName.LibrarySyncFilesQueueAll>): Promise<JobStatus> {
const library = await this.libraryRepository.get(job.id);
if (!library) {
this.logger.debug(`Library ${job.id} not found, skipping refresh`);
return JobStatus.Skipped;
}
this.logger.debug(`Validating import paths for library ${library.id}...`);
const validImportPaths: string[] = [];
for (const importPath of library.importPaths) {
const validation = await this.validateImportPath(importPath);
if (validation.isValid) {
validImportPaths.push(path.normalize(importPath));
} else {
this.logger.warn(`Skipping invalid import path: ${importPath}. Reason: ${validation.message}`);
}
}
if (validImportPaths.length === 0) {
this.logger.warn(`No valid import paths found for library ${library.id}`);
return JobStatus.Skipped;
}
const pathsOnDisk = this.storageRepository.walk({
pathsToCrawl: validImportPaths,
includeHidden: false,
exclusionPatterns: library.exclusionPatterns,
take: JOBS_LIBRARY_PAGINATION_SIZE,
});
let importCount = 0;
let crawlCount = 0;
this.logger.log(`Starting disk crawl of ${validImportPaths.length} import path(s) for library ${library.id}...`);
for await (const pathBatch of pathsOnDisk) {
crawlCount += pathBatch.length;
const paths = await this.assetRepository.filterNewExternalAssetPaths(library.id, pathBatch);
if (paths.length > 0) {
importCount += paths.length;
await this.jobRepository.queue({
name: JobName.LibrarySyncFiles,
data: {
libraryId: library.id,
paths,
progressCounter: crawlCount,
},
});
}
this.logger.log(
`Crawled ${crawlCount} file(s) so far: ${paths.length} of current batch of ${pathBatch.length} will be imported to library ${library.id}...`,
);
}
this.logger.log(
`Finished disk crawl, ${crawlCount} file(s) found on disk and queued ${importCount} file(s) for import into ${library.id}`,
);
await this.libraryRepository.update(job.id, { refreshedAt: new Date() });
return JobStatus.Success;
}
@OnJob({ name: JobName.LibraryRemoveAsset, queue: QueueName.Library })
async handleAssetRemoval(job: JobOf<JobName.LibraryRemoveAsset>): Promise<JobStatus> {
// This is only for handling file unlink events via the file watcher
this.logger.verbose(`Deleting asset(s) ${job.paths} from library ${job.libraryId}`);
for (const assetPath of job.paths) {
const asset = await this.assetRepository.getByLibraryIdAndOriginalPath(job.libraryId, assetPath);
if (asset) {
await this.assetRepository.remove(asset);
}
}
return JobStatus.Success;
}
@OnJob({ name: JobName.LibrarySyncAssetsQueueAll, queue: QueueName.Library })
async handleQueueSyncAssets(job: JobOf<JobName.LibrarySyncAssetsQueueAll>): Promise<JobStatus> {
const library = await this.libraryRepository.get(job.id);
if (!library) {
return JobStatus.Skipped;
}
const assetCount = await this.assetRepository.getLibraryAssetCount(job.id);
if (!assetCount) {
this.logger.log(`Library ${library.id} is empty, no need to check assets`);
return JobStatus.Success;
}
this.logger.log(
`Checking ${assetCount} asset(s) against import paths and exclusion patterns in library ${library.id}...`,
);
const offlineResult = await this.assetRepository.detectOfflineExternalAssets(
library.id,
library.importPaths,
library.exclusionPatterns,
);
const affectedAssetCount = Number(offlineResult.numUpdatedRows);
this.logger.log(
`${affectedAssetCount} asset(s) out of ${assetCount} were offlined due to import paths and/or exclusion pattern(s) in library ${library.id}`,
);
if (affectedAssetCount === assetCount) {
return JobStatus.Success;
}
let chunk: string[] = [];
let count = 0;
const queueChunk = async () => {
if (chunk.length > 0) {
count += chunk.length;
await this.jobRepository.queue({
name: JobName.LibrarySyncAssets,
data: {
libraryId: library.id,
importPaths: library.importPaths,
exclusionPatterns: library.exclusionPatterns,
assetIds: chunk.map((id) => id),
progressCounter: count,
totalAssets: assetCount,
},
});
chunk = [];
const completePercentage = ((100 * count) / assetCount).toFixed(1);
this.logger.log(
`Queued check of ${count} of ${assetCount} (${completePercentage} %) existing asset(s) so far in library ${library.id}`,
);
}
};
this.logger.log(`Scanning library ${library.id} for assets missing from disk...`);
const existingAssets = this.libraryRepository.streamAssetIds(library.id);
for await (const asset of existingAssets) {
chunk.push(asset.id);
if (chunk.length === JOBS_LIBRARY_PAGINATION_SIZE) {
await queueChunk();
}
}
await queueChunk();
this.logger.log(`Finished queuing ${count} asset check(s) for library ${library.id}`);
return JobStatus.Success;
}
private async findOrFail(id: string) {
const library = await this.libraryRepository.get(id);
if (!library) {
throw new BadRequestException('Library not found');
}
return library;
}
}