fix: partner and album backfill acks (#19371)

fix: partner sync being entirely broken
This commit is contained in:
Zack Pollard
2025-06-20 17:14:36 +01:00
committed by GitHub
parent a04c6ed80d
commit 0b44d4b6f2
4 changed files with 103 additions and 29 deletions

View File

@@ -130,7 +130,7 @@ from
where
"ownerId" = $1
and "updatedAt" < now() - interval '1 millisecond'
and "updateId" < $2
and "updateId" <= $2
and "updateId" >= $3
order by
"updateId" asc
@@ -274,7 +274,7 @@ from
where
"assets"."ownerId" = $1
and "exif"."updatedAt" < now() - interval '1 millisecond'
and "exif"."updateId" < $2
and "exif"."updateId" <= $2
and "exif"."updateId" >= $3
order by
"exif"."updateId" asc
@@ -418,7 +418,7 @@ from
where
"albumsId" = $1
and "updatedAt" < now() - interval '1 millisecond'
and "updateId" < $2
and "updateId" <= $2
and "updateId" >= $3
order by
"updateId" asc

View File

@@ -111,7 +111,7 @@ export class SyncRepository {
.select(columns.syncAsset)
.where('ownerId', '=', partnerId)
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('updateId', '<', beforeUpdateId)
.where('updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!))
.orderBy('updateId', 'asc')
.stream();
@@ -169,7 +169,7 @@ export class SyncRepository {
.innerJoin('assets', 'assets.id', 'exif.assetId')
.where('assets.ownerId', '=', partnerId)
.where('exif.updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('exif.updateId', '<', beforeUpdateId)
.where('exif.updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('exif.updateId', '>=', afterUpdateId!))
.orderBy('exif.updateId', 'asc')
.stream();
@@ -273,7 +273,7 @@ export class SyncRepository {
.select(columns.syncAlbumUser)
.where('albumsId', '=', albumId)
.where('updatedAt', '<', sql.raw<Date>("now() - interval '1 millisecond'"))
.where('updateId', '<', beforeUpdateId)
.where('updateId', '<=', beforeUpdateId)
.$if(!!afterUpdateId, (eb) => eb.where('updateId', '>=', afterUpdateId!))
.orderBy('updateId', 'asc')
.stream();

View File

@@ -38,11 +38,11 @@ const mapSyncAssetV1 = ({ checksum, thumbhash, ...data }: AssetLike): SyncAssetV
thumbhash: thumbhash ? hexOrBufferToBase64(thumbhash) : null,
});
const isEntityBackfillComplete = (entity: { createId: string }, checkpoint: SyncAck | undefined): boolean =>
entity.createId === checkpoint?.updateId && checkpoint.extraId === COMPLETE_ID;
const isEntityBackfillComplete = (createId: string, checkpoint: SyncAck | undefined): boolean =>
createId === checkpoint?.updateId && checkpoint.extraId === COMPLETE_ID;
const getStartId = (entity: { createId: string }, checkpoint: SyncAck | undefined): string | undefined =>
checkpoint?.updateId === entity.createId ? checkpoint?.extraId : undefined;
const getStartId = (createId: string, checkpoint: SyncAck | undefined): string | undefined =>
createId === checkpoint?.updateId ? checkpoint?.extraId : undefined;
const send = <T extends keyof SyncItem, D extends SyncItem[T]>(response: Writable, item: SerializeOptions<T, D>) => {
response.write(serialize(item));
@@ -235,22 +235,23 @@ export class SyncService extends BaseService {
const endId = upsertCheckpoint.updateId;
for (const partner of partners) {
if (isEntityBackfillComplete(partner, backfillCheckpoint)) {
const createId = partner.createId;
if (isEntityBackfillComplete(createId, backfillCheckpoint)) {
continue;
}
const startId = getStartId(partner, backfillCheckpoint);
const startId = getStartId(createId, backfillCheckpoint);
const backfill = this.syncRepository.getPartnerAssetsBackfill(partner.sharedById, startId, endId);
for await (const { updateId, ...data } of backfill) {
send(response, {
type: backfillType,
ids: [updateId],
ids: [createId, updateId],
data: mapSyncAssetV1(data),
});
}
sendEntityBackfillCompleteAck(response, backfillType, partner.sharedById);
sendEntityBackfillCompleteAck(response, backfillType, createId);
}
} else if (partners.length > 0) {
await this.upsertBackfillCheckpoint({
@@ -291,18 +292,19 @@ export class SyncService extends BaseService {
const endId = upsertCheckpoint.updateId;
for (const partner of partners) {
if (isEntityBackfillComplete(partner, backfillCheckpoint)) {
const createId = partner.createId;
if (isEntityBackfillComplete(createId, backfillCheckpoint)) {
continue;
}
const startId = getStartId(partner, backfillCheckpoint);
const startId = getStartId(createId, backfillCheckpoint);
const backfill = this.syncRepository.getPartnerAssetExifsBackfill(partner.sharedById, startId, endId);
for await (const { updateId, ...data } of backfill) {
send(response, { type: backfillType, ids: [updateId], data });
send(response, { type: backfillType, ids: [partner.createId, updateId], data });
}
sendEntityBackfillCompleteAck(response, backfillType, partner.sharedById);
sendEntityBackfillCompleteAck(response, backfillType, partner.createId);
}
} else if (partners.length > 0) {
await this.upsertBackfillCheckpoint({
@@ -350,18 +352,19 @@ export class SyncService extends BaseService {
const endId = upsertCheckpoint.updateId;
for (const album of albums) {
if (isEntityBackfillComplete(album, backfillCheckpoint)) {
const createId = album.createId;
if (isEntityBackfillComplete(createId, backfillCheckpoint)) {
continue;
}
const startId = getStartId(album, backfillCheckpoint);
const startId = getStartId(createId, backfillCheckpoint);
const backfill = this.syncRepository.getAlbumUsersBackfill(album.id, startId, endId);
for await (const { updateId, ...data } of backfill) {
send(response, { type: backfillType, ids: [updateId], data });
send(response, { type: backfillType, ids: [createId, updateId], data });
}
sendEntityBackfillCompleteAck(response, backfillType, album.id);
sendEntityBackfillCompleteAck(response, backfillType, createId);
}
} else if (albums.length > 0) {
await this.upsertBackfillCheckpoint({