mirror of
https://github.com/immich-app/immich.git
synced 2025-12-22 17:24:56 +03:00
feat: sync partner stacks (#19635)
This commit is contained in:
@@ -356,6 +356,13 @@ export const columns = {
|
||||
'assets.duration',
|
||||
],
|
||||
syncAlbumUser: ['album_users.albumsId as albumId', 'album_users.usersId as userId', 'album_users.role'],
|
||||
syncStack: [
|
||||
'asset_stack.id',
|
||||
'asset_stack.createdAt',
|
||||
'asset_stack.updatedAt',
|
||||
'asset_stack.primaryAssetId',
|
||||
'asset_stack.ownerId',
|
||||
],
|
||||
stack: ['stack.id', 'stack.primaryAssetId', 'ownerId'],
|
||||
syncAssetExif: [
|
||||
'exif.assetId',
|
||||
|
||||
@@ -267,6 +267,9 @@ export type SyncItem = {
|
||||
[SyncEntityType.MemoryToAssetDeleteV1]: SyncMemoryAssetDeleteV1;
|
||||
[SyncEntityType.StackV1]: SyncStackV1;
|
||||
[SyncEntityType.StackDeleteV1]: SyncStackDeleteV1;
|
||||
[SyncEntityType.PartnerStackBackfillV1]: SyncStackV1;
|
||||
[SyncEntityType.PartnerStackDeleteV1]: SyncStackDeleteV1;
|
||||
[SyncEntityType.PartnerStackV1]: SyncStackV1;
|
||||
[SyncEntityType.SyncAckV1]: SyncAckV1;
|
||||
};
|
||||
|
||||
|
||||
@@ -573,20 +573,21 @@ export enum DatabaseLock {
|
||||
}
|
||||
|
||||
export enum SyncRequestType {
|
||||
UsersV1 = 'UsersV1',
|
||||
PartnersV1 = 'PartnersV1',
|
||||
AssetsV1 = 'AssetsV1',
|
||||
AssetExifsV1 = 'AssetExifsV1',
|
||||
PartnerAssetsV1 = 'PartnerAssetsV1',
|
||||
PartnerAssetExifsV1 = 'PartnerAssetExifsV1',
|
||||
AlbumsV1 = 'AlbumsV1',
|
||||
AlbumUsersV1 = 'AlbumUsersV1',
|
||||
AlbumToAssetsV1 = 'AlbumToAssetsV1',
|
||||
AlbumAssetsV1 = 'AlbumAssetsV1',
|
||||
AlbumAssetExifsV1 = 'AlbumAssetExifsV1',
|
||||
AssetsV1 = 'AssetsV1',
|
||||
AssetExifsV1 = 'AssetExifsV1',
|
||||
MemoriesV1 = 'MemoriesV1',
|
||||
MemoryToAssetsV1 = 'MemoryToAssetsV1',
|
||||
PartnersV1 = 'PartnersV1',
|
||||
PartnerAssetsV1 = 'PartnerAssetsV1',
|
||||
PartnerAssetExifsV1 = 'PartnerAssetExifsV1',
|
||||
PartnerStacksV1 = 'PartnerStacksV1',
|
||||
StacksV1 = 'StacksV1',
|
||||
UsersV1 = 'UsersV1',
|
||||
}
|
||||
|
||||
export enum SyncEntityType {
|
||||
@@ -605,6 +606,9 @@ export enum SyncEntityType {
|
||||
PartnerAssetDeleteV1 = 'PartnerAssetDeleteV1',
|
||||
PartnerAssetExifV1 = 'PartnerAssetExifV1',
|
||||
PartnerAssetExifBackfillV1 = 'PartnerAssetExifBackfillV1',
|
||||
PartnerStackBackfillV1 = 'PartnerStackBackfillV1',
|
||||
PartnerStackDeleteV1 = 'PartnerStackDeleteV1',
|
||||
PartnerStackV1 = 'PartnerStackV1',
|
||||
|
||||
AlbumV1 = 'AlbumV1',
|
||||
AlbumDeleteV1 = 'AlbumDeleteV1',
|
||||
|
||||
@@ -689,6 +689,66 @@ where
|
||||
order by
|
||||
"updateId" asc
|
||||
|
||||
-- SyncRepository.partnerStack.getDeletes
|
||||
select
|
||||
"id",
|
||||
"stackId"
|
||||
from
|
||||
"stacks_audit"
|
||||
where
|
||||
"userId" in (
|
||||
select
|
||||
"sharedById"
|
||||
from
|
||||
"partners"
|
||||
where
|
||||
"sharedWithId" = $1
|
||||
)
|
||||
and "deletedAt" < now() - interval '1 millisecond'
|
||||
order by
|
||||
"id" asc
|
||||
|
||||
-- SyncRepository.partnerStack.getBackfill
|
||||
select
|
||||
"asset_stack"."id",
|
||||
"asset_stack"."createdAt",
|
||||
"asset_stack"."updatedAt",
|
||||
"asset_stack"."primaryAssetId",
|
||||
"asset_stack"."ownerId",
|
||||
"updateId"
|
||||
from
|
||||
"asset_stack"
|
||||
where
|
||||
"ownerId" = $1
|
||||
and "updatedAt" < now() - interval '1 millisecond'
|
||||
and "updateId" <= $2
|
||||
and "updateId" >= $3
|
||||
order by
|
||||
"updateId" asc
|
||||
|
||||
-- SyncRepository.partnerStack.getUpserts
|
||||
select
|
||||
"asset_stack"."id",
|
||||
"asset_stack"."createdAt",
|
||||
"asset_stack"."updatedAt",
|
||||
"asset_stack"."primaryAssetId",
|
||||
"asset_stack"."ownerId",
|
||||
"updateId"
|
||||
from
|
||||
"asset_stack"
|
||||
where
|
||||
"ownerId" in (
|
||||
select
|
||||
"sharedById"
|
||||
from
|
||||
"partners"
|
||||
where
|
||||
"sharedWithId" = $1
|
||||
)
|
||||
and "updatedAt" < now() - interval '1 millisecond'
|
||||
order by
|
||||
"updateId" asc
|
||||
|
||||
-- SyncRepository.stack.getDeletes
|
||||
select
|
||||
"id",
|
||||
@@ -703,11 +763,11 @@ order by
|
||||
|
||||
-- SyncRepository.stack.getUpserts
|
||||
select
|
||||
"id",
|
||||
"createdAt",
|
||||
"updatedAt",
|
||||
"primaryAssetId",
|
||||
"ownerId",
|
||||
"asset_stack"."id",
|
||||
"asset_stack"."createdAt",
|
||||
"asset_stack"."updatedAt",
|
||||
"asset_stack"."primaryAssetId",
|
||||
"asset_stack"."ownerId",
|
||||
"updateId"
|
||||
from
|
||||
"asset_stack"
|
||||
|
||||
@@ -41,6 +41,7 @@ export class SyncRepository {
|
||||
partner: PartnerSync;
|
||||
partnerAsset: PartnerAssetsSync;
|
||||
partnerAssetExif: PartnerAssetExifsSync;
|
||||
partnerStack: PartnerStackSync;
|
||||
stack: StackSync;
|
||||
user: UserSync;
|
||||
|
||||
@@ -57,6 +58,7 @@ export class SyncRepository {
|
||||
this.partner = new PartnerSync(this.db);
|
||||
this.partnerAsset = new PartnerAssetsSync(this.db);
|
||||
this.partnerAssetExif = new PartnerAssetExifsSync(this.db);
|
||||
this.partnerStack = new PartnerStackSync(this.db);
|
||||
this.stack = new StackSync(this.db);
|
||||
this.user = new UserSync(this.db);
|
||||
}
|
||||
@@ -552,13 +554,54 @@ class StackSync extends BaseSync {
|
||||
getUpserts(userId: string, ack?: SyncAck) {
|
||||
return this.db
|
||||
.selectFrom('asset_stack')
|
||||
.select(['id', 'createdAt', 'updatedAt', 'primaryAssetId', 'ownerId', 'updateId'])
|
||||
.select(columns.syncStack)
|
||||
.select('updateId')
|
||||
.where('ownerId', '=', userId)
|
||||
.$call((qb) => this.upsertTableFilters(qb, ack))
|
||||
.stream();
|
||||
}
|
||||
}
|
||||
|
||||
class PartnerStackSync extends BaseSync {
|
||||
@GenerateSql({ params: [DummyValue.UUID], stream: true })
|
||||
getDeletes(userId: string, ack?: SyncAck) {
|
||||
return this.db
|
||||
.selectFrom('stacks_audit')
|
||||
.select(['id', 'stackId'])
|
||||
.where('userId', 'in', (eb) =>
|
||||
eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId),
|
||||
)
|
||||
.$call((qb) => this.auditTableFilters(qb, ack))
|
||||
.stream();
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID, DummyValue.UUID], stream: true })
|
||||
getBackfill(partnerId: string, afterUpdateId: string | undefined, beforeUpdateId: string) {
|
||||
return this.db
|
||||
.selectFrom('asset_stack')
|
||||
.select(columns.syncStack)
|
||||
.select('updateId')
|
||||
.where('ownerId', '=', partnerId)
|
||||
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
|
||||
.where('updateId', '<=', beforeUpdateId)
|
||||
.$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!))
|
||||
.orderBy('updateId', 'asc')
|
||||
.stream();
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [DummyValue.UUID], stream: true })
|
||||
getUpserts(userId: string, ack?: SyncAck) {
|
||||
return this.db
|
||||
.selectFrom('asset_stack')
|
||||
.select(columns.syncStack)
|
||||
.select('updateId')
|
||||
.where('ownerId', 'in', (eb) =>
|
||||
eb.selectFrom('partners').select(['sharedById']).where('sharedWithId', '=', userId),
|
||||
)
|
||||
.$call((qb) => this.upsertTableFilters(qb, ack))
|
||||
.stream();
|
||||
}
|
||||
}
|
||||
class UserSync extends BaseSync {
|
||||
@GenerateSql({ params: [], stream: true })
|
||||
getDeletes(ack?: SyncAck) {
|
||||
|
||||
@@ -59,6 +59,7 @@ export const SYNC_TYPES_ORDER = [
|
||||
SyncRequestType.AssetsV1,
|
||||
SyncRequestType.StacksV1,
|
||||
SyncRequestType.PartnerAssetsV1,
|
||||
SyncRequestType.PartnerStacksV1,
|
||||
SyncRequestType.AlbumAssetsV1,
|
||||
SyncRequestType.AlbumsV1,
|
||||
SyncRequestType.AlbumUsersV1,
|
||||
@@ -139,6 +140,7 @@ export class SyncService extends BaseService {
|
||||
[SyncRequestType.MemoriesV1]: () => this.syncMemoriesV1(response, checkpointMap, auth),
|
||||
[SyncRequestType.MemoryToAssetsV1]: () => this.syncMemoryAssetsV1(response, checkpointMap, auth),
|
||||
[SyncRequestType.StacksV1]: () => this.syncStackV1(response, checkpointMap, auth),
|
||||
[SyncRequestType.PartnerStacksV1]: () => this.syncPartnerStackV1(response, checkpointMap, auth, sessionId),
|
||||
};
|
||||
|
||||
for (const type of SYNC_TYPES_ORDER.filter((type) => dto.types.includes(type))) {
|
||||
@@ -526,6 +528,54 @@ export class SyncService extends BaseService {
|
||||
}
|
||||
}
|
||||
|
||||
private async syncPartnerStackV1(response: Writable, checkpointMap: CheckpointMap, auth: AuthDto, sessionId: string) {
|
||||
const deleteType = SyncEntityType.PartnerStackDeleteV1;
|
||||
const deletes = this.syncRepository.partnerStack.getDeletes(auth.user.id, checkpointMap[deleteType]);
|
||||
for await (const { id, ...data } of deletes) {
|
||||
send(response, { type: deleteType, ids: [id], data });
|
||||
}
|
||||
|
||||
const backfillType = SyncEntityType.PartnerStackBackfillV1;
|
||||
const backfillCheckpoint = checkpointMap[backfillType];
|
||||
const partners = await this.syncRepository.partner.getCreatedAfter(auth.user.id, backfillCheckpoint?.updateId);
|
||||
const upsertType = SyncEntityType.PartnerStackV1;
|
||||
const upsertCheckpoint = checkpointMap[upsertType];
|
||||
if (upsertCheckpoint) {
|
||||
const endId = upsertCheckpoint.updateId;
|
||||
|
||||
for (const partner of partners) {
|
||||
const createId = partner.createId;
|
||||
if (isEntityBackfillComplete(createId, backfillCheckpoint)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
const startId = getStartId(createId, backfillCheckpoint);
|
||||
const backfill = this.syncRepository.partnerStack.getBackfill(partner.sharedById, startId, endId);
|
||||
|
||||
for await (const { updateId, ...data } of backfill) {
|
||||
send(response, {
|
||||
type: backfillType,
|
||||
ids: [createId, updateId],
|
||||
data,
|
||||
});
|
||||
}
|
||||
|
||||
sendEntityBackfillCompleteAck(response, backfillType, createId);
|
||||
}
|
||||
} else if (partners.length > 0) {
|
||||
await this.upsertBackfillCheckpoint({
|
||||
type: backfillType,
|
||||
sessionId,
|
||||
createId: partners.at(-1)!.createId,
|
||||
});
|
||||
}
|
||||
|
||||
const upserts = this.syncRepository.partnerStack.getUpserts(auth.user.id, checkpointMap[upsertType]);
|
||||
for await (const { updateId, ...data } of upserts) {
|
||||
send(response, { type: upsertType, ids: [updateId], data });
|
||||
}
|
||||
}
|
||||
|
||||
private async upsertBackfillCheckpoint(item: { type: SyncEntityType; sessionId: string; createId: string }) {
|
||||
const { type, sessionId, createId } = item;
|
||||
await this.syncCheckpointRepository.upsertAll([
|
||||
|
||||
Reference in New Issue
Block a user