chore(server): cleanup library watching (#8835)

chore: clean up library watching
This commit is contained in:
Jason Rasmussen
2024-04-15 23:05:08 -04:00
committed by GitHub
parent 1c1e461936
commit dba365634a
23 changed files with 56 additions and 1088 deletions

View File

@@ -31,14 +31,6 @@ export interface WatchEvents {
onError(error: Error): void;
}
export enum StorageEventType {
READY = 'ready',
ADD = 'add',
CHANGE = 'change',
UNLINK = 'unlink',
ERROR = 'error',
}
export interface IStorageRepository {
createZipStream(): ImmichZipStream;
createReadStream(filepath: string, mimeType?: string | null): Promise<ImmichReadStream>;

View File

@@ -10,7 +10,6 @@ import {
IStorageRepository,
ImmichReadStream,
ImmichZipStream,
StorageEventType,
WatchEvents,
} from 'src/interfaces/storage.interface';
import { Instrumentation } from 'src/utils/instrumentation';
@@ -173,11 +172,11 @@ export class StorageRepository implements IStorageRepository {
watch(paths: string[], options: WatchOptions, events: Partial<WatchEvents>) {
const watcher = chokidar.watch(paths, options);
watcher.on(StorageEventType.READY, () => events.onReady?.());
watcher.on(StorageEventType.ADD, (path) => events.onAdd?.(path));
watcher.on(StorageEventType.CHANGE, (path) => events.onChange?.(path));
watcher.on(StorageEventType.UNLINK, (path) => events.onUnlink?.(path));
watcher.on(StorageEventType.ERROR, (error) => events.onError?.(error));
watcher.on('ready', () => events.onReady?.());
watcher.on('add', (path) => events.onAdd?.(path));
watcher.on('change', (path) => events.onChange?.(path));
watcher.on('unlink', (path) => events.onUnlink?.(path));
watcher.on('error', (error) => events.onError?.(error));
return () => watcher.close();
}

View File

@@ -13,7 +13,7 @@ import { ICryptoRepository } from 'src/interfaces/crypto.interface';
import { IDatabaseRepository } from 'src/interfaces/database.interface';
import { IJobRepository, ILibraryFileJob, ILibraryRefreshJob, JobName, JobStatus } from 'src/interfaces/job.interface';
import { ILibraryRepository } from 'src/interfaces/library.interface';
import { IStorageRepository, StorageEventType } from 'src/interfaces/storage.interface';
import { IStorageRepository } from 'src/interfaces/storage.interface';
import { ISystemConfigRepository } from 'src/interfaces/system-config.interface';
import { LibraryService } from 'src/services/library.service';
import { assetStub } from 'test/fixtures/asset.stub';
@@ -933,12 +933,6 @@ describe(LibraryService.name, () => {
type: LibraryType.EXTERNAL,
importPaths: libraryStub.externalLibraryWithImportPaths1.importPaths,
});
expect(storageMock.watch).toHaveBeenCalledWith(
libraryStub.externalLibraryWithImportPaths1.importPaths,
expect.anything(),
expect.anything(),
);
});
it('should create with exclusion patterns', async () => {
@@ -1087,45 +1081,6 @@ describe(LibraryService.name, () => {
await expect(sut.update('library-id', {})).resolves.toEqual(mapLibrary(libraryStub.uploadLibrary1));
expect(libraryMock.update).toHaveBeenCalledWith(expect.objectContaining({ id: 'library-id' }));
});
it('should re-watch library when updating import paths', async () => {
libraryMock.update.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
storageMock.stat.mockResolvedValue({
isDirectory: () => true,
} as Stats);
storageMock.checkFileExists.mockResolvedValue(true);
await expect(sut.update('library-id', { importPaths: ['/data/user1/foo'] })).resolves.toEqual(
mapLibrary(libraryStub.externalLibraryWithImportPaths1),
);
expect(libraryMock.update).toHaveBeenCalledWith(expect.objectContaining({ id: 'library-id' }));
expect(storageMock.watch).toHaveBeenCalledWith(
libraryStub.externalLibraryWithImportPaths1.importPaths,
expect.anything(),
expect.anything(),
);
});
it('should re-watch library when updating exclusion patterns', async () => {
libraryMock.update.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
configMock.load.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
await expect(sut.update('library-id', { exclusionPatterns: ['bar'] })).resolves.toEqual(
mapLibrary(libraryStub.externalLibraryWithImportPaths1),
);
expect(libraryMock.update).toHaveBeenCalledWith(expect.objectContaining({ id: 'library-id' }));
expect(storageMock.watch).toHaveBeenCalledWith(
expect.arrayContaining([expect.any(String)]),
expect.anything(),
expect.anything(),
);
});
});
describe('watchAll', () => {
@@ -1198,9 +1153,7 @@ describe(LibraryService.name, () => {
it('should handle a new file event', async () => {
libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]);
storageMock.watch.mockImplementation(
makeMockWatcher({ items: [{ event: StorageEventType.ADD, value: '/foo/photo.jpg' }] }),
);
storageMock.watch.mockImplementation(makeMockWatcher({ items: [{ event: 'add', value: '/foo/photo.jpg' }] }));
await sut.watchAll();
@@ -1221,7 +1174,7 @@ describe(LibraryService.name, () => {
libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]);
storageMock.watch.mockImplementation(
makeMockWatcher({ items: [{ event: StorageEventType.CHANGE, value: '/foo/photo.jpg' }] }),
makeMockWatcher({ items: [{ event: 'change', value: '/foo/photo.jpg' }] }),
);
await sut.watchAll();
@@ -1244,7 +1197,7 @@ describe(LibraryService.name, () => {
libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]);
assetMock.getByLibraryIdAndOriginalPath.mockResolvedValue(assetStub.external);
storageMock.watch.mockImplementation(
makeMockWatcher({ items: [{ event: StorageEventType.UNLINK, value: '/foo/photo.jpg' }] }),
makeMockWatcher({ items: [{ event: 'unlink', value: '/foo/photo.jpg' }] }),
);
await sut.watchAll();
@@ -1258,19 +1211,17 @@ describe(LibraryService.name, () => {
libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]);
storageMock.watch.mockImplementation(
makeMockWatcher({
items: [{ event: StorageEventType.ERROR, value: 'Error!' }],
items: [{ event: 'error', value: 'Error!' }],
}),
);
await expect(sut.watchAll()).rejects.toThrow('Error!');
await expect(sut.watchAll()).resolves.toBeUndefined();
});
it('should ignore unknown extensions', async () => {
libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]);
storageMock.watch.mockImplementation(
makeMockWatcher({ items: [{ event: StorageEventType.ADD, value: '/foo/photo.jpg' }] }),
);
storageMock.watch.mockImplementation(makeMockWatcher({ items: [{ event: 'add', value: '/foo/photo.jpg' }] }));
await sut.watchAll();
@@ -1280,9 +1231,7 @@ describe(LibraryService.name, () => {
it('should ignore excluded paths', async () => {
libraryMock.get.mockResolvedValue(libraryStub.patternPath);
libraryMock.getAll.mockResolvedValue([libraryStub.patternPath]);
storageMock.watch.mockImplementation(
makeMockWatcher({ items: [{ event: StorageEventType.ADD, value: '/dir1/photo.txt' }] }),
);
storageMock.watch.mockImplementation(makeMockWatcher({ items: [{ event: 'add', value: '/dir1/photo.txt' }] }));
await sut.watchAll();
@@ -1292,9 +1241,7 @@ describe(LibraryService.name, () => {
it('should ignore excluded paths without case sensitivity', async () => {
libraryMock.get.mockResolvedValue(libraryStub.patternPath);
libraryMock.getAll.mockResolvedValue([libraryStub.patternPath]);
storageMock.watch.mockImplementation(
makeMockWatcher({ items: [{ event: StorageEventType.ADD, value: '/DIR1/photo.txt' }] }),
);
storageMock.watch.mockImplementation(makeMockWatcher({ items: [{ event: 'add', value: '/DIR1/photo.txt' }] }));
await sut.watchAll();

View File

@@ -1,7 +1,6 @@
import { BadRequestException, Inject, Injectable } from '@nestjs/common';
import { Trie } from 'mnemonist';
import { R_OK } from 'node:constants';
import { EventEmitter } from 'node:events';
import { Stats } from 'node:fs';
import path, { basename, parse } from 'node:path';
import picomatch from 'picomatch';
@@ -37,7 +36,7 @@ import {
JobStatus,
} from 'src/interfaces/job.interface';
import { ILibraryRepository } from 'src/interfaces/library.interface';
import { IStorageRepository, StorageEventType } from 'src/interfaces/storage.interface';
import { IStorageRepository } from 'src/interfaces/storage.interface';
import { ISystemConfigRepository } from 'src/interfaces/system-config.interface';
import { ImmichLogger } from 'src/utils/logger';
import { mimeTypes } from 'src/utils/mime-types';
@@ -48,7 +47,7 @@ import { validateCronExpression } from 'src/validation';
const LIBRARY_SCAN_BATCH_SIZE = 5000;
@Injectable()
export class LibraryService extends EventEmitter {
export class LibraryService {
readonly logger = new ImmichLogger(LibraryService.name);
private configCore: SystemConfigCore;
private watchLibraries = false;
@@ -64,7 +63,6 @@ export class LibraryService extends EventEmitter {
@Inject(IStorageRepository) private storageRepository: IStorageRepository,
@Inject(IDatabaseRepository) private databaseRepository: IDatabaseRepository,
) {
super();
this.configCore = SystemConfigCore.create(configRepository);
}
@@ -152,7 +150,6 @@ export class LibraryService extends EventEmitter {
if (matcher(path)) {
await this.scanAssets(library.id, [path], library.ownerId, false);
}
this.emit(StorageEventType.ADD, path);
};
return handlePromiseError(handler(), this.logger);
},
@@ -163,7 +160,6 @@ export class LibraryService extends EventEmitter {
// Note: if the changed file was not previously imported, it will be imported now.
await this.scanAssets(library.id, [path], library.ownerId, false);
}
this.emit(StorageEventType.CHANGE, path);
};
return handlePromiseError(handler(), this.logger);
},
@@ -174,13 +170,11 @@ export class LibraryService extends EventEmitter {
if (asset && matcher(path)) {
await this.assetRepository.update({ id: asset.id, isOffline: true });
}
this.emit(StorageEventType.UNLINK, path);
};
return handlePromiseError(handler(), this.logger);
},
onError: (error) => {
this.logger.error(`Library watcher for library ${library.id} encountered error: ${error}`);
this.emit(StorageEventType.ERROR, error);
},
},
);
@@ -281,10 +275,6 @@ export class LibraryService extends EventEmitter {
this.logger.log(`Creating ${dto.type} library for ${dto.ownerId}}`);
if (dto.type === LibraryType.EXTERNAL) {
await this.watch(library.id);
}
return mapLibrary(library);
}
@@ -368,11 +358,6 @@ export class LibraryService extends EventEmitter {
}
}
if (dto.importPaths || dto.exclusionPatterns) {
// Re-watch library to use new paths and/or exclusion patterns
await this.watch(id);
}
return mapLibrary(library);
}