mirror of
https://github.com/immich-app/immich.git
synced 2025-12-27 01:11:42 +03:00
feat: workflow foundation (#23621)
* feat: plugins * feat: table definition * feat: type and migration * feat: add repositories * feat: validate manifest with class-validator and load manifest info to database * feat: workflow/plugin controller/service layer * feat: implement workflow logic * feat: make trigger static * feat: dynamical instantiate plugin instances * fix: access control and helper script * feat: it works * chore: simplify * refactor: refactor and use queue for workflow execution * refactor: remove unsused property in plugin-schema * build wasm in prod * feat: plugin loader in transaction * fix: docker build arm64 * generated files * shell check * fix tests * fix: waiting for migration to finish before loading plugin * remove context reassignment * feat: use mise to manage extism tools (#23760) * pr feedback * refactor: create workflow now including create filters and actions * feat: workflow medium tests * fix: broken medium test * feat: medium tests * chore: unify workflow job * sign user id with jwt * chore: query plugin with filters and action * chore: read manifest in repository * chore: load manifest from server configs * merge main * feat: endpoint documentation * pr feedback * load plugin from absolute path * refactor:handle trigger * throw error and return early * pr feedback * unify plugin services * fix: plugins code * clean up * remove triggerConfig * clean up * displayName and methodName --------- Co-authored-by: Jason Rasmussen <jason@rasm.me> Co-authored-by: bo0tzz <git@bo0tzz.me>
This commit is contained in:
@@ -426,6 +426,9 @@ export class AssetMediaService extends BaseService {
|
||||
}
|
||||
await this.storageRepository.utimes(file.originalPath, new Date(), new Date(dto.fileModifiedAt));
|
||||
await this.assetRepository.upsertExif({ assetId: asset.id, fileSizeInByte: file.size });
|
||||
|
||||
await this.eventRepository.emit('AssetCreate', { asset });
|
||||
|
||||
await this.jobRepository.queue({ name: JobName.AssetExtractMetadata, data: { id: asset.id, source: 'upload' } });
|
||||
|
||||
return asset;
|
||||
|
||||
@@ -35,6 +35,7 @@ import { OAuthRepository } from 'src/repositories/oauth.repository';
|
||||
import { OcrRepository } from 'src/repositories/ocr.repository';
|
||||
import { PartnerRepository } from 'src/repositories/partner.repository';
|
||||
import { PersonRepository } from 'src/repositories/person.repository';
|
||||
import { PluginRepository } from 'src/repositories/plugin.repository';
|
||||
import { ProcessRepository } from 'src/repositories/process.repository';
|
||||
import { SearchRepository } from 'src/repositories/search.repository';
|
||||
import { ServerInfoRepository } from 'src/repositories/server-info.repository';
|
||||
@@ -53,6 +54,7 @@ import { UserRepository } from 'src/repositories/user.repository';
|
||||
import { VersionHistoryRepository } from 'src/repositories/version-history.repository';
|
||||
import { ViewRepository } from 'src/repositories/view-repository';
|
||||
import { WebsocketRepository } from 'src/repositories/websocket.repository';
|
||||
import { WorkflowRepository } from 'src/repositories/workflow.repository';
|
||||
import { UserTable } from 'src/schema/tables/user.table';
|
||||
import { AccessRequest, checkAccess, requireAccess } from 'src/utils/access';
|
||||
import { getConfig, updateConfig } from 'src/utils/config';
|
||||
@@ -88,6 +90,7 @@ export const BASE_SERVICE_DEPENDENCIES = [
|
||||
OcrRepository,
|
||||
PartnerRepository,
|
||||
PersonRepository,
|
||||
PluginRepository,
|
||||
ProcessRepository,
|
||||
SearchRepository,
|
||||
ServerInfoRepository,
|
||||
@@ -105,6 +108,8 @@ export const BASE_SERVICE_DEPENDENCIES = [
|
||||
UserRepository,
|
||||
VersionHistoryRepository,
|
||||
ViewRepository,
|
||||
WebsocketRepository,
|
||||
WorkflowRepository,
|
||||
];
|
||||
|
||||
@Injectable()
|
||||
@@ -142,6 +147,7 @@ export class BaseService {
|
||||
protected ocrRepository: OcrRepository,
|
||||
protected partnerRepository: PartnerRepository,
|
||||
protected personRepository: PersonRepository,
|
||||
protected pluginRepository: PluginRepository,
|
||||
protected processRepository: ProcessRepository,
|
||||
protected searchRepository: SearchRepository,
|
||||
protected serverInfoRepository: ServerInfoRepository,
|
||||
@@ -160,6 +166,7 @@ export class BaseService {
|
||||
protected versionRepository: VersionHistoryRepository,
|
||||
protected viewRepository: ViewRepository,
|
||||
protected websocketRepository: WebsocketRepository,
|
||||
protected workflowRepository: WorkflowRepository,
|
||||
) {
|
||||
this.logger.setContext(this.constructor.name);
|
||||
this.storageCore = StorageCore.create(
|
||||
|
||||
@@ -23,6 +23,7 @@ import { NotificationService } from 'src/services/notification.service';
|
||||
import { OcrService } from 'src/services/ocr.service';
|
||||
import { PartnerService } from 'src/services/partner.service';
|
||||
import { PersonService } from 'src/services/person.service';
|
||||
import { PluginService } from 'src/services/plugin.service';
|
||||
import { QueueService } from 'src/services/queue.service';
|
||||
import { SearchService } from 'src/services/search.service';
|
||||
import { ServerService } from 'src/services/server.service';
|
||||
@@ -43,6 +44,7 @@ import { UserAdminService } from 'src/services/user-admin.service';
|
||||
import { UserService } from 'src/services/user.service';
|
||||
import { VersionService } from 'src/services/version.service';
|
||||
import { ViewService } from 'src/services/view.service';
|
||||
import { WorkflowService } from 'src/services/workflow.service';
|
||||
|
||||
export const services = [
|
||||
ApiKeyService,
|
||||
@@ -70,6 +72,7 @@ export const services = [
|
||||
OcrService,
|
||||
PartnerService,
|
||||
PersonService,
|
||||
PluginService,
|
||||
QueueService,
|
||||
SearchService,
|
||||
ServerService,
|
||||
@@ -90,4 +93,5 @@ export const services = [
|
||||
UserService,
|
||||
VersionService,
|
||||
ViewService,
|
||||
WorkflowService,
|
||||
];
|
||||
|
||||
120
server/src/services/plugin-host.functions.ts
Normal file
120
server/src/services/plugin-host.functions.ts
Normal file
@@ -0,0 +1,120 @@
|
||||
import { CurrentPlugin } from '@extism/extism';
|
||||
import { UnauthorizedException } from '@nestjs/common';
|
||||
import { Updateable } from 'kysely';
|
||||
import { Permission } from 'src/enum';
|
||||
import { AccessRepository } from 'src/repositories/access.repository';
|
||||
import { AlbumRepository } from 'src/repositories/album.repository';
|
||||
import { AssetRepository } from 'src/repositories/asset.repository';
|
||||
import { CryptoRepository } from 'src/repositories/crypto.repository';
|
||||
import { LoggingRepository } from 'src/repositories/logging.repository';
|
||||
import { AssetTable } from 'src/schema/tables/asset.table';
|
||||
import { requireAccess } from 'src/utils/access';
|
||||
|
||||
/**
|
||||
* Plugin host functions that are exposed to WASM plugins via Extism.
|
||||
* These functions allow plugins to interact with the Immich system.
|
||||
*/
|
||||
export class PluginHostFunctions {
|
||||
constructor(
|
||||
private assetRepository: AssetRepository,
|
||||
private albumRepository: AlbumRepository,
|
||||
private accessRepository: AccessRepository,
|
||||
private cryptoRepository: CryptoRepository,
|
||||
private logger: LoggingRepository,
|
||||
private pluginJwtSecret: string,
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Creates Extism host function bindings for the plugin.
|
||||
* These are the functions that WASM plugins can call.
|
||||
*/
|
||||
getHostFunctions() {
|
||||
return {
|
||||
'extism:host/user': {
|
||||
updateAsset: (cp: CurrentPlugin, offs: bigint) => this.handleUpdateAsset(cp, offs),
|
||||
addAssetToAlbum: (cp: CurrentPlugin, offs: bigint) => this.handleAddAssetToAlbum(cp, offs),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Host function wrapper for updateAsset.
|
||||
* Reads the input from the plugin, parses it, and calls the actual update function.
|
||||
*/
|
||||
private async handleUpdateAsset(cp: CurrentPlugin, offs: bigint) {
|
||||
const input = JSON.parse(cp.read(offs)!.text());
|
||||
await this.updateAsset(input);
|
||||
}
|
||||
|
||||
/**
|
||||
* Host function wrapper for addAssetToAlbum.
|
||||
* Reads the input from the plugin, parses it, and calls the actual add function.
|
||||
*/
|
||||
private async handleAddAssetToAlbum(cp: CurrentPlugin, offs: bigint) {
|
||||
const input = JSON.parse(cp.read(offs)!.text());
|
||||
await this.addAssetToAlbum(input);
|
||||
}
|
||||
|
||||
/**
|
||||
* Validates the JWT token and returns the auth context.
|
||||
*/
|
||||
private validateToken(authToken: string): { userId: string } {
|
||||
try {
|
||||
const auth = this.cryptoRepository.verifyJwt<{ userId: string }>(authToken, this.pluginJwtSecret);
|
||||
if (!auth.userId) {
|
||||
throw new UnauthorizedException('Invalid token: missing userId');
|
||||
}
|
||||
return auth;
|
||||
} catch (error) {
|
||||
this.logger.error('Token validation failed:', error);
|
||||
throw new UnauthorizedException('Invalid token');
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates an asset with the given properties.
|
||||
*/
|
||||
async updateAsset(input: { authToken: string } & Updateable<AssetTable> & { id: string }) {
|
||||
const { authToken, id, ...assetData } = input;
|
||||
|
||||
// Validate token
|
||||
const auth = this.validateToken(authToken);
|
||||
|
||||
// Check access to the asset
|
||||
await requireAccess(this.accessRepository, {
|
||||
auth: { user: { id: auth.userId } } as any,
|
||||
permission: Permission.AssetUpdate,
|
||||
ids: [id],
|
||||
});
|
||||
|
||||
this.logger.log(`Updating asset ${id} -- ${JSON.stringify(assetData)}`);
|
||||
await this.assetRepository.update({ id, ...assetData });
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an asset to an album.
|
||||
*/
|
||||
async addAssetToAlbum(input: { authToken: string; assetId: string; albumId: string }) {
|
||||
const { authToken, assetId, albumId } = input;
|
||||
|
||||
// Validate token
|
||||
const auth = this.validateToken(authToken);
|
||||
|
||||
// Check access to both the asset and the album
|
||||
await requireAccess(this.accessRepository, {
|
||||
auth: { user: { id: auth.userId } } as any,
|
||||
permission: Permission.AssetRead,
|
||||
ids: [assetId],
|
||||
});
|
||||
|
||||
await requireAccess(this.accessRepository, {
|
||||
auth: { user: { id: auth.userId } } as any,
|
||||
permission: Permission.AlbumUpdate,
|
||||
ids: [albumId],
|
||||
});
|
||||
|
||||
this.logger.log(`Adding asset ${assetId} to album ${albumId}`);
|
||||
await this.albumRepository.addAssetIds(albumId, [assetId]);
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
317
server/src/services/plugin.service.ts
Normal file
317
server/src/services/plugin.service.ts
Normal file
@@ -0,0 +1,317 @@
|
||||
import { Plugin as ExtismPlugin, newPlugin } from '@extism/extism';
|
||||
import { BadRequestException, Injectable } from '@nestjs/common';
|
||||
import { plainToInstance } from 'class-transformer';
|
||||
import { validateOrReject } from 'class-validator';
|
||||
import { join } from 'node:path';
|
||||
import { Asset, WorkflowAction, WorkflowFilter } from 'src/database';
|
||||
import { OnEvent, OnJob } from 'src/decorators';
|
||||
import { PluginManifestDto } from 'src/dtos/plugin-manifest.dto';
|
||||
import { mapPlugin, PluginResponseDto } from 'src/dtos/plugin.dto';
|
||||
import { JobName, JobStatus, PluginTriggerType, QueueName } from 'src/enum';
|
||||
import { ArgOf } from 'src/repositories/event.repository';
|
||||
import { BaseService } from 'src/services/base.service';
|
||||
import { PluginHostFunctions } from 'src/services/plugin-host.functions';
|
||||
import { IWorkflowJob, JobItem, JobOf, WorkflowData } from 'src/types';
|
||||
|
||||
interface WorkflowContext {
|
||||
authToken: string;
|
||||
asset: Asset;
|
||||
}
|
||||
|
||||
interface PluginInput<T = unknown> {
|
||||
authToken: string;
|
||||
config: T;
|
||||
data: {
|
||||
asset: Asset;
|
||||
};
|
||||
}
|
||||
|
||||
@Injectable()
|
||||
export class PluginService extends BaseService {
|
||||
private pluginJwtSecret!: string;
|
||||
private loadedPlugins: Map<string, ExtismPlugin> = new Map();
|
||||
private hostFunctions!: PluginHostFunctions;
|
||||
|
||||
@OnEvent({ name: 'AppBootstrap' })
|
||||
async onBootstrap() {
|
||||
this.pluginJwtSecret = this.cryptoRepository.randomBytesAsText(32);
|
||||
|
||||
await this.loadPluginsFromManifests();
|
||||
|
||||
this.hostFunctions = new PluginHostFunctions(
|
||||
this.assetRepository,
|
||||
this.albumRepository,
|
||||
this.accessRepository,
|
||||
this.cryptoRepository,
|
||||
this.logger,
|
||||
this.pluginJwtSecret,
|
||||
);
|
||||
|
||||
await this.loadPlugins();
|
||||
}
|
||||
|
||||
//
|
||||
// CRUD operations for plugins
|
||||
//
|
||||
async getAll(): Promise<PluginResponseDto[]> {
|
||||
const plugins = await this.pluginRepository.getAllPlugins();
|
||||
return plugins.map((plugin) => mapPlugin(plugin));
|
||||
}
|
||||
|
||||
async get(id: string): Promise<PluginResponseDto> {
|
||||
const plugin = await this.pluginRepository.getPlugin(id);
|
||||
if (!plugin) {
|
||||
throw new BadRequestException('Plugin not found');
|
||||
}
|
||||
return mapPlugin(plugin);
|
||||
}
|
||||
|
||||
///////////////////////////////////////////
|
||||
// Plugin Loader
|
||||
//////////////////////////////////////////
|
||||
async loadPluginsFromManifests(): Promise<void> {
|
||||
// Load core plugin
|
||||
const { resourcePaths, plugins } = this.configRepository.getEnv();
|
||||
const coreManifestPath = `${resourcePaths.corePlugin}/manifest.json`;
|
||||
|
||||
const coreManifest = await this.readAndValidateManifest(coreManifestPath);
|
||||
await this.loadPluginToDatabase(coreManifest, resourcePaths.corePlugin);
|
||||
|
||||
this.logger.log(`Successfully processed core plugin: ${coreManifest.name} (version ${coreManifest.version})`);
|
||||
|
||||
// Load external plugins
|
||||
if (plugins.enabled && plugins.installFolder) {
|
||||
await this.loadExternalPlugins(plugins.installFolder);
|
||||
}
|
||||
}
|
||||
|
||||
private async loadExternalPlugins(installFolder: string): Promise<void> {
|
||||
try {
|
||||
const entries = await this.pluginRepository.readDirectory(installFolder);
|
||||
|
||||
for (const entry of entries) {
|
||||
if (!entry.isDirectory()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const pluginFolder = join(installFolder, entry.name);
|
||||
const manifestPath = join(pluginFolder, 'manifest.json');
|
||||
try {
|
||||
const manifest = await this.readAndValidateManifest(manifestPath);
|
||||
await this.loadPluginToDatabase(manifest, pluginFolder);
|
||||
|
||||
this.logger.log(`Successfully processed external plugin: ${manifest.name} (version ${manifest.version})`);
|
||||
} catch (error) {
|
||||
this.logger.warn(`Failed to load external plugin from ${manifestPath}:`, error);
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to scan external plugins folder ${installFolder}:`, error);
|
||||
}
|
||||
}
|
||||
|
||||
private async loadPluginToDatabase(manifest: PluginManifestDto, basePath: string): Promise<void> {
|
||||
const currentPlugin = await this.pluginRepository.getPluginByName(manifest.name);
|
||||
if (currentPlugin != null && currentPlugin.version === manifest.version) {
|
||||
this.logger.log(`Plugin ${manifest.name} is up to date (version ${manifest.version}). Skipping`);
|
||||
return;
|
||||
}
|
||||
|
||||
const { plugin, filters, actions } = await this.pluginRepository.loadPlugin(manifest, basePath);
|
||||
|
||||
this.logger.log(`Upserted plugin: ${plugin.name} (ID: ${plugin.id}, version: ${plugin.version})`);
|
||||
|
||||
for (const filter of filters) {
|
||||
this.logger.log(`Upserted plugin filter: ${filter.methodName} (ID: ${filter.id})`);
|
||||
}
|
||||
|
||||
for (const action of actions) {
|
||||
this.logger.log(`Upserted plugin action: ${action.methodName} (ID: ${action.id})`);
|
||||
}
|
||||
}
|
||||
|
||||
private async readAndValidateManifest(manifestPath: string): Promise<PluginManifestDto> {
|
||||
const content = await this.storageRepository.readTextFile(manifestPath);
|
||||
const manifestData = JSON.parse(content);
|
||||
const manifest = plainToInstance(PluginManifestDto, manifestData);
|
||||
|
||||
await validateOrReject(manifest, {
|
||||
whitelist: true,
|
||||
forbidNonWhitelisted: true,
|
||||
});
|
||||
|
||||
return manifest;
|
||||
}
|
||||
|
||||
///////////////////////////////////////////
|
||||
// Plugin Execution
|
||||
///////////////////////////////////////////
|
||||
private async loadPlugins() {
|
||||
const plugins = await this.pluginRepository.getAllPlugins();
|
||||
for (const plugin of plugins) {
|
||||
try {
|
||||
this.logger.debug(`Loading plugin: ${plugin.name} from ${plugin.wasmPath}`);
|
||||
|
||||
const extismPlugin = await newPlugin(plugin.wasmPath, {
|
||||
useWasi: true,
|
||||
functions: this.hostFunctions.getHostFunctions(),
|
||||
});
|
||||
|
||||
this.loadedPlugins.set(plugin.id, extismPlugin);
|
||||
this.logger.log(`Successfully loaded plugin: ${plugin.name}`);
|
||||
} catch (error) {
|
||||
this.logger.error(`Failed to load plugin ${plugin.name}:`, error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@OnEvent({ name: 'AssetCreate' })
|
||||
async handleAssetCreate({ asset }: ArgOf<'AssetCreate'>) {
|
||||
await this.handleTrigger(PluginTriggerType.AssetCreate, {
|
||||
ownerId: asset.ownerId,
|
||||
event: { userId: asset.ownerId, asset },
|
||||
});
|
||||
}
|
||||
|
||||
private async handleTrigger<T extends PluginTriggerType>(
|
||||
triggerType: T,
|
||||
params: { ownerId: string; event: WorkflowData[T] },
|
||||
): Promise<void> {
|
||||
const workflows = await this.workflowRepository.getWorkflowByOwnerAndTrigger(params.ownerId, triggerType);
|
||||
if (workflows.length === 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
const jobs: JobItem[] = workflows.map((workflow) => ({
|
||||
name: JobName.WorkflowRun,
|
||||
data: {
|
||||
id: workflow.id,
|
||||
type: triggerType,
|
||||
event: params.event,
|
||||
} as IWorkflowJob<T>,
|
||||
}));
|
||||
|
||||
await this.jobRepository.queueAll(jobs);
|
||||
this.logger.debug(`Queued ${jobs.length} workflow execution jobs for trigger ${triggerType}`);
|
||||
}
|
||||
|
||||
@OnJob({ name: JobName.WorkflowRun, queue: QueueName.Workflow })
|
||||
async handleWorkflowRun({ id: workflowId, type, event }: JobOf<JobName.WorkflowRun>): Promise<JobStatus> {
|
||||
try {
|
||||
const workflow = await this.workflowRepository.getWorkflow(workflowId);
|
||||
if (!workflow) {
|
||||
this.logger.error(`Workflow ${workflowId} not found`);
|
||||
return JobStatus.Failed;
|
||||
}
|
||||
|
||||
const workflowFilters = await this.workflowRepository.getFilters(workflowId);
|
||||
const workflowActions = await this.workflowRepository.getActions(workflowId);
|
||||
|
||||
switch (type) {
|
||||
case PluginTriggerType.AssetCreate: {
|
||||
const data = event as WorkflowData[PluginTriggerType.AssetCreate];
|
||||
const asset = data.asset;
|
||||
|
||||
const authToken = this.cryptoRepository.signJwt({ userId: data.userId }, this.pluginJwtSecret);
|
||||
|
||||
const context = {
|
||||
authToken,
|
||||
asset,
|
||||
};
|
||||
|
||||
const filtersPassed = await this.executeFilters(workflowFilters, context);
|
||||
if (!filtersPassed) {
|
||||
return JobStatus.Skipped;
|
||||
}
|
||||
|
||||
await this.executeActions(workflowActions, context);
|
||||
this.logger.debug(`Workflow ${workflowId} executed successfully`);
|
||||
return JobStatus.Success;
|
||||
}
|
||||
|
||||
case PluginTriggerType.PersonRecognized: {
|
||||
this.logger.error('unimplemented');
|
||||
return JobStatus.Skipped;
|
||||
}
|
||||
|
||||
default: {
|
||||
this.logger.error(`Unknown workflow trigger type: ${type}`);
|
||||
return JobStatus.Failed;
|
||||
}
|
||||
}
|
||||
} catch (error) {
|
||||
this.logger.error(`Error executing workflow ${workflowId}:`, error);
|
||||
return JobStatus.Failed;
|
||||
}
|
||||
}
|
||||
|
||||
private async executeFilters(workflowFilters: WorkflowFilter[], context: WorkflowContext): Promise<boolean> {
|
||||
for (const workflowFilter of workflowFilters) {
|
||||
const filter = await this.pluginRepository.getFilter(workflowFilter.filterId);
|
||||
if (!filter) {
|
||||
this.logger.error(`Filter ${workflowFilter.filterId} not found`);
|
||||
return false;
|
||||
}
|
||||
|
||||
const pluginInstance = this.loadedPlugins.get(filter.pluginId);
|
||||
if (!pluginInstance) {
|
||||
this.logger.error(`Plugin ${filter.pluginId} not loaded`);
|
||||
return false;
|
||||
}
|
||||
|
||||
const filterInput: PluginInput = {
|
||||
authToken: context.authToken,
|
||||
config: workflowFilter.filterConfig,
|
||||
data: {
|
||||
asset: context.asset,
|
||||
},
|
||||
};
|
||||
|
||||
this.logger.debug(`Calling filter ${filter.methodName} with input: ${JSON.stringify(filterInput)}`);
|
||||
|
||||
const filterResult = await pluginInstance.call(
|
||||
filter.methodName,
|
||||
new TextEncoder().encode(JSON.stringify(filterInput)),
|
||||
);
|
||||
|
||||
if (!filterResult) {
|
||||
this.logger.error(`Filter ${filter.methodName} returned null`);
|
||||
return false;
|
||||
}
|
||||
|
||||
const result = JSON.parse(filterResult.text());
|
||||
if (result.passed === false) {
|
||||
this.logger.debug(`Filter ${filter.methodName} returned false, stopping workflow execution`);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private async executeActions(workflowActions: WorkflowAction[], context: WorkflowContext): Promise<void> {
|
||||
for (const workflowAction of workflowActions) {
|
||||
const action = await this.pluginRepository.getAction(workflowAction.actionId);
|
||||
if (!action) {
|
||||
throw new Error(`Action ${workflowAction.actionId} not found`);
|
||||
}
|
||||
|
||||
const pluginInstance = this.loadedPlugins.get(action.pluginId);
|
||||
if (!pluginInstance) {
|
||||
throw new Error(`Plugin ${action.pluginId} not loaded`);
|
||||
}
|
||||
|
||||
const actionInput: PluginInput = {
|
||||
authToken: context.authToken,
|
||||
config: workflowAction.actionConfig,
|
||||
data: {
|
||||
asset: context.asset,
|
||||
},
|
||||
};
|
||||
|
||||
this.logger.debug(`Calling action ${action.methodName} with input: ${JSON.stringify(actionInput)}`);
|
||||
|
||||
await pluginInstance.call(action.methodName, JSON.stringify(actionInput));
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -22,7 +22,7 @@ describe(QueueService.name, () => {
|
||||
it('should update concurrency', () => {
|
||||
sut.onConfigUpdate({ newConfig: defaults, oldConfig: {} as SystemConfig });
|
||||
|
||||
expect(mocks.job.setConcurrency).toHaveBeenCalledTimes(16);
|
||||
expect(mocks.job.setConcurrency).toHaveBeenCalledTimes(17);
|
||||
expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(5, QueueName.FacialRecognition, 1);
|
||||
expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(7, QueueName.DuplicateDetection, 1);
|
||||
expect(mocks.job.setConcurrency).toHaveBeenNthCalledWith(8, QueueName.BackgroundTask, 5);
|
||||
@@ -97,6 +97,7 @@ describe(QueueService.name, () => {
|
||||
[QueueName.Notification]: expectedJobStatus,
|
||||
[QueueName.BackupDatabase]: expectedJobStatus,
|
||||
[QueueName.Ocr]: expectedJobStatus,
|
||||
[QueueName.Workflow]: expectedJobStatus,
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -40,6 +40,7 @@ const updatedConfig = Object.freeze<SystemConfig>({
|
||||
[QueueName.VideoConversion]: { concurrency: 1 },
|
||||
[QueueName.Notification]: { concurrency: 5 },
|
||||
[QueueName.Ocr]: { concurrency: 1 },
|
||||
[QueueName.Workflow]: { concurrency: 5 },
|
||||
},
|
||||
backup: {
|
||||
database: {
|
||||
|
||||
159
server/src/services/workflow.service.ts
Normal file
159
server/src/services/workflow.service.ts
Normal file
@@ -0,0 +1,159 @@
|
||||
import { BadRequestException, Injectable } from '@nestjs/common';
|
||||
import { Workflow } from 'src/database';
|
||||
import { AuthDto } from 'src/dtos/auth.dto';
|
||||
import {
|
||||
mapWorkflowAction,
|
||||
mapWorkflowFilter,
|
||||
WorkflowCreateDto,
|
||||
WorkflowResponseDto,
|
||||
WorkflowUpdateDto,
|
||||
} from 'src/dtos/workflow.dto';
|
||||
import { Permission, PluginContext, PluginTriggerType } from 'src/enum';
|
||||
import { pluginTriggers } from 'src/plugins';
|
||||
|
||||
import { BaseService } from 'src/services/base.service';
|
||||
|
||||
@Injectable()
|
||||
export class WorkflowService extends BaseService {
|
||||
async create(auth: AuthDto, dto: WorkflowCreateDto): Promise<WorkflowResponseDto> {
|
||||
const trigger = this.getTriggerOrFail(dto.triggerType);
|
||||
|
||||
const filterInserts = await this.validateAndMapFilters(dto.filters, trigger.context);
|
||||
const actionInserts = await this.validateAndMapActions(dto.actions, trigger.context);
|
||||
|
||||
const workflow = await this.workflowRepository.createWorkflow(
|
||||
{
|
||||
ownerId: auth.user.id,
|
||||
triggerType: dto.triggerType,
|
||||
name: dto.name,
|
||||
description: dto.description || '',
|
||||
enabled: dto.enabled ?? true,
|
||||
},
|
||||
filterInserts,
|
||||
actionInserts,
|
||||
);
|
||||
|
||||
return this.mapWorkflow(workflow);
|
||||
}
|
||||
|
||||
async getAll(auth: AuthDto): Promise<WorkflowResponseDto[]> {
|
||||
const workflows = await this.workflowRepository.getWorkflowsByOwner(auth.user.id);
|
||||
|
||||
return Promise.all(workflows.map((workflow) => this.mapWorkflow(workflow)));
|
||||
}
|
||||
|
||||
async get(auth: AuthDto, id: string): Promise<WorkflowResponseDto> {
|
||||
await this.requireAccess({ auth, permission: Permission.WorkflowRead, ids: [id] });
|
||||
const workflow = await this.findOrFail(id);
|
||||
return this.mapWorkflow(workflow);
|
||||
}
|
||||
|
||||
async update(auth: AuthDto, id: string, dto: WorkflowUpdateDto): Promise<WorkflowResponseDto> {
|
||||
await this.requireAccess({ auth, permission: Permission.WorkflowUpdate, ids: [id] });
|
||||
|
||||
if (Object.values(dto).filter((prop) => prop !== undefined).length === 0) {
|
||||
throw new BadRequestException('No fields to update');
|
||||
}
|
||||
|
||||
const workflow = await this.findOrFail(id);
|
||||
const trigger = this.getTriggerOrFail(workflow.triggerType);
|
||||
|
||||
const { filters, actions, ...workflowUpdate } = dto;
|
||||
const filterInserts = filters && (await this.validateAndMapFilters(filters, trigger.context));
|
||||
const actionInserts = actions && (await this.validateAndMapActions(actions, trigger.context));
|
||||
|
||||
const updatedWorkflow = await this.workflowRepository.updateWorkflow(
|
||||
id,
|
||||
workflowUpdate,
|
||||
filterInserts,
|
||||
actionInserts,
|
||||
);
|
||||
|
||||
return this.mapWorkflow(updatedWorkflow);
|
||||
}
|
||||
|
||||
async delete(auth: AuthDto, id: string): Promise<void> {
|
||||
await this.requireAccess({ auth, permission: Permission.WorkflowDelete, ids: [id] });
|
||||
await this.workflowRepository.deleteWorkflow(id);
|
||||
}
|
||||
|
||||
private async validateAndMapFilters(
|
||||
filters: Array<{ filterId: string; filterConfig?: any }>,
|
||||
requiredContext: PluginContext,
|
||||
) {
|
||||
for (const dto of filters) {
|
||||
const filter = await this.pluginRepository.getFilter(dto.filterId);
|
||||
if (!filter) {
|
||||
throw new BadRequestException(`Invalid filter ID: ${dto.filterId}`);
|
||||
}
|
||||
|
||||
if (!filter.supportedContexts.includes(requiredContext)) {
|
||||
throw new BadRequestException(
|
||||
`Filter "${filter.title}" does not support ${requiredContext} context. Supported contexts: ${filter.supportedContexts.join(', ')}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return filters.map((dto, index) => ({
|
||||
filterId: dto.filterId,
|
||||
filterConfig: dto.filterConfig || null,
|
||||
order: index,
|
||||
}));
|
||||
}
|
||||
|
||||
private async validateAndMapActions(
|
||||
actions: Array<{ actionId: string; actionConfig?: any }>,
|
||||
requiredContext: PluginContext,
|
||||
) {
|
||||
for (const dto of actions) {
|
||||
const action = await this.pluginRepository.getAction(dto.actionId);
|
||||
if (!action) {
|
||||
throw new BadRequestException(`Invalid action ID: ${dto.actionId}`);
|
||||
}
|
||||
if (!action.supportedContexts.includes(requiredContext)) {
|
||||
throw new BadRequestException(
|
||||
`Action "${action.title}" does not support ${requiredContext} context. Supported contexts: ${action.supportedContexts.join(', ')}`,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return actions.map((dto, index) => ({
|
||||
actionId: dto.actionId,
|
||||
actionConfig: dto.actionConfig || null,
|
||||
order: index,
|
||||
}));
|
||||
}
|
||||
|
||||
private getTriggerOrFail(triggerType: PluginTriggerType) {
|
||||
const trigger = pluginTriggers.find((t) => t.type === triggerType);
|
||||
if (!trigger) {
|
||||
throw new BadRequestException(`Invalid trigger type: ${triggerType}`);
|
||||
}
|
||||
return trigger;
|
||||
}
|
||||
|
||||
private async findOrFail(id: string) {
|
||||
const workflow = await this.workflowRepository.getWorkflow(id);
|
||||
if (!workflow) {
|
||||
throw new BadRequestException('Workflow not found');
|
||||
}
|
||||
return workflow;
|
||||
}
|
||||
|
||||
private async mapWorkflow(workflow: Workflow): Promise<WorkflowResponseDto> {
|
||||
const filters = await this.workflowRepository.getFilters(workflow.id);
|
||||
const actions = await this.workflowRepository.getActions(workflow.id);
|
||||
|
||||
return {
|
||||
id: workflow.id,
|
||||
ownerId: workflow.ownerId,
|
||||
triggerType: workflow.triggerType,
|
||||
name: workflow.name,
|
||||
description: workflow.description,
|
||||
createdAt: workflow.createdAt.toISOString(),
|
||||
enabled: workflow.enabled,
|
||||
filters: filters.map((f) => mapWorkflowFilter(f)),
|
||||
actions: actions.map((a) => mapWorkflowAction(a)),
|
||||
};
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user