Files
immich/server/src/repositories/workflow.repository.ts
Alex 4dcc049465 feat: workflow foundation (#23621)
* feat: plugins

* feat: table definition

* feat: type and migration

* feat: add repositories

* feat: validate manifest with class-validator and load manifest info to database

* feat: workflow/plugin controller/service layer

* feat: implement workflow logic

* feat: make trigger static

* feat: dynamical instantiate plugin instances

* fix: access control and helper script

* feat: it works

* chore: simplify

* refactor: refactor and use queue for workflow execution

* refactor: remove unsused property in plugin-schema

* build wasm in prod

* feat: plugin loader in transaction

* fix: docker build arm64

* generated files

* shell check

* fix tests

* fix: waiting for migration to finish before loading plugin

* remove context reassignment

* feat: use mise to manage extism tools (#23760)

* pr feedback

* refactor: create workflow now including create filters and actions

* feat: workflow medium tests

* fix: broken medium test

* feat: medium tests

* chore: unify workflow job

* sign user id with jwt

* chore: query plugin with filters and action

* chore: read manifest in repository

* chore: load manifest from server configs

* merge main

* feat: endpoint documentation

* pr feedback

* load plugin from absolute path

* refactor:handle trigger

* throw error and return early

* pr feedback

* unify plugin services

* fix: plugins code

* clean up

* remove triggerConfig

* clean up

* displayName and methodName

---------

Co-authored-by: Jason Rasmussen <jason@rasm.me>
Co-authored-by: bo0tzz <git@bo0tzz.me>
2025-11-14 20:05:05 +00:00

140 lines
4.5 KiB
TypeScript

import { Injectable } from '@nestjs/common';
import { Insertable, Kysely, Updateable } from 'kysely';
import { InjectKysely } from 'nestjs-kysely';
import { DummyValue, GenerateSql } from 'src/decorators';
import { PluginTriggerType } from 'src/enum';
import { DB } from 'src/schema';
import { WorkflowActionTable, WorkflowFilterTable, WorkflowTable } from 'src/schema/tables/workflow.table';
@Injectable()
export class WorkflowRepository {
constructor(@InjectKysely() private db: Kysely<DB>) {}
@GenerateSql({ params: [DummyValue.UUID] })
getWorkflow(id: string) {
return this.db.selectFrom('workflow').selectAll().where('id', '=', id).executeTakeFirst();
}
@GenerateSql({ params: [DummyValue.UUID] })
getWorkflowsByOwner(ownerId: string) {
return this.db.selectFrom('workflow').selectAll().where('ownerId', '=', ownerId).orderBy('name').execute();
}
@GenerateSql({ params: [PluginTriggerType.AssetCreate] })
getWorkflowsByTrigger(type: PluginTriggerType) {
return this.db
.selectFrom('workflow')
.selectAll()
.where('triggerType', '=', type)
.where('enabled', '=', true)
.execute();
}
@GenerateSql({ params: [DummyValue.UUID, PluginTriggerType.AssetCreate] })
getWorkflowByOwnerAndTrigger(ownerId: string, type: PluginTriggerType) {
return this.db
.selectFrom('workflow')
.selectAll()
.where('ownerId', '=', ownerId)
.where('triggerType', '=', type)
.where('enabled', '=', true)
.execute();
}
async createWorkflow(
workflow: Insertable<WorkflowTable>,
filters: Insertable<WorkflowFilterTable>[],
actions: Insertable<WorkflowActionTable>[],
) {
return await this.db.transaction().execute(async (tx) => {
const createdWorkflow = await tx.insertInto('workflow').values(workflow).returningAll().executeTakeFirstOrThrow();
if (filters.length > 0) {
const newFilters = filters.map((filter) => ({
...filter,
workflowId: createdWorkflow.id,
}));
await tx.insertInto('workflow_filter').values(newFilters).execute();
}
if (actions.length > 0) {
const newActions = actions.map((action) => ({
...action,
workflowId: createdWorkflow.id,
}));
await tx.insertInto('workflow_action').values(newActions).execute();
}
return createdWorkflow;
});
}
async updateWorkflow(
id: string,
workflow: Updateable<WorkflowTable>,
filters: Insertable<WorkflowFilterTable>[] | undefined,
actions: Insertable<WorkflowActionTable>[] | undefined,
) {
return await this.db.transaction().execute(async (trx) => {
if (Object.keys(workflow).length > 0) {
await trx.updateTable('workflow').set(workflow).where('id', '=', id).execute();
}
if (filters !== undefined) {
await trx.deleteFrom('workflow_filter').where('workflowId', '=', id).execute();
if (filters.length > 0) {
const filtersWithWorkflowId = filters.map((filter) => ({
...filter,
workflowId: id,
}));
await trx.insertInto('workflow_filter').values(filtersWithWorkflowId).execute();
}
}
if (actions !== undefined) {
await trx.deleteFrom('workflow_action').where('workflowId', '=', id).execute();
if (actions.length > 0) {
const actionsWithWorkflowId = actions.map((action) => ({
...action,
workflowId: id,
}));
await trx.insertInto('workflow_action').values(actionsWithWorkflowId).execute();
}
}
return await trx.selectFrom('workflow').selectAll().where('id', '=', id).executeTakeFirstOrThrow();
});
}
@GenerateSql({ params: [DummyValue.UUID] })
async deleteWorkflow(id: string) {
await this.db.deleteFrom('workflow').where('id', '=', id).execute();
}
@GenerateSql({ params: [DummyValue.UUID] })
getFilters(workflowId: string) {
return this.db
.selectFrom('workflow_filter')
.selectAll()
.where('workflowId', '=', workflowId)
.orderBy('order', 'asc')
.execute();
}
@GenerateSql({ params: [DummyValue.UUID] })
async deleteFiltersByWorkflow(workflowId: string) {
await this.db.deleteFrom('workflow_filter').where('workflowId', '=', workflowId).execute();
}
@GenerateSql({ params: [DummyValue.UUID] })
getActions(workflowId: string) {
return this.db
.selectFrom('workflow_action')
.selectAll()
.where('workflowId', '=', workflowId)
.orderBy('order', 'asc')
.execute();
}
}