From 4b63d3d0559801aa4dc65ce11aea13c8e4d4cbe5 Mon Sep 17 00:00:00 2001 From: mertalev <101130780+mertalev@users.noreply.github.com> Date: Wed, 1 Oct 2025 11:57:29 -0400 Subject: [PATCH] ensure stream is closed before releasing lock --- server/src/services/asset-upload.service.ts | 128 +++++++++----------- server/src/utils/misc.ts | 16 +++ 2 files changed, 73 insertions(+), 71 deletions(-) diff --git a/server/src/services/asset-upload.service.ts b/server/src/services/asset-upload.service.ts index 9a552cd7c8..640b738933 100644 --- a/server/src/services/asset-upload.service.ts +++ b/server/src/services/asset-upload.service.ts @@ -2,7 +2,6 @@ import { BadRequestException, Injectable, InternalServerErrorException } from '@ import { Response } from 'express'; import { createHash } from 'node:crypto'; import { extname, join } from 'node:path'; -import { setTimeout } from 'node:timers/promises'; import { StorageCore } from 'src/cores/storage.core'; import { AuthDto } from 'src/dtos/auth.dto'; import { GetUploadStatusDto, ResumeUploadDto, StartUploadDto } from 'src/dtos/upload.dto'; @@ -11,16 +10,17 @@ import { AuthenticatedRequest } from 'src/middleware/auth.guard'; import { BaseService } from 'src/services/base.service'; import { isAssetChecksumConstraint } from 'src/utils/database'; import { mimeTypes } from 'src/utils/mime-types'; +import { withRetry } from 'src/utils/misc'; export const MAX_RUFH_INTEROP_VERSION = 8; @Injectable() export class AssetUploadService extends BaseService { - async startUpload(req: AuthenticatedRequest, response: Response, dto: StartUploadDto): Promise { + async startUpload(req: AuthenticatedRequest, res: Response, dto: StartUploadDto): Promise { this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`); const { isComplete, assetData, uploadLength, contentLength, version } = dto; if (isComplete && uploadLength !== undefined && uploadLength !== contentLength) { - return this.sendInconsistentLengthProblem(response); + return this.sendInconsistentLengthProblem(res); } const assetId = this.cryptoRepository.randomUUID(); @@ -65,20 +65,20 @@ export class AssetUploadService extends BaseService { } if (duplicate.status !== AssetStatus.Partial) { - return this.sendAlreadyCompletedProblem(response); + return this.sendAlreadyCompletedProblem(res); } const location = `/api/upload/${duplicate.id}`; - response.status(201).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send(); + res.status(201).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send(); return; } this.logger.error(`Error creating upload asset record: ${error.message}`); - response.status(500).send('Error creating upload asset record'); + res.status(500).send('Error creating upload asset record'); return; } const location = `/api/upload/${assetId}`; if (version <= MAX_RUFH_INTEROP_VERSION) { - this.sendInterimResponse(response, location, version); + this.sendInterimResponse(res, location, version); } await this.storageRepository.mkdir(folder); @@ -93,33 +93,33 @@ export class AssetUploadService extends BaseService { writeStream.on('error', (error) => { this.logger.error(`Failed to write chunk to ${path}: ${error.message}`); - if (!response.headersSent) { - response.status(500).setHeader('Location', location).send(); + if (!res.headersSent) { + res.status(500).setHeader('Location', location).send(); } }); writeStream.on('finish', async () => { if (!isComplete) { - return response.status(201).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send(); + return res.status(201).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send(); } this.logger.log(`Finished upload to ${path}`); if (dto.checksum.compare(checksumBuffer!) !== 0) { - return this.sendChecksumMismatchResponse(response, assetId, path); + return this.sendChecksumMismatchResponse(res, assetId, path); } try { await this.onComplete({ assetId, path, size: contentLength, fileModifiedAt: assetData.fileModifiedAt }); } finally { - this.setCompleteHeader(response, dto.version, true); - response.status(200).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send(); + this.setCompleteHeader(res, dto.version, true); + res.status(200).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send(); } }); req.on('error', (error) => { this.logger.error(`Failed to read request body: ${error.message}`); writeStream.end(); - if (!response.headersSent) { - response.status(500).setHeader('Location', location).send(); + if (!res.headersSent) { + res.status(500).setHeader('Location', location).send(); } }); @@ -128,7 +128,7 @@ export class AssetUploadService extends BaseService { if (receivedLength + chunk.length > contentLength) { writeStream.destroy(); req.destroy(); - response.status(400).send('Received more data than specified in content-length'); + res.status(400).send('Received more data than specified in content-length'); return this.onCancel(assetId, path); } receivedLength += chunk.length; @@ -146,13 +146,14 @@ export class AssetUploadService extends BaseService { writeStream.destroy(); this.onCancel(assetId, path); }); + await new Promise((resolve) => writeStream.on('close', resolve)); } - resumeUpload(req: AuthenticatedRequest, response: Response, id: string, dto: ResumeUploadDto): Promise { + resumeUpload(req: AuthenticatedRequest, res: Response, id: string, dto: ResumeUploadDto): Promise { this.logger.verboseFn(() => `Resuming upload for ${id}: ${JSON.stringify(dto)}`); const { isComplete, uploadLength, uploadOffset, contentLength, version } = dto; if (isComplete && uploadLength !== undefined && uploadLength !== contentLength) { - this.sendInconsistentLengthProblem(response); + this.sendInconsistentLengthProblem(res); return Promise.resolve(); } @@ -163,12 +164,12 @@ export class AssetUploadService extends BaseService { return this.databaseRepository.withUuidLock(id, async () => { const asset = await this.assetRepository.getCompletionMetadata(id, req.auth.user.id); if (!asset) { - response.status(404).send('Asset not found'); + res.status(404).send('Asset not found'); return; } if (asset.status !== AssetStatus.Partial) { - return this.sendAlreadyCompletedProblem(response); + return this.sendAlreadyCompletedProblem(res); } if (uploadOffset === null) { throw new BadRequestException('Missing Upload-Offset header'); @@ -177,15 +178,15 @@ export class AssetUploadService extends BaseService { const { path } = asset; const expectedOffset = await this.getCurrentOffset(path); if (expectedOffset !== uploadOffset) { - this.setCompleteHeader(response, version, false); - return this.sendOffsetMismatchProblem(response, expectedOffset, uploadOffset); + this.setCompleteHeader(res, version, false); + return this.sendOffsetMismatchProblem(res, expectedOffset, uploadOffset); } const newLength = uploadOffset + contentLength; // If upload length is provided, validate we're not exceeding it if (uploadLength !== undefined && newLength > uploadLength) { - response.status(400).send('Upload would exceed declared length'); + res.status(400).send('Upload would exceed declared length'); return; } @@ -193,8 +194,8 @@ export class AssetUploadService extends BaseService { // Empty PATCH without Upload-Complete if (contentLength === 0 && !isComplete) { - this.setCompleteHeader(response, version, false); - response.status(204).setHeader('Upload-Offset', expectedOffset.toString()).send(); + this.setCompleteHeader(res, version, false); + res.status(204).setHeader('Upload-Offset', expectedOffset.toString()).send(); return; } @@ -203,29 +204,29 @@ export class AssetUploadService extends BaseService { writeStream.on('error', (error) => { this.logger.error(`Failed to write chunk to ${path}: ${error.message}`); - if (!response.headersSent) { - response.status(500).send('Failed to write chunk'); + if (!res.headersSent) { + res.status(500).send('Failed to write chunk'); } }); writeStream.on('finish', async () => { const currentOffset = await this.getCurrentOffset(path); if (!isComplete) { - this.setCompleteHeader(response, version, false); - return response.status(204).setHeader('Upload-Offset', currentOffset.toString()).send(); + this.setCompleteHeader(res, version, false); + return res.status(204).setHeader('Upload-Offset', currentOffset.toString()).send(); } this.logger.log(`Finished upload to ${path}`); const checksum = await this.cryptoRepository.hashFile(path); if (asset.checksum.compare(checksum) !== 0) { - return this.sendChecksumMismatchResponse(response, id, path); + return this.sendChecksumMismatchResponse(res, id, path); } try { await this.onComplete({ assetId: id, path, size: currentOffset, fileModifiedAt: asset.fileModifiedAt }); } finally { - this.setCompleteHeader(response, version, true); - response.status(200).setHeader('Upload-Offset', currentOffset.toString()).send(); + this.setCompleteHeader(res, version, true); + res.status(200).setHeader('Upload-Offset', currentOffset.toString()).send(); } }); @@ -234,7 +235,7 @@ export class AssetUploadService extends BaseService { this.logger.error(`Received more data than specified in content-length for upload to ${path}`); writeStream.destroy(); req.destroy(); - response.status(400).send('Received more data than specified in content-length'); + res.status(400).send('Received more data than specified in content-length'); return this.onCancel(id, path); } @@ -253,6 +254,7 @@ export class AssetUploadService extends BaseService { writeStream.destroy(); return this.onCancel(id, path); }); + await new Promise((resolve) => writeStream.on('close', resolve)); }); } @@ -269,19 +271,19 @@ export class AssetUploadService extends BaseService { response.status(204).send(); } - async getUploadStatus(auth: AuthDto, response: Response, id: string, { version }: GetUploadStatusDto) { + async getUploadStatus(auth: AuthDto, res: Response, id: string, { version }: GetUploadStatusDto): Promise { return this.databaseRepository.withUuidLock(id, async () => { const asset = await this.assetRepository.getCompletionMetadata(id, auth.user.id); if (!asset) { - response.status(404).send('Asset not found'); + res.status(404).send('Asset not found'); return; } const offset = await this.getCurrentOffset(asset.path); const isComplete = asset.status !== AssetStatus.Partial; - this.setCompleteHeader(response, version, isComplete); - response + this.setCompleteHeader(res, version, isComplete); + res .status(204) .setHeader('Upload-Offset', offset.toString()) .setHeader('Cache-Control', 'no-store') @@ -298,23 +300,22 @@ export class AssetUploadService extends BaseService { const { assetId, path, size, fileModifiedAt } = data; this.logger.debug('Completing upload for asset', assetId); const jobData = { name: JobName.AssetExtractMetadata, data: { id: assetId, source: 'upload' } } as const; - await this.withRetry(() => this.assetRepository.setCompleteWithSize(assetId, size)); + await withRetry(() => this.assetRepository.setCompleteWithSize(assetId, size)); try { - await this.withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt)); + await withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt)); } catch (error: any) { this.logger.error(`Failed to update times for ${path}: ${error.message}`); } - await this.withRetry(() => this.jobRepository.queue(jobData)); + await withRetry(() => this.jobRepository.queue(jobData)); } private async onCancel(assetId: string, path: string): Promise { this.logger.debug('Cancelling upload for asset', assetId); - await this.withRetry(() => this.storageRepository.unlink(path)); - await this.withRetry(() => this.assetRepository.remove({ id: assetId })); + await withRetry(() => this.storageRepository.unlink(path)); + await withRetry(() => this.assetRepository.remove({ id: assetId })); } - private sendInterimResponse(response: Response, location: string, interopVersion: number): void { - const socket = response.socket; + private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void { if (socket && !socket.destroyed) { // Express doesn't understand interim responses, so write directly to socket socket.write( @@ -325,22 +326,22 @@ export class AssetUploadService extends BaseService { } } - private sendInconsistentLengthProblem(response: Response): void { - response.status(400).contentType('application/problem+json').send({ + private sendInconsistentLengthProblem(res: Response): void { + res.status(400).contentType('application/problem+json').send({ type: 'https://iana.org/assignments/http-problem-types#inconsistent-upload-length', title: 'inconsistent length values for upload', }); } - private sendAlreadyCompletedProblem(response: Response): void { - response.status(400).contentType('application/problem+json').send({ + private sendAlreadyCompletedProblem(res: Response): void { + res.status(400).contentType('application/problem+json').send({ type: 'https://iana.org/assignments/http-problem-types#completed-upload', title: 'upload is already completed', }); } - private sendOffsetMismatchProblem(response: Response, expected: number, actual: number): void { - response.status(409).contentType('application/problem+json').setHeader('Upload-Offset', expected.toString()).send({ + private sendOffsetMismatchProblem(res: Response, expected: number, actual: number): void { + res.status(409).contentType('application/problem+json').setHeader('Upload-Offset', expected.toString()).send({ type: 'https://iana.org/assignments/http-problem-types#mismatching-upload-offset', title: 'offset from request does not match offset of resource', 'expected-offset': expected, @@ -348,28 +349,13 @@ export class AssetUploadService extends BaseService { }); } - private sendChecksumMismatchResponse(response: Response, assetId: string, path: string): Promise { + private sendChecksumMismatchResponse(res: Response, assetId: string, path: string): Promise { this.logger.warn(`Removing upload asset ${assetId} due to checksum mismatch`); - response.status(460).send('Checksum mismatch'); + res.status(460).send('Checksum mismatch'); return this.onCancel(assetId, path); } - private async withRetry(operation: () => Promise, retries: number = 2, delay: number = 100): Promise { - let lastError: any; - for (let attempt = 0; attempt <= retries; attempt++) { - try { - return await operation(); - } catch (error: any) { - lastError = error; - } - if (attempt < retries) { - await setTimeout(delay); - } - } - throw lastError; - } - - private validateQuota(auth: AuthDto, size: number) { + private validateQuota(auth: AuthDto, size: number): void { if (auth.user.quotaSizeInBytes === null) { return; } @@ -391,15 +377,15 @@ export class AssetUploadService extends BaseService { } } - private setCompleteHeader(response: Response, interopVersion: number | null, isComplete: boolean): void { + private setCompleteHeader(res: Response, interopVersion: number | null, isComplete: boolean): void { if (!interopVersion) { return; } if (interopVersion > 3) { - response.setHeader('Upload-Complete', isComplete ? '?1' : '?0'); + res.setHeader('Upload-Complete', isComplete ? '?1' : '?0'); } else { - response.setHeader('Upload-Incomplete', isComplete ? '?0' : '?1'); + res.setHeader('Upload-Incomplete', isComplete ? '?0' : '?1'); } } } diff --git a/server/src/utils/misc.ts b/server/src/utils/misc.ts index b9741c3b44..e06c466438 100644 --- a/server/src/utils/misc.ts +++ b/server/src/utils/misc.ts @@ -14,6 +14,7 @@ import { import _ from 'lodash'; import { writeFileSync } from 'node:fs'; import path from 'node:path'; +import { setTimeout } from 'node:timers/promises'; import picomatch from 'picomatch'; import parse from 'picomatch/lib/parse'; import { SystemConfig } from 'src/config'; @@ -326,3 +327,18 @@ export const globToSqlPattern = (glob: string) => { export function clamp(value: number, min: number, max: number) { return Math.max(min, Math.min(max, value)); } + +export async function withRetry(operation: () => Promise, retries: number = 2, delay: number = 100): Promise { + let lastError: any; + for (let attempt = 0; attempt <= retries; attempt++) { + try { + return await operation(); + } catch (error: any) { + lastError = error; + } + if (attempt < retries) { + await setTimeout(delay); + } + } + throw lastError; +}