more e2e tests

consistent e2e sections

decrement quota on cancel
This commit is contained in:
mertalev
2025-10-01 23:57:02 -04:00
parent e193cb3a5b
commit 2d6a2dc77b
5 changed files with 783 additions and 359 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,18 @@
import { BadRequestException, Controller, Delete, Head, Options, Param, Patch, Post, Req, Res } from '@nestjs/common'; import {
BadRequestException,
Controller,
Delete,
Head,
Header,
HttpCode,
HttpStatus,
Options,
Param,
Patch,
Post,
Req,
Res,
} from '@nestjs/common';
import { ApiHeader, ApiTags } from '@nestjs/swagger'; import { ApiHeader, ApiTags } from '@nestjs/swagger';
import { plainToInstance } from 'class-transformer'; import { plainToInstance } from 'class-transformer';
import { validateSync } from 'class-validator'; import { validateSync } from 'class-validator';
@@ -91,16 +105,15 @@ export class AssetUploadController {
} }
@Options() @Options()
@Authenticated({ sharedLink: true, permission: Permission.AssetUpload }) @HttpCode(HttpStatus.NO_CONTENT)
getUploadOptions(@Res() res: Response) { @Header('Upload-Limit', 'min-size=0')
return this.service.getUploadOptions(res); getUploadOptions() {}
}
private getDto<T extends object>(cls: new () => T, headers: IncomingHttpHeaders): T { private getDto<T extends object>(cls: new () => T, headers: IncomingHttpHeaders): T {
const dto = plainToInstance(cls, headers, { excludeExtraneousValues: true }); const dto = plainToInstance(cls, headers, { excludeExtraneousValues: true });
const errors = validateSync(dto); const errors = validateSync(dto);
if (errors.length > 0) { if (errors.length > 0) {
const constraints = errors.map((e) => (e.constraints ? Object.values(e.constraints).join(', ') : '')).join('; '); const constraints = errors.flatMap((e) => (e.constraints ? Object.values(e.constraints) : []));
console.warn('Upload DTO validation failed:', JSON.stringify(errors, null, 2)); console.warn('Upload DTO validation failed:', JSON.stringify(errors, null, 2));
throw new BadRequestException(constraints); throw new BadRequestException(constraints);
} }

View File

@@ -84,22 +84,15 @@ export class BaseUploadHeadersDto extends BaseRufhHeadersDto {
contentLength!: number; contentLength!: number;
@Expose({ name: UploadHeader.UploadComplete }) @Expose({ name: UploadHeader.UploadComplete })
@ValidateIf((o) => o.requestInterop !== null && o.requestInterop! <= 3) @ValidateIf((o) => o.version === null || o.version! > 3)
@IsEnum(StructuredBoolean) @IsEnum(StructuredBoolean)
uploadComplete!: StructuredBoolean; uploadComplete!: StructuredBoolean;
@Expose({ name: UploadHeader.UploadIncomplete }) @Expose({ name: UploadHeader.UploadIncomplete })
@ValidateIf((o) => o.requestInterop === null || o.requestInterop! > 3) @ValidateIf((o) => o.version !== null && o.version! <= 3)
@IsEnum(StructuredBoolean) @IsEnum(StructuredBoolean)
uploadIncomplete!: StructuredBoolean; uploadIncomplete!: StructuredBoolean;
@Expose({ name: UploadHeader.UploadLength })
@Min(0)
@IsInt()
@Type(() => Number)
@Optional()
uploadLength?: number;
get isComplete(): boolean { get isComplete(): boolean {
if (this.version <= 3) { if (this.version <= 3) {
return this.uploadIncomplete === StructuredBoolean.False; return this.uploadIncomplete === StructuredBoolean.False;
@@ -134,7 +127,7 @@ export class StartUploadDto extends BaseUploadHeadersDto {
} }
const checksum = parseDictionary(value).get('sha')?.[0]; const checksum = parseDictionary(value).get('sha')?.[0];
if (checksum instanceof ArrayBuffer) { if (checksum instanceof ArrayBuffer && checksum.byteLength === 20) {
return Buffer.from(checksum); return Buffer.from(checksum);
} }
throw new BadRequestException(`Invalid ${UploadHeader.ReprDigest} header`); throw new BadRequestException(`Invalid ${UploadHeader.ReprDigest} header`);
@@ -145,14 +138,21 @@ export class StartUploadDto extends BaseUploadHeadersDto {
@Min(0) @Min(0)
@IsInt() @IsInt()
@Type(() => Number) @Type(() => Number)
declare uploadLength: number; uploadLength!: number;
} }
export class ResumeUploadDto extends BaseUploadHeadersDto { export class ResumeUploadDto extends BaseUploadHeadersDto {
@Expose({ name: 'content-type' }) @Expose({ name: 'content-type' })
@ValidateIf((o) => o.requestInterop !== null && o.requestInterop >= 6) @ValidateIf((o) => o.version && o.version >= 6)
@Equals('application/partial-upload') @Equals('application/partial-upload')
contentType!: number | null; contentType!: string;
@Expose({ name: UploadHeader.UploadLength })
@Min(0)
@IsInt()
@Type(() => Number)
@Optional()
uploadLength?: number;
@Expose({ name: UploadHeader.UploadOffset }) @Expose({ name: UploadHeader.UploadOffset })
@Min(0) @Min(0)

View File

@@ -256,27 +256,26 @@ export class AssetRepository {
} }
createWithMetadata(asset: Insertable<AssetTable> & { id: string }, size: number, metadata?: AssetMetadataItem[]) { createWithMetadata(asset: Insertable<AssetTable> & { id: string }, size: number, metadata?: AssetMetadataItem[]) {
if (!metadata || metadata.length === 0) { let query = this.db
return this.db.insertInto('asset').values(asset).execute(); .with('asset', (qb) => qb.insertInto('asset').values(asset).returning(['id', 'ownerId']))
}
return this.db
.with('asset', (qb) => qb.insertInto('asset').values(asset).returning('id'))
.with('exif', (qb) => .with('exif', (qb) =>
qb qb
.insertInto('asset_exif') .insertInto('asset_exif')
.columns(['assetId', 'fileSizeInByte']) .columns(['assetId', 'fileSizeInByte'])
.expression((eb) => eb.selectFrom('asset').select(['asset.id', eb.val(size).as('fileSizeInByte')])), .expression((eb) => eb.selectFrom('asset').select(['asset.id', eb.val(size).as('fileSizeInByte')])),
) );
.with('user', (qb) =>
qb if (metadata && metadata.length > 0) {
.updateTable('user') (query as any) = query.with('metadata', (qb) =>
.from('asset') qb.insertInto('asset_metadata').values(metadata.map(({ key, value }) => ({ assetId: asset.id, key, value }))),
.set({ quotaUsageInBytes: sql`"quotaUsageInBytes" + ${size}` }) );
.whereRef('user.id', '=', 'asset.ownerId'), }
)
.insertInto('asset_metadata') return query
.values(metadata.map(({ key, value }) => ({ assetId: asset.id, key, value }))) .updateTable('user')
.from('asset')
.set({ quotaUsageInBytes: sql`"quotaUsageInBytes" + ${size}` })
.whereRef('user.id', '=', 'asset.ownerId')
.execute(); .execute();
} }
@@ -290,12 +289,23 @@ export class AssetRepository {
.executeTakeFirst(); .executeTakeFirst();
} }
setCompleteWithSize(assetId: string) { setComplete(assetId: string) {
return this.db return this.db
.updateTable('asset') .updateTable('asset')
.set({ status: AssetStatus.Active }) .set({ status: AssetStatus.Active })
.where('asset.id', '=', assetId) .where('id', '=', assetId)
.where('asset.status', '=', sql.lit(AssetStatus.Partial)) .where('status', '=', sql.lit(AssetStatus.Partial))
.execute();
}
async removeAndDecrementQuota(id: string): Promise<void> {
await this.db
.with('asset_exif', (qb) => qb.selectFrom('asset_exif').where('assetId', '=', id).select('fileSizeInByte'))
.with('asset', (qb) => qb.deleteFrom('asset').where('id', '=', id).returning('ownerId'))
.updateTable('user')
.from(['asset_exif', 'asset'])
.set({ quotaUsageInBytes: sql`"quotaUsageInBytes" - "fileSizeInByte"` })
.whereRef('user.id', '=', 'asset.ownerId')
.execute(); .execute();
} }

View File

@@ -20,9 +20,6 @@ export class AssetUploadService extends BaseService {
async startUpload(req: AuthenticatedRequest, res: Response, dto: StartUploadDto): Promise<void> { async startUpload(req: AuthenticatedRequest, res: Response, dto: StartUploadDto): Promise<void> {
this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`); this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`);
const { isComplete, assetData, uploadLength, contentLength, version } = dto; const { isComplete, assetData, uploadLength, contentLength, version } = dto;
if (isComplete && uploadLength && uploadLength !== contentLength) {
return this.sendInconsistentLengthProblem(res);
}
const assetId = this.cryptoRepository.randomUUID(); const assetId = this.cryptoRepository.randomUUID();
const folder = StorageCore.getNestedFolder(StorageFolder.Upload, req.auth.user.id, assetId); const folder = StorageCore.getNestedFolder(StorageFolder.Upload, req.auth.user.id, assetId);
@@ -71,7 +68,7 @@ export class AssetUploadService extends BaseService {
return this.sendAlreadyCompletedProblem(res); return this.sendAlreadyCompletedProblem(res);
} }
const location = `/api/upload/${duplicate.id}`; const location = `/api/upload/${duplicate.id}`;
res.status(201).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send(); res.status(400).setHeader('Location', location).send('Incomplete asset already exists');
return; return;
} }
this.logger.error(`Error creating upload asset record: ${error.message}`); this.logger.error(`Error creating upload asset record: ${error.message}`);
@@ -79,6 +76,10 @@ export class AssetUploadService extends BaseService {
return; return;
} }
if (isComplete && uploadLength && uploadLength !== contentLength) {
return this.sendInconsistentLengthProblem(res);
}
const location = `/api/upload/${assetId}`; const location = `/api/upload/${assetId}`;
if (version <= MAX_RUFH_INTEROP_VERSION) { if (version <= MAX_RUFH_INTEROP_VERSION) {
this.sendInterimResponse(res, location, version); this.sendInterimResponse(res, location, version);
@@ -98,7 +99,7 @@ export class AssetUploadService extends BaseService {
writeStream.on('finish', () => { writeStream.on('finish', () => {
this.setCompleteHeader(res, dto.version, isComplete); this.setCompleteHeader(res, dto.version, isComplete);
if (!isComplete) { if (!isComplete) {
return res.status(201).send(); return res.status(201).set('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
} }
this.logger.log(`Finished upload to ${path}`); this.logger.log(`Finished upload to ${path}`);
if (dto.checksum.compare(checksumBuffer!) !== 0) { if (dto.checksum.compare(checksumBuffer!) !== 0) {
@@ -183,6 +184,40 @@ export class AssetUploadService extends BaseService {
}); });
} }
cancelUpload(auth: AuthDto, assetId: string, response: Response): Promise<void> {
return this.databaseRepository.withUuidLock(assetId, async () => {
const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id);
if (!asset) {
response.status(404).send('Asset not found');
return;
}
if (asset.status !== AssetStatus.Partial) {
return this.sendAlreadyCompletedProblem(response);
}
await this.onCancel(assetId, asset.path);
response.status(204).send();
});
}
async getUploadStatus(auth: AuthDto, res: Response, id: string, { version }: GetUploadStatusDto): Promise<void> {
return this.databaseRepository.withUuidLock(id, async () => {
const asset = await this.assetRepository.getCompletionMetadata(id, auth.user.id);
if (!asset) {
res.status(404).send('Asset not found');
return;
}
const offset = await this.getCurrentOffset(asset.path);
this.setCompleteHeader(res, version, asset.status !== AssetStatus.Partial);
res
.status(204)
.setHeader('Upload-Offset', offset.toString())
.setHeader('Cache-Control', 'no-store')
.setHeader('Upload-Limit', 'min-size=0')
.send();
});
}
private pipe(req: Readable, res: Response, { id, path, size }: { id: string; path: string; size: number }) { private pipe(req: Readable, res: Response, { id, path, size }: { id: string; path: string; size: number }) {
const writeStream = this.storageRepository.createOrAppendWriteStream(path); const writeStream = this.storageRepository.createOrAppendWriteStream(path);
writeStream.on('error', (error) => { writeStream.on('error', (error) => {
@@ -228,48 +263,10 @@ export class AssetUploadService extends BaseService {
return writeStream; return writeStream;
} }
cancelUpload(auth: AuthDto, assetId: string, response: Response): Promise<void> {
return this.databaseRepository.withUuidLock(assetId, async () => {
const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id);
if (!asset) {
response.status(404).send('Asset not found');
return;
}
if (asset.status !== AssetStatus.Partial) {
return this.sendAlreadyCompletedProblem(response);
}
await this.onCancel(assetId, asset.path);
response.status(204).send();
});
}
async getUploadStatus(auth: AuthDto, res: Response, id: string, { version }: GetUploadStatusDto): Promise<void> {
return this.databaseRepository.withUuidLock(id, async () => {
const asset = await this.assetRepository.getCompletionMetadata(id, auth.user.id);
if (!asset) {
res.status(404).send('Asset not found');
return;
}
const offset = await this.getCurrentOffset(asset.path);
this.setCompleteHeader(res, version, asset.status !== AssetStatus.Partial);
res
.status(204)
.setHeader('Upload-Offset', offset.toString())
.setHeader('Cache-Control', 'no-store')
.setHeader('Upload-Limit', 'min-size=0')
.send();
});
}
async getUploadOptions(response: Response): Promise<void> {
response.status(204).setHeader('Upload-Limit', 'min-size=0').setHeader('Allow', 'POST, OPTIONS').send();
}
private async onComplete({ id, path, fileModifiedAt }: { id: string; path: string; fileModifiedAt: Date }) { private async onComplete({ id, path, fileModifiedAt }: { id: string; path: string; fileModifiedAt: Date }) {
this.logger.debug('Completing upload for asset', id); this.logger.debug('Completing upload for asset', id);
const jobData = { name: JobName.AssetExtractMetadata, data: { id: id, source: 'upload' } } as const; const jobData = { name: JobName.AssetExtractMetadata, data: { id: id, source: 'upload' } } as const;
await withRetry(() => this.assetRepository.setCompleteWithSize(id)); await withRetry(() => this.assetRepository.setComplete(id));
try { try {
await withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt)); await withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt));
} catch (error: any) { } catch (error: any) {
@@ -281,7 +278,7 @@ export class AssetUploadService extends BaseService {
private async onCancel(assetId: string, path: string): Promise<void> { private async onCancel(assetId: string, path: string): Promise<void> {
this.logger.debug('Cancelling upload for asset', assetId); this.logger.debug('Cancelling upload for asset', assetId);
await withRetry(() => this.storageRepository.unlink(path)); await withRetry(() => this.storageRepository.unlink(path));
await withRetry(() => this.assetRepository.remove({ id: assetId })); await withRetry(() => this.assetRepository.removeAndDecrementQuota(assetId));
} }
private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void { private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void {
@@ -321,7 +318,7 @@ export class AssetUploadService extends BaseService {
private sendChecksumMismatchResponse(res: Response, assetId: string, path: string): Promise<void> { private sendChecksumMismatchResponse(res: Response, assetId: string, path: string): Promise<void> {
this.logger.warn(`Removing upload asset ${assetId} due to checksum mismatch`); this.logger.warn(`Removing upload asset ${assetId} due to checksum mismatch`);
res.status(460).send('Checksum mismatch'); res.status(460).send('File on server does not match provided checksum');
return this.onCancel(assetId, path); return this.onCancel(assetId, path);
} }