mirror of
https://github.com/immich-app/immich.git
synced 2025-12-27 17:24:58 +03:00
ensure stream is closed before releasing lock
This commit is contained in:
@@ -2,7 +2,6 @@ import { BadRequestException, Injectable, InternalServerErrorException } from '@
|
||||
import { Response } from 'express';
|
||||
import { createHash } from 'node:crypto';
|
||||
import { extname, join } from 'node:path';
|
||||
import { setTimeout } from 'node:timers/promises';
|
||||
import { StorageCore } from 'src/cores/storage.core';
|
||||
import { AuthDto } from 'src/dtos/auth.dto';
|
||||
import { GetUploadStatusDto, ResumeUploadDto, StartUploadDto } from 'src/dtos/upload.dto';
|
||||
@@ -11,16 +10,17 @@ import { AuthenticatedRequest } from 'src/middleware/auth.guard';
|
||||
import { BaseService } from 'src/services/base.service';
|
||||
import { isAssetChecksumConstraint } from 'src/utils/database';
|
||||
import { mimeTypes } from 'src/utils/mime-types';
|
||||
import { withRetry } from 'src/utils/misc';
|
||||
|
||||
export const MAX_RUFH_INTEROP_VERSION = 8;
|
||||
|
||||
@Injectable()
|
||||
export class AssetUploadService extends BaseService {
|
||||
async startUpload(req: AuthenticatedRequest, response: Response, dto: StartUploadDto): Promise<void> {
|
||||
async startUpload(req: AuthenticatedRequest, res: Response, dto: StartUploadDto): Promise<void> {
|
||||
this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`);
|
||||
const { isComplete, assetData, uploadLength, contentLength, version } = dto;
|
||||
if (isComplete && uploadLength !== undefined && uploadLength !== contentLength) {
|
||||
return this.sendInconsistentLengthProblem(response);
|
||||
return this.sendInconsistentLengthProblem(res);
|
||||
}
|
||||
|
||||
const assetId = this.cryptoRepository.randomUUID();
|
||||
@@ -65,20 +65,20 @@ export class AssetUploadService extends BaseService {
|
||||
}
|
||||
|
||||
if (duplicate.status !== AssetStatus.Partial) {
|
||||
return this.sendAlreadyCompletedProblem(response);
|
||||
return this.sendAlreadyCompletedProblem(res);
|
||||
}
|
||||
const location = `/api/upload/${duplicate.id}`;
|
||||
response.status(201).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
|
||||
res.status(201).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
|
||||
return;
|
||||
}
|
||||
this.logger.error(`Error creating upload asset record: ${error.message}`);
|
||||
response.status(500).send('Error creating upload asset record');
|
||||
res.status(500).send('Error creating upload asset record');
|
||||
return;
|
||||
}
|
||||
|
||||
const location = `/api/upload/${assetId}`;
|
||||
if (version <= MAX_RUFH_INTEROP_VERSION) {
|
||||
this.sendInterimResponse(response, location, version);
|
||||
this.sendInterimResponse(res, location, version);
|
||||
}
|
||||
|
||||
await this.storageRepository.mkdir(folder);
|
||||
@@ -93,33 +93,33 @@ export class AssetUploadService extends BaseService {
|
||||
|
||||
writeStream.on('error', (error) => {
|
||||
this.logger.error(`Failed to write chunk to ${path}: ${error.message}`);
|
||||
if (!response.headersSent) {
|
||||
response.status(500).setHeader('Location', location).send();
|
||||
if (!res.headersSent) {
|
||||
res.status(500).setHeader('Location', location).send();
|
||||
}
|
||||
});
|
||||
|
||||
writeStream.on('finish', async () => {
|
||||
if (!isComplete) {
|
||||
return response.status(201).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
|
||||
return res.status(201).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
|
||||
}
|
||||
this.logger.log(`Finished upload to ${path}`);
|
||||
if (dto.checksum.compare(checksumBuffer!) !== 0) {
|
||||
return this.sendChecksumMismatchResponse(response, assetId, path);
|
||||
return this.sendChecksumMismatchResponse(res, assetId, path);
|
||||
}
|
||||
|
||||
try {
|
||||
await this.onComplete({ assetId, path, size: contentLength, fileModifiedAt: assetData.fileModifiedAt });
|
||||
} finally {
|
||||
this.setCompleteHeader(response, dto.version, true);
|
||||
response.status(200).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
|
||||
this.setCompleteHeader(res, dto.version, true);
|
||||
res.status(200).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
|
||||
}
|
||||
});
|
||||
|
||||
req.on('error', (error) => {
|
||||
this.logger.error(`Failed to read request body: ${error.message}`);
|
||||
writeStream.end();
|
||||
if (!response.headersSent) {
|
||||
response.status(500).setHeader('Location', location).send();
|
||||
if (!res.headersSent) {
|
||||
res.status(500).setHeader('Location', location).send();
|
||||
}
|
||||
});
|
||||
|
||||
@@ -128,7 +128,7 @@ export class AssetUploadService extends BaseService {
|
||||
if (receivedLength + chunk.length > contentLength) {
|
||||
writeStream.destroy();
|
||||
req.destroy();
|
||||
response.status(400).send('Received more data than specified in content-length');
|
||||
res.status(400).send('Received more data than specified in content-length');
|
||||
return this.onCancel(assetId, path);
|
||||
}
|
||||
receivedLength += chunk.length;
|
||||
@@ -146,13 +146,14 @@ export class AssetUploadService extends BaseService {
|
||||
writeStream.destroy();
|
||||
this.onCancel(assetId, path);
|
||||
});
|
||||
await new Promise((resolve) => writeStream.on('close', resolve));
|
||||
}
|
||||
|
||||
resumeUpload(req: AuthenticatedRequest, response: Response, id: string, dto: ResumeUploadDto): Promise<void> {
|
||||
resumeUpload(req: AuthenticatedRequest, res: Response, id: string, dto: ResumeUploadDto): Promise<void> {
|
||||
this.logger.verboseFn(() => `Resuming upload for ${id}: ${JSON.stringify(dto)}`);
|
||||
const { isComplete, uploadLength, uploadOffset, contentLength, version } = dto;
|
||||
if (isComplete && uploadLength !== undefined && uploadLength !== contentLength) {
|
||||
this.sendInconsistentLengthProblem(response);
|
||||
this.sendInconsistentLengthProblem(res);
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
@@ -163,12 +164,12 @@ export class AssetUploadService extends BaseService {
|
||||
return this.databaseRepository.withUuidLock(id, async () => {
|
||||
const asset = await this.assetRepository.getCompletionMetadata(id, req.auth.user.id);
|
||||
if (!asset) {
|
||||
response.status(404).send('Asset not found');
|
||||
res.status(404).send('Asset not found');
|
||||
return;
|
||||
}
|
||||
|
||||
if (asset.status !== AssetStatus.Partial) {
|
||||
return this.sendAlreadyCompletedProblem(response);
|
||||
return this.sendAlreadyCompletedProblem(res);
|
||||
}
|
||||
if (uploadOffset === null) {
|
||||
throw new BadRequestException('Missing Upload-Offset header');
|
||||
@@ -177,15 +178,15 @@ export class AssetUploadService extends BaseService {
|
||||
const { path } = asset;
|
||||
const expectedOffset = await this.getCurrentOffset(path);
|
||||
if (expectedOffset !== uploadOffset) {
|
||||
this.setCompleteHeader(response, version, false);
|
||||
return this.sendOffsetMismatchProblem(response, expectedOffset, uploadOffset);
|
||||
this.setCompleteHeader(res, version, false);
|
||||
return this.sendOffsetMismatchProblem(res, expectedOffset, uploadOffset);
|
||||
}
|
||||
|
||||
const newLength = uploadOffset + contentLength;
|
||||
|
||||
// If upload length is provided, validate we're not exceeding it
|
||||
if (uploadLength !== undefined && newLength > uploadLength) {
|
||||
response.status(400).send('Upload would exceed declared length');
|
||||
res.status(400).send('Upload would exceed declared length');
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -193,8 +194,8 @@ export class AssetUploadService extends BaseService {
|
||||
|
||||
// Empty PATCH without Upload-Complete
|
||||
if (contentLength === 0 && !isComplete) {
|
||||
this.setCompleteHeader(response, version, false);
|
||||
response.status(204).setHeader('Upload-Offset', expectedOffset.toString()).send();
|
||||
this.setCompleteHeader(res, version, false);
|
||||
res.status(204).setHeader('Upload-Offset', expectedOffset.toString()).send();
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -203,29 +204,29 @@ export class AssetUploadService extends BaseService {
|
||||
|
||||
writeStream.on('error', (error) => {
|
||||
this.logger.error(`Failed to write chunk to ${path}: ${error.message}`);
|
||||
if (!response.headersSent) {
|
||||
response.status(500).send('Failed to write chunk');
|
||||
if (!res.headersSent) {
|
||||
res.status(500).send('Failed to write chunk');
|
||||
}
|
||||
});
|
||||
|
||||
writeStream.on('finish', async () => {
|
||||
const currentOffset = await this.getCurrentOffset(path);
|
||||
if (!isComplete) {
|
||||
this.setCompleteHeader(response, version, false);
|
||||
return response.status(204).setHeader('Upload-Offset', currentOffset.toString()).send();
|
||||
this.setCompleteHeader(res, version, false);
|
||||
return res.status(204).setHeader('Upload-Offset', currentOffset.toString()).send();
|
||||
}
|
||||
|
||||
this.logger.log(`Finished upload to ${path}`);
|
||||
const checksum = await this.cryptoRepository.hashFile(path);
|
||||
if (asset.checksum.compare(checksum) !== 0) {
|
||||
return this.sendChecksumMismatchResponse(response, id, path);
|
||||
return this.sendChecksumMismatchResponse(res, id, path);
|
||||
}
|
||||
|
||||
try {
|
||||
await this.onComplete({ assetId: id, path, size: currentOffset, fileModifiedAt: asset.fileModifiedAt });
|
||||
} finally {
|
||||
this.setCompleteHeader(response, version, true);
|
||||
response.status(200).setHeader('Upload-Offset', currentOffset.toString()).send();
|
||||
this.setCompleteHeader(res, version, true);
|
||||
res.status(200).setHeader('Upload-Offset', currentOffset.toString()).send();
|
||||
}
|
||||
});
|
||||
|
||||
@@ -234,7 +235,7 @@ export class AssetUploadService extends BaseService {
|
||||
this.logger.error(`Received more data than specified in content-length for upload to ${path}`);
|
||||
writeStream.destroy();
|
||||
req.destroy();
|
||||
response.status(400).send('Received more data than specified in content-length');
|
||||
res.status(400).send('Received more data than specified in content-length');
|
||||
return this.onCancel(id, path);
|
||||
}
|
||||
|
||||
@@ -253,6 +254,7 @@ export class AssetUploadService extends BaseService {
|
||||
writeStream.destroy();
|
||||
return this.onCancel(id, path);
|
||||
});
|
||||
await new Promise((resolve) => writeStream.on('close', resolve));
|
||||
});
|
||||
}
|
||||
|
||||
@@ -269,19 +271,19 @@ export class AssetUploadService extends BaseService {
|
||||
response.status(204).send();
|
||||
}
|
||||
|
||||
async getUploadStatus(auth: AuthDto, response: Response, id: string, { version }: GetUploadStatusDto) {
|
||||
async getUploadStatus(auth: AuthDto, res: Response, id: string, { version }: GetUploadStatusDto): Promise<void> {
|
||||
return this.databaseRepository.withUuidLock(id, async () => {
|
||||
const asset = await this.assetRepository.getCompletionMetadata(id, auth.user.id);
|
||||
if (!asset) {
|
||||
response.status(404).send('Asset not found');
|
||||
res.status(404).send('Asset not found');
|
||||
return;
|
||||
}
|
||||
|
||||
const offset = await this.getCurrentOffset(asset.path);
|
||||
const isComplete = asset.status !== AssetStatus.Partial;
|
||||
|
||||
this.setCompleteHeader(response, version, isComplete);
|
||||
response
|
||||
this.setCompleteHeader(res, version, isComplete);
|
||||
res
|
||||
.status(204)
|
||||
.setHeader('Upload-Offset', offset.toString())
|
||||
.setHeader('Cache-Control', 'no-store')
|
||||
@@ -298,23 +300,22 @@ export class AssetUploadService extends BaseService {
|
||||
const { assetId, path, size, fileModifiedAt } = data;
|
||||
this.logger.debug('Completing upload for asset', assetId);
|
||||
const jobData = { name: JobName.AssetExtractMetadata, data: { id: assetId, source: 'upload' } } as const;
|
||||
await this.withRetry(() => this.assetRepository.setCompleteWithSize(assetId, size));
|
||||
await withRetry(() => this.assetRepository.setCompleteWithSize(assetId, size));
|
||||
try {
|
||||
await this.withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt));
|
||||
await withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt));
|
||||
} catch (error: any) {
|
||||
this.logger.error(`Failed to update times for ${path}: ${error.message}`);
|
||||
}
|
||||
await this.withRetry(() => this.jobRepository.queue(jobData));
|
||||
await withRetry(() => this.jobRepository.queue(jobData));
|
||||
}
|
||||
|
||||
private async onCancel(assetId: string, path: string): Promise<void> {
|
||||
this.logger.debug('Cancelling upload for asset', assetId);
|
||||
await this.withRetry(() => this.storageRepository.unlink(path));
|
||||
await this.withRetry(() => this.assetRepository.remove({ id: assetId }));
|
||||
await withRetry(() => this.storageRepository.unlink(path));
|
||||
await withRetry(() => this.assetRepository.remove({ id: assetId }));
|
||||
}
|
||||
|
||||
private sendInterimResponse(response: Response, location: string, interopVersion: number): void {
|
||||
const socket = response.socket;
|
||||
private sendInterimResponse({ socket }: Response, location: string, interopVersion: number): void {
|
||||
if (socket && !socket.destroyed) {
|
||||
// Express doesn't understand interim responses, so write directly to socket
|
||||
socket.write(
|
||||
@@ -325,22 +326,22 @@ export class AssetUploadService extends BaseService {
|
||||
}
|
||||
}
|
||||
|
||||
private sendInconsistentLengthProblem(response: Response): void {
|
||||
response.status(400).contentType('application/problem+json').send({
|
||||
private sendInconsistentLengthProblem(res: Response): void {
|
||||
res.status(400).contentType('application/problem+json').send({
|
||||
type: 'https://iana.org/assignments/http-problem-types#inconsistent-upload-length',
|
||||
title: 'inconsistent length values for upload',
|
||||
});
|
||||
}
|
||||
|
||||
private sendAlreadyCompletedProblem(response: Response): void {
|
||||
response.status(400).contentType('application/problem+json').send({
|
||||
private sendAlreadyCompletedProblem(res: Response): void {
|
||||
res.status(400).contentType('application/problem+json').send({
|
||||
type: 'https://iana.org/assignments/http-problem-types#completed-upload',
|
||||
title: 'upload is already completed',
|
||||
});
|
||||
}
|
||||
|
||||
private sendOffsetMismatchProblem(response: Response, expected: number, actual: number): void {
|
||||
response.status(409).contentType('application/problem+json').setHeader('Upload-Offset', expected.toString()).send({
|
||||
private sendOffsetMismatchProblem(res: Response, expected: number, actual: number): void {
|
||||
res.status(409).contentType('application/problem+json').setHeader('Upload-Offset', expected.toString()).send({
|
||||
type: 'https://iana.org/assignments/http-problem-types#mismatching-upload-offset',
|
||||
title: 'offset from request does not match offset of resource',
|
||||
'expected-offset': expected,
|
||||
@@ -348,28 +349,13 @@ export class AssetUploadService extends BaseService {
|
||||
});
|
||||
}
|
||||
|
||||
private sendChecksumMismatchResponse(response: Response, assetId: string, path: string): Promise<void> {
|
||||
private sendChecksumMismatchResponse(res: Response, assetId: string, path: string): Promise<void> {
|
||||
this.logger.warn(`Removing upload asset ${assetId} due to checksum mismatch`);
|
||||
response.status(460).send('Checksum mismatch');
|
||||
res.status(460).send('Checksum mismatch');
|
||||
return this.onCancel(assetId, path);
|
||||
}
|
||||
|
||||
private async withRetry<T>(operation: () => Promise<T>, retries: number = 2, delay: number = 100): Promise<T> {
|
||||
let lastError: any;
|
||||
for (let attempt = 0; attempt <= retries; attempt++) {
|
||||
try {
|
||||
return await operation();
|
||||
} catch (error: any) {
|
||||
lastError = error;
|
||||
}
|
||||
if (attempt < retries) {
|
||||
await setTimeout(delay);
|
||||
}
|
||||
}
|
||||
throw lastError;
|
||||
}
|
||||
|
||||
private validateQuota(auth: AuthDto, size: number) {
|
||||
private validateQuota(auth: AuthDto, size: number): void {
|
||||
if (auth.user.quotaSizeInBytes === null) {
|
||||
return;
|
||||
}
|
||||
@@ -391,15 +377,15 @@ export class AssetUploadService extends BaseService {
|
||||
}
|
||||
}
|
||||
|
||||
private setCompleteHeader(response: Response, interopVersion: number | null, isComplete: boolean): void {
|
||||
private setCompleteHeader(res: Response, interopVersion: number | null, isComplete: boolean): void {
|
||||
if (!interopVersion) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (interopVersion > 3) {
|
||||
response.setHeader('Upload-Complete', isComplete ? '?1' : '?0');
|
||||
res.setHeader('Upload-Complete', isComplete ? '?1' : '?0');
|
||||
} else {
|
||||
response.setHeader('Upload-Incomplete', isComplete ? '?0' : '?1');
|
||||
res.setHeader('Upload-Incomplete', isComplete ? '?0' : '?1');
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -14,6 +14,7 @@ import {
|
||||
import _ from 'lodash';
|
||||
import { writeFileSync } from 'node:fs';
|
||||
import path from 'node:path';
|
||||
import { setTimeout } from 'node:timers/promises';
|
||||
import picomatch from 'picomatch';
|
||||
import parse from 'picomatch/lib/parse';
|
||||
import { SystemConfig } from 'src/config';
|
||||
@@ -326,3 +327,18 @@ export const globToSqlPattern = (glob: string) => {
|
||||
export function clamp(value: number, min: number, max: number) {
|
||||
return Math.max(min, Math.min(max, value));
|
||||
}
|
||||
|
||||
export async function withRetry<T>(operation: () => Promise<T>, retries: number = 2, delay: number = 100): Promise<T> {
|
||||
let lastError: any;
|
||||
for (let attempt = 0; attempt <= retries; attempt++) {
|
||||
try {
|
||||
return await operation();
|
||||
} catch (error: any) {
|
||||
lastError = error;
|
||||
}
|
||||
if (attempt < retries) {
|
||||
await setTimeout(delay);
|
||||
}
|
||||
}
|
||||
throw lastError;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user