mirror of
https://github.com/immich-app/immich.git
synced 2025-12-21 09:15:44 +03:00
feat: pending sync reset flag (#19861)
This commit is contained in:
@@ -1,7 +1,7 @@
|
||||
import { Body, Controller, Delete, Get, HttpCode, HttpStatus, Param, Post } from '@nestjs/common';
|
||||
import { Body, Controller, Delete, Get, HttpCode, HttpStatus, Param, Post, Put } from '@nestjs/common';
|
||||
import { ApiTags } from '@nestjs/swagger';
|
||||
import { AuthDto } from 'src/dtos/auth.dto';
|
||||
import { SessionCreateDto, SessionCreateResponseDto, SessionResponseDto } from 'src/dtos/session.dto';
|
||||
import { SessionCreateDto, SessionCreateResponseDto, SessionResponseDto, SessionUpdateDto } from 'src/dtos/session.dto';
|
||||
import { Permission } from 'src/enum';
|
||||
import { Auth, Authenticated } from 'src/middleware/auth.guard';
|
||||
import { SessionService } from 'src/services/session.service';
|
||||
@@ -31,6 +31,16 @@ export class SessionController {
|
||||
return this.service.deleteAll(auth);
|
||||
}
|
||||
|
||||
@Put(':id')
|
||||
@Authenticated({ permission: Permission.SESSION_UPDATE })
|
||||
updateSession(
|
||||
@Auth() auth: AuthDto,
|
||||
@Param() { id }: UUIDParamDto,
|
||||
@Body() dto: SessionUpdateDto,
|
||||
): Promise<SessionResponseDto> {
|
||||
return this.service.update(auth, id, dto);
|
||||
}
|
||||
|
||||
@Delete(':id')
|
||||
@Authenticated({ permission: Permission.SESSION_DELETE })
|
||||
@HttpCode(HttpStatus.NO_CONTENT)
|
||||
|
||||
@@ -201,6 +201,7 @@ export type Album = Selectable<AlbumTable> & {
|
||||
|
||||
export type AuthSession = {
|
||||
id: string;
|
||||
isPendingSyncReset: boolean;
|
||||
hasElevatedPermission: boolean;
|
||||
};
|
||||
|
||||
@@ -238,6 +239,7 @@ export type Session = {
|
||||
deviceOS: string;
|
||||
deviceType: string;
|
||||
pinExpiresAt: Date | null;
|
||||
isPendingSyncReset: boolean;
|
||||
};
|
||||
|
||||
export type Exif = Omit<Selectable<ExifTable>, 'updatedAt' | 'updateId'>;
|
||||
@@ -311,7 +313,7 @@ export const columns = {
|
||||
'users.quotaSizeInBytes',
|
||||
],
|
||||
authApiKey: ['api_keys.id', 'api_keys.permissions'],
|
||||
authSession: ['sessions.id', 'sessions.updatedAt', 'sessions.pinExpiresAt'],
|
||||
authSession: ['sessions.id', 'sessions.isPendingSyncReset', 'sessions.updatedAt', 'sessions.pinExpiresAt'],
|
||||
authSharedLink: [
|
||||
'shared_links.id',
|
||||
'shared_links.userId',
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import { IsInt, IsPositive, IsString } from 'class-validator';
|
||||
import { Session } from 'src/database';
|
||||
import { Optional } from 'src/validation';
|
||||
import { Optional, ValidateBoolean } from 'src/validation';
|
||||
|
||||
export class SessionCreateDto {
|
||||
/**
|
||||
@@ -20,6 +20,11 @@ export class SessionCreateDto {
|
||||
deviceOS?: string;
|
||||
}
|
||||
|
||||
export class SessionUpdateDto {
|
||||
@ValidateBoolean({ optional: true })
|
||||
isPendingSyncReset?: boolean;
|
||||
}
|
||||
|
||||
export class SessionResponseDto {
|
||||
id!: string;
|
||||
createdAt!: string;
|
||||
@@ -28,6 +33,7 @@ export class SessionResponseDto {
|
||||
current!: boolean;
|
||||
deviceType!: string;
|
||||
deviceOS!: string;
|
||||
isPendingSyncReset!: boolean;
|
||||
}
|
||||
|
||||
export class SessionCreateResponseDto extends SessionResponseDto {
|
||||
@@ -42,4 +48,5 @@ export const mapSession = (entity: Session, currentId?: string): SessionResponse
|
||||
current: currentId === entity.id,
|
||||
deviceOS: entity.deviceOS,
|
||||
deviceType: entity.deviceType,
|
||||
isPendingSyncReset: entity.isPendingSyncReset,
|
||||
});
|
||||
|
||||
@@ -11,7 +11,7 @@ import {
|
||||
SyncEntityType,
|
||||
SyncRequestType,
|
||||
} from 'src/enum';
|
||||
import { Optional, ValidateDate, ValidateUUID } from 'src/validation';
|
||||
import { Optional, ValidateBoolean, ValidateDate, ValidateUUID } from 'src/validation';
|
||||
|
||||
export class AssetFullSyncDto {
|
||||
@ValidateUUID({ optional: true })
|
||||
@@ -256,6 +256,9 @@ export class SyncPersonDeleteV1 {
|
||||
@ExtraModel()
|
||||
export class SyncAckV1 {}
|
||||
|
||||
@ExtraModel()
|
||||
export class SyncResetV1 {}
|
||||
|
||||
export type SyncItem = {
|
||||
[SyncEntityType.UserV1]: SyncUserV1;
|
||||
[SyncEntityType.UserDeleteV1]: SyncUserDeleteV1;
|
||||
@@ -293,12 +296,16 @@ export type SyncItem = {
|
||||
[SyncEntityType.PersonV1]: SyncPersonV1;
|
||||
[SyncEntityType.PersonDeleteV1]: SyncPersonDeleteV1;
|
||||
[SyncEntityType.SyncAckV1]: SyncAckV1;
|
||||
[SyncEntityType.SyncResetV1]: SyncResetV1;
|
||||
};
|
||||
|
||||
export class SyncStreamDto {
|
||||
@IsEnum(SyncRequestType, { each: true })
|
||||
@ApiProperty({ enumName: 'SyncRequestType', enum: SyncRequestType, isArray: true })
|
||||
types!: SyncRequestType[];
|
||||
|
||||
@ValidateBoolean({ optional: true })
|
||||
reset?: boolean;
|
||||
}
|
||||
|
||||
export class SyncAckDto {
|
||||
|
||||
@@ -640,6 +640,7 @@ export enum SyncEntityType {
|
||||
PersonDeleteV1 = 'PersonDeleteV1',
|
||||
|
||||
SyncAckV1 = 'SyncAckV1',
|
||||
SyncResetV1 = 'SyncResetV1',
|
||||
}
|
||||
|
||||
export enum NotificationLevel {
|
||||
|
||||
@@ -13,6 +13,7 @@ where
|
||||
-- SessionRepository.getByToken
|
||||
select
|
||||
"sessions"."id",
|
||||
"sessions"."isPendingSyncReset",
|
||||
"sessions"."updatedAt",
|
||||
"sessions"."pinExpiresAt",
|
||||
(
|
||||
@@ -71,3 +72,15 @@ set
|
||||
"pinExpiresAt" = $1
|
||||
where
|
||||
"userId" = $2
|
||||
|
||||
-- SessionRepository.resetSyncProgress
|
||||
begin
|
||||
update "sessions"
|
||||
set
|
||||
"isPendingSyncReset" = $1
|
||||
where
|
||||
"id" = $2
|
||||
delete from "session_sync_checkpoints"
|
||||
where
|
||||
"sessionId" = $1
|
||||
commit
|
||||
|
||||
@@ -95,4 +95,14 @@ export class SessionRepository {
|
||||
async lockAll(userId: string) {
|
||||
await this.db.updateTable('sessions').set({ pinExpiresAt: null }).where('userId', '=', userId).execute();
|
||||
}
|
||||
|
||||
@GenerateSql({ params: [DummyValue.UUID] })
|
||||
async resetSyncProgress(sessionId: string) {
|
||||
await this.db.transaction().execute((tx) => {
|
||||
return Promise.all([
|
||||
tx.updateTable('sessions').set({ isPendingSyncReset: false }).where('id', '=', sessionId).execute(),
|
||||
tx.deleteFrom('session_sync_checkpoints').where('sessionId', '=', sessionId).execute(),
|
||||
]);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,9 @@
|
||||
import { Kysely, sql } from 'kysely';
|
||||
|
||||
export async function up(db: Kysely<any>): Promise<void> {
|
||||
await sql`ALTER TABLE "sessions" ADD "isPendingSyncReset" boolean NOT NULL DEFAULT false;`.execute(db);
|
||||
}
|
||||
|
||||
export async function down(db: Kysely<any>): Promise<void> {
|
||||
await sql`ALTER TABLE "sessions" DROP COLUMN "isPendingSyncReset";`.execute(db);
|
||||
}
|
||||
@@ -45,6 +45,9 @@ export class SessionTable {
|
||||
@UpdateIdColumn({ indexName: 'IDX_sessions_update_id' })
|
||||
updateId!: Generated<string>;
|
||||
|
||||
@Column({ type: 'boolean', default: false })
|
||||
isPendingSyncReset!: Generated<boolean>;
|
||||
|
||||
@Column({ type: 'timestamp with time zone', nullable: true })
|
||||
pinExpiresAt!: Timestamp | null;
|
||||
}
|
||||
|
||||
@@ -241,6 +241,7 @@ describe(AuthService.name, () => {
|
||||
const sessionWithToken = {
|
||||
id: session.id,
|
||||
updatedAt: session.updatedAt,
|
||||
isPendingSyncReset: false,
|
||||
user: factory.authUser(),
|
||||
pinExpiresAt: null,
|
||||
};
|
||||
@@ -255,7 +256,11 @@ describe(AuthService.name, () => {
|
||||
}),
|
||||
).resolves.toEqual({
|
||||
user: sessionWithToken.user,
|
||||
session: { id: session.id, hasElevatedPermission: false },
|
||||
session: {
|
||||
id: session.id,
|
||||
hasElevatedPermission: false,
|
||||
isPendingSyncReset: session.isPendingSyncReset,
|
||||
},
|
||||
});
|
||||
});
|
||||
});
|
||||
@@ -366,6 +371,7 @@ describe(AuthService.name, () => {
|
||||
id: session.id,
|
||||
updatedAt: session.updatedAt,
|
||||
user: factory.authUser(),
|
||||
isPendingSyncReset: false,
|
||||
pinExpiresAt: null,
|
||||
};
|
||||
|
||||
@@ -379,7 +385,11 @@ describe(AuthService.name, () => {
|
||||
}),
|
||||
).resolves.toEqual({
|
||||
user: sessionWithToken.user,
|
||||
session: { id: session.id, hasElevatedPermission: false },
|
||||
session: {
|
||||
id: session.id,
|
||||
hasElevatedPermission: false,
|
||||
isPendingSyncReset: session.isPendingSyncReset,
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
@@ -389,6 +399,7 @@ describe(AuthService.name, () => {
|
||||
id: session.id,
|
||||
updatedAt: session.updatedAt,
|
||||
user: factory.authUser(),
|
||||
isPendingSyncReset: false,
|
||||
pinExpiresAt: null,
|
||||
};
|
||||
|
||||
@@ -409,6 +420,7 @@ describe(AuthService.name, () => {
|
||||
id: session.id,
|
||||
updatedAt: session.updatedAt,
|
||||
user: factory.authUser(),
|
||||
isPendingSyncReset: false,
|
||||
pinExpiresAt: null,
|
||||
};
|
||||
|
||||
|
||||
@@ -466,6 +466,7 @@ export class AuthService extends BaseService {
|
||||
user: session.user,
|
||||
session: {
|
||||
id: session.id,
|
||||
isPendingSyncReset: session.isPendingSyncReset,
|
||||
hasElevatedPermission,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -2,7 +2,13 @@ import { BadRequestException, Injectable } from '@nestjs/common';
|
||||
import { DateTime } from 'luxon';
|
||||
import { OnJob } from 'src/decorators';
|
||||
import { AuthDto } from 'src/dtos/auth.dto';
|
||||
import { SessionCreateDto, SessionCreateResponseDto, SessionResponseDto, mapSession } from 'src/dtos/session.dto';
|
||||
import {
|
||||
SessionCreateDto,
|
||||
SessionCreateResponseDto,
|
||||
SessionResponseDto,
|
||||
SessionUpdateDto,
|
||||
mapSession,
|
||||
} from 'src/dtos/session.dto';
|
||||
import { JobName, JobStatus, Permission, QueueName } from 'src/enum';
|
||||
import { BaseService } from 'src/services/base.service';
|
||||
|
||||
@@ -44,6 +50,20 @@ export class SessionService extends BaseService {
|
||||
return sessions.map((session) => mapSession(session, auth.session?.id));
|
||||
}
|
||||
|
||||
async update(auth: AuthDto, id: string, dto: SessionUpdateDto): Promise<SessionResponseDto> {
|
||||
await this.requireAccess({ auth, permission: Permission.SESSION_UPDATE, ids: [id] });
|
||||
|
||||
if (Object.values(dto).filter((prop) => prop !== undefined).length === 0) {
|
||||
throw new BadRequestException('No fields to update');
|
||||
}
|
||||
|
||||
const session = await this.sessionRepository.update(id, {
|
||||
isPendingSyncReset: dto.isPendingSyncReset,
|
||||
});
|
||||
|
||||
return mapSession(session);
|
||||
}
|
||||
|
||||
async delete(auth: AuthDto, id: string): Promise<void> {
|
||||
await this.requireAccess({ auth, permission: Permission.AUTH_DEVICE_DELETE, ids: [id] });
|
||||
await this.sessionRepository.delete(id);
|
||||
|
||||
@@ -22,7 +22,7 @@ import { SyncAck } from 'src/types';
|
||||
import { getMyPartnerIds } from 'src/utils/asset.util';
|
||||
import { hexOrBufferToBase64 } from 'src/utils/bytes';
|
||||
import { setIsEqual } from 'src/utils/set';
|
||||
import { fromAck, serialize, SerializeOptions, toAck } from 'src/utils/sync';
|
||||
import { fromAck, mapJsonLine, serialize, SerializeOptions, toAck } from 'src/utils/sync';
|
||||
|
||||
type CheckpointMap = Partial<Record<SyncEntityType, SyncAck>>;
|
||||
type AssetLike = Omit<SyncAssetV1, 'checksum' | 'thumbhash'> & {
|
||||
@@ -118,30 +118,42 @@ export class SyncService extends BaseService {
|
||||
}
|
||||
|
||||
async stream(auth: AuthDto, response: Writable, dto: SyncStreamDto) {
|
||||
const sessionId = auth.session?.id;
|
||||
if (!sessionId) {
|
||||
const session = auth.session;
|
||||
if (!session) {
|
||||
return throwSessionRequired();
|
||||
}
|
||||
|
||||
const checkpoints = await this.syncCheckpointRepository.getAll(sessionId);
|
||||
if (dto.reset) {
|
||||
await this.sessionRepository.resetSyncProgress(session.id);
|
||||
session.isPendingSyncReset = false;
|
||||
}
|
||||
|
||||
if (session.isPendingSyncReset) {
|
||||
response.write(mapJsonLine({ type: SyncEntityType.SyncResetV1, data: {} }));
|
||||
response.end();
|
||||
return;
|
||||
}
|
||||
|
||||
const checkpoints = await this.syncCheckpointRepository.getAll(session.id);
|
||||
const checkpointMap: CheckpointMap = Object.fromEntries(checkpoints.map(({ type, ack }) => [type, fromAck(ack)]));
|
||||
|
||||
const handlers: Record<SyncRequestType, () => Promise<void>> = {
|
||||
[SyncRequestType.UsersV1]: () => this.syncUsersV1(response, checkpointMap),
|
||||
[SyncRequestType.PartnersV1]: () => this.syncPartnersV1(response, checkpointMap, auth),
|
||||
[SyncRequestType.AssetsV1]: () => this.syncAssetsV1(response, checkpointMap, auth),
|
||||
[SyncRequestType.AssetExifsV1]: () => this.syncAssetExifsV1(response, checkpointMap, auth),
|
||||
[SyncRequestType.PartnerAssetsV1]: () => this.syncPartnerAssetsV1(response, checkpointMap, auth, sessionId),
|
||||
[SyncRequestType.PartnerAssetsV1]: () => this.syncPartnerAssetsV1(response, checkpointMap, auth, session.id),
|
||||
[SyncRequestType.PartnerAssetExifsV1]: () =>
|
||||
this.syncPartnerAssetExifsV1(response, checkpointMap, auth, sessionId),
|
||||
this.syncPartnerAssetExifsV1(response, checkpointMap, auth, session.id),
|
||||
[SyncRequestType.AlbumsV1]: () => this.syncAlbumsV1(response, checkpointMap, auth),
|
||||
[SyncRequestType.AlbumUsersV1]: () => this.syncAlbumUsersV1(response, checkpointMap, auth, sessionId),
|
||||
[SyncRequestType.AlbumAssetsV1]: () => this.syncAlbumAssetsV1(response, checkpointMap, auth, sessionId),
|
||||
[SyncRequestType.AlbumToAssetsV1]: () => this.syncAlbumToAssetsV1(response, checkpointMap, auth, sessionId),
|
||||
[SyncRequestType.AlbumAssetExifsV1]: () => this.syncAlbumAssetExifsV1(response, checkpointMap, auth, sessionId),
|
||||
[SyncRequestType.AlbumUsersV1]: () => this.syncAlbumUsersV1(response, checkpointMap, auth, session.id),
|
||||
[SyncRequestType.AlbumAssetsV1]: () => this.syncAlbumAssetsV1(response, checkpointMap, auth, session.id),
|
||||
[SyncRequestType.AlbumToAssetsV1]: () => this.syncAlbumToAssetsV1(response, checkpointMap, auth, session.id),
|
||||
[SyncRequestType.AlbumAssetExifsV1]: () => this.syncAlbumAssetExifsV1(response, checkpointMap, auth, session.id),
|
||||
[SyncRequestType.MemoriesV1]: () => this.syncMemoriesV1(response, checkpointMap, auth),
|
||||
[SyncRequestType.MemoryToAssetsV1]: () => this.syncMemoryAssetsV1(response, checkpointMap, auth),
|
||||
[SyncRequestType.StacksV1]: () => this.syncStackV1(response, checkpointMap, auth),
|
||||
[SyncRequestType.PartnerStacksV1]: () => this.syncPartnerStackV1(response, checkpointMap, auth, sessionId),
|
||||
[SyncRequestType.PartnerStacksV1]: () => this.syncPartnerStackV1(response, checkpointMap, auth, session.id),
|
||||
[SyncRequestType.PeopleV1]: () => this.syncPeopleV1(response, checkpointMap, auth),
|
||||
};
|
||||
|
||||
|
||||
@@ -234,11 +234,11 @@ export class SyncTestContext extends MediumTestContext<SyncService> {
|
||||
});
|
||||
}
|
||||
|
||||
async syncStream(auth: AuthDto, types: SyncRequestType[]) {
|
||||
async syncStream(auth: AuthDto, types: SyncRequestType[], reset?: boolean) {
|
||||
const stream = mediumFactory.syncStream();
|
||||
// Wait for 2ms to ensure all updates are available and account for setTimeout inaccuracy
|
||||
await wait(2);
|
||||
await this.sut.stream(auth, stream, { types });
|
||||
await this.sut.stream(auth, stream, { types, reset });
|
||||
|
||||
return stream.getResponse();
|
||||
}
|
||||
@@ -481,6 +481,7 @@ const sessionInsert = ({
|
||||
const defaults: Insertable<SessionTable> = {
|
||||
id,
|
||||
userId,
|
||||
isPendingSyncReset: false,
|
||||
token: sha256(id),
|
||||
};
|
||||
|
||||
|
||||
63
server/test/medium/specs/sync/sync-reset.spec.ts
Normal file
63
server/test/medium/specs/sync/sync-reset.spec.ts
Normal file
@@ -0,0 +1,63 @@
|
||||
import { Kysely } from 'kysely';
|
||||
import { SyncEntityType, SyncRequestType } from 'src/enum';
|
||||
import { DB } from 'src/schema';
|
||||
import { SyncTestContext } from 'test/medium.factory';
|
||||
import { getKyselyDB } from 'test/utils';
|
||||
|
||||
let defaultDatabase: Kysely<DB>;
|
||||
|
||||
const setup = async (db?: Kysely<DB>) => {
|
||||
const ctx = new SyncTestContext(db || defaultDatabase);
|
||||
const { auth, user, session } = await ctx.newSyncAuthUser();
|
||||
return { auth, user, session, ctx };
|
||||
};
|
||||
|
||||
beforeAll(async () => {
|
||||
defaultDatabase = await getKyselyDB();
|
||||
});
|
||||
|
||||
describe(SyncEntityType.SyncResetV1, () => {
|
||||
it('should work', async () => {
|
||||
const { auth, ctx } = await setup();
|
||||
|
||||
const response = await ctx.syncStream(auth, [SyncRequestType.AssetsV1]);
|
||||
expect(response).toEqual([]);
|
||||
});
|
||||
|
||||
it('should detect a pending sync reset', async () => {
|
||||
const { auth, ctx } = await setup();
|
||||
|
||||
auth.session!.isPendingSyncReset = true;
|
||||
|
||||
const response = await ctx.syncStream(auth, [SyncRequestType.AssetsV1]);
|
||||
expect(response).toEqual([{ type: SyncEntityType.SyncResetV1, data: {} }]);
|
||||
});
|
||||
|
||||
it('should not send other dtos when a reset is pending', async () => {
|
||||
const { auth, user, ctx } = await setup();
|
||||
|
||||
await ctx.newAsset({ ownerId: user.id });
|
||||
|
||||
await expect(ctx.syncStream(auth, [SyncRequestType.AssetsV1])).resolves.toHaveLength(1);
|
||||
|
||||
auth.session!.isPendingSyncReset = true;
|
||||
|
||||
await expect(ctx.syncStream(auth, [SyncRequestType.AssetsV1])).resolves.toEqual([
|
||||
{ type: SyncEntityType.SyncResetV1, data: {} },
|
||||
]);
|
||||
});
|
||||
|
||||
it('should allow resetting a pending reset when requesting changes ', async () => {
|
||||
const { auth, user, ctx } = await setup();
|
||||
|
||||
await ctx.newAsset({ ownerId: user.id });
|
||||
|
||||
auth.session!.isPendingSyncReset = true;
|
||||
|
||||
await expect(ctx.syncStream(auth, [SyncRequestType.AssetsV1], true)).resolves.toEqual([
|
||||
expect.objectContaining({
|
||||
type: SyncEntityType.AssetV1,
|
||||
}),
|
||||
]);
|
||||
});
|
||||
});
|
||||
@@ -58,7 +58,11 @@ const authFactory = ({
|
||||
}
|
||||
|
||||
if (session) {
|
||||
auth.session = { id: session.id, hasElevatedPermission: false };
|
||||
auth.session = {
|
||||
id: session.id,
|
||||
isPendingSyncReset: false,
|
||||
hasElevatedPermission: false,
|
||||
};
|
||||
}
|
||||
|
||||
if (sharedLink) {
|
||||
@@ -131,6 +135,7 @@ const sessionFactory = (session: Partial<Session> = {}) => ({
|
||||
expiresAt: null,
|
||||
userId: newUuid(),
|
||||
pinExpiresAt: newDate(),
|
||||
isPendingSyncReset: false,
|
||||
...session,
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user