feat: backfill album users (#19234)

This commit is contained in:
Jason Rasmussen
2025-06-18 10:48:11 -04:00
committed by GitHub
parent 65e8d75e82
commit e0144b4ece
14 changed files with 232 additions and 24 deletions

View File

@@ -355,6 +355,12 @@ export const columns = {
'updateId',
'duration',
],
syncAlbumUser: [
'albums_shared_users_users.albumsId as albumId',
'albums_shared_users_users.usersId as userId',
'albums_shared_users_users.role',
'albums_shared_users_users.updateId',
],
stack: ['stack.id', 'stack.primaryAssetId', 'ownerId'],
syncAssetExif: [
'exif.assetId',

4
server/src/db.d.ts vendored
View File

@@ -98,8 +98,10 @@ export interface AlbumsSharedUsersUsers {
albumsId: string;
role: Generated<AlbumUserRole>;
usersId: string;
updatedAt: Generated<Timestamp>;
createId: Generated<string>;
createdAt: Generated<Timestamp>;
updateId: Generated<string>;
updatedAt: Generated<Timestamp>;
}
export interface ApiKeys {

View File

@@ -161,6 +161,7 @@ export type SyncItem = {
[SyncEntityType.AlbumV1]: SyncAlbumV1;
[SyncEntityType.AlbumDeleteV1]: SyncAlbumDeleteV1;
[SyncEntityType.AlbumUserV1]: SyncAlbumUserV1;
[SyncEntityType.AlbumUserBackfillV1]: SyncAlbumUserV1;
[SyncEntityType.AlbumUserDeleteV1]: SyncAlbumUserDeleteV1;
[SyncEntityType.SyncAckV1]: object;
};

View File

@@ -603,6 +603,7 @@ export enum SyncEntityType {
AlbumV1 = 'AlbumV1',
AlbumDeleteV1 = 'AlbumDeleteV1',
AlbumUserV1 = 'AlbumUserV1',
AlbumUserBackfillV1 = 'AlbumUserBackfillV1',
AlbumUserDeleteV1 = 'AlbumUserDeleteV1',
SyncAckV1 = 'SyncAckV1',

View File

@@ -394,6 +394,35 @@ where
order by
"id" asc
-- SyncRepository.getAlbumBackfill
select
"albumsId" as "id",
"createId"
from
"albums_shared_users_users"
where
"usersId" = $1
and "createId" >= $2
and "createdAt" < now() - interval '1 millisecond'
order by
"createId" asc
-- SyncRepository.getAlbumUsersBackfill
select
"albums_shared_users_users"."albumsId" as "albumId",
"albums_shared_users_users"."usersId" as "userId",
"albums_shared_users_users"."role",
"albums_shared_users_users"."updateId"
from
"albums_shared_users_users"
where
"albumsId" = $1
and "updatedAt" < now() - interval '1 millisecond'
and "updateId" < $2
and "updateId" >= $3
order by
"updateId" asc
-- SyncRepository.getAlbumUserUpserts
select
"albums_shared_users_users"."albumsId" as "albumId",

View File

@@ -254,16 +254,36 @@ export class SyncRepository {
.stream();
}
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID] })
getAlbumBackfill(userId: string, afterCreateId?: string) {
return this.db
.selectFrom('albums_shared_users_users')
.select(['albumsId as id', 'createId'])
.where('usersId', '=', userId)
.$if(!!afterCreateId, (qb) => qb.where('createId', '>=', afterCreateId!))
.where('createdAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.orderBy('createId', 'asc')
.execute();
}
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
getAlbumUsersBackfill(albumId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
return this.db
.selectFrom('albums_shared_users_users')
.select(columns.syncAlbumUser)
.where('albumsId', '=', albumId)
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('updateId', '<', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!))
.orderBy('updateId', 'asc')
.stream();
}
@GenerateSql({ params: [DummyValue.UUID], stream: true })
getAlbumUserUpserts(userId: string, ack?: SyncAck) {
return this.db
.selectFrom('albums_shared_users_users')
.select([
'albums_shared_users_users.albumsId as albumId',
'albums_shared_users_users.usersId as userId',
'albums_shared_users_users.role',
'albums_shared_users_users.updateId',
])
.select(columns.syncAlbumUser)
.where('albums_shared_users_users.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.$if(!!ack, (qb) => qb.where('albums_shared_users_users.updateId', '>', ack!.updateId))
.orderBy('albums_shared_users_users.updateId', 'asc')

View File

@@ -0,0 +1,15 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await sql`ALTER TABLE "albums_shared_users_users" ADD "createId" uuid NOT NULL DEFAULT immich_uuid_v7();`.execute(db);
await sql`ALTER TABLE "albums_shared_users_users" ADD "createdAt" timestamp with time zone NOT NULL DEFAULT now();`.execute(db);
await sql`CREATE INDEX "IDX_album_users_create_id" ON "albums_shared_users_users" ("createId")`.execute(db);
await sql`CREATE INDEX "IDX_partners_create_id" ON "partners" ("createId")`.execute(db);
}
export async function down(db: Kysely<any>): Promise<void> {
await sql`DROP INDEX "IDX_partners_create_id";`.execute(db);
await sql`DROP INDEX "IDX_album_users_create_id";`.execute(db);
await sql`ALTER TABLE "albums_shared_users_users" DROP COLUMN "createId";`.execute(db);
await sql`ALTER TABLE "albums_shared_users_users" DROP COLUMN "createdAt";`.execute(db);
}

View File

@@ -1,4 +1,4 @@
import { UpdatedAtTrigger, UpdateIdColumn } from 'src/decorators';
import { CreateIdColumn, UpdatedAtTrigger, UpdateIdColumn } from 'src/decorators';
import { AlbumUserRole } from 'src/enum';
import { album_user_after_insert, album_users_delete_audit } from 'src/schema/functions';
import { AlbumTable } from 'src/schema/tables/album.table';
@@ -7,6 +7,7 @@ import {
AfterDeleteTrigger,
AfterInsertTrigger,
Column,
CreateDateColumn,
ForeignKeyColumn,
Index,
Table,
@@ -51,6 +52,12 @@ export class AlbumUserTable {
@Column({ type: 'character varying', default: AlbumUserRole.EDITOR })
role!: AlbumUserRole;
@CreateIdColumn({ indexName: 'IDX_album_users_create_id' })
createId?: string;
@CreateDateColumn()
createdAt!: Date;
@UpdateIdColumn({ indexName: 'IDX_album_users_update_id' })
updateId?: string;

View File

@@ -27,7 +27,7 @@ export class PartnerTable {
@CreateDateColumn()
createdAt!: Date;
@CreateIdColumn()
@CreateIdColumn({ indexName: 'IDX_partners_create_id' })
createId!: string;
@UpdateDateColumn()

View File

@@ -138,14 +138,14 @@ export class SyncService extends BaseService {
break;
}
case SyncRequestType.PartnerAssetsV1: {
await this.syncPartnerAssetsV1(response, checkpointMap, auth, sessionId);
case SyncRequestType.AssetExifsV1: {
await this.syncAssetExifsV1(response, checkpointMap, auth);
break;
}
case SyncRequestType.AssetExifsV1: {
await this.syncAssetExifsV1(response, checkpointMap, auth);
case SyncRequestType.PartnerAssetsV1: {
await this.syncPartnerAssetsV1(response, checkpointMap, auth, sessionId);
break;
}
@@ -160,7 +160,7 @@ export class SyncService extends BaseService {
}
case SyncRequestType.AlbumUsersV1: {
await this.syncAlbumUsersV1(response, checkpointMap, auth);
await this.syncAlbumUsersV1(response, checkpointMap, auth, sessionId);
break;
}
@@ -330,18 +330,50 @@ export class SyncService extends BaseService {
}
}
private async syncAlbumUsersV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto) {
const deletes = this.syncRepository.getAlbumUserDeletes(
auth.user.id,
checkpointMap[SyncEntityType.AlbumUserDeleteV1],
);
private async syncAlbumUsersV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto, sessionId: string) {
const backfillType = SyncEntityType.AlbumUserBackfillV1;
const upsertType = SyncEntityType.AlbumUserV1;
const deleteType = SyncEntityType.AlbumUserDeleteV1;
const backfillCheckpoint = checkpointMap[backfillType];
const upsertCheckpoint = checkpointMap[upsertType];
const deletes = this.syncRepository.getAlbumUserDeletes(auth.user.id, checkpointMap[deleteType]);
for await (const { id, ...data } of deletes) {
send(response, { type: SyncEntityType.AlbumUserDeleteV1, ids: [id], data });
send(response, { type: deleteType, ids: [id], data });
}
const upserts = this.syncRepository.getAlbumUserUpserts(auth.user.id, checkpointMap[SyncEntityType.AlbumUserV1]);
const albums = await this.syncRepository.getAlbumBackfill(auth.user.id, backfillCheckpoint?.updateId);
if (upsertCheckpoint) {
const endId = upsertCheckpoint.updateId;
for (const album of albums) {
if (isEntityBackfillComplete(album, backfillCheckpoint)) {
continue;
}
const startId = getStartId(album, backfillCheckpoint);
const backfill = this.syncRepository.getAlbumUsersBackfill(album.id, startId, endId);
for await (const { updateId, ...data } of backfill) {
send(response, { type: backfillType, ids: [updateId], data });
}
sendEntityBackfillCompleteAck(response, backfillType, album.id);
}
} else if (albums.length > 0) {
await this.upsertBackfillCheckpoint({
type: backfillType,
sessionId,
createId: albums.at(-1)!.createId,
});
}
const upserts = this.syncRepository.getAlbumUserUpserts(auth.user.id, checkpointMap[upsertType]);
for await (const { updateId, ...data } of upserts) {
send(response, { type: SyncEntityType.AlbumUserV1, ids: [updateId], data });
send(response, { type: upsertType, ids: [updateId], data });
}
}