Files
immich/server/src/utils/database.ts
Daniel Dietzler c15998e805 fix: asset update race condition (#24384)
* fix: asset update race condition

* fix: asset update race condition

* single statement

* update sql

* missed one

* fix `none` handling

* fix: tests

* chore: simplify update all assets

* fix: updating lockable properties

---------

Co-authored-by: mertalev <101130780+mertalev@users.noreply.github.com>
2025-12-17 09:23:13 -06:00

498 lines
18 KiB
TypeScript

import {
DeduplicateJoinsPlugin,
Expression,
ExpressionBuilder,
ExpressionWrapper,
Kysely,
KyselyConfig,
Nullable,
Selectable,
SelectQueryBuilder,
Simplify,
sql,
} from 'kysely';
import { PostgresJSDialect } from 'kysely-postgres-js';
import { jsonArrayFrom, jsonObjectFrom } from 'kysely/helpers/postgres';
import { parse } from 'pg-connection-string';
import postgres, { Notice, PostgresError } from 'postgres';
import { columns, Exif, lockableProperties, LockableProperty, Person } from 'src/database';
import { AssetFileType, AssetVisibility, DatabaseExtension, DatabaseSslMode } from 'src/enum';
import { AssetSearchBuilderOptions } from 'src/repositories/search.repository';
import { DB } from 'src/schema';
import { DatabaseConnectionParams, VectorExtension } from 'src/types';
type Ssl = 'require' | 'allow' | 'prefer' | 'verify-full' | boolean | object;
const isValidSsl = (ssl?: string | boolean | object): ssl is Ssl =>
typeof ssl !== 'string' || ssl === 'require' || ssl === 'allow' || ssl === 'prefer' || ssl === 'verify-full';
export const asPostgresConnectionConfig = (params: DatabaseConnectionParams) => {
if (params.connectionType === 'parts') {
return {
host: params.host,
port: params.port,
username: params.username,
password: params.password,
database: params.database,
ssl: params.ssl === DatabaseSslMode.Disable ? false : params.ssl,
};
}
const { host, port, user, password, database, ...rest } = parse(params.url);
let ssl: Ssl | undefined;
if (rest.ssl) {
if (!isValidSsl(rest.ssl)) {
throw new Error(`Invalid ssl option: ${rest.ssl}`);
}
ssl = rest.ssl;
}
return {
host: host ?? undefined,
port: port ? Number(port) : undefined,
username: user,
password,
database: database ?? undefined,
ssl,
};
};
export const getKyselyConfig = (
params: DatabaseConnectionParams,
options: Partial<postgres.Options<Record<string, postgres.PostgresType>>> = {},
): KyselyConfig => {
const config = asPostgresConnectionConfig(params);
return {
dialect: new PostgresJSDialect({
postgres: postgres({
onnotice: (notice: Notice) => {
if (notice['severity'] !== 'NOTICE') {
console.warn('Postgres notice:', notice);
}
},
max: 10,
types: {
date: {
to: 1184,
from: [1082, 1114, 1184],
serialize: (x: Date | string) => (x instanceof Date ? x.toISOString() : x),
parse: (x: string) => new Date(x),
},
bigint: {
to: 20,
from: [20, 1700],
parse: (value: string) => Number.parseInt(value),
serialize: (value: number) => value.toString(),
},
},
connection: {
TimeZone: 'UTC',
},
host: config.host,
port: config.port,
username: config.username,
password: config.password,
database: config.database,
ssl: config.ssl,
...options,
}),
}),
log(event) {
if (event.level === 'error') {
console.error('Query failed :', {
durationMs: event.queryDurationMillis,
error: event.error,
sql: event.query.sql,
params: event.query.parameters,
});
}
},
};
};
export const asUuid = (id: string | Expression<string>) => sql<string>`${id}::uuid`;
export const anyUuid = (ids: string[]) => sql<string>`any(${`{${ids}}`}::uuid[])`;
export const asVector = (embedding: number[]) => sql<string>`${`[${embedding}]`}::vector`;
export const unnest = (array: string[]) => sql<Record<string, string>>`unnest(array[${sql.join(array)}]::text[])`;
export const removeUndefinedKeys = <T extends object>(update: T, template: unknown) => {
for (const key in update) {
if ((template as T)[key] === undefined) {
delete update[key];
}
}
return update;
};
/** Modifies toJson return type to not set all properties as nullable */
export function toJson<DB, TB extends keyof DB & string, T extends TB | Expression<unknown>>(
eb: ExpressionBuilder<DB, TB>,
table: T,
) {
return eb.fn.toJson<T>(table) as ExpressionWrapper<
DB,
TB,
Simplify<
T extends TB
? Selectable<DB[T]> extends Nullable<infer N>
? N | null
: Selectable<DB[T]>
: T extends Expression<infer O>
? O extends Nullable<infer N>
? N | null
: O
: never
>
>;
}
export const ASSET_CHECKSUM_CONSTRAINT = 'UQ_assets_owner_checksum';
export const isAssetChecksumConstraint = (error: unknown) => {
return (error as PostgresError)?.constraint_name === 'UQ_assets_owner_checksum';
};
export function withDefaultVisibility<O>(qb: SelectQueryBuilder<DB, 'asset', O>) {
return qb.where('asset.visibility', 'in', [sql.lit(AssetVisibility.Archive), sql.lit(AssetVisibility.Timeline)]);
}
// TODO come up with a better query that only selects the fields we need
export function withExif<O>(qb: SelectQueryBuilder<DB, 'asset', O>) {
return qb
.leftJoin('asset_exif', 'asset.id', 'asset_exif.assetId')
.select((eb) => eb.fn.toJson(eb.table('asset_exif')).$castTo<Exif | null>().as('exifInfo'));
}
export function withExifInner<O>(qb: SelectQueryBuilder<DB, 'asset', O>) {
return qb
.innerJoin('asset_exif', 'asset.id', 'asset_exif.assetId')
.select((eb) => eb.fn.toJson(eb.table('asset_exif')).$castTo<Exif>().as('exifInfo'));
}
export function withSmartSearch<O>(qb: SelectQueryBuilder<DB, 'asset', O>) {
return qb
.leftJoin('smart_search', 'asset.id', 'smart_search.assetId')
.select((eb) => toJson(eb, 'smart_search').as('smartSearch'));
}
export function withFaces(eb: ExpressionBuilder<DB, 'asset'>, withDeletedFace?: boolean) {
return jsonArrayFrom(
eb
.selectFrom('asset_face')
.selectAll('asset_face')
.whereRef('asset_face.assetId', '=', 'asset.id')
.$if(!withDeletedFace, (qb) => qb.where('asset_face.deletedAt', 'is', null)),
).as('faces');
}
export function withFiles(eb: ExpressionBuilder<DB, 'asset'>, type?: AssetFileType) {
return jsonArrayFrom(
eb
.selectFrom('asset_file')
.select(columns.assetFiles)
.whereRef('asset_file.assetId', '=', 'asset.id')
.$if(!!type, (qb) => qb.where('asset_file.type', '=', type!)),
).as('files');
}
export function withFilePath(eb: ExpressionBuilder<DB, 'asset'>, type: AssetFileType) {
return eb
.selectFrom('asset_file')
.select('asset_file.path')
.whereRef('asset_file.assetId', '=', 'asset.id')
.where('asset_file.type', '=', type);
}
export function withFacesAndPeople(eb: ExpressionBuilder<DB, 'asset'>, withDeletedFace?: boolean) {
return jsonArrayFrom(
eb
.selectFrom('asset_face')
.leftJoinLateral(
(eb) =>
eb.selectFrom('person').selectAll('person').whereRef('asset_face.personId', '=', 'person.id').as('person'),
(join) => join.onTrue(),
)
.selectAll('asset_face')
.select((eb) => eb.table('person').$castTo<Person>().as('person'))
.whereRef('asset_face.assetId', '=', 'asset.id')
.$if(!withDeletedFace, (qb) => qb.where('asset_face.deletedAt', 'is', null)),
).as('faces');
}
export function hasPeople<O>(qb: SelectQueryBuilder<DB, 'asset', O>, personIds: string[]) {
return qb.innerJoin(
(eb) =>
eb
.selectFrom('asset_face')
.select('assetId')
.where('personId', '=', anyUuid(personIds!))
.where('deletedAt', 'is', null)
.groupBy('assetId')
.having((eb) => eb.fn.count('personId').distinct(), '=', personIds.length)
.as('has_people'),
(join) => join.onRef('has_people.assetId', '=', 'asset.id'),
);
}
export function inAlbums<O>(qb: SelectQueryBuilder<DB, 'asset', O>, albumIds: string[]) {
return qb.innerJoin(
(eb) =>
eb
.selectFrom('album_asset')
.select('assetId')
.where('albumId', '=', anyUuid(albumIds!))
.groupBy('assetId')
.having((eb) => eb.fn.count('albumId').distinct(), '=', albumIds.length)
.as('has_album'),
(join) => join.onRef('has_album.assetId', '=', 'asset.id'),
);
}
export function hasTags<O>(qb: SelectQueryBuilder<DB, 'asset', O>, tagIds: string[]) {
return qb.innerJoin(
(eb) =>
eb
.selectFrom('tag_asset')
.select('assetId')
.innerJoin('tag_closure', 'tag_asset.tagId', 'tag_closure.id_descendant')
.where('tag_closure.id_ancestor', '=', anyUuid(tagIds))
.groupBy('assetId')
.having((eb) => eb.fn.count('tag_closure.id_ancestor').distinct(), '>=', tagIds.length)
.as('has_tags'),
(join) => join.onRef('has_tags.assetId', '=', 'asset.id'),
);
}
export function withOwner(eb: ExpressionBuilder<DB, 'asset'>) {
return jsonObjectFrom(eb.selectFrom('user').select(columns.user).whereRef('user.id', '=', 'asset.ownerId')).as(
'owner',
);
}
export function withLibrary(eb: ExpressionBuilder<DB, 'asset'>) {
return jsonObjectFrom(
eb.selectFrom('library').selectAll('library').whereRef('library.id', '=', 'asset.libraryId'),
).as('library');
}
export function withTags(eb: ExpressionBuilder<DB, 'asset'>) {
return jsonArrayFrom(
eb
.selectFrom('tag')
.select(columns.tag)
.innerJoin('tag_asset', 'tag.id', 'tag_asset.tagId')
.whereRef('asset.id', '=', 'tag_asset.assetId'),
).as('tags');
}
export function truncatedDate<O>() {
return sql<O>`date_trunc(${sql.lit('MONTH')}, "localDateTime" AT TIME ZONE 'UTC') AT TIME ZONE 'UTC'`;
}
export function withTagId<O>(qb: SelectQueryBuilder<DB, 'asset', O>, tagId: string) {
return qb.where((eb) =>
eb.exists(
eb
.selectFrom('tag_closure')
.innerJoin('tag_asset', 'tag_asset.tagId', 'tag_closure.id_descendant')
.whereRef('tag_asset.assetId', '=', 'asset.id')
.where('tag_closure.id_ancestor', '=', tagId),
),
);
}
const isCJK = (c: number): boolean =>
(c >= 0x4e_00 && c <= 0x9f_ff) ||
(c >= 0xac_00 && c <= 0xd7_af) ||
(c >= 0x30_40 && c <= 0x30_9f) ||
(c >= 0x30_a0 && c <= 0x30_ff) ||
(c >= 0x34_00 && c <= 0x4d_bf);
export const tokenizeForSearch = (text: string): string[] => {
/* eslint-disable unicorn/prefer-code-point */
const tokens: string[] = [];
let i = 0;
while (i < text.length) {
const c = text.charCodeAt(i);
if (c <= 32) {
i++;
continue;
}
const start = i;
if (isCJK(c)) {
while (i < text.length && isCJK(text.charCodeAt(i))) {
i++;
}
if (i - start === 1) {
tokens.push(text[start]);
} else {
for (let k = start; k < i - 1; k++) {
tokens.push(text[k] + text[k + 1]);
}
}
} else {
while (i < text.length && text.charCodeAt(i) > 32 && !isCJK(text.charCodeAt(i))) {
i++;
}
tokens.push(text.slice(start, i));
}
}
return tokens;
};
const joinDeduplicationPlugin = new DeduplicateJoinsPlugin();
/** TODO: This should only be used for search-related queries, not as a general purpose query builder */
export function searchAssetBuilder(kysely: Kysely<DB>, options: AssetSearchBuilderOptions) {
options.withDeleted ||= !!(options.trashedAfter || options.trashedBefore || options.isOffline);
const visibility = options.visibility == null ? AssetVisibility.Timeline : options.visibility;
return kysely
.withPlugin(joinDeduplicationPlugin)
.selectFrom('asset')
.where('asset.visibility', '=', visibility)
.$if(!!options.albumIds && options.albumIds.length > 0, (qb) => inAlbums(qb, options.albumIds!))
.$if(!!options.tagIds && options.tagIds.length > 0, (qb) => hasTags(qb, options.tagIds!))
.$if(options.tagIds === null, (qb) =>
qb.where((eb) => eb.not(eb.exists((eb) => eb.selectFrom('tag_asset').whereRef('assetId', '=', 'asset.id')))),
)
.$if(!!options.personIds && options.personIds.length > 0, (qb) => hasPeople(qb, options.personIds!))
.$if(!!options.createdBefore, (qb) => qb.where('asset.createdAt', '<=', options.createdBefore!))
.$if(!!options.createdAfter, (qb) => qb.where('asset.createdAt', '>=', options.createdAfter!))
.$if(!!options.updatedBefore, (qb) => qb.where('asset.updatedAt', '<=', options.updatedBefore!))
.$if(!!options.updatedAfter, (qb) => qb.where('asset.updatedAt', '>=', options.updatedAfter!))
.$if(!!options.trashedBefore, (qb) => qb.where('asset.deletedAt', '<=', options.trashedBefore!))
.$if(!!options.trashedAfter, (qb) => qb.where('asset.deletedAt', '>=', options.trashedAfter!))
.$if(!!options.takenBefore, (qb) => qb.where('asset.fileCreatedAt', '<=', options.takenBefore!))
.$if(!!options.takenAfter, (qb) => qb.where('asset.fileCreatedAt', '>=', options.takenAfter!))
.$if(options.city !== undefined, (qb) =>
qb
.innerJoin('asset_exif', 'asset.id', 'asset_exif.assetId')
.where('asset_exif.city', options.city === null ? 'is' : '=', options.city!),
)
.$if(options.state !== undefined, (qb) =>
qb
.innerJoin('asset_exif', 'asset.id', 'asset_exif.assetId')
.where('asset_exif.state', options.state === null ? 'is' : '=', options.state!),
)
.$if(options.country !== undefined, (qb) =>
qb
.innerJoin('asset_exif', 'asset.id', 'asset_exif.assetId')
.where('asset_exif.country', options.country === null ? 'is' : '=', options.country!),
)
.$if(options.make !== undefined, (qb) =>
qb
.innerJoin('asset_exif', 'asset.id', 'asset_exif.assetId')
.where('asset_exif.make', options.make === null ? 'is' : '=', options.make!),
)
.$if(options.model !== undefined, (qb) =>
qb
.innerJoin('asset_exif', 'asset.id', 'asset_exif.assetId')
.where('asset_exif.model', options.model === null ? 'is' : '=', options.model!),
)
.$if(options.lensModel !== undefined, (qb) =>
qb
.innerJoin('asset_exif', 'asset.id', 'asset_exif.assetId')
.where('asset_exif.lensModel', options.lensModel === null ? 'is' : '=', options.lensModel!),
)
.$if(options.rating !== undefined, (qb) =>
qb
.innerJoin('asset_exif', 'asset.id', 'asset_exif.assetId')
.where('asset_exif.rating', options.rating === null ? 'is' : '=', options.rating!),
)
.$if(!!options.checksum, (qb) => qb.where('asset.checksum', '=', options.checksum!))
.$if(!!options.deviceAssetId, (qb) => qb.where('asset.deviceAssetId', '=', options.deviceAssetId!))
.$if(!!options.deviceId, (qb) => qb.where('asset.deviceId', '=', options.deviceId!))
.$if(!!options.id, (qb) => qb.where('asset.id', '=', asUuid(options.id!)))
.$if(!!options.libraryId, (qb) => qb.where('asset.libraryId', '=', asUuid(options.libraryId!)))
.$if(!!options.userIds, (qb) => qb.where('asset.ownerId', '=', anyUuid(options.userIds!)))
.$if(!!options.encodedVideoPath, (qb) => qb.where('asset.encodedVideoPath', '=', options.encodedVideoPath!))
.$if(!!options.originalPath, (qb) =>
qb.where(sql`f_unaccent(asset."originalPath")`, 'ilike', sql`'%' || f_unaccent(${options.originalPath}) || '%'`),
)
.$if(!!options.originalFileName, (qb) =>
qb.where(
sql`f_unaccent(asset."originalFileName")`,
'ilike',
sql`'%' || f_unaccent(${options.originalFileName}) || '%'`,
),
)
.$if(!!options.description, (qb) =>
qb
.innerJoin('asset_exif', 'asset.id', 'asset_exif.assetId')
.where(sql`f_unaccent(asset_exif.description)`, 'ilike', sql`'%' || f_unaccent(${options.description}) || '%'`),
)
.$if(!!options.ocr, (qb) =>
qb
.innerJoin('ocr_search', 'asset.id', 'ocr_search.assetId')
.where(() => sql`f_unaccent(ocr_search.text) %>> f_unaccent(${tokenizeForSearch(options.ocr!).join(' ')})`),
)
.$if(!!options.type, (qb) => qb.where('asset.type', '=', options.type!))
.$if(options.isFavorite !== undefined, (qb) => qb.where('asset.isFavorite', '=', options.isFavorite!))
.$if(options.isOffline !== undefined, (qb) => qb.where('asset.isOffline', '=', options.isOffline!))
.$if(options.isEncoded !== undefined, (qb) =>
qb.where('asset.encodedVideoPath', options.isEncoded ? 'is not' : 'is', null),
)
.$if(options.isMotion !== undefined, (qb) =>
qb.where('asset.livePhotoVideoId', options.isMotion ? 'is not' : 'is', null),
)
.$if(!!options.isNotInAlbum && (!options.albumIds || options.albumIds.length === 0), (qb) =>
qb.where((eb) => eb.not(eb.exists((eb) => eb.selectFrom('album_asset').whereRef('assetId', '=', 'asset.id')))),
)
.$if(!!options.withExif, withExifInner)
.$if(!!(options.withFaces || options.withPeople || options.personIds), (qb) => qb.select(withFacesAndPeople))
.$if(!options.withDeleted, (qb) => qb.where('asset.deletedAt', 'is', null));
}
export type ReindexVectorIndexOptions = { indexName: string; lists?: number };
type VectorIndexQueryOptions = { table: string; vectorExtension: VectorExtension } & ReindexVectorIndexOptions;
export function vectorIndexQuery({ vectorExtension, table, indexName, lists }: VectorIndexQueryOptions): string {
switch (vectorExtension) {
case DatabaseExtension.VectorChord: {
return `
CREATE INDEX IF NOT EXISTS ${indexName} ON ${table} USING vchordrq (embedding vector_cosine_ops) WITH (options = $$
residual_quantization = false
[build.internal]
lists = [${lists ?? 1}]
spherical_centroids = true
build_threads = 4
sampling_factor = 1024
$$)`;
}
case DatabaseExtension.Vectors: {
return `
CREATE INDEX IF NOT EXISTS ${indexName} ON ${table}
USING vectors (embedding vector_cos_ops) WITH (options = $$
optimizing.optimizing_threads = 4
[indexing.hnsw]
m = 16
ef_construction = 300
$$)`;
}
case DatabaseExtension.Vector: {
return `
CREATE INDEX IF NOT EXISTS ${indexName} ON ${table}
USING hnsw (embedding vector_cosine_ops)
WITH (ef_construction = 300, m = 16)`;
}
default: {
throw new Error(`Unsupported vector extension: '${vectorExtension}'`);
}
}
}
export const updateLockedColumns = <T extends Record<string, unknown> & { lockedProperties?: LockableProperty[] }>(
exif: T,
) => {
exif.lockedProperties = lockableProperties.filter((property) => property in exif);
return exif;
};