chore(server): Move library watcher to microservices (#7533)

* move watcher init to micro

* document watcher recovery

* chore: fix lint

* add try lock

* use global library watch lock

* fix: ensure lock stays on

* fix: mocks

* unit test for library watch lock

* move statement to correct test

* fix: correct return type of try lock

* fix: tests

* add library teardown

* add chokidar error handler

* make event strings an enum

* wait for event refactor

* refactor event type mocks

* expect correct error

* don't release lock in teardown

* chore: lint

* use enum

* fix mock

* fix lint

* fix watcher await

* remove await

* simplify typing

* remove async

* Revert "remove async"

This reverts commit 84ab5abac4.

* can now change watch settings at runtime

* fix lint

* only watch libraries if enabled

---------

Co-authored-by: mertalev <101130780+mertalev@users.noreply.github.com>
Co-authored-by: Alex Tran <alex.tran1502@gmail.com>
This commit is contained in:
Jonathan Jogenfors
2024-03-07 18:36:53 +01:00
committed by GitHub
parent 3278dcbcbe
commit 4cb0f37918
14 changed files with 149 additions and 81 deletions

View File

@@ -9,11 +9,11 @@ import {
newAccessRepositoryMock,
newAssetRepositoryMock,
newCryptoRepositoryMock,
newDatabaseRepositoryMock,
newJobRepositoryMock,
newLibraryRepositoryMock,
newStorageRepositoryMock,
newSystemConfigRepositoryMock,
newUserRepositoryMock,
systemConfigStub,
userStub,
} from '@test';
@@ -23,11 +23,12 @@ import { ILibraryFileJob, ILibraryRefreshJob, JobName } from '../job';
import {
IAssetRepository,
ICryptoRepository,
IDatabaseRepository,
IJobRepository,
ILibraryRepository,
IStorageRepository,
ISystemConfigRepository,
IUserRepository,
StorageEventType,
} from '../repositories';
import { SystemConfigCore } from '../system-config/system-config.core';
import { mapLibrary } from './library.dto';
@@ -40,20 +41,20 @@ describe(LibraryService.name, () => {
let assetMock: jest.Mocked<IAssetRepository>;
let configMock: jest.Mocked<ISystemConfigRepository>;
let cryptoMock: jest.Mocked<ICryptoRepository>;
let userMock: jest.Mocked<IUserRepository>;
let jobMock: jest.Mocked<IJobRepository>;
let libraryMock: jest.Mocked<ILibraryRepository>;
let storageMock: jest.Mocked<IStorageRepository>;
let databaseMock: jest.Mocked<IDatabaseRepository>;
beforeEach(() => {
accessMock = newAccessRepositoryMock();
configMock = newSystemConfigRepositoryMock();
libraryMock = newLibraryRepositoryMock();
userMock = newUserRepositoryMock();
assetMock = newAssetRepositoryMock();
jobMock = newJobRepositoryMock();
cryptoMock = newCryptoRepositoryMock();
storageMock = newStorageRepositoryMock();
databaseMock = newDatabaseRepositoryMock();
// Always validate owner access for library.
accessMock.library.checkOwnerAccess.mockImplementation((_, libraryIds) => Promise.resolve(libraryIds));
@@ -66,8 +67,10 @@ describe(LibraryService.name, () => {
jobMock,
libraryMock,
storageMock,
userMock,
databaseMock,
);
databaseMock.tryLock.mockResolvedValue(true);
});
it('should work', () => {
@@ -125,13 +128,22 @@ describe(LibraryService.name, () => {
);
});
it('should not initialize when watching is disabled', async () => {
it('should not initialize watcher when watching is disabled', async () => {
configMock.load.mockResolvedValue(systemConfigStub.libraryWatchDisabled);
await sut.init();
expect(storageMock.watch).not.toHaveBeenCalled();
});
it('should not initialize watcher when lock is taken', async () => {
configMock.load.mockResolvedValue(systemConfigStub.libraryWatchEnabled);
databaseMock.tryLock.mockResolvedValue(false);
await sut.init();
expect(storageMock.watch).not.toHaveBeenCalled();
});
});
describe('handleQueueAssetRefresh', () => {
@@ -146,7 +158,6 @@ describe(LibraryService.name, () => {
storageMock.crawl.mockResolvedValue(['/data/user1/photo.jpg']);
assetMock.getPathsNotInLibrary.mockResolvedValue(['/data/user1/photo.jpg']);
assetMock.getByLibraryId.mockResolvedValue([]);
userMock.get.mockResolvedValue(userStub.admin);
await sut.handleQueueAssetRefresh(mockLibraryJob);
@@ -173,7 +184,6 @@ describe(LibraryService.name, () => {
libraryMock.get.mockResolvedValue(libraryStub.externalLibrary1);
storageMock.crawl.mockResolvedValue(['/data/user1/photo.jpg']);
assetMock.getByLibraryId.mockResolvedValue([]);
userMock.get.mockResolvedValue(userStub.admin);
await sut.handleQueueAssetRefresh(mockLibraryJob);
@@ -224,7 +234,6 @@ describe(LibraryService.name, () => {
libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
storageMock.crawl.mockResolvedValue([]);
assetMock.getByLibraryId.mockResolvedValue([]);
userMock.get.mockResolvedValue(userStub.externalPathRoot);
await sut.handleQueueAssetRefresh(mockLibraryJob);
@@ -240,7 +249,6 @@ describe(LibraryService.name, () => {
beforeEach(() => {
mockUser = userStub.admin;
userMock.get.mockResolvedValue(mockUser);
storageMock.stat.mockResolvedValue({
size: 100,
@@ -1167,7 +1175,9 @@ describe(LibraryService.name, () => {
it('should handle a new file event', async () => {
libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]);
storageMock.watch.mockImplementation(makeMockWatcher({ items: [{ event: 'add', value: '/foo/photo.jpg' }] }));
storageMock.watch.mockImplementation(
makeMockWatcher({ items: [{ event: StorageEventType.ADD, value: '/foo/photo.jpg' }] }),
);
await sut.watchAll();
@@ -1188,7 +1198,7 @@ describe(LibraryService.name, () => {
libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]);
storageMock.watch.mockImplementation(
makeMockWatcher({ items: [{ event: 'change', value: '/foo/photo.jpg' }] }),
makeMockWatcher({ items: [{ event: StorageEventType.CHANGE, value: '/foo/photo.jpg' }] }),
);
await sut.watchAll();
@@ -1211,7 +1221,7 @@ describe(LibraryService.name, () => {
libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]);
assetMock.getByLibraryIdAndOriginalPath.mockResolvedValue(assetStub.external);
storageMock.watch.mockImplementation(
makeMockWatcher({ items: [{ event: 'unlink', value: '/foo/photo.jpg' }] }),
makeMockWatcher({ items: [{ event: StorageEventType.UNLINK, value: '/foo/photo.jpg' }] }),
);
await sut.watchAll();
@@ -1225,17 +1235,19 @@ describe(LibraryService.name, () => {
libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]);
storageMock.watch.mockImplementation(
makeMockWatcher({
items: [{ event: 'error', value: 'Error!' }],
items: [{ event: StorageEventType.ERROR, value: 'Error!' }],
}),
);
await sut.watchAll();
await expect(sut.watchAll()).rejects.toThrow('Error!');
});
it('should ignore unknown extensions', async () => {
libraryMock.get.mockResolvedValue(libraryStub.externalLibraryWithImportPaths1);
libraryMock.getAll.mockResolvedValue([libraryStub.externalLibraryWithImportPaths1]);
storageMock.watch.mockImplementation(makeMockWatcher({ items: [{ event: 'add', value: '/foo/photo.jpg' }] }));
storageMock.watch.mockImplementation(
makeMockWatcher({ items: [{ event: StorageEventType.ADD, value: '/foo/photo.jpg' }] }),
);
await sut.watchAll();
@@ -1245,7 +1257,9 @@ describe(LibraryService.name, () => {
it('should ignore excluded paths', async () => {
libraryMock.get.mockResolvedValue(libraryStub.patternPath);
libraryMock.getAll.mockResolvedValue([libraryStub.patternPath]);
storageMock.watch.mockImplementation(makeMockWatcher({ items: [{ event: 'add', value: '/dir1/photo.txt' }] }));
storageMock.watch.mockImplementation(
makeMockWatcher({ items: [{ event: StorageEventType.ADD, value: '/dir1/photo.txt' }] }),
);
await sut.watchAll();
@@ -1255,7 +1269,9 @@ describe(LibraryService.name, () => {
it('should ignore excluded paths without case sensitivity', async () => {
libraryMock.get.mockResolvedValue(libraryStub.patternPath);
libraryMock.getAll.mockResolvedValue([libraryStub.patternPath]);
storageMock.watch.mockImplementation(makeMockWatcher({ items: [{ event: 'add', value: '/DIR1/photo.txt' }] }));
storageMock.watch.mockImplementation(
makeMockWatcher({ items: [{ event: StorageEventType.ADD, value: '/DIR1/photo.txt' }] }),
);
await sut.watchAll();
@@ -1264,7 +1280,7 @@ describe(LibraryService.name, () => {
});
});
describe('tearDown', () => {
describe('teardown', () => {
it('should tear down all watchers', async () => {
libraryMock.getAll.mockResolvedValue([
libraryStub.externalLibraryWithImportPaths1,
@@ -1286,7 +1302,7 @@ describe(LibraryService.name, () => {
storageMock.watch.mockImplementation(makeMockWatcher({ close: mockClose }));
await sut.init();
await sut.unwatchAll();
await sut.teardown();
expect(mockClose).toHaveBeenCalledTimes(2);
});

View File

@@ -13,14 +13,16 @@ import { handlePromiseError, usePagination, validateCronExpression } from '../do
import { IBaseJob, IEntityJob, ILibraryFileJob, ILibraryRefreshJob, JOBS_ASSET_PAGINATION_SIZE, JobName } from '../job';
import {
DatabaseLock,
IAccessRepository,
IAssetRepository,
ICryptoRepository,
IDatabaseRepository,
IJobRepository,
ILibraryRepository,
IStorageRepository,
ISystemConfigRepository,
IUserRepository,
StorageEventType,
WithProperty,
} from '../repositories';
import { SystemConfigCore } from '../system-config';
@@ -43,6 +45,7 @@ export class LibraryService extends EventEmitter {
private access: AccessCore;
private configCore: SystemConfigCore;
private watchLibraries = false;
private watchLock = false;
private watchers: Record<string, () => Promise<void>> = {};
constructor(
@@ -53,7 +56,7 @@ export class LibraryService extends EventEmitter {
@Inject(IJobRepository) private jobRepository: IJobRepository,
@Inject(ILibraryRepository) private repository: ILibraryRepository,
@Inject(IStorageRepository) private storageRepository: IStorageRepository,
@Inject(IUserRepository) private userRepository: IUserRepository,
@Inject(IDatabaseRepository) private databaseRepository: IDatabaseRepository,
) {
super();
this.access = AccessCore.create(accessRepository);
@@ -68,8 +71,15 @@ export class LibraryService extends EventEmitter {
async init() {
const config = await this.configCore.getConfig();
const { watch, scan } = config.library;
this.watchLibraries = watch.enabled;
// This ensures that library watching only occurs in one microservice
// TODO: we could make the lock be per-library instead of global
this.watchLock = await this.databaseRepository.tryLock(DatabaseLock.LibraryWatch);
this.watchLibraries = this.watchLock && watch.enabled;
this.jobRepository.addCronJob(
'libraryScan',
scan.cronExpression,
@@ -89,6 +99,7 @@ export class LibraryService extends EventEmitter {
this.jobRepository.updateCronJob('libraryScan', library.scan.cronExpression, library.scan.enabled);
if (library.watch.enabled !== this.watchLibraries) {
// Watch configuration changed, update accordingly
this.watchLibraries = library.watch.enabled;
handlePromiseError(this.watchLibraries ? this.watchAll() : this.unwatchAll(), this.logger);
}
@@ -134,7 +145,7 @@ export class LibraryService extends EventEmitter {
if (matcher(path)) {
await this.scanAssets(library.id, [path], library.ownerId, false);
}
this.emit('add', path);
this.emit(StorageEventType.ADD, path);
};
return handlePromiseError(handler(), this.logger);
},
@@ -145,7 +156,7 @@ export class LibraryService extends EventEmitter {
// Note: if the changed file was not previously imported, it will be imported now.
await this.scanAssets(library.id, [path], library.ownerId, false);
}
this.emit('change', path);
this.emit(StorageEventType.CHANGE, path);
};
return handlePromiseError(handler(), this.logger);
},
@@ -156,13 +167,13 @@ export class LibraryService extends EventEmitter {
if (asset && matcher(path)) {
await this.assetRepository.save({ id: asset.id, isOffline: true });
}
this.emit('unlink', path);
this.emit(StorageEventType.UNLINK, path);
};
return handlePromiseError(handler(), this.logger);
},
onError: (error) => {
// TODO: should we log, or throw an exception?
this.logger.error(`Library watcher for library ${library.id} encountered error: ${error}`);
this.emit(StorageEventType.ERROR, error);
},
},
);
@@ -180,13 +191,25 @@ export class LibraryService extends EventEmitter {
}
}
async unwatchAll() {
async teardown() {
await this.unwatchAll();
}
private async unwatchAll() {
if (!this.watchLock) {
return false;
}
for (const id in this.watchers) {
await this.unwatch(id);
}
}
async watchAll() {
if (!this.watchLock) {
return false;
}
const libraries = await this.repository.getAll(false, LibraryType.EXTERNAL);
for (const library of libraries) {
@@ -267,7 +290,7 @@ export class LibraryService extends EventEmitter {
this.logger.log(`Creating ${dto.type} library for user ${auth.user.name}`);
if (dto.type === LibraryType.EXTERNAL && this.watchLibraries) {
if (dto.type === LibraryType.EXTERNAL) {
await this.watch(library.id);
}

View File

@@ -19,6 +19,7 @@ export enum DatabaseLock {
Migrations = 200,
StorageTemplateMigration = 420,
CLIPDimSize = 512,
LibraryWatch = 1337,
}
export const extName: Record<DatabaseExtension, string> = {
@@ -46,6 +47,7 @@ export interface IDatabaseRepository {
shouldReindex(name: VectorIndex): Promise<boolean>;
runMigrations(options?: { transaction?: 'all' | 'none' | 'each' }): Promise<void>;
withLock<R>(lock: DatabaseLock, callback: () => Promise<R>): Promise<R>;
tryLock(lock: DatabaseLock): Promise<boolean>;
isBusy(lock: DatabaseLock): boolean;
wait(lock: DatabaseLock): Promise<void>;
}

View File

@@ -31,6 +31,14 @@ export interface WatchEvents {
onError(error: Error): void;
}
export enum StorageEventType {
READY = 'ready',
ADD = 'add',
CHANGE = 'change',
UNLINK = 'unlink',
ERROR = 'error',
}
export interface IStorageRepository {
createZipStream(): ImmichZipStream;
createReadStream(filepath: string, mimeType?: string | null): Promise<ImmichReadStream>;

View File

@@ -2,7 +2,6 @@ import {
AuthService,
DatabaseService,
JobService,
LibraryService,
ONE_HOUR,
OpenGraphTags,
ServerInfoService,
@@ -45,7 +44,6 @@ export class AppService {
private authService: AuthService,
private configService: SystemConfigService,
private jobService: JobService,
private libraryService: LibraryService,
private serverService: ServerInfoService,
private sharedLinkService: SharedLinkService,
private storageService: StorageService,
@@ -66,15 +64,10 @@ export class AppService {
await this.databaseService.init();
await this.configService.init();
this.storageService.init();
await this.libraryService.init();
await this.serverService.init();
this.logger.log(`Feature Flags: ${JSON.stringify(await this.serverService.getFeatures(), null, 2)}`);
}
async teardown() {
await this.libraryService.unwatchAll();
}
ssr(excludePaths: string[]) {
let index = '';
try {

View File

@@ -210,6 +210,11 @@ export class DatabaseRepository implements IDatabaseRepository {
return res as R;
}
async tryLock(lock: DatabaseLock): Promise<boolean> {
const queryRunner = this.dataSource.createQueryRunner();
return await this.acquireTryLock(lock, queryRunner);
}
isBusy(lock: DatabaseLock): boolean {
return this.asyncLock.isBusy(DatabaseLock[lock]);
}
@@ -222,6 +227,11 @@ export class DatabaseRepository implements IDatabaseRepository {
return queryRunner.query('SELECT pg_advisory_lock($1)', [lock]);
}
private async acquireTryLock(lock: DatabaseLock, queryRunner: QueryRunner): Promise<boolean> {
const lockResult = await queryRunner.query('SELECT pg_try_advisory_lock($1)', [lock]);
return lockResult[0].pg_try_advisory_lock;
}
private async releaseLock(lock: DatabaseLock, queryRunner: QueryRunner): Promise<void> {
return queryRunner.query('SELECT pg_advisory_unlock($1)', [lock]);
}

View File

@@ -1,11 +1,12 @@
import {
CrawlOptionsDto,
DiskUsage,
IStorageRepository,
ImmichReadStream,
ImmichZipStream,
IStorageRepository,
mimeTypes,
StorageEventType,
WatchEvents,
mimeTypes,
} from '@app/domain';
import { ImmichLogger } from '@app/infra/logger';
import archiver from 'archiver';
@@ -141,10 +142,11 @@ export class FilesystemProvider implements IStorageRepository {
watch(paths: string[], options: WatchOptions, events: Partial<WatchEvents>) {
const watcher = chokidar.watch(paths, options);
watcher.on('ready', () => events.onReady?.());
watcher.on('add', (path) => events.onAdd?.(path));
watcher.on('change', (path) => events.onChange?.(path));
watcher.on('unlink', (path) => events.onUnlink?.(path));
watcher.on(StorageEventType.READY, () => events.onReady?.());
watcher.on(StorageEventType.ADD, (path) => events.onAdd?.(path));
watcher.on(StorageEventType.CHANGE, (path) => events.onChange?.(path));
watcher.on(StorageEventType.UNLINK, (path) => events.onUnlink?.(path));
watcher.on(StorageEventType.ERROR, (error) => events.onError?.(error));
return () => watcher.close();
}

View File

@@ -40,6 +40,7 @@ export class AppService {
async init() {
await this.databaseService.init();
await this.configService.init();
await this.libraryService.init();
await this.jobService.init({
[JobName.ASSET_DELETION]: (data) => this.assetService.handleAssetDeletion(data),
[JobName.ASSET_DELETION_CHECK]: () => this.assetService.handleAssetDeletionCheck(),
@@ -86,6 +87,7 @@ export class AppService {
}
async teardown() {
await this.libraryService.teardown();
await this.metadataService.teardown();
}
}

View File

@@ -1,4 +1,4 @@
import { IJobRepository, IMediaRepository, JobItem, JobItemHandler, QueueName } from '@app/domain';
import { IJobRepository, IMediaRepository, JobItem, JobItemHandler, QueueName, StorageEventType } from '@app/domain';
import { AppModule } from '@app/immich';
import { InfraModule, InfraTestModule, dataSource } from '@app/infra';
import { MediaRepository } from '@app/infra/repositories';
@@ -48,6 +48,9 @@ export const db = {
if (deleteUsers) {
await em.query(`DELETE FROM "users" CASCADE;`);
}
// Release all locks
await em.query('SELECT pg_advisory_unlock_all()');
});
},
disconnect: async () => {
@@ -124,34 +127,37 @@ export const testApp = {
},
reset: async (options?: ResetOptions) => {
await db.reset(options);
await app.get(AppService).init();
await app.get(MicroAppService).init();
},
get: (member: any) => app.get(member),
teardown: async () => {
if (app) {
await app.get(MicroAppService).teardown();
await app.get(AppService).teardown();
await app.close();
}
await db.disconnect();
},
};
export function waitForEvent<T>(emitter: EventEmitter, event: string): Promise<T> {
return new Promise((resolve, reject) => {
const success = (value: T) => {
emitter.off('error', fail);
resolve(value);
};
const fail = (error: Error) => {
emitter.off(event, success);
reject(error);
};
emitter.once(event, success);
emitter.once('error', fail);
});
export function waitForEvent(emitter: EventEmitter, event: string, times = 1): Promise<void[]> {
const promises: Promise<void>[] = [];
for (let i = 1; i <= times; i++) {
promises.push(
new Promise((resolve, reject) => {
const success = (value: any) => {
emitter.off(StorageEventType.ERROR, fail);
resolve(value);
};
const fail = (error: Error) => {
emitter.off(event, success);
reject(error);
};
emitter.once(event, success);
emitter.once(StorageEventType.ERROR, fail);
}),
);
}
return Promise.all(promises);
}
const directoryExists = async (dirPath: string) =>