Files
immich/server/src/services/plugin.service.ts

318 lines
11 KiB
TypeScript
Raw Normal View History

feat: workflow foundation (#23621) * feat: plugins * feat: table definition * feat: type and migration * feat: add repositories * feat: validate manifest with class-validator and load manifest info to database * feat: workflow/plugin controller/service layer * feat: implement workflow logic * feat: make trigger static * feat: dynamical instantiate plugin instances * fix: access control and helper script * feat: it works * chore: simplify * refactor: refactor and use queue for workflow execution * refactor: remove unsused property in plugin-schema * build wasm in prod * feat: plugin loader in transaction * fix: docker build arm64 * generated files * shell check * fix tests * fix: waiting for migration to finish before loading plugin * remove context reassignment * feat: use mise to manage extism tools (#23760) * pr feedback * refactor: create workflow now including create filters and actions * feat: workflow medium tests * fix: broken medium test * feat: medium tests * chore: unify workflow job * sign user id with jwt * chore: query plugin with filters and action * chore: read manifest in repository * chore: load manifest from server configs * merge main * feat: endpoint documentation * pr feedback * load plugin from absolute path * refactor:handle trigger * throw error and return early * pr feedback * unify plugin services * fix: plugins code * clean up * remove triggerConfig * clean up * displayName and methodName --------- Co-authored-by: Jason Rasmussen <jason@rasm.me> Co-authored-by: bo0tzz <git@bo0tzz.me>
2025-11-14 14:05:05 -06:00
import { Plugin as ExtismPlugin, newPlugin } from '@extism/extism';
import { BadRequestException, Injectable } from '@nestjs/common';
import { plainToInstance } from 'class-transformer';
import { validateOrReject } from 'class-validator';
import { join } from 'node:path';
import { Asset, WorkflowAction, WorkflowFilter } from 'src/database';
import { OnEvent, OnJob } from 'src/decorators';
import { PluginManifestDto } from 'src/dtos/plugin-manifest.dto';
import { mapPlugin, PluginResponseDto } from 'src/dtos/plugin.dto';
import { JobName, JobStatus, PluginTriggerType, QueueName } from 'src/enum';
import { ArgOf } from 'src/repositories/event.repository';
import { BaseService } from 'src/services/base.service';
import { PluginHostFunctions } from 'src/services/plugin-host.functions';
import { IWorkflowJob, JobItem, JobOf, WorkflowData } from 'src/types';
interface WorkflowContext {
authToken: string;
asset: Asset;
}
interface PluginInput<T = unknown> {
authToken: string;
config: T;
data: {
asset: Asset;
};
}
@Injectable()
export class PluginService extends BaseService {
private pluginJwtSecret!: string;
private loadedPlugins: Map<string, ExtismPlugin> = new Map();
private hostFunctions!: PluginHostFunctions;
@OnEvent({ name: 'AppBootstrap' })
async onBootstrap() {
this.pluginJwtSecret = this.cryptoRepository.randomBytesAsText(32);
await this.loadPluginsFromManifests();
this.hostFunctions = new PluginHostFunctions(
this.assetRepository,
this.albumRepository,
this.accessRepository,
this.cryptoRepository,
this.logger,
this.pluginJwtSecret,
);
await this.loadPlugins();
}
//
// CRUD operations for plugins
//
async getAll(): Promise<PluginResponseDto[]> {
const plugins = await this.pluginRepository.getAllPlugins();
return plugins.map((plugin) => mapPlugin(plugin));
}
async get(id: string): Promise<PluginResponseDto> {
const plugin = await this.pluginRepository.getPlugin(id);
if (!plugin) {
throw new BadRequestException('Plugin not found');
}
return mapPlugin(plugin);
}
///////////////////////////////////////////
// Plugin Loader
//////////////////////////////////////////
async loadPluginsFromManifests(): Promise<void> {
// Load core plugin
const { resourcePaths, plugins } = this.configRepository.getEnv();
const coreManifestPath = `${resourcePaths.corePlugin}/manifest.json`;
const coreManifest = await this.readAndValidateManifest(coreManifestPath);
await this.loadPluginToDatabase(coreManifest, resourcePaths.corePlugin);
this.logger.log(`Successfully processed core plugin: ${coreManifest.name} (version ${coreManifest.version})`);
// Load external plugins
if (plugins.enabled && plugins.installFolder) {
await this.loadExternalPlugins(plugins.installFolder);
}
}
private async loadExternalPlugins(installFolder: string): Promise<void> {
try {
const entries = await this.pluginRepository.readDirectory(installFolder);
for (const entry of entries) {
if (!entry.isDirectory()) {
continue;
}
const pluginFolder = join(installFolder, entry.name);
const manifestPath = join(pluginFolder, 'manifest.json');
try {
const manifest = await this.readAndValidateManifest(manifestPath);
await this.loadPluginToDatabase(manifest, pluginFolder);
this.logger.log(`Successfully processed external plugin: ${manifest.name} (version ${manifest.version})`);
} catch (error) {
this.logger.warn(`Failed to load external plugin from ${manifestPath}:`, error);
}
}
} catch (error) {
this.logger.error(`Failed to scan external plugins folder ${installFolder}:`, error);
}
}
private async loadPluginToDatabase(manifest: PluginManifestDto, basePath: string): Promise<void> {
const currentPlugin = await this.pluginRepository.getPluginByName(manifest.name);
if (currentPlugin != null && currentPlugin.version === manifest.version) {
this.logger.log(`Plugin ${manifest.name} is up to date (version ${manifest.version}). Skipping`);
return;
}
const { plugin, filters, actions } = await this.pluginRepository.loadPlugin(manifest, basePath);
this.logger.log(`Upserted plugin: ${plugin.name} (ID: ${plugin.id}, version: ${plugin.version})`);
for (const filter of filters) {
this.logger.log(`Upserted plugin filter: ${filter.methodName} (ID: ${filter.id})`);
}
for (const action of actions) {
this.logger.log(`Upserted plugin action: ${action.methodName} (ID: ${action.id})`);
}
}
private async readAndValidateManifest(manifestPath: string): Promise<PluginManifestDto> {
const content = await this.storageRepository.readTextFile(manifestPath);
const manifestData = JSON.parse(content);
const manifest = plainToInstance(PluginManifestDto, manifestData);
await validateOrReject(manifest, {
whitelist: true,
forbidNonWhitelisted: true,
});
return manifest;
}
///////////////////////////////////////////
// Plugin Execution
///////////////////////////////////////////
private async loadPlugins() {
const plugins = await this.pluginRepository.getAllPlugins();
for (const plugin of plugins) {
try {
this.logger.debug(`Loading plugin: ${plugin.name} from ${plugin.wasmPath}`);
const extismPlugin = await newPlugin(plugin.wasmPath, {
useWasi: true,
functions: this.hostFunctions.getHostFunctions(),
});
this.loadedPlugins.set(plugin.id, extismPlugin);
this.logger.log(`Successfully loaded plugin: ${plugin.name}`);
} catch (error) {
this.logger.error(`Failed to load plugin ${plugin.name}:`, error);
}
}
}
@OnEvent({ name: 'AssetCreate' })
async handleAssetCreate({ asset }: ArgOf<'AssetCreate'>) {
await this.handleTrigger(PluginTriggerType.AssetCreate, {
ownerId: asset.ownerId,
event: { userId: asset.ownerId, asset },
});
}
private async handleTrigger<T extends PluginTriggerType>(
triggerType: T,
params: { ownerId: string; event: WorkflowData[T] },
): Promise<void> {
const workflows = await this.workflowRepository.getWorkflowByOwnerAndTrigger(params.ownerId, triggerType);
if (workflows.length === 0) {
return;
}
const jobs: JobItem[] = workflows.map((workflow) => ({
name: JobName.WorkflowRun,
data: {
id: workflow.id,
type: triggerType,
event: params.event,
} as IWorkflowJob<T>,
}));
await this.jobRepository.queueAll(jobs);
this.logger.debug(`Queued ${jobs.length} workflow execution jobs for trigger ${triggerType}`);
}
@OnJob({ name: JobName.WorkflowRun, queue: QueueName.Workflow })
async handleWorkflowRun({ id: workflowId, type, event }: JobOf<JobName.WorkflowRun>): Promise<JobStatus> {
try {
const workflow = await this.workflowRepository.getWorkflow(workflowId);
if (!workflow) {
this.logger.error(`Workflow ${workflowId} not found`);
return JobStatus.Failed;
}
const workflowFilters = await this.workflowRepository.getFilters(workflowId);
const workflowActions = await this.workflowRepository.getActions(workflowId);
switch (type) {
case PluginTriggerType.AssetCreate: {
const data = event as WorkflowData[PluginTriggerType.AssetCreate];
const asset = data.asset;
const authToken = this.cryptoRepository.signJwt({ userId: data.userId }, this.pluginJwtSecret);
const context = {
authToken,
asset,
};
const filtersPassed = await this.executeFilters(workflowFilters, context);
if (!filtersPassed) {
return JobStatus.Skipped;
}
await this.executeActions(workflowActions, context);
this.logger.debug(`Workflow ${workflowId} executed successfully`);
return JobStatus.Success;
}
case PluginTriggerType.PersonRecognized: {
this.logger.error('unimplemented');
return JobStatus.Skipped;
}
default: {
this.logger.error(`Unknown workflow trigger type: ${type}`);
return JobStatus.Failed;
}
}
} catch (error) {
this.logger.error(`Error executing workflow ${workflowId}:`, error);
return JobStatus.Failed;
}
}
private async executeFilters(workflowFilters: WorkflowFilter[], context: WorkflowContext): Promise<boolean> {
for (const workflowFilter of workflowFilters) {
const filter = await this.pluginRepository.getFilter(workflowFilter.filterId);
if (!filter) {
this.logger.error(`Filter ${workflowFilter.filterId} not found`);
return false;
}
const pluginInstance = this.loadedPlugins.get(filter.pluginId);
if (!pluginInstance) {
this.logger.error(`Plugin ${filter.pluginId} not loaded`);
return false;
}
const filterInput: PluginInput = {
authToken: context.authToken,
config: workflowFilter.filterConfig,
data: {
asset: context.asset,
},
};
this.logger.debug(`Calling filter ${filter.methodName} with input: ${JSON.stringify(filterInput)}`);
const filterResult = await pluginInstance.call(
filter.methodName,
new TextEncoder().encode(JSON.stringify(filterInput)),
);
if (!filterResult) {
this.logger.error(`Filter ${filter.methodName} returned null`);
return false;
}
const result = JSON.parse(filterResult.text());
if (result.passed === false) {
this.logger.debug(`Filter ${filter.methodName} returned false, stopping workflow execution`);
return false;
}
}
return true;
}
private async executeActions(workflowActions: WorkflowAction[], context: WorkflowContext): Promise<void> {
for (const workflowAction of workflowActions) {
const action = await this.pluginRepository.getAction(workflowAction.actionId);
if (!action) {
throw new Error(`Action ${workflowAction.actionId} not found`);
}
const pluginInstance = this.loadedPlugins.get(action.pluginId);
if (!pluginInstance) {
throw new Error(`Plugin ${action.pluginId} not loaded`);
}
const actionInput: PluginInput = {
authToken: context.authToken,
config: workflowAction.actionConfig,
data: {
asset: context.asset,
},
};
this.logger.debug(`Calling action ${action.methodName} with input: ${JSON.stringify(actionInput)}`);
await pluginInstance.call(action.methodName, JSON.stringify(actionInput));
}
}
}