mirror of
https://github.com/immich-app/immich.git
synced 2026-03-01 11:20:12 +03:00
Compare commits
1 Commits
fix/map-we
...
postgres-s
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
19f276b543 |
@@ -10,6 +10,7 @@ services:
|
|||||||
immich-server:
|
immich-server:
|
||||||
container_name: immich-e2e-server
|
container_name: immich-e2e-server
|
||||||
image: immich-server:latest
|
image: immich-server:latest
|
||||||
|
shm_size: 128mb
|
||||||
build:
|
build:
|
||||||
context: ../
|
context: ../
|
||||||
dockerfile: server/Dockerfile
|
dockerfile: server/Dockerfile
|
||||||
|
|||||||
43
pnpm-lock.yaml
generated
43
pnpm-lock.yaml
generated
@@ -406,9 +406,12 @@ importers:
|
|||||||
'@react-email/render':
|
'@react-email/render':
|
||||||
specifier: ^1.1.2
|
specifier: ^1.1.2
|
||||||
version: 1.4.0(react-dom@19.2.4(react@19.2.4))(react@19.2.4)
|
version: 1.4.0(react-dom@19.2.4(react@19.2.4))(react@19.2.4)
|
||||||
'@socket.io/redis-adapter':
|
'@socket.io/postgres-adapter':
|
||||||
specifier: ^8.3.0
|
specifier: ^0.5.0
|
||||||
version: 8.3.0(socket.io-adapter@2.5.6)
|
version: 0.5.0(socket.io-adapter@2.5.6)
|
||||||
|
'@types/pg':
|
||||||
|
specifier: ^8.16.0
|
||||||
|
version: 8.16.0
|
||||||
ajv:
|
ajv:
|
||||||
specifier: ^8.17.1
|
specifier: ^8.17.1
|
||||||
version: 8.17.1
|
version: 8.17.1
|
||||||
@@ -562,6 +565,9 @@ importers:
|
|||||||
socket.io:
|
socket.io:
|
||||||
specifier: ^4.8.1
|
specifier: ^4.8.1
|
||||||
version: 4.8.3
|
version: 4.8.3
|
||||||
|
socket.io-adapter:
|
||||||
|
specifier: ^2.5.6
|
||||||
|
version: 2.5.6
|
||||||
tailwindcss-preset-email:
|
tailwindcss-preset-email:
|
||||||
specifier: ^1.4.0
|
specifier: ^1.4.0
|
||||||
version: 1.4.1(tailwindcss@3.4.19(tsx@4.21.0)(yaml@2.8.2))
|
version: 1.4.1(tailwindcss@3.4.19(tsx@4.21.0)(yaml@2.8.2))
|
||||||
@@ -3402,6 +3408,10 @@ packages:
|
|||||||
'@microsoft/tsdoc@0.16.0':
|
'@microsoft/tsdoc@0.16.0':
|
||||||
resolution: {integrity: sha512-xgAyonlVVS+q7Vc7qLW0UrJU7rSFcETRWsqdXZtjzRU8dF+6CkozTK4V4y1LwOX7j8r/vHphjDeMeGI4tNGeGA==}
|
resolution: {integrity: sha512-xgAyonlVVS+q7Vc7qLW0UrJU7rSFcETRWsqdXZtjzRU8dF+6CkozTK4V4y1LwOX7j8r/vHphjDeMeGI4tNGeGA==}
|
||||||
|
|
||||||
|
'@msgpack/msgpack@2.8.0':
|
||||||
|
resolution: {integrity: sha512-h9u4u/jiIRKbq25PM+zymTyW6bhTzELvOoUd+AvYriWOAKpLGnIamaET3pnHYoI5iYphAHBI4ayx0MehR+VVPQ==}
|
||||||
|
engines: {node: '>= 10'}
|
||||||
|
|
||||||
'@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3':
|
'@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3':
|
||||||
resolution: {integrity: sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==}
|
resolution: {integrity: sha512-QZHtlVgbAdy2zAqNA9Gu1UpIuI8Xvsd1v8ic6B2pZmeFnFcMWiPLfWXh7TVw4eGEZ/C9TH281KwhVoeQUKbyjw==}
|
||||||
cpu: [arm64]
|
cpu: [arm64]
|
||||||
@@ -4310,9 +4320,9 @@ packages:
|
|||||||
'@socket.io/component-emitter@3.1.2':
|
'@socket.io/component-emitter@3.1.2':
|
||||||
resolution: {integrity: sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==}
|
resolution: {integrity: sha512-9BCxFwvbGg/RsZK9tjXd8s4UcwR0MWeFQ1XEKIQVVvAGJyINdrqKMcTRyLoK8Rse1GjzLV9cwjWV1olXRWEXVA==}
|
||||||
|
|
||||||
'@socket.io/redis-adapter@8.3.0':
|
'@socket.io/postgres-adapter@0.5.0':
|
||||||
resolution: {integrity: sha512-ly0cra+48hDmChxmIpnESKrc94LjRL80TEmZVscuQ/WWkRP81nNj8W8cCGMqbI4L6NCuAaPRSzZF1a9GlAxxnA==}
|
resolution: {integrity: sha512-s1vFsatB4lS429ZbeAi8ju+mZMgtgdSmi9UsZsdcEG++vVtX5z10yDEt4TV8saePscvvGjs6uXvJfMCxz8+M2Q==}
|
||||||
engines: {node: '>=10.0.0'}
|
engines: {node: '>=12.0.0'}
|
||||||
peerDependencies:
|
peerDependencies:
|
||||||
socket.io-adapter: ^2.5.4
|
socket.io-adapter: ^2.5.4
|
||||||
|
|
||||||
@@ -9285,9 +9295,6 @@ packages:
|
|||||||
not@0.1.0:
|
not@0.1.0:
|
||||||
resolution: {integrity: sha512-5PDmaAsVfnWUgTUbJ3ERwn7u79Z0dYxN9ErxCpVJJqe2RK0PJ3z+iFUxuqjwtlDDegXvtWoxD/3Fzxox7tFGWA==}
|
resolution: {integrity: sha512-5PDmaAsVfnWUgTUbJ3ERwn7u79Z0dYxN9ErxCpVJJqe2RK0PJ3z+iFUxuqjwtlDDegXvtWoxD/3Fzxox7tFGWA==}
|
||||||
|
|
||||||
notepack.io@3.0.1:
|
|
||||||
resolution: {integrity: sha512-TKC/8zH5pXIAMVQio2TvVDTtPRX+DJPHDqjRbxogtFiByHyzKmy96RA0JtCQJ+WouyyL4A10xomQzgbUT+1jCg==}
|
|
||||||
|
|
||||||
npm-run-path@4.0.1:
|
npm-run-path@4.0.1:
|
||||||
resolution: {integrity: sha512-S48WzZW777zhNIrn7gxOlISNAqi9ZC/uQFnRdbeIHhZhCA6UqpkOT8T1G7BvfdgP4Er8gF4sUbaS0i7QvIfCWw==}
|
resolution: {integrity: sha512-S48WzZW777zhNIrn7gxOlISNAqi9ZC/uQFnRdbeIHhZhCA6UqpkOT8T1G7BvfdgP4Er8gF4sUbaS0i7QvIfCWw==}
|
||||||
engines: {node: '>=8'}
|
engines: {node: '>=8'}
|
||||||
@@ -11619,10 +11626,6 @@ packages:
|
|||||||
engines: {node: '>=0.8.0'}
|
engines: {node: '>=0.8.0'}
|
||||||
hasBin: true
|
hasBin: true
|
||||||
|
|
||||||
uid2@1.0.0:
|
|
||||||
resolution: {integrity: sha512-+I6aJUv63YAcY9n4mQreLUt0d4lvwkkopDNmpomkAUz0fAkEMV9pRWxN0EjhW1YfRhcuyHg2v3mwddCDW1+LFQ==}
|
|
||||||
engines: {node: '>= 4.0.0'}
|
|
||||||
|
|
||||||
uid@2.0.2:
|
uid@2.0.2:
|
||||||
resolution: {integrity: sha512-u3xV3X7uzvi5b1MncmZo3i2Aw222Zk1keqLA1YkHldREkAhAqi65wuPfe7lHx8H/Wzy+8CE7S7uS3jekIM5s8g==}
|
resolution: {integrity: sha512-u3xV3X7uzvi5b1MncmZo3i2Aw222Zk1keqLA1YkHldREkAhAqi65wuPfe7lHx8H/Wzy+8CE7S7uS3jekIM5s8g==}
|
||||||
engines: {node: '>=8'}
|
engines: {node: '>=8'}
|
||||||
@@ -15426,6 +15429,8 @@ snapshots:
|
|||||||
|
|
||||||
'@microsoft/tsdoc@0.16.0': {}
|
'@microsoft/tsdoc@0.16.0': {}
|
||||||
|
|
||||||
|
'@msgpack/msgpack@2.8.0': {}
|
||||||
|
|
||||||
'@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3':
|
'@msgpackr-extract/msgpackr-extract-darwin-arm64@3.0.3':
|
||||||
optional: true
|
optional: true
|
||||||
|
|
||||||
@@ -16303,13 +16308,15 @@ snapshots:
|
|||||||
|
|
||||||
'@socket.io/component-emitter@3.1.2': {}
|
'@socket.io/component-emitter@3.1.2': {}
|
||||||
|
|
||||||
'@socket.io/redis-adapter@8.3.0(socket.io-adapter@2.5.6)':
|
'@socket.io/postgres-adapter@0.5.0(socket.io-adapter@2.5.6)':
|
||||||
dependencies:
|
dependencies:
|
||||||
|
'@msgpack/msgpack': 2.8.0
|
||||||
|
'@types/pg': 8.16.0
|
||||||
debug: 4.3.7
|
debug: 4.3.7
|
||||||
notepack.io: 3.0.1
|
pg: 8.18.0
|
||||||
socket.io-adapter: 2.5.6
|
socket.io-adapter: 2.5.6
|
||||||
uid2: 1.0.0
|
|
||||||
transitivePeerDependencies:
|
transitivePeerDependencies:
|
||||||
|
- pg-native
|
||||||
- supports-color
|
- supports-color
|
||||||
|
|
||||||
'@sphinxxxx/color-conversion@2.2.2': {}
|
'@sphinxxxx/color-conversion@2.2.2': {}
|
||||||
@@ -22280,8 +22287,6 @@ snapshots:
|
|||||||
|
|
||||||
not@0.1.0: {}
|
not@0.1.0: {}
|
||||||
|
|
||||||
notepack.io@3.0.1: {}
|
|
||||||
|
|
||||||
npm-run-path@4.0.1:
|
npm-run-path@4.0.1:
|
||||||
dependencies:
|
dependencies:
|
||||||
path-key: 3.1.1
|
path-key: 3.1.1
|
||||||
@@ -25030,8 +25035,6 @@ snapshots:
|
|||||||
uglify-js@3.19.3:
|
uglify-js@3.19.3:
|
||||||
optional: true
|
optional: true
|
||||||
|
|
||||||
uid2@1.0.0: {}
|
|
||||||
|
|
||||||
uid@2.0.2:
|
uid@2.0.2:
|
||||||
dependencies:
|
dependencies:
|
||||||
'@lukeed/csprng': 1.1.0
|
'@lukeed/csprng': 1.1.0
|
||||||
|
|||||||
@@ -56,7 +56,8 @@
|
|||||||
"@opentelemetry/semantic-conventions": "^1.34.0",
|
"@opentelemetry/semantic-conventions": "^1.34.0",
|
||||||
"@react-email/components": "^0.5.0",
|
"@react-email/components": "^0.5.0",
|
||||||
"@react-email/render": "^1.1.2",
|
"@react-email/render": "^1.1.2",
|
||||||
"@socket.io/redis-adapter": "^8.3.0",
|
"@socket.io/postgres-adapter": "^0.5.0",
|
||||||
|
"@types/pg": "^8.16.0",
|
||||||
"ajv": "^8.17.1",
|
"ajv": "^8.17.1",
|
||||||
"archiver": "^7.0.0",
|
"archiver": "^7.0.0",
|
||||||
"async-lock": "^1.4.0",
|
"async-lock": "^1.4.0",
|
||||||
@@ -108,6 +109,7 @@
|
|||||||
"sharp": "^0.34.5",
|
"sharp": "^0.34.5",
|
||||||
"sirv": "^3.0.0",
|
"sirv": "^3.0.0",
|
||||||
"socket.io": "^4.8.1",
|
"socket.io": "^4.8.1",
|
||||||
|
"socket.io-adapter": "^2.5.6",
|
||||||
"tailwindcss-preset-email": "^1.4.0",
|
"tailwindcss-preset-email": "^1.4.0",
|
||||||
"thumbhash": "^0.1.1",
|
"thumbhash": "^0.1.1",
|
||||||
"transformation-matrix": "^3.1.0",
|
"transformation-matrix": "^3.1.0",
|
||||||
|
|||||||
@@ -5,8 +5,9 @@ import cookieParser from 'cookie-parser';
|
|||||||
import { existsSync } from 'node:fs';
|
import { existsSync } from 'node:fs';
|
||||||
import sirv from 'sirv';
|
import sirv from 'sirv';
|
||||||
import { excludePaths, serverVersion } from 'src/constants';
|
import { excludePaths, serverVersion } from 'src/constants';
|
||||||
|
import { SocketIoAdapter } from 'src/enum';
|
||||||
import { MaintenanceWorkerService } from 'src/maintenance/maintenance-worker.service';
|
import { MaintenanceWorkerService } from 'src/maintenance/maintenance-worker.service';
|
||||||
import { WebSocketAdapter } from 'src/middleware/websocket.adapter';
|
import { createWebSocketAdapter } from 'src/middleware/websocket.adapter';
|
||||||
import { ConfigRepository } from 'src/repositories/config.repository';
|
import { ConfigRepository } from 'src/repositories/config.repository';
|
||||||
import { LoggingRepository } from 'src/repositories/logging.repository';
|
import { LoggingRepository } from 'src/repositories/logging.repository';
|
||||||
import { bootstrapTelemetry } from 'src/repositories/telemetry.repository';
|
import { bootstrapTelemetry } from 'src/repositories/telemetry.repository';
|
||||||
@@ -25,6 +26,7 @@ export async function configureExpress(
|
|||||||
{
|
{
|
||||||
permitSwaggerWrite = true,
|
permitSwaggerWrite = true,
|
||||||
ssr,
|
ssr,
|
||||||
|
socketIoAdapter,
|
||||||
}: {
|
}: {
|
||||||
/**
|
/**
|
||||||
* Whether to allow swagger module to write to the specs.json
|
* Whether to allow swagger module to write to the specs.json
|
||||||
@@ -36,6 +38,10 @@ export async function configureExpress(
|
|||||||
* Service to use for server-side rendering
|
* Service to use for server-side rendering
|
||||||
*/
|
*/
|
||||||
ssr: typeof ApiService | typeof MaintenanceWorkerService;
|
ssr: typeof ApiService | typeof MaintenanceWorkerService;
|
||||||
|
/**
|
||||||
|
* Override the Socket.IO adapter. If not specified, uses the adapter from config.
|
||||||
|
*/
|
||||||
|
socketIoAdapter?: SocketIoAdapter;
|
||||||
},
|
},
|
||||||
) {
|
) {
|
||||||
const configRepository = app.get(ConfigRepository);
|
const configRepository = app.get(ConfigRepository);
|
||||||
@@ -55,7 +61,7 @@ export async function configureExpress(
|
|||||||
}
|
}
|
||||||
|
|
||||||
app.setGlobalPrefix('api', { exclude: excludePaths });
|
app.setGlobalPrefix('api', { exclude: excludePaths });
|
||||||
app.useWebSocketAdapter(new WebSocketAdapter(app));
|
app.useWebSocketAdapter(await createWebSocketAdapter(app, socketIoAdapter));
|
||||||
|
|
||||||
useSwagger(app, { write: configRepository.isDev() && permitSwaggerWrite });
|
useSwagger(app, { write: configRepository.isDev() && permitSwaggerWrite });
|
||||||
|
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import { DatabaseBackupController } from 'src/controllers/database-backup.contro
|
|||||||
import { DownloadController } from 'src/controllers/download.controller';
|
import { DownloadController } from 'src/controllers/download.controller';
|
||||||
import { DuplicateController } from 'src/controllers/duplicate.controller';
|
import { DuplicateController } from 'src/controllers/duplicate.controller';
|
||||||
import { FaceController } from 'src/controllers/face.controller';
|
import { FaceController } from 'src/controllers/face.controller';
|
||||||
|
import { InternalController } from 'src/controllers/internal.controller';
|
||||||
import { JobController } from 'src/controllers/job.controller';
|
import { JobController } from 'src/controllers/job.controller';
|
||||||
import { LibraryController } from 'src/controllers/library.controller';
|
import { LibraryController } from 'src/controllers/library.controller';
|
||||||
import { MaintenanceController } from 'src/controllers/maintenance.controller';
|
import { MaintenanceController } from 'src/controllers/maintenance.controller';
|
||||||
@@ -51,6 +52,7 @@ export const controllers = [
|
|||||||
DownloadController,
|
DownloadController,
|
||||||
DuplicateController,
|
DuplicateController,
|
||||||
FaceController,
|
FaceController,
|
||||||
|
InternalController,
|
||||||
JobController,
|
JobController,
|
||||||
LibraryController,
|
LibraryController,
|
||||||
MaintenanceController,
|
MaintenanceController,
|
||||||
|
|||||||
22
server/src/controllers/internal.controller.ts
Normal file
22
server/src/controllers/internal.controller.ts
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
import { Body, Controller, NotFoundException, Post, Req } from '@nestjs/common';
|
||||||
|
import { ApiExcludeController } from '@nestjs/swagger';
|
||||||
|
import { Request } from 'express';
|
||||||
|
import { AppRestartEvent, EventRepository } from 'src/repositories/event.repository';
|
||||||
|
|
||||||
|
const LOCALHOST_ADDRESSES = new Set(['127.0.0.1', '::1', '::ffff:127.0.0.1']);
|
||||||
|
|
||||||
|
@ApiExcludeController()
|
||||||
|
@Controller('internal')
|
||||||
|
export class InternalController {
|
||||||
|
constructor(private eventRepository: EventRepository) {}
|
||||||
|
|
||||||
|
@Post('restart')
|
||||||
|
async restart(@Req() req: Request, @Body() dto: AppRestartEvent): Promise<void> {
|
||||||
|
const remoteAddress = req.socket.remoteAddress;
|
||||||
|
if (!remoteAddress || !LOCALHOST_ADDRESSES.has(remoteAddress)) {
|
||||||
|
throw new NotFoundException();
|
||||||
|
}
|
||||||
|
|
||||||
|
await this.eventRepository.emit('AppRestart', dto);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,6 +1,6 @@
|
|||||||
import { Transform, Type } from 'class-transformer';
|
import { Transform, Type } from 'class-transformer';
|
||||||
import { IsEnum, IsInt, IsString, Matches } from 'class-validator';
|
import { IsEnum, IsInt, IsString, Matches } from 'class-validator';
|
||||||
import { DatabaseSslMode, ImmichEnvironment, LogFormat, LogLevel } from 'src/enum';
|
import { DatabaseSslMode, ImmichEnvironment, LogFormat, LogLevel, SocketIoAdapter } from 'src/enum';
|
||||||
import { IsIPRange, Optional, ValidateBoolean } from 'src/validation';
|
import { IsIPRange, Optional, ValidateBoolean } from 'src/validation';
|
||||||
|
|
||||||
export class EnvDto {
|
export class EnvDto {
|
||||||
@@ -140,6 +140,11 @@ export class EnvDto {
|
|||||||
@Optional()
|
@Optional()
|
||||||
IMMICH_WORKERS_EXCLUDE?: string;
|
IMMICH_WORKERS_EXCLUDE?: string;
|
||||||
|
|
||||||
|
@IsEnum(SocketIoAdapter)
|
||||||
|
@Optional()
|
||||||
|
@Transform(({ value }) => (value ? String(value).toLowerCase().trim() : value))
|
||||||
|
IMMICH_SOCKETIO_ADAPTER?: SocketIoAdapter;
|
||||||
|
|
||||||
@IsString()
|
@IsString()
|
||||||
@Optional()
|
@Optional()
|
||||||
DB_DATABASE_NAME?: string;
|
DB_DATABASE_NAME?: string;
|
||||||
|
|||||||
@@ -518,6 +518,11 @@ export enum ImmichTelemetry {
|
|||||||
Job = 'job',
|
Job = 'job',
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export enum SocketIoAdapter {
|
||||||
|
BroadcastChannel = 'broadcastchannel',
|
||||||
|
Postgres = 'postgres',
|
||||||
|
}
|
||||||
|
|
||||||
export enum ExifOrientation {
|
export enum ExifOrientation {
|
||||||
Horizontal = 1,
|
Horizontal = 1,
|
||||||
MirrorHorizontal = 2,
|
MirrorHorizontal = 2,
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
import { Kysely, sql } from 'kysely';
|
import { Kysely, sql } from 'kysely';
|
||||||
import { CommandFactory } from 'nest-commander';
|
import { CommandFactory } from 'nest-commander';
|
||||||
import { ChildProcess, fork } from 'node:child_process';
|
|
||||||
import { dirname, join } from 'node:path';
|
import { dirname, join } from 'node:path';
|
||||||
import { Worker } from 'node:worker_threads';
|
import { Worker } from 'node:worker_threads';
|
||||||
import { PostgresError } from 'postgres';
|
import { PostgresError } from 'postgres';
|
||||||
@@ -18,7 +17,7 @@ class Workers {
|
|||||||
/**
|
/**
|
||||||
* Currently running workers
|
* Currently running workers
|
||||||
*/
|
*/
|
||||||
workers: Partial<Record<ImmichWorker, { kill: (signal: NodeJS.Signals) => Promise<void> | void }>> = {};
|
workers: Partial<Record<ImmichWorker, { kill: () => Promise<void> | void }>> = {};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fail-safe in case anything dies during restart
|
* Fail-safe in case anything dies during restart
|
||||||
@@ -101,25 +100,23 @@ class Workers {
|
|||||||
const basePath = dirname(__filename);
|
const basePath = dirname(__filename);
|
||||||
const workerFile = join(basePath, 'workers', `${name}.js`);
|
const workerFile = join(basePath, 'workers', `${name}.js`);
|
||||||
|
|
||||||
let anyWorker: Worker | ChildProcess;
|
const inspectArg = process.execArgv.find((arg) => arg.startsWith('--inspect'));
|
||||||
let kill: (signal?: NodeJS.Signals) => Promise<void> | void;
|
const workerData: { inspectorPort?: number } = {};
|
||||||
|
|
||||||
if (name === ImmichWorker.Api) {
|
if (inspectArg) {
|
||||||
const worker = fork(workerFile, [], {
|
const inspectorPorts: Record<ImmichWorker, number> = {
|
||||||
execArgv: process.execArgv.map((arg) => (arg.startsWith('--inspect') ? '--inspect=0.0.0.0:9231' : arg)),
|
[ImmichWorker.Api]: 9230,
|
||||||
});
|
[ImmichWorker.Microservices]: 9231,
|
||||||
|
[ImmichWorker.Maintenance]: 9232,
|
||||||
kill = (signal) => void worker.kill(signal);
|
};
|
||||||
anyWorker = worker;
|
workerData.inspectorPort = inspectorPorts[name];
|
||||||
} else {
|
|
||||||
const worker = new Worker(workerFile);
|
|
||||||
|
|
||||||
kill = async () => void (await worker.terminate());
|
|
||||||
anyWorker = worker;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
anyWorker.on('error', (error) => this.onError(name, error));
|
const worker = new Worker(workerFile, { workerData });
|
||||||
anyWorker.on('exit', (exitCode) => this.onExit(name, exitCode));
|
const kill = async () => void (await worker.terminate());
|
||||||
|
|
||||||
|
worker.on('error', (error) => this.onError(name, error));
|
||||||
|
worker.on('exit', (exitCode) => this.onExit(name, exitCode));
|
||||||
|
|
||||||
this.workers[name] = { kill };
|
this.workers[name] = { kill };
|
||||||
}
|
}
|
||||||
@@ -152,8 +149,8 @@ class Workers {
|
|||||||
console.error(`${name} worker exited with code ${exitCode}`);
|
console.error(`${name} worker exited with code ${exitCode}`);
|
||||||
|
|
||||||
if (this.workers[ImmichWorker.Api] && name !== ImmichWorker.Api) {
|
if (this.workers[ImmichWorker.Api] && name !== ImmichWorker.Api) {
|
||||||
console.error('Killing api process');
|
console.error('Terminating api worker');
|
||||||
void this.workers[ImmichWorker.Api].kill('SIGTERM');
|
void this.workers[ImmichWorker.Api].kill();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import {
|
|||||||
Delete,
|
Delete,
|
||||||
Get,
|
Get,
|
||||||
Next,
|
Next,
|
||||||
|
NotFoundException,
|
||||||
Param,
|
Param,
|
||||||
Post,
|
Post,
|
||||||
Req,
|
Req,
|
||||||
@@ -25,12 +26,15 @@ import { ImmichCookie } from 'src/enum';
|
|||||||
import { MaintenanceRoute } from 'src/maintenance/maintenance-auth.guard';
|
import { MaintenanceRoute } from 'src/maintenance/maintenance-auth.guard';
|
||||||
import { MaintenanceWorkerService } from 'src/maintenance/maintenance-worker.service';
|
import { MaintenanceWorkerService } from 'src/maintenance/maintenance-worker.service';
|
||||||
import { GetLoginDetails } from 'src/middleware/auth.guard';
|
import { GetLoginDetails } from 'src/middleware/auth.guard';
|
||||||
|
import { AppRestartEvent } from 'src/repositories/event.repository';
|
||||||
import { LoggingRepository } from 'src/repositories/logging.repository';
|
import { LoggingRepository } from 'src/repositories/logging.repository';
|
||||||
import { LoginDetails } from 'src/services/auth.service';
|
import { LoginDetails } from 'src/services/auth.service';
|
||||||
import { sendFile } from 'src/utils/file';
|
import { sendFile } from 'src/utils/file';
|
||||||
import { respondWithCookie } from 'src/utils/response';
|
import { respondWithCookie } from 'src/utils/response';
|
||||||
import { FilenameParamDto } from 'src/validation';
|
import { FilenameParamDto } from 'src/validation';
|
||||||
|
|
||||||
|
const LOCALHOST_ADDRESSES = new Set(['127.0.0.1', '::1', '::ffff:127.0.0.1']);
|
||||||
|
|
||||||
import type { DatabaseBackupController as _DatabaseBackupController } from 'src/controllers/database-backup.controller';
|
import type { DatabaseBackupController as _DatabaseBackupController } from 'src/controllers/database-backup.controller';
|
||||||
import type { ServerController as _ServerController } from 'src/controllers/server.controller';
|
import type { ServerController as _ServerController } from 'src/controllers/server.controller';
|
||||||
import { DatabaseBackupDeleteDto, DatabaseBackupListResponseDto } from 'src/dtos/database-backup.dto';
|
import { DatabaseBackupDeleteDto, DatabaseBackupListResponseDto } from 'src/dtos/database-backup.dto';
|
||||||
@@ -131,4 +135,14 @@ export class MaintenanceWorkerController {
|
|||||||
setMaintenanceMode(@Body() dto: SetMaintenanceModeDto): void {
|
setMaintenanceMode(@Body() dto: SetMaintenanceModeDto): void {
|
||||||
void this.service.setAction(dto);
|
void this.service.setAction(dto);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Post('internal/restart')
|
||||||
|
internalRestart(@Req() req: Request, @Body() dto: AppRestartEvent): void {
|
||||||
|
const remoteAddress = req.socket.remoteAddress;
|
||||||
|
if (!remoteAddress || !LOCALHOST_ADDRESSES.has(remoteAddress)) {
|
||||||
|
throw new NotFoundException();
|
||||||
|
}
|
||||||
|
|
||||||
|
this.service.handleInternalRestart(dto);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ import { MaintenanceWebsocketRepository } from 'src/maintenance/maintenance-webs
|
|||||||
import { AppRepository } from 'src/repositories/app.repository';
|
import { AppRepository } from 'src/repositories/app.repository';
|
||||||
import { ConfigRepository } from 'src/repositories/config.repository';
|
import { ConfigRepository } from 'src/repositories/config.repository';
|
||||||
import { DatabaseRepository } from 'src/repositories/database.repository';
|
import { DatabaseRepository } from 'src/repositories/database.repository';
|
||||||
|
import { AppRestartEvent } from 'src/repositories/event.repository';
|
||||||
import { LoggingRepository } from 'src/repositories/logging.repository';
|
import { LoggingRepository } from 'src/repositories/logging.repository';
|
||||||
import { ProcessRepository } from 'src/repositories/process.repository';
|
import { ProcessRepository } from 'src/repositories/process.repository';
|
||||||
import { StorageRepository } from 'src/repositories/storage.repository';
|
import { StorageRepository } from 'src/repositories/storage.repository';
|
||||||
@@ -290,6 +291,9 @@ export class MaintenanceWorkerService {
|
|||||||
|
|
||||||
const lock = await this.databaseRepository.tryLock(DatabaseLock.MaintenanceOperation);
|
const lock = await this.databaseRepository.tryLock(DatabaseLock.MaintenanceOperation);
|
||||||
if (!lock) {
|
if (!lock) {
|
||||||
|
// Another maintenance worker has the lock - poll until maintenance mode ends
|
||||||
|
this.logger.log('Another worker has the maintenance lock, polling for maintenance mode changes...');
|
||||||
|
await this.pollForMaintenanceEnd();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -351,4 +355,25 @@ export class MaintenanceWorkerService {
|
|||||||
this.maintenanceWebsocketRepository.serverSend('AppRestart', state);
|
this.maintenanceWebsocketRepository.serverSend('AppRestart', state);
|
||||||
this.appRepository.exitApp();
|
this.appRepository.exitApp();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
handleInternalRestart(state: AppRestartEvent): void {
|
||||||
|
this.maintenanceWebsocketRepository.clientBroadcast('AppRestartV1', state);
|
||||||
|
this.maintenanceWebsocketRepository.serverSend('AppRestart', state);
|
||||||
|
this.appRepository.exitApp();
|
||||||
|
}
|
||||||
|
|
||||||
|
private async pollForMaintenanceEnd(): Promise<void> {
|
||||||
|
const pollIntervalMs = 5000;
|
||||||
|
|
||||||
|
while (true) {
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, pollIntervalMs));
|
||||||
|
|
||||||
|
const state = await this.systemMetadataRepository.get(SystemMetadataKey.MaintenanceMode);
|
||||||
|
if (!state?.isMaintenanceMode) {
|
||||||
|
this.logger.log('Maintenance mode ended, restarting...');
|
||||||
|
this.appRepository.exitApp();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
80
server/src/middleware/broadcast-channel.adapter.ts
Normal file
80
server/src/middleware/broadcast-channel.adapter.ts
Normal file
@@ -0,0 +1,80 @@
|
|||||||
|
import {
|
||||||
|
ClusterAdapterWithHeartbeat,
|
||||||
|
type ClusterAdapterOptions,
|
||||||
|
type ClusterMessage,
|
||||||
|
type ClusterResponse,
|
||||||
|
type ServerId,
|
||||||
|
} from 'socket.io-adapter';
|
||||||
|
|
||||||
|
const BC_CHANNEL_NAME = 'immich:socketio';
|
||||||
|
|
||||||
|
interface BroadcastChannelPayload {
|
||||||
|
type: 'message' | 'response';
|
||||||
|
sourceUid: string;
|
||||||
|
targetUid?: string;
|
||||||
|
data: unknown;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Socket.IO adapter using Node.js BroadcastChannel
|
||||||
|
*
|
||||||
|
* Relays messages between worker_threads within a single OS process.
|
||||||
|
* Zero external dependencies. Does NOT work across containers — use
|
||||||
|
* the Postgres adapter for multi-replica deployments.
|
||||||
|
*/
|
||||||
|
class BroadcastChannelAdapter extends ClusterAdapterWithHeartbeat {
|
||||||
|
private readonly channel: BroadcastChannel;
|
||||||
|
|
||||||
|
constructor(nsp: any, opts?: Partial<ClusterAdapterOptions>) {
|
||||||
|
super(nsp, opts ?? {});
|
||||||
|
|
||||||
|
this.channel = new BroadcastChannel(BC_CHANNEL_NAME);
|
||||||
|
this.channel.addEventListener('message', (event: MessageEvent<BroadcastChannelPayload>) => {
|
||||||
|
const msg = event.data;
|
||||||
|
if (msg.sourceUid === this.uid) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (msg.type === 'message') {
|
||||||
|
this.onMessage(msg.data as ClusterMessage);
|
||||||
|
} else if (msg.type === 'response' && msg.targetUid === this.uid) {
|
||||||
|
this.onResponse(msg.data as ClusterResponse);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
this.init();
|
||||||
|
}
|
||||||
|
|
||||||
|
override doPublish(message: ClusterMessage): Promise<string> {
|
||||||
|
this.channel.postMessage({
|
||||||
|
type: 'message',
|
||||||
|
sourceUid: this.uid,
|
||||||
|
data: message,
|
||||||
|
});
|
||||||
|
return Promise.resolve('');
|
||||||
|
}
|
||||||
|
|
||||||
|
override doPublishResponse(requesterUid: ServerId, response: ClusterResponse): Promise<void> {
|
||||||
|
this.channel.postMessage({
|
||||||
|
type: 'response',
|
||||||
|
sourceUid: this.uid,
|
||||||
|
targetUid: requesterUid,
|
||||||
|
data: response,
|
||||||
|
});
|
||||||
|
return Promise.resolve();
|
||||||
|
}
|
||||||
|
|
||||||
|
override close(): void {
|
||||||
|
super.close();
|
||||||
|
this.channel.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export function createBroadcastChannelAdapter(opts?: Partial<ClusterAdapterOptions>) {
|
||||||
|
const options: Partial<ClusterAdapterOptions> = {
|
||||||
|
...opts,
|
||||||
|
};
|
||||||
|
|
||||||
|
return function (nsp: any) {
|
||||||
|
return new BroadcastChannelAdapter(nsp, options);
|
||||||
|
};
|
||||||
|
}
|
||||||
@@ -1,21 +1,103 @@
|
|||||||
import { INestApplicationContext } from '@nestjs/common';
|
import { INestApplication, Logger } from '@nestjs/common';
|
||||||
import { IoAdapter } from '@nestjs/platform-socket.io';
|
import { IoAdapter } from '@nestjs/platform-socket.io';
|
||||||
import { createAdapter } from '@socket.io/redis-adapter';
|
import { Pool, PoolConfig } from 'pg';
|
||||||
import { Redis } from 'ioredis';
|
import type { ServerOptions } from 'socket.io';
|
||||||
import { ServerOptions } from 'socket.io';
|
import { SocketIoAdapter } from 'src/enum';
|
||||||
|
import { createBroadcastChannelAdapter } from 'src/middleware/broadcast-channel.adapter';
|
||||||
import { ConfigRepository } from 'src/repositories/config.repository';
|
import { ConfigRepository } from 'src/repositories/config.repository';
|
||||||
|
import { asPostgresConnectionConfig } from 'src/utils/database';
|
||||||
|
|
||||||
export class WebSocketAdapter extends IoAdapter {
|
export type Ssl = 'require' | 'allow' | 'prefer' | 'verify-full' | boolean | object;
|
||||||
constructor(private app: INestApplicationContext) {
|
|
||||||
|
export function asPgPoolSsl(ssl?: Ssl): PoolConfig['ssl'] {
|
||||||
|
if (ssl === undefined || ssl === false || ssl === 'allow') {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ssl === true || ssl === 'prefer' || ssl === 'require') {
|
||||||
|
return { rejectUnauthorized: false };
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ssl === 'verify-full') {
|
||||||
|
return { rejectUnauthorized: true };
|
||||||
|
}
|
||||||
|
|
||||||
|
return ssl;
|
||||||
|
}
|
||||||
|
|
||||||
|
class BroadcastChannelSocketAdapter extends IoAdapter {
|
||||||
|
private adapterConstructor: ReturnType<typeof createBroadcastChannelAdapter>;
|
||||||
|
|
||||||
|
constructor(app: INestApplication) {
|
||||||
super(app);
|
super(app);
|
||||||
|
this.adapterConstructor = createBroadcastChannelAdapter();
|
||||||
}
|
}
|
||||||
|
|
||||||
createIOServer(port: number, options?: ServerOptions): any {
|
createIOServer(port: number, options?: ServerOptions): any {
|
||||||
const { redis } = this.app.get(ConfigRepository).getEnv();
|
|
||||||
const server = super.createIOServer(port, options);
|
const server = super.createIOServer(port, options);
|
||||||
const pubClient = new Redis(redis);
|
server.adapter(this.adapterConstructor);
|
||||||
const subClient = pubClient.duplicate();
|
|
||||||
server.adapter(createAdapter(pubClient, subClient));
|
|
||||||
return server;
|
return server;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
class PostgresSocketAdapter extends IoAdapter {
|
||||||
|
private adapterConstructor: any;
|
||||||
|
|
||||||
|
constructor(app: INestApplication, adapterConstructor: any) {
|
||||||
|
super(app);
|
||||||
|
this.adapterConstructor = adapterConstructor;
|
||||||
|
}
|
||||||
|
|
||||||
|
createIOServer(port: number, options?: ServerOptions): any {
|
||||||
|
const server = super.createIOServer(port, options);
|
||||||
|
server.adapter(this.adapterConstructor);
|
||||||
|
return server;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
export async function createWebSocketAdapter(
|
||||||
|
app: INestApplication,
|
||||||
|
adapterOverride?: SocketIoAdapter,
|
||||||
|
): Promise<IoAdapter> {
|
||||||
|
const logger = new Logger('WebSocketAdapter');
|
||||||
|
const config = new ConfigRepository();
|
||||||
|
const { database, socketIo } = config.getEnv();
|
||||||
|
const adapter = adapterOverride ?? socketIo.adapter;
|
||||||
|
|
||||||
|
switch (adapter) {
|
||||||
|
case SocketIoAdapter.Postgres: {
|
||||||
|
logger.log('Using Postgres Socket.IO adapter');
|
||||||
|
const { createAdapter } = await import('@socket.io/postgres-adapter');
|
||||||
|
const config = asPostgresConnectionConfig(database.config);
|
||||||
|
const pool = new Pool({
|
||||||
|
host: config.host,
|
||||||
|
port: config.port,
|
||||||
|
user: config.username,
|
||||||
|
password: config.password,
|
||||||
|
database: config.database,
|
||||||
|
ssl: asPgPoolSsl(config.ssl),
|
||||||
|
max: 2,
|
||||||
|
});
|
||||||
|
|
||||||
|
await pool.query(`
|
||||||
|
CREATE TABLE IF NOT EXISTS socket_io_attachments (
|
||||||
|
id bigserial UNIQUE,
|
||||||
|
created_at timestamptz DEFAULT NOW(),
|
||||||
|
payload bytea
|
||||||
|
);
|
||||||
|
`);
|
||||||
|
|
||||||
|
pool.on('error', (error) => {
|
||||||
|
logger.error(' Postgres pool error', error);
|
||||||
|
});
|
||||||
|
|
||||||
|
const adapterConstructor = createAdapter(pool);
|
||||||
|
return new PostgresSocketAdapter(app, adapterConstructor);
|
||||||
|
}
|
||||||
|
|
||||||
|
case SocketIoAdapter.BroadcastChannel: {
|
||||||
|
logger.log('Using BroadcastChannel Socket.IO adapter');
|
||||||
|
return new BroadcastChannelSocketAdapter(app);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,7 +1,4 @@
|
|||||||
import { Injectable } from '@nestjs/common';
|
import { Injectable } from '@nestjs/common';
|
||||||
import { createAdapter } from '@socket.io/redis-adapter';
|
|
||||||
import Redis from 'ioredis';
|
|
||||||
import { Server as SocketIO } from 'socket.io';
|
|
||||||
import { ExitCode } from 'src/enum';
|
import { ExitCode } from 'src/enum';
|
||||||
import { ConfigRepository } from 'src/repositories/config.repository';
|
import { ConfigRepository } from 'src/repositories/config.repository';
|
||||||
import { AppRestartEvent } from 'src/repositories/event.repository';
|
import { AppRestartEvent } from 'src/repositories/event.repository';
|
||||||
@@ -24,24 +21,17 @@ export class AppRepository {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async sendOneShotAppRestart(state: AppRestartEvent): Promise<void> {
|
async sendOneShotAppRestart(state: AppRestartEvent): Promise<void> {
|
||||||
const server = new SocketIO();
|
const { port } = new ConfigRepository().getEnv();
|
||||||
const { redis } = new ConfigRepository().getEnv();
|
const url = `http://127.0.0.1:${port}/api/internal/restart`;
|
||||||
const pubClient = new Redis({ ...redis, lazyConnect: true });
|
|
||||||
const subClient = pubClient.duplicate();
|
|
||||||
|
|
||||||
await Promise.all([pubClient.connect(), subClient.connect()]);
|
const response = await fetch(url, {
|
||||||
|
method: 'POST',
|
||||||
server.adapter(createAdapter(pubClient, subClient));
|
headers: { 'Content-Type': 'application/json' },
|
||||||
|
body: JSON.stringify(state),
|
||||||
// => corresponds to notification.service.ts#onAppRestart
|
|
||||||
server.emit('AppRestartV1', state, async () => {
|
|
||||||
const responses = await server.serverSideEmitWithAck('AppRestart', state);
|
|
||||||
if (responses.some((response) => response !== 'ok')) {
|
|
||||||
throw new Error("One or more node(s) returned a non-'ok' response to our restart request!");
|
|
||||||
}
|
|
||||||
|
|
||||||
pubClient.disconnect();
|
|
||||||
subClient.disconnect();
|
|
||||||
});
|
});
|
||||||
|
|
||||||
|
if (!response.ok) {
|
||||||
|
throw new Error(`Failed to trigger app restart: ${response.status} ${response.statusText}`);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ import {
|
|||||||
LogFormat,
|
LogFormat,
|
||||||
LogLevel,
|
LogLevel,
|
||||||
QueueName,
|
QueueName,
|
||||||
|
SocketIoAdapter,
|
||||||
} from 'src/enum';
|
} from 'src/enum';
|
||||||
import { DatabaseConnectionParams, VectorExtension } from 'src/types';
|
import { DatabaseConnectionParams, VectorExtension } from 'src/types';
|
||||||
import { setDifference } from 'src/utils/set';
|
import { setDifference } from 'src/utils/set';
|
||||||
@@ -116,6 +117,10 @@ export interface EnvData {
|
|||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
|
socketIo: {
|
||||||
|
adapter: SocketIoAdapter;
|
||||||
|
};
|
||||||
|
|
||||||
noColor: boolean;
|
noColor: boolean;
|
||||||
nodeVersion?: string;
|
nodeVersion?: string;
|
||||||
}
|
}
|
||||||
@@ -346,6 +351,10 @@ const getEnv = (): EnvData => {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
socketIo: {
|
||||||
|
adapter: dto.IMMICH_SOCKETIO_ADAPTER ?? SocketIoAdapter.Postgres,
|
||||||
|
},
|
||||||
|
|
||||||
noColor: !!dto.NO_COLOR,
|
noColor: !!dto.NO_COLOR,
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -1,60 +1,11 @@
|
|||||||
import { createAdapter } from '@socket.io/redis-adapter';
|
|
||||||
import Redis from 'ioredis';
|
|
||||||
import { SignJWT } from 'jose';
|
import { SignJWT } from 'jose';
|
||||||
import { randomBytes } from 'node:crypto';
|
import { randomBytes } from 'node:crypto';
|
||||||
import { join } from 'node:path';
|
import { join } from 'node:path';
|
||||||
import { Server as SocketIO } from 'socket.io';
|
|
||||||
import { StorageCore } from 'src/cores/storage.core';
|
import { StorageCore } from 'src/cores/storage.core';
|
||||||
import { MaintenanceAuthDto, MaintenanceDetectInstallResponseDto } from 'src/dtos/maintenance.dto';
|
import { MaintenanceAuthDto, MaintenanceDetectInstallResponseDto } from 'src/dtos/maintenance.dto';
|
||||||
import { StorageFolder } from 'src/enum';
|
import { StorageFolder } from 'src/enum';
|
||||||
import { ConfigRepository } from 'src/repositories/config.repository';
|
|
||||||
import { AppRestartEvent } from 'src/repositories/event.repository';
|
|
||||||
import { StorageRepository } from 'src/repositories/storage.repository';
|
import { StorageRepository } from 'src/repositories/storage.repository';
|
||||||
|
|
||||||
export function sendOneShotAppRestart(state: AppRestartEvent): void {
|
|
||||||
const server = new SocketIO();
|
|
||||||
const { redis } = new ConfigRepository().getEnv();
|
|
||||||
const pubClient = new Redis(redis);
|
|
||||||
const subClient = pubClient.duplicate();
|
|
||||||
server.adapter(createAdapter(pubClient, subClient));
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Keep trying until we manage to stop Immich
|
|
||||||
*
|
|
||||||
* Sometimes there appear to be communication
|
|
||||||
* issues between to the other servers.
|
|
||||||
*
|
|
||||||
* This issue only occurs with this method.
|
|
||||||
*/
|
|
||||||
async function tryTerminate() {
|
|
||||||
while (true) {
|
|
||||||
try {
|
|
||||||
const responses = await server.serverSideEmitWithAck('AppRestart', state);
|
|
||||||
if (responses.length > 0) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
} catch (error) {
|
|
||||||
console.error(error);
|
|
||||||
console.error('Encountered an error while telling Immich to stop.');
|
|
||||||
}
|
|
||||||
|
|
||||||
console.info(
|
|
||||||
"\nIt doesn't appear that Immich stopped, trying again in a moment.\nIf Immich is already not running, you can ignore this error.",
|
|
||||||
);
|
|
||||||
|
|
||||||
await new Promise((r) => setTimeout(r, 1e3));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// => corresponds to notification.service.ts#onAppRestart
|
|
||||||
server.emit('AppRestartV1', state, () => {
|
|
||||||
void tryTerminate().finally(() => {
|
|
||||||
pubClient.disconnect();
|
|
||||||
subClient.disconnect();
|
|
||||||
});
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
export async function createMaintenanceLoginUrl(
|
export async function createMaintenanceLoginUrl(
|
||||||
baseUrl: string,
|
baseUrl: string,
|
||||||
auth: MaintenanceAuthDto,
|
auth: MaintenanceAuthDto,
|
||||||
|
|||||||
@@ -1,14 +1,21 @@
|
|||||||
import { NestFactory } from '@nestjs/core';
|
import { NestFactory } from '@nestjs/core';
|
||||||
import { NestExpressApplication } from '@nestjs/platform-express';
|
import { NestExpressApplication } from '@nestjs/platform-express';
|
||||||
|
import inspector from 'node:inspector';
|
||||||
|
import { isMainThread, workerData } from 'node:worker_threads';
|
||||||
import { configureExpress, configureTelemetry } from 'src/app.common';
|
import { configureExpress, configureTelemetry } from 'src/app.common';
|
||||||
import { ApiModule } from 'src/app.module';
|
import { ApiModule } from 'src/app.module';
|
||||||
import { AppRepository } from 'src/repositories/app.repository';
|
import { AppRepository } from 'src/repositories/app.repository';
|
||||||
import { ApiService } from 'src/services/api.service';
|
import { ApiService } from 'src/services/api.service';
|
||||||
import { isStartUpError } from 'src/utils/misc';
|
import { isStartUpError } from 'src/utils/misc';
|
||||||
|
|
||||||
async function bootstrap() {
|
export async function bootstrap() {
|
||||||
process.title = 'immich-api';
|
process.title = 'immich-api';
|
||||||
|
|
||||||
|
const { inspectorPort } = workerData ?? {};
|
||||||
|
if (inspectorPort) {
|
||||||
|
inspector.open(inspectorPort, '0.0.0.0', false);
|
||||||
|
}
|
||||||
|
|
||||||
configureTelemetry();
|
configureTelemetry();
|
||||||
|
|
||||||
const app = await NestFactory.create<NestExpressApplication>(ApiModule, { bufferLogs: true });
|
const app = await NestFactory.create<NestExpressApplication>(ApiModule, { bufferLogs: true });
|
||||||
@@ -19,10 +26,12 @@ async function bootstrap() {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
bootstrap().catch((error) => {
|
if (!isMainThread || process.send) {
|
||||||
if (!isStartUpError(error)) {
|
bootstrap().catch((error) => {
|
||||||
console.error(error);
|
if (!isStartUpError(error)) {
|
||||||
}
|
console.error(error);
|
||||||
// eslint-disable-next-line unicorn/no-process-exit
|
}
|
||||||
process.exit(1);
|
|
||||||
});
|
process.exit(1);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,13 +1,22 @@
|
|||||||
import { NestFactory } from '@nestjs/core';
|
import { NestFactory } from '@nestjs/core';
|
||||||
import { NestExpressApplication } from '@nestjs/platform-express';
|
import { NestExpressApplication } from '@nestjs/platform-express';
|
||||||
|
import inspector from 'node:inspector';
|
||||||
|
import { isMainThread, workerData } from 'node:worker_threads';
|
||||||
import { configureExpress, configureTelemetry } from 'src/app.common';
|
import { configureExpress, configureTelemetry } from 'src/app.common';
|
||||||
import { MaintenanceModule } from 'src/app.module';
|
import { MaintenanceModule } from 'src/app.module';
|
||||||
|
import { SocketIoAdapter } from 'src/enum';
|
||||||
import { MaintenanceWorkerService } from 'src/maintenance/maintenance-worker.service';
|
import { MaintenanceWorkerService } from 'src/maintenance/maintenance-worker.service';
|
||||||
import { AppRepository } from 'src/repositories/app.repository';
|
import { AppRepository } from 'src/repositories/app.repository';
|
||||||
import { isStartUpError } from 'src/utils/misc';
|
import { isStartUpError } from 'src/utils/misc';
|
||||||
|
|
||||||
async function bootstrap() {
|
export async function bootstrap() {
|
||||||
process.title = 'immich-maintenance';
|
process.title = 'immich-maintenance';
|
||||||
|
|
||||||
|
const { inspectorPort } = workerData ?? {};
|
||||||
|
if (inspectorPort) {
|
||||||
|
inspector.open(inspectorPort, '0.0.0.0', false);
|
||||||
|
}
|
||||||
|
|
||||||
configureTelemetry();
|
configureTelemetry();
|
||||||
|
|
||||||
const app = await NestFactory.create<NestExpressApplication>(MaintenanceModule, { bufferLogs: true });
|
const app = await NestFactory.create<NestExpressApplication>(MaintenanceModule, { bufferLogs: true });
|
||||||
@@ -16,13 +25,18 @@ async function bootstrap() {
|
|||||||
void configureExpress(app, {
|
void configureExpress(app, {
|
||||||
permitSwaggerWrite: false,
|
permitSwaggerWrite: false,
|
||||||
ssr: MaintenanceWorkerService,
|
ssr: MaintenanceWorkerService,
|
||||||
|
// Use BroadcastChannel instead of Postgres adapter to avoid crash when
|
||||||
|
// pg_terminate_backend() kills all database connections during restore
|
||||||
|
socketIoAdapter: SocketIoAdapter.BroadcastChannel,
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
bootstrap().catch((error) => {
|
if (!isMainThread) {
|
||||||
if (!isStartUpError(error)) {
|
bootstrap().catch((error) => {
|
||||||
console.error(error);
|
if (!isStartUpError(error)) {
|
||||||
}
|
console.error(error);
|
||||||
// eslint-disable-next-line unicorn/no-process-exit
|
}
|
||||||
process.exit(1);
|
|
||||||
});
|
process.exit(1);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,8 +1,9 @@
|
|||||||
import { NestFactory } from '@nestjs/core';
|
import { NestFactory } from '@nestjs/core';
|
||||||
import { isMainThread } from 'node:worker_threads';
|
import inspector from 'node:inspector';
|
||||||
|
import { isMainThread, workerData } from 'node:worker_threads';
|
||||||
import { MicroservicesModule } from 'src/app.module';
|
import { MicroservicesModule } from 'src/app.module';
|
||||||
import { serverVersion } from 'src/constants';
|
import { serverVersion } from 'src/constants';
|
||||||
import { WebSocketAdapter } from 'src/middleware/websocket.adapter';
|
import { createWebSocketAdapter } from 'src/middleware/websocket.adapter';
|
||||||
import { AppRepository } from 'src/repositories/app.repository';
|
import { AppRepository } from 'src/repositories/app.repository';
|
||||||
import { ConfigRepository } from 'src/repositories/config.repository';
|
import { ConfigRepository } from 'src/repositories/config.repository';
|
||||||
import { LoggingRepository } from 'src/repositories/logging.repository';
|
import { LoggingRepository } from 'src/repositories/logging.repository';
|
||||||
@@ -10,6 +11,11 @@ import { bootstrapTelemetry } from 'src/repositories/telemetry.repository';
|
|||||||
import { isStartUpError } from 'src/utils/misc';
|
import { isStartUpError } from 'src/utils/misc';
|
||||||
|
|
||||||
export async function bootstrap() {
|
export async function bootstrap() {
|
||||||
|
const { inspectorPort } = workerData ?? {};
|
||||||
|
if (inspectorPort) {
|
||||||
|
inspector.open(inspectorPort, '0.0.0.0', false);
|
||||||
|
}
|
||||||
|
|
||||||
const { telemetry } = new ConfigRepository().getEnv();
|
const { telemetry } = new ConfigRepository().getEnv();
|
||||||
if (telemetry.metrics.size > 0) {
|
if (telemetry.metrics.size > 0) {
|
||||||
bootstrapTelemetry(telemetry.microservicesPort);
|
bootstrapTelemetry(telemetry.microservicesPort);
|
||||||
@@ -24,7 +30,7 @@ export async function bootstrap() {
|
|||||||
|
|
||||||
logger.setContext('Bootstrap');
|
logger.setContext('Bootstrap');
|
||||||
app.useLogger(logger);
|
app.useLogger(logger);
|
||||||
app.useWebSocketAdapter(new WebSocketAdapter(app));
|
app.useWebSocketAdapter(await createWebSocketAdapter(app));
|
||||||
|
|
||||||
await (host ? app.listen(0, host) : app.listen(0));
|
await (host ? app.listen(0, host) : app.listen(0));
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,276 @@
|
|||||||
|
import { ClusterMessage, ClusterResponse } from 'socket.io-adapter';
|
||||||
|
import { createBroadcastChannelAdapter } from 'src/middleware/broadcast-channel.adapter';
|
||||||
|
import { vi } from 'vitest';
|
||||||
|
|
||||||
|
const createMockNamespace = () => ({
|
||||||
|
name: '/',
|
||||||
|
sockets: new Map(),
|
||||||
|
adapter: null,
|
||||||
|
server: {
|
||||||
|
encoder: {
|
||||||
|
encode: vi.fn().mockReturnValue([]),
|
||||||
|
},
|
||||||
|
_opts: {},
|
||||||
|
sockets: {
|
||||||
|
sockets: new Map(),
|
||||||
|
},
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('BroadcastChannelAdapter', () => {
|
||||||
|
describe('createBroadcastChannelAdapter', () => {
|
||||||
|
it('should return a factory function', () => {
|
||||||
|
const factory = createBroadcastChannelAdapter();
|
||||||
|
expect(typeof factory).toBe('function');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should create adapter instance when factory is called', () => {
|
||||||
|
const mockNamespace = createMockNamespace();
|
||||||
|
const factory = createBroadcastChannelAdapter();
|
||||||
|
const adapter = factory(mockNamespace);
|
||||||
|
|
||||||
|
expect(adapter).toBeDefined();
|
||||||
|
expect(adapter.doPublish).toBeDefined();
|
||||||
|
expect(adapter.doPublishResponse).toBeDefined();
|
||||||
|
|
||||||
|
adapter.close();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('BroadcastChannelAdapter message passing', () => {
|
||||||
|
it('should actually send and receive messages between two adapters', async () => {
|
||||||
|
const factory1 = createBroadcastChannelAdapter();
|
||||||
|
const factory2 = createBroadcastChannelAdapter();
|
||||||
|
|
||||||
|
const namespace1 = createMockNamespace();
|
||||||
|
const namespace2 = createMockNamespace();
|
||||||
|
|
||||||
|
const adapter1 = factory1(namespace1);
|
||||||
|
const adapter2 = factory2(namespace2);
|
||||||
|
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||||
|
|
||||||
|
const receivedMessages: ClusterMessage[] = [];
|
||||||
|
const messageReceived = new Promise<void>((resolve) => {
|
||||||
|
const originalOnMessage = adapter2.onMessage.bind(adapter2);
|
||||||
|
adapter2.onMessage = (message: ClusterMessage) => {
|
||||||
|
receivedMessages.push(message);
|
||||||
|
resolve();
|
||||||
|
return originalOnMessage(message);
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
const testMessage = {
|
||||||
|
type: 2,
|
||||||
|
data: {
|
||||||
|
opts: { rooms: new Set(['room1']) },
|
||||||
|
rooms: ['room1'],
|
||||||
|
},
|
||||||
|
nsp: '/',
|
||||||
|
};
|
||||||
|
|
||||||
|
void adapter1.doPublish(testMessage as any);
|
||||||
|
|
||||||
|
await Promise.race([messageReceived, new Promise((resolve) => setTimeout(resolve, 500))]);
|
||||||
|
|
||||||
|
expect(receivedMessages.length).toBeGreaterThan(0);
|
||||||
|
|
||||||
|
adapter1.close();
|
||||||
|
adapter2.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should send ConfigUpdate-style event and receive it on another adapter', async () => {
|
||||||
|
const factory1 = createBroadcastChannelAdapter();
|
||||||
|
const factory2 = createBroadcastChannelAdapter();
|
||||||
|
|
||||||
|
const namespace1 = createMockNamespace();
|
||||||
|
const namespace2 = createMockNamespace();
|
||||||
|
|
||||||
|
const adapter1 = factory1(namespace1);
|
||||||
|
const adapter2 = factory2(namespace2);
|
||||||
|
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||||
|
|
||||||
|
const receivedMessages: ClusterMessage[] = [];
|
||||||
|
const messageReceived = new Promise<void>((resolve) => {
|
||||||
|
const originalOnMessage = adapter2.onMessage.bind(adapter2);
|
||||||
|
adapter2.onMessage = (message: ClusterMessage) => {
|
||||||
|
receivedMessages.push(message);
|
||||||
|
if ((message as any)?.data?.event === 'ConfigUpdate') {
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
return originalOnMessage(message);
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
const configUpdateMessage = {
|
||||||
|
type: 2,
|
||||||
|
data: {
|
||||||
|
event: 'ConfigUpdate',
|
||||||
|
payload: { newConfig: { ffmpeg: { crf: 23 } }, oldConfig: { ffmpeg: { crf: 20 } } },
|
||||||
|
opts: { rooms: new Set() },
|
||||||
|
rooms: [],
|
||||||
|
},
|
||||||
|
nsp: '/',
|
||||||
|
};
|
||||||
|
|
||||||
|
void adapter1.doPublish(configUpdateMessage as any);
|
||||||
|
|
||||||
|
await Promise.race([messageReceived, new Promise((resolve) => setTimeout(resolve, 500))]);
|
||||||
|
|
||||||
|
const configMessages = receivedMessages.filter((m) => (m as any)?.data?.event === 'ConfigUpdate');
|
||||||
|
expect(configMessages.length).toBeGreaterThan(0);
|
||||||
|
expect((configMessages[0] as any).data.payload.newConfig.ffmpeg.crf).toBe(23);
|
||||||
|
|
||||||
|
adapter1.close();
|
||||||
|
adapter2.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should send AppRestart-style event and receive it on another adapter', async () => {
|
||||||
|
const factory1 = createBroadcastChannelAdapter();
|
||||||
|
const factory2 = createBroadcastChannelAdapter();
|
||||||
|
|
||||||
|
const namespace1 = createMockNamespace();
|
||||||
|
const namespace2 = createMockNamespace();
|
||||||
|
|
||||||
|
const adapter1 = factory1(namespace1);
|
||||||
|
const adapter2 = factory2(namespace2);
|
||||||
|
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||||
|
|
||||||
|
const receivedMessages: ClusterMessage[] = [];
|
||||||
|
const messageReceived = new Promise<void>((resolve) => {
|
||||||
|
const originalOnMessage = adapter2.onMessage.bind(adapter2);
|
||||||
|
adapter2.onMessage = (message: ClusterMessage) => {
|
||||||
|
receivedMessages.push(message);
|
||||||
|
if ((message as any)?.data?.event === 'AppRestart') {
|
||||||
|
resolve();
|
||||||
|
}
|
||||||
|
return originalOnMessage(message);
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
const appRestartMessage = {
|
||||||
|
type: 2,
|
||||||
|
data: {
|
||||||
|
event: 'AppRestart',
|
||||||
|
payload: { isMaintenanceMode: true },
|
||||||
|
opts: { rooms: new Set() },
|
||||||
|
rooms: [],
|
||||||
|
},
|
||||||
|
nsp: '/',
|
||||||
|
};
|
||||||
|
|
||||||
|
void adapter1.doPublish(appRestartMessage as any);
|
||||||
|
|
||||||
|
await Promise.race([messageReceived, new Promise((resolve) => setTimeout(resolve, 500))]);
|
||||||
|
|
||||||
|
const restartMessages = receivedMessages.filter((m) => (m as any)?.data?.event === 'AppRestart');
|
||||||
|
expect(restartMessages.length).toBeGreaterThan(0);
|
||||||
|
expect((restartMessages[0] as any).data.payload.isMaintenanceMode).toBe(true);
|
||||||
|
|
||||||
|
adapter1.close();
|
||||||
|
adapter2.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should not receive its own messages (echo prevention)', async () => {
|
||||||
|
const factory = createBroadcastChannelAdapter();
|
||||||
|
const namespace = createMockNamespace();
|
||||||
|
const adapter = factory(namespace);
|
||||||
|
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||||
|
|
||||||
|
const receivedOwnMessages: ClusterMessage[] = [];
|
||||||
|
const uniqueMarker = `test-${Date.now()}-${Math.random()}`;
|
||||||
|
|
||||||
|
const originalOnMessage = adapter.onMessage.bind(adapter);
|
||||||
|
adapter.onMessage = (message: ClusterMessage) => {
|
||||||
|
if ((message as any)?.data?.marker === uniqueMarker) {
|
||||||
|
receivedOwnMessages.push(message);
|
||||||
|
}
|
||||||
|
return originalOnMessage(message);
|
||||||
|
};
|
||||||
|
|
||||||
|
const testMessage = {
|
||||||
|
type: 2,
|
||||||
|
data: {
|
||||||
|
marker: uniqueMarker,
|
||||||
|
opts: { rooms: new Set() },
|
||||||
|
rooms: [],
|
||||||
|
},
|
||||||
|
nsp: '/',
|
||||||
|
};
|
||||||
|
|
||||||
|
void adapter.doPublish(testMessage as any);
|
||||||
|
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 200));
|
||||||
|
|
||||||
|
expect(receivedOwnMessages.length).toBe(0);
|
||||||
|
|
||||||
|
adapter.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should send and receive response messages between adapters', async () => {
|
||||||
|
const factory1 = createBroadcastChannelAdapter();
|
||||||
|
const factory2 = createBroadcastChannelAdapter();
|
||||||
|
|
||||||
|
const namespace1 = createMockNamespace();
|
||||||
|
const namespace2 = createMockNamespace();
|
||||||
|
|
||||||
|
const adapter1 = factory1(namespace1);
|
||||||
|
const adapter2 = factory2(namespace2);
|
||||||
|
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||||
|
|
||||||
|
const receivedResponses: ClusterResponse[] = [];
|
||||||
|
const responseReceived = new Promise<void>((resolve) => {
|
||||||
|
const originalOnResponse = adapter1.onResponse.bind(adapter1);
|
||||||
|
adapter1.onResponse = (response: ClusterResponse) => {
|
||||||
|
receivedResponses.push(response);
|
||||||
|
resolve();
|
||||||
|
return originalOnResponse(response);
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
const responseMessage = {
|
||||||
|
type: 3,
|
||||||
|
data: { result: 'success', count: 42 },
|
||||||
|
};
|
||||||
|
|
||||||
|
void adapter2.doPublishResponse((adapter1 as any).uid, responseMessage as any);
|
||||||
|
|
||||||
|
await Promise.race([responseReceived, new Promise((resolve) => setTimeout(resolve, 500))]);
|
||||||
|
|
||||||
|
expect(receivedResponses.length).toBeGreaterThan(0);
|
||||||
|
|
||||||
|
adapter1.close();
|
||||||
|
adapter2.close();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('BroadcastChannelAdapter lifecycle', () => {
|
||||||
|
it('should close cleanly without errors', () => {
|
||||||
|
const factory = createBroadcastChannelAdapter();
|
||||||
|
const namespace = createMockNamespace();
|
||||||
|
const adapter = factory(namespace);
|
||||||
|
|
||||||
|
expect(() => adapter.close()).not.toThrow();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should handle multiple adapters closing in sequence', () => {
|
||||||
|
const factory1 = createBroadcastChannelAdapter();
|
||||||
|
const factory2 = createBroadcastChannelAdapter();
|
||||||
|
const factory3 = createBroadcastChannelAdapter();
|
||||||
|
|
||||||
|
const adapter1 = factory1(createMockNamespace());
|
||||||
|
const adapter2 = factory2(createMockNamespace());
|
||||||
|
const adapter3 = factory3(createMockNamespace());
|
||||||
|
|
||||||
|
expect(() => {
|
||||||
|
adapter1.close();
|
||||||
|
adapter2.close();
|
||||||
|
adapter3.close();
|
||||||
|
}).not.toThrow();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,159 @@
|
|||||||
|
import { Server } from 'socket.io';
|
||||||
|
import { createBroadcastChannelAdapter } from 'src/middleware/broadcast-channel.adapter';
|
||||||
|
import { EventRepository } from 'src/repositories/event.repository';
|
||||||
|
import { LoggingRepository } from 'src/repositories/logging.repository';
|
||||||
|
import { WebsocketRepository } from 'src/repositories/websocket.repository';
|
||||||
|
import { automock } from 'test/utils';
|
||||||
|
import { vi } from 'vitest';
|
||||||
|
|
||||||
|
describe('WebSocket Integration - serverSend with adapters', () => {
|
||||||
|
describe('BroadcastChannel adapter', () => {
|
||||||
|
it('should broadcast ConfigUpdate event through BroadcastChannel adapter', async () => {
|
||||||
|
const createMockNamespace = () => ({
|
||||||
|
name: '/',
|
||||||
|
sockets: new Map(),
|
||||||
|
adapter: null,
|
||||||
|
server: {
|
||||||
|
encoder: { encode: vi.fn().mockReturnValue([]) },
|
||||||
|
_opts: {},
|
||||||
|
sockets: { sockets: new Map() },
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const factory1 = createBroadcastChannelAdapter();
|
||||||
|
const factory2 = createBroadcastChannelAdapter();
|
||||||
|
|
||||||
|
const namespace1 = createMockNamespace();
|
||||||
|
const namespace2 = createMockNamespace();
|
||||||
|
|
||||||
|
const adapter1 = factory1(namespace1);
|
||||||
|
const adapter2 = factory2(namespace2);
|
||||||
|
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||||
|
|
||||||
|
const receivedMessages: any[] = [];
|
||||||
|
vi.spyOn(adapter2, 'onMessage').mockImplementation((message: any) => {
|
||||||
|
receivedMessages.push(message);
|
||||||
|
});
|
||||||
|
|
||||||
|
const configUpdatePayload = {
|
||||||
|
type: 5,
|
||||||
|
data: {
|
||||||
|
event: 'ConfigUpdate',
|
||||||
|
args: [{ newConfig: { ffmpeg: { crf: 23 } }, oldConfig: { ffmpeg: { crf: 20 } } }],
|
||||||
|
},
|
||||||
|
nsp: '/',
|
||||||
|
};
|
||||||
|
|
||||||
|
void adapter1.doPublish(configUpdatePayload as any);
|
||||||
|
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||||
|
|
||||||
|
const configMessages = receivedMessages.filter((m) => m?.data?.event === 'ConfigUpdate');
|
||||||
|
expect(configMessages.length).toBeGreaterThan(0);
|
||||||
|
|
||||||
|
adapter1.close();
|
||||||
|
adapter2.close();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should broadcast AppRestart event through BroadcastChannel adapter', async () => {
|
||||||
|
const createMockNamespace = () => ({
|
||||||
|
name: '/',
|
||||||
|
sockets: new Map(),
|
||||||
|
adapter: null,
|
||||||
|
server: {
|
||||||
|
encoder: { encode: vi.fn().mockReturnValue([]) },
|
||||||
|
_opts: {},
|
||||||
|
sockets: { sockets: new Map() },
|
||||||
|
},
|
||||||
|
});
|
||||||
|
|
||||||
|
const factory1 = createBroadcastChannelAdapter();
|
||||||
|
const factory2 = createBroadcastChannelAdapter();
|
||||||
|
|
||||||
|
const namespace1 = createMockNamespace();
|
||||||
|
const namespace2 = createMockNamespace();
|
||||||
|
|
||||||
|
const adapter1 = factory1(namespace1);
|
||||||
|
const adapter2 = factory2(namespace2);
|
||||||
|
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||||
|
|
||||||
|
const receivedMessages: any[] = [];
|
||||||
|
vi.spyOn(adapter2, 'onMessage').mockImplementation((message: any) => {
|
||||||
|
receivedMessages.push(message);
|
||||||
|
});
|
||||||
|
|
||||||
|
const appRestartPayload = {
|
||||||
|
type: 5,
|
||||||
|
data: {
|
||||||
|
event: 'AppRestart',
|
||||||
|
args: [{ isMaintenanceMode: true }],
|
||||||
|
},
|
||||||
|
nsp: '/',
|
||||||
|
};
|
||||||
|
|
||||||
|
void adapter1.doPublish(appRestartPayload as any);
|
||||||
|
|
||||||
|
await new Promise((resolve) => setTimeout(resolve, 100));
|
||||||
|
|
||||||
|
const restartMessages = receivedMessages.filter((m) => m?.data?.event === 'AppRestart');
|
||||||
|
expect(restartMessages.length).toBeGreaterThan(0);
|
||||||
|
|
||||||
|
adapter1.close();
|
||||||
|
adapter2.close();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('WebsocketRepository with adapter', () => {
|
||||||
|
it('should call serverSideEmit when serverSend is called', () => {
|
||||||
|
const mockServer = {
|
||||||
|
serverSideEmit: vi.fn(),
|
||||||
|
on: vi.fn(),
|
||||||
|
} as unknown as Server;
|
||||||
|
|
||||||
|
const eventRepository = automock(EventRepository, {
|
||||||
|
args: [undefined, undefined, { setContext: () => {} }],
|
||||||
|
});
|
||||||
|
const loggingRepository = automock(LoggingRepository, {
|
||||||
|
args: [undefined, { getEnv: () => ({ noColor: false }) }],
|
||||||
|
strict: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
const websocketRepository = new WebsocketRepository(eventRepository, loggingRepository);
|
||||||
|
(websocketRepository as any).server = mockServer;
|
||||||
|
|
||||||
|
websocketRepository.serverSend('ConfigUpdate', {
|
||||||
|
newConfig: { ffmpeg: { crf: 23 } } as any,
|
||||||
|
oldConfig: { ffmpeg: { crf: 20 } } as any,
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(mockServer.serverSideEmit).toHaveBeenCalledWith('ConfigUpdate', {
|
||||||
|
newConfig: { ffmpeg: { crf: 23 } },
|
||||||
|
oldConfig: { ffmpeg: { crf: 20 } },
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should call serverSideEmit for AppRestart event', () => {
|
||||||
|
const mockServer = {
|
||||||
|
serverSideEmit: vi.fn(),
|
||||||
|
on: vi.fn(),
|
||||||
|
} as unknown as Server;
|
||||||
|
|
||||||
|
const eventRepository = automock(EventRepository, {
|
||||||
|
args: [undefined, undefined, { setContext: () => {} }],
|
||||||
|
});
|
||||||
|
const loggingRepository = automock(LoggingRepository, {
|
||||||
|
args: [undefined, { getEnv: () => ({ noColor: false }) }],
|
||||||
|
strict: false,
|
||||||
|
});
|
||||||
|
|
||||||
|
const websocketRepository = new WebsocketRepository(eventRepository, loggingRepository);
|
||||||
|
(websocketRepository as any).server = mockServer;
|
||||||
|
|
||||||
|
websocketRepository.serverSend('AppRestart', { isMaintenanceMode: true });
|
||||||
|
|
||||||
|
expect(mockServer.serverSideEmit).toHaveBeenCalledWith('AppRestart', { isMaintenanceMode: true });
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -0,0 +1,70 @@
|
|||||||
|
import { INestApplication } from '@nestjs/common';
|
||||||
|
import { IoAdapter } from '@nestjs/platform-socket.io';
|
||||||
|
import { SocketIoAdapter } from 'src/enum';
|
||||||
|
import { asPgPoolSsl, createWebSocketAdapter } from 'src/middleware/websocket.adapter';
|
||||||
|
import { Mocked, vi } from 'vitest';
|
||||||
|
|
||||||
|
describe('asPgPoolSsl', () => {
|
||||||
|
it('should return false for undefined ssl', () => {
|
||||||
|
expect(asPgPoolSsl()).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return false for ssl = false', () => {
|
||||||
|
expect(asPgPoolSsl(false)).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return false for ssl = "allow"', () => {
|
||||||
|
expect(asPgPoolSsl('allow')).toBe(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return { rejectUnauthorized: false } for ssl = true', () => {
|
||||||
|
expect(asPgPoolSsl(true)).toEqual({ rejectUnauthorized: false });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return { rejectUnauthorized: false } for ssl = "prefer"', () => {
|
||||||
|
expect(asPgPoolSsl('prefer')).toEqual({ rejectUnauthorized: false });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return { rejectUnauthorized: false } for ssl = "require"', () => {
|
||||||
|
expect(asPgPoolSsl('require')).toEqual({ rejectUnauthorized: false });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should return { rejectUnauthorized: true } for ssl = "verify-full"', () => {
|
||||||
|
expect(asPgPoolSsl('verify-full')).toEqual({ rejectUnauthorized: true });
|
||||||
|
});
|
||||||
|
|
||||||
|
it('should pass through object ssl config unchanged', () => {
|
||||||
|
const sslConfig = { ca: 'certificate', rejectUnauthorized: true };
|
||||||
|
expect(asPgPoolSsl(sslConfig)).toBe(sslConfig);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('createWebSocketAdapter', () => {
|
||||||
|
let mockApp: Mocked<INestApplication>;
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
vi.clearAllMocks();
|
||||||
|
|
||||||
|
mockApp = {
|
||||||
|
getHttpServer: vi.fn().mockReturnValue({}),
|
||||||
|
} as unknown as Mocked<INestApplication>;
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('BroadcastChannel adapter', () => {
|
||||||
|
it('should create BroadcastChannel adapter when configured', async () => {
|
||||||
|
const adapter = await createWebSocketAdapter(mockApp, SocketIoAdapter.BroadcastChannel);
|
||||||
|
|
||||||
|
expect(adapter).toBeDefined();
|
||||||
|
expect(adapter).toBeInstanceOf(IoAdapter);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
describe('Postgres adapter', () => {
|
||||||
|
it('should create Postgres adapter when configured', async () => {
|
||||||
|
const adapter = await createWebSocketAdapter(mockApp, SocketIoAdapter.Postgres);
|
||||||
|
|
||||||
|
expect(adapter).toBeDefined();
|
||||||
|
expect(adapter).toBeInstanceOf(IoAdapter);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
import { DatabaseExtension, ImmichEnvironment, ImmichWorker, LogFormat } from 'src/enum';
|
import { DatabaseExtension, ImmichEnvironment, ImmichWorker, LogFormat, SocketIoAdapter } from 'src/enum';
|
||||||
import { ConfigRepository, EnvData } from 'src/repositories/config.repository';
|
import { ConfigRepository, EnvData } from 'src/repositories/config.repository';
|
||||||
import { RepositoryInterface } from 'src/types';
|
import { RepositoryInterface } from 'src/types';
|
||||||
import { Mocked, vitest } from 'vitest';
|
import { Mocked, vitest } from 'vitest';
|
||||||
@@ -99,6 +99,10 @@ const envData: EnvData = {
|
|||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
|
||||||
|
socketIo: {
|
||||||
|
adapter: SocketIoAdapter.Postgres,
|
||||||
|
},
|
||||||
|
|
||||||
noColor: false,
|
noColor: false,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user