dto refactor

add logging

handle metadata
This commit is contained in:
mertalev
2025-09-29 18:09:06 -04:00
parent 6f61bf04e4
commit 4ed92f5df5
13 changed files with 795 additions and 370 deletions

View File

@@ -1,12 +1,35 @@
import { Controller, Delete, Head, Options, Param, Patch, Post, Req, Res } from '@nestjs/common';
import { ApiTags } from '@nestjs/swagger';
import { Request, Response } from 'express';
import { BadRequestException, Controller, Delete, Head, Options, Param, Patch, Post, Req, Res } from '@nestjs/common';
import { ApiHeader, ApiTags } from '@nestjs/swagger';
import { plainToInstance } from 'class-transformer';
import { validateSync } from 'class-validator';
import { Response } from 'express';
import { IncomingHttpHeaders } from 'node:http';
import { AuthDto } from 'src/dtos/auth.dto';
import { Permission } from 'src/enum';
import { Auth, Authenticated } from 'src/middleware/auth.guard';
import { GetUploadStatusDto, ResumeUploadDto, StartUploadDto, UploadHeader } from 'src/dtos/upload.dto';
import { ImmichHeader, Permission } from 'src/enum';
import { Auth, Authenticated, AuthenticatedRequest } from 'src/middleware/auth.guard';
import { AssetUploadService } from 'src/services/asset-upload.service';
import { UUIDParamDto } from 'src/validation';
const apiInteropVersion = {
name: UploadHeader.InteropVersion,
description: `Indicates the version of the RUFH protocol supported by the client.`,
required: true,
};
const apiUploadComplete = {
name: UploadHeader.UploadComplete,
description:
'Structured boolean indicating whether this request completes the file. Use Upload-Incomplete instead for version <= 3.',
required: true,
};
const apiContentLength = {
name: UploadHeader.ContentLength,
description: 'Non-negative size of the request body in bytes.',
required: true,
};
@ApiTags('Upload')
@Controller('upload')
export class AssetUploadController {
@@ -14,36 +37,73 @@ export class AssetUploadController {
@Post()
@Authenticated({ sharedLink: true, permission: Permission.AssetUpload })
startUpload(@Auth() auth: AuthDto, @Req() request: Request, @Res() response: Response): Promise<void> {
return this.service.startUpload(auth, request, response);
@ApiHeader({
name: ImmichHeader.AssetData,
description:
'Base64-encoded JSON of asset metadata. The expected content is the same as AssetMediaCreateDto, except that `filename` is required and `sidecarData` is ignored.',
required: true,
})
@ApiHeader({
name: UploadHeader.ReprDigest,
description:
'Structured dictionary containing an SHA-1 checksum used to detect duplicate files and validate data integrity.',
required: true,
})
@ApiHeader(apiInteropVersion)
@ApiHeader(apiUploadComplete)
@ApiHeader(apiContentLength)
startUpload(@Req() req: AuthenticatedRequest, @Res() res: Response): Promise<void> {
const dto = this.getDto(StartUploadDto, req.headers);
console.log('Starting upload with dto:', JSON.stringify(dto));
return this.service.startUpload(req, res, dto);
}
@Patch(':id')
@Authenticated({ sharedLink: true, permission: Permission.AssetUpload })
resumeUpload(
@Auth() auth: AuthDto,
@Param() { id }: UUIDParamDto,
@Req() request: Request,
@Res() response: Response,
): Promise<void> {
return this.service.resumeUpload(auth, id, request, response);
@ApiHeader({
name: UploadHeader.UploadOffset,
description:
'Non-negative byte offset indicating the starting position of the data in the request body within the entire file.',
required: true,
})
@ApiHeader(apiInteropVersion)
@ApiHeader(apiUploadComplete)
@ApiHeader(apiContentLength)
resumeUpload(@Req() req: AuthenticatedRequest, @Res() res: Response, @Param() { id }: UUIDParamDto) {
const dto = this.getDto(ResumeUploadDto, req.headers);
console.log('Resuming upload with dto:', JSON.stringify(dto));
return this.service.resumeUpload(req, res, id, dto);
}
@Delete(':id')
@Authenticated({ sharedLink: true, permission: Permission.AssetUpload })
cancelUpload(@Auth() auth: AuthDto, @Param() { id }: UUIDParamDto, @Res() response: Response): Promise<void> {
return this.service.cancelUpload(auth, id, response);
cancelUpload(@Auth() auth: AuthDto, @Res() res: Response, @Param() { id }: UUIDParamDto) {
return this.service.cancelUpload(auth, id, res);
}
@Head(':id')
@Authenticated({ sharedLink: true, permission: Permission.AssetUpload })
getUploadStatus(@Auth() auth: AuthDto, @Param() { id }: UUIDParamDto, @Res() response: Response): Promise<void> {
return this.service.getUploadStatus(auth, id, response);
@ApiHeader(apiInteropVersion)
getUploadStatus(@Req() req: AuthenticatedRequest, @Res() res: Response, @Param() { id }: UUIDParamDto) {
const dto = this.getDto(GetUploadStatusDto, req.headers);
console.log('Getting upload status with dto:', JSON.stringify(dto));
return this.service.getUploadStatus(req.auth, res, id, dto);
}
@Options()
@Authenticated({ sharedLink: true, permission: Permission.AssetUpload })
getUploadOptions(@Res() response: Response): Promise<void> {
return this.service.getUploadOptions(response);
getUploadOptions(@Res() res: Response) {
return this.service.getUploadOptions(res);
}
private getDto<T extends object>(cls: new () => T, headers: IncomingHttpHeaders): T {
const dto = plainToInstance(cls, headers, { excludeExtraneousValues: true });
const errors = validateSync(dto);
if (errors.length > 0) {
const constraints = errors.map((e) => (e.constraints ? Object.values(e.constraints).join(', ') : '')).join('; ');
console.warn('Upload DTO validation failed:', JSON.stringify(errors, null, 2));
throw new BadRequestException(constraints);
}
return dto;
}
}

View File

@@ -1,133 +1,159 @@
import { Type } from 'class-transformer';
import { IsEnum, IsInt, IsNotEmpty, IsObject, IsString, IsUUID, ValidateNested } from 'class-validator';
import { AssetMediaCreateDto } from 'src/dtos/asset-media.dto';
import { BadRequestException } from '@nestjs/common';
import { Expose, plainToInstance, Transform, Type } from 'class-transformer';
import { Equals, IsArray, IsEnum, IsInt, IsNotEmpty, IsString, Min, ValidateIf, ValidateNested } from 'class-validator';
import { AssetMetadataUpsertItemDto } from 'src/dtos/asset.dto';
import { AssetVisibility, ImmichHeader } from 'src/enum';
import { Optional, ValidateBoolean, ValidateDate, ValidateEnum, ValidateUUID } from 'src/validation';
import { parseDictionary } from 'structured-headers';
export enum TusdHookRequestType {
PreCreate = 'pre-create',
PreFinish = 'pre-finish',
}
export class UploadAssetDataDto {
@IsNotEmpty()
@IsString()
deviceAssetId!: string;
export enum TusdHookStorageType {
FileStore = 'filestore',
}
@IsNotEmpty()
@IsString()
deviceId!: string;
export class TusdStorageDto {
@IsEnum(TusdHookStorageType)
Type!: string;
@ValidateDate()
fileCreatedAt!: Date;
@ValidateDate()
fileModifiedAt!: Date;
@Optional()
@IsString()
duration?: string;
@IsString()
@IsNotEmpty()
Path!: string;
filename!: string;
@IsString()
@IsNotEmpty()
InfoPath!: string;
@ValidateBoolean({ optional: true })
isFavorite?: boolean;
@ValidateEnum({ enum: AssetVisibility, name: 'AssetVisibility', optional: true })
visibility?: AssetVisibility;
@ValidateUUID({ optional: true })
livePhotoVideoId?: string;
@Transform(({ value }) => {
try {
const json = JSON.parse(value);
const items = Array.isArray(json) ? json : [json];
return items.map((item) => plainToInstance(AssetMetadataUpsertItemDto, item));
} catch {
throw new BadRequestException(['metadata must be valid JSON']);
}
})
@Optional()
@ValidateNested({ each: true })
@IsArray()
metadata!: AssetMetadataUpsertItemDto[];
}
export class UploadAssetDataDto extends AssetMediaCreateDto {
@IsString()
@IsNotEmpty()
declare filename: string;
export enum StructuredBoolean {
False = '?0',
True = '?1',
}
export class TusdMetaDataDto {
@IsString()
@IsNotEmpty()
declare AssetData: string; // base64-encoded JSON string of UploadAssetDataDto
export enum UploadHeader {
UploadOffset = 'upload-offset',
ContentLength = 'content-length',
UploadLength = 'upload-length',
UploadComplete = 'upload-complete',
UploadIncomplete = 'upload-incomplete',
InteropVersion = 'upload-draft-interop-version',
ReprDigest = 'repr-digest',
}
export class TusdPreCreateUploadDto {
class BaseRufhHeadersDto {
@Expose({ name: UploadHeader.InteropVersion })
@Min(3)
@IsInt()
Size!: number;
@Type(() => Number)
version!: number;
}
export class TusdPreFinishUploadDto {
@IsUUID()
ID!: string;
export class BaseUploadHeadersDto extends BaseRufhHeadersDto {
@Expose({ name: UploadHeader.ContentLength })
@Min(0)
@IsInt()
Size!: number;
@Type(() => Number)
contentLength!: number;
@Type(() => TusdMetaDataDto)
@ValidateNested()
@IsObject()
MetaData!: TusdMetaDataDto;
@Expose({ name: UploadHeader.UploadComplete })
@ValidateIf((o) => o.requestInterop !== null && o.requestInterop! <= 3)
@IsEnum(StructuredBoolean)
uploadComplete!: StructuredBoolean;
@Type(() => TusdStorageDto)
@ValidateNested()
@IsObject()
Storage!: TusdStorageDto;
@Expose({ name: UploadHeader.UploadIncomplete })
@ValidateIf((o) => o.requestInterop === null || o.requestInterop! > 3)
@IsEnum(StructuredBoolean)
uploadIncomplete!: StructuredBoolean;
@Expose({ name: UploadHeader.UploadLength })
@Min(0)
@IsInt()
@Type(() => Number)
@Optional()
uploadLength?: number;
get isComplete(): boolean {
if (this.version <= 3) {
return this.uploadIncomplete === StructuredBoolean.False;
}
return this.uploadComplete === StructuredBoolean.True;
}
}
export class TusdHttpRequestDto {
@IsString()
@IsNotEmpty()
Method!: string;
export class StartUploadDto extends BaseUploadHeadersDto {
@Expose({ name: ImmichHeader.AssetData })
// @ValidateNested()
// @IsObject()
@Type(() => UploadAssetDataDto)
@Transform(({ value }) => {
if (!value) {
return null;
}
@IsString()
@IsNotEmpty()
URI!: string;
const json = Buffer.from(value, 'base64').toString('utf-8');
try {
return JSON.parse(json);
} catch {
throw new BadRequestException(`${ImmichHeader.AssetData} must be valid base64-encoded JSON`);
}
})
assetData!: UploadAssetDataDto;
@IsObject()
Header!: Record<string, string[]>;
@Expose({ name: UploadHeader.ReprDigest })
@Transform(({ value }) => {
if (!value) {
return null;
}
const checksum = parseDictionary(value).get('sha')?.[0];
if (checksum instanceof ArrayBuffer) {
return Buffer.from(checksum);
}
throw new BadRequestException(`Invalid ${UploadHeader.ReprDigest} header`);
})
checksum!: Buffer;
}
export class TusdPreCreateEventDto {
@Type(() => TusdPreCreateUploadDto)
@ValidateNested()
@IsObject()
Upload!: TusdPreCreateUploadDto;
export class ResumeUploadDto extends BaseUploadHeadersDto {
@Expose({ name: 'content-type' })
@ValidateIf((o) => o.requestInterop !== null && o.requestInterop >= 6)
@Equals('application/partial-upload')
contentType!: number | null;
@Type(() => TusdHttpRequestDto)
@ValidateNested()
@IsObject()
HTTPRequest!: TusdHttpRequestDto;
@Expose({ name: UploadHeader.UploadOffset })
@Min(0)
@IsInt()
@Type(() => Number)
@Optional()
uploadOffset!: number | null;
}
export class TusdPreFinishEventDto {
@Type(() => TusdPreFinishUploadDto)
@ValidateNested()
@IsObject()
Upload!: TusdPreFinishUploadDto;
@Type(() => TusdHttpRequestDto)
@ValidateNested()
@IsObject()
HTTPRequest!: TusdHttpRequestDto;
}
export class TusdHookRequestDto {
@IsEnum(TusdHookRequestType)
Type!: TusdHookRequestType;
@IsObject()
Event!: TusdPreCreateEventDto | TusdPreFinishEventDto;
}
export class TusdHttpResponseDto {
StatusCode!: number;
Body?: string;
Header?: Record<string, string>;
}
export class TusdChangeFileInfoStorageDto {
Path?: string;
}
export class TusdChangeFileInfoDto {
ID?: string;
MetaData?: TusdMetaDataDto;
Storage?: TusdChangeFileInfoStorageDto;
}
export class TusdHookResponseDto {
HTTPResponse?: TusdHttpResponseDto;
RejectUpload?: boolean;
ChangeFileInfo?: TusdChangeFileInfoDto;
}
export class GetUploadStatusDto extends BaseRufhHeadersDto {}

View File

@@ -16,7 +16,7 @@ export class AssetUploadInterceptor implements NestInterceptor {
const res = context.switchToHttp().getResponse<Response<AssetMediaResponseDto>>();
const checksum = fromMaybeArray(req.headers[ImmichHeader.Checksum]);
const response = await this.service.getUploadAssetIdByChecksum(req.user, checksum);
const response = await this.service.getUploadAssetIdByChecksum(req.auth, checksum);
if (response) {
res.status(200);
return of({ status: AssetMediaStatus.DUPLICATE, id: response.id });

View File

@@ -46,7 +46,7 @@ export const Authenticated = (options: AuthenticatedOptions = {}): MethodDecorat
};
export const Auth = createParamDecorator((data, context: ExecutionContext): AuthDto => {
return context.switchToHttp().getRequest<AuthenticatedRequest>().user;
return context.switchToHttp().getRequest<AuthenticatedRequest>().auth;
});
export const FileResponse = () =>
@@ -68,11 +68,11 @@ export const GetLoginDetails = createParamDecorator((data, context: ExecutionCon
});
export interface AuthRequest extends Request {
user?: AuthDto;
auth?: AuthDto;
}
export interface AuthenticatedRequest extends Request {
user: AuthDto;
auth: AuthDto;
}
@Injectable()
@@ -99,7 +99,7 @@ export class AuthGuard implements CanActivate {
} = { sharedLink: false, admin: false, ...options };
const request = context.switchToHttp().getRequest<AuthRequest>();
request.user = await this.authService.authenticate({
request.auth = await this.authService.authenticate({
headers: request.headers,
queryParams: request.query as Record<string, string>,
metadata: { adminRoute, sharedLinkRoute, permission, uri: request.path },

View File

@@ -255,6 +255,18 @@ export class AssetRepository {
return this.db.insertInto('asset').values(asset).returningAll().executeTakeFirstOrThrow();
}
createWithMetadata(asset: Insertable<AssetTable> & { id: string }, metadata?: AssetMetadataItem[]) {
if (!metadata || metadata.length === 0) {
return this.db.insertInto('asset').values(asset).execute();
}
return this.db
.with('asset', (qb) => qb.insertInto('asset').values(asset).returning('id'))
.insertInto('asset_metadata')
.values(metadata.map(({ key, value }) => ({ assetId: asset.id, key, value })))
.execute();
}
getCompletionMetadata(assetId: string, ownerId: string) {
return this.db
.selectFrom('asset')

View File

@@ -62,15 +62,11 @@ export class StorageRepository {
}
createWriteStream(filepath: string): Writable {
return createWriteStream(filepath, { flags: 'w' });
}
overwriteWriteStream(filepath: string, offset = 0): Writable {
return createWriteStream(filepath, { flags: 'r+', start: offset });
return createWriteStream(filepath, { flags: 'w', highWaterMark: 1024 * 1024 });
}
createOrAppendWriteStream(filepath: string): Writable {
return createWriteStream(filepath, { flags: 'a' });
return createWriteStream(filepath, { flags: 'a', highWaterMark: 1024 * 1024 });
}
createOrOverwriteFile(filepath: string, buffer: Buffer) {

View File

@@ -1,70 +1,65 @@
import { BadRequestException, Injectable, InternalServerErrorException } from '@nestjs/common';
import { plainToInstance } from 'class-transformer';
import { validateSync } from 'class-validator';
import { Request, Response } from 'express';
import { Response } from 'express';
import { createHash } from 'node:crypto';
import { extname, join } from 'node:path';
import { setTimeout } from 'node:timers/promises';
import { StorageCore } from 'src/cores/storage.core';
import { AuthDto } from 'src/dtos/auth.dto';
import { UploadAssetDataDto } from 'src/dtos/upload.dto';
import { AssetStatus, AssetType, AssetVisibility, ImmichHeader, JobName, StorageFolder } from 'src/enum';
import { GetUploadStatusDto, ResumeUploadDto, StartUploadDto } from 'src/dtos/upload.dto';
import { AssetStatus, AssetType, AssetVisibility, JobName, StorageFolder } from 'src/enum';
import { AuthenticatedRequest } from 'src/middleware/auth.guard';
import { BaseService } from 'src/services/base.service';
import { isAssetChecksumConstraint } from 'src/utils/database';
import { mimeTypes } from 'src/utils/mime-types';
import { parseDictionary } from 'structured-headers';
const MAX_INTEROP_VERSION = 8;
export const MAX_RUFH_INTEROP_VERSION = 8;
@Injectable()
export class AssetUploadService extends BaseService {
async startUpload(auth: AuthDto, request: Request, response: Response): Promise<void> {
const headers = request.headers;
const requestInterop = this.getNumberHeader(headers, 'upload-draft-interop-version');
const contentLength = this.requireContentLength(headers);
const isComplete = this.requireUploadComplete(headers, requestInterop);
const metadata = this.requireAssetData(headers);
const checksumHeader = this.requireChecksum(headers);
const uploadLength = this.getNumberHeader(headers, 'upload-length');
if (isComplete && uploadLength !== null && uploadLength !== contentLength) {
async startUpload(req: AuthenticatedRequest, response: Response, dto: StartUploadDto): Promise<void> {
this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`);
const { isComplete, assetData, uploadLength, contentLength, version } = dto;
if (isComplete && uploadLength !== undefined && uploadLength !== contentLength) {
return this.sendInconsistentLengthProblem(response);
}
const assetId = this.cryptoRepository.randomUUID();
const folder = StorageCore.getNestedFolder(StorageFolder.Upload, auth.user.id, assetId);
const extension = extname(metadata.filename);
const folder = StorageCore.getNestedFolder(StorageFolder.Upload, req.auth.user.id, assetId);
const extension = extname(assetData.filename);
const path = join(folder, `${assetId}${extension}`);
const type = mimeTypes.assetType(path);
if (type === AssetType.Other) {
throw new BadRequestException(`${metadata.filename} is an unsupported file type`);
throw new BadRequestException(`${assetData.filename} is an unsupported file type`);
}
this.validateQuota(auth, uploadLength ?? contentLength);
this.validateQuota(req.auth, uploadLength ?? contentLength);
try {
await this.assetRepository.create({
id: assetId,
ownerId: auth.user.id,
libraryId: null,
checksum: checksumHeader,
originalPath: path,
deviceAssetId: metadata.deviceAssetId,
deviceId: metadata.deviceId,
fileCreatedAt: metadata.fileCreatedAt,
fileModifiedAt: metadata.fileModifiedAt,
localDateTime: metadata.fileCreatedAt,
type: mimeTypes.assetType(path),
isFavorite: metadata.isFavorite,
duration: metadata.duration || null,
visibility: metadata.visibility || AssetVisibility.Timeline,
originalFileName: metadata.filename,
status: AssetStatus.Partial,
});
await this.assetRepository.createWithMetadata(
{
id: assetId,
ownerId: req.auth.user.id,
libraryId: null,
checksum: dto.checksum,
originalPath: path,
deviceAssetId: assetData.deviceAssetId,
deviceId: assetData.deviceId,
fileCreatedAt: assetData.fileCreatedAt,
fileModifiedAt: assetData.fileModifiedAt,
localDateTime: assetData.fileCreatedAt,
type: mimeTypes.assetType(path),
isFavorite: assetData.isFavorite,
duration: assetData.duration || null,
visibility: assetData.visibility || AssetVisibility.Timeline,
originalFileName: assetData.filename,
status: AssetStatus.Partial,
},
assetData.metadata,
);
} catch (error: any) {
if (isAssetChecksumConstraint(error)) {
const duplicate = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, checksumHeader);
const duplicate = await this.assetRepository.getUploadAssetIdByChecksum(req.auth.user.id, dto.checksum);
if (!duplicate) {
throw new InternalServerErrorException('Error locating duplicate for checksum constraint');
}
@@ -82,8 +77,8 @@ export class AssetUploadService extends BaseService {
}
const location = `/api/upload/${assetId}`;
if (requestInterop !== null && requestInterop >= 3 && requestInterop <= MAX_INTEROP_VERSION) {
this.sendInterimResponse(response, location, requestInterop);
if (version <= MAX_RUFH_INTEROP_VERSION) {
this.sendInterimResponse(response, location, version);
}
await this.storageRepository.mkdir(folder);
@@ -92,7 +87,7 @@ export class AssetUploadService extends BaseService {
if (isComplete) {
const hash = createHash('sha1');
request.on('data', (chunk: Buffer) => hash.update(chunk));
req.on('data', (chunk: Buffer) => hash.update(chunk));
writeStream.on('finish', () => (checksumBuffer = hash.digest()));
}
@@ -103,22 +98,24 @@ export class AssetUploadService extends BaseService {
}
});
writeStream.on('finish', () => {
writeStream.on('finish', async () => {
if (!isComplete) {
return response.status(201).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
}
this.logger.log(`Finished upload to ${path}`);
if (checksumHeader.compare(checksumBuffer!) !== 0) {
if (dto.checksum.compare(checksumBuffer!) !== 0) {
return this.sendChecksumMismatchResponse(response, assetId, path);
}
this.setCompleteHeader(response, requestInterop, true);
response.status(200).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
return this.onComplete({ assetId, path, size: contentLength, fileModifiedAt: metadata.fileModifiedAt });
try {
await this.onComplete({ assetId, path, size: contentLength, fileModifiedAt: assetData.fileModifiedAt });
} finally {
this.setCompleteHeader(response, dto.version, true);
response.status(200).setHeader('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
}
});
request.on('error', (error) => {
req.on('error', (error) => {
this.logger.error(`Failed to read request body: ${error.message}`);
writeStream.end();
if (!response.headersSent) {
@@ -127,45 +124,44 @@ export class AssetUploadService extends BaseService {
});
let receivedLength = 0;
request.on('data', (chunk: Buffer) => {
req.on('data', (chunk: Buffer) => {
if (receivedLength + chunk.length > contentLength) {
writeStream.destroy();
request.destroy();
req.destroy();
response.status(400).send('Received more data than specified in content-length');
return this.removeAsset(assetId, path);
return this.onCancel(assetId, path);
}
receivedLength += chunk.length;
if (!writeStream.write(chunk)) {
request.pause();
writeStream.once('drain', () => request.resume());
req.pause();
writeStream.once('drain', () => req.resume());
}
});
request.on('end', () => {
req.on('end', () => {
if (receivedLength === contentLength) {
return writeStream.end();
}
this.logger.error(`Received ${receivedLength} bytes when expecting ${contentLength} for ${assetId}`);
writeStream.destroy();
this.removeAsset(assetId, path);
this.onCancel(assetId, path);
});
}
async resumeUpload(auth: AuthDto, assetId: string, request: Request, response: Response): Promise<void> {
const headers = request.headers;
const requestInterop = this.getNumberHeader(headers, 'upload-draft-interop-version');
const isComplete = this.requireUploadComplete(headers, requestInterop);
const contentLength = this.requireContentLength(headers);
const providedOffset = this.getNumberHeader(headers, 'upload-offset');
const uploadLength = this.getNumberHeader(headers, 'upload-length');
resumeUpload(req: AuthenticatedRequest, response: Response, id: string, dto: ResumeUploadDto): Promise<void> {
this.logger.verboseFn(() => `Resuming upload for ${id}: ${JSON.stringify(dto)}`);
const { isComplete, uploadLength, uploadOffset, contentLength, version } = dto;
if (isComplete && uploadLength !== undefined && uploadLength !== contentLength) {
this.sendInconsistentLengthProblem(response);
return Promise.resolve();
}
const contentType = headers['content-type'];
if (requestInterop && requestInterop >= 6 && contentType !== 'application/partial-upload') {
if (version && version >= 6 && req.headers['content-type'] !== 'application/partial-upload') {
throw new BadRequestException('Content-Type must be application/partial-upload for PATCH requests');
}
await this.databaseRepository.withUuidLock(assetId, async () => {
const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id);
return this.databaseRepository.withUuidLock(id, async () => {
const asset = await this.assetRepository.getCompletionMetadata(id, req.auth.user.id);
if (!asset) {
response.status(404).send('Asset not found');
return;
@@ -174,30 +170,30 @@ export class AssetUploadService extends BaseService {
if (asset.status !== AssetStatus.Partial) {
return this.sendAlreadyCompletedProblem(response);
}
if (providedOffset === null) {
if (uploadOffset === null) {
throw new BadRequestException('Missing Upload-Offset header');
}
const { path } = asset;
const expectedOffset = await this.getCurrentOffset(path);
if (expectedOffset !== providedOffset) {
this.setCompleteHeader(response, requestInterop, false);
return this.sendOffsetMismatchProblem(response, expectedOffset, providedOffset);
if (expectedOffset !== uploadOffset) {
this.setCompleteHeader(response, version, false);
return this.sendOffsetMismatchProblem(response, expectedOffset, uploadOffset);
}
const newLength = providedOffset + contentLength;
const newLength = uploadOffset + contentLength;
// If upload length is provided, validate we're not exceeding it
if (uploadLength !== null && newLength > uploadLength) {
if (uploadLength !== undefined && newLength > uploadLength) {
response.status(400).send('Upload would exceed declared length');
return;
}
this.validateQuota(auth, newLength);
this.validateQuota(req.auth, newLength);
// Empty PATCH without Upload-Complete
if (contentLength === 0 && !isComplete) {
this.setCompleteHeader(response, requestInterop, false);
this.setCompleteHeader(response, version, false);
response.status(204).setHeader('Upload-Offset', expectedOffset.toString()).send();
return;
}
@@ -215,45 +211,47 @@ export class AssetUploadService extends BaseService {
writeStream.on('finish', async () => {
const currentOffset = await this.getCurrentOffset(path);
if (!isComplete) {
this.setCompleteHeader(response, requestInterop, false);
this.setCompleteHeader(response, version, false);
return response.status(204).setHeader('Upload-Offset', currentOffset.toString()).send();
}
this.logger.log(`Finished upload to ${path}`);
const checksum = await this.cryptoRepository.hashFile(path);
if (asset.checksum.compare(checksum) !== 0) {
return this.sendChecksumMismatchResponse(response, assetId, path);
return this.sendChecksumMismatchResponse(response, id, path);
}
this.setCompleteHeader(response, requestInterop, true);
response.status(200).setHeader('Upload-Offset', currentOffset.toString()).send();
await this.onComplete({ assetId, path, size: currentOffset, fileModifiedAt: asset.fileModifiedAt });
try {
await this.onComplete({ assetId: id, path, size: currentOffset, fileModifiedAt: asset.fileModifiedAt });
} finally {
this.setCompleteHeader(response, version, true);
response.status(200).setHeader('Upload-Offset', currentOffset.toString()).send();
}
});
request.on('data', (chunk: Buffer) => {
req.on('data', (chunk: Buffer) => {
if (receivedLength + chunk.length > contentLength) {
this.logger.error(`Received more data than specified in content-length for upload to ${path}`);
writeStream.destroy();
request.destroy();
req.destroy();
response.status(400).send('Received more data than specified in content-length');
return this.removeAsset(assetId, path);
return this.onCancel(id, path);
}
receivedLength += chunk.length;
if (!writeStream.write(chunk)) {
request.pause();
writeStream.once('drain', () => request.resume());
req.pause();
writeStream.once('drain', () => req.resume());
}
});
request.on('end', () => {
req.on('end', () => {
if (receivedLength === contentLength) {
return writeStream.end();
}
this.logger.error(`Received ${receivedLength} bytes when expecting ${contentLength} for ${assetId}`);
this.logger.error(`Received ${receivedLength} bytes when expecting ${contentLength} for ${id}`);
writeStream.destroy();
return this.removeAsset(assetId, path);
return this.onCancel(id, path);
});
});
}
@@ -267,13 +265,13 @@ export class AssetUploadService extends BaseService {
if (asset.status !== AssetStatus.Partial) {
return this.sendAlreadyCompletedProblem(response);
}
await this.removeAsset(assetId, asset.path);
await this.onCancel(assetId, asset.path);
response.status(204).send();
}
async getUploadStatus(auth: AuthDto, assetId: string, response: Response) {
return this.databaseRepository.withUuidLock(assetId, async () => {
const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id);
async getUploadStatus(auth: AuthDto, response: Response, id: string, { version }: GetUploadStatusDto) {
return this.databaseRepository.withUuidLock(id, async () => {
const asset = await this.assetRepository.getCompletionMetadata(id, auth.user.id);
if (!asset) {
response.status(404).send('Asset not found');
return;
@@ -282,8 +280,7 @@ export class AssetUploadService extends BaseService {
const offset = await this.getCurrentOffset(asset.path);
const isComplete = asset.status !== AssetStatus.Partial;
const requestInterop = this.getNumberHeader(response.req.headers, 'upload-draft-interop-version');
this.setCompleteHeader(response, requestInterop, isComplete);
this.setCompleteHeader(response, version, isComplete);
response
.status(204)
.setHeader('Upload-Offset', offset.toString())
@@ -299,13 +296,19 @@ export class AssetUploadService extends BaseService {
private async onComplete(data: { assetId: string; path: string; size: number; fileModifiedAt: Date }): Promise<void> {
const { assetId, path, size, fileModifiedAt } = data;
this.logger.debug('Completing upload for asset', assetId);
const jobData = { name: JobName.AssetExtractMetadata, data: { id: assetId, source: 'upload' } } as const;
await this.withRetry(() => this.assetRepository.setCompleteWithSize(assetId, size));
try {
await this.withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt));
} catch (error: any) {
this.logger.error(`Failed to update times for ${path}: ${error.message}`);
}
await this.withRetry(() => this.jobRepository.queue(jobData));
await this.withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt));
}
private async removeAsset(assetId: string, path: string): Promise<void> {
private async onCancel(assetId: string, path: string): Promise<void> {
this.logger.debug('Cancelling upload for asset', assetId);
await this.withRetry(() => this.storageRepository.unlink(path));
await this.withRetry(() => this.assetRepository.remove({ id: assetId }));
}
@@ -324,14 +327,14 @@ export class AssetUploadService extends BaseService {
private sendInconsistentLengthProblem(response: Response): void {
response.status(400).contentType('application/problem+json').send({
type: `https://iana.org/assignments/http-problem-types#inconsistent-upload-length`,
type: 'https://iana.org/assignments/http-problem-types#inconsistent-upload-length',
title: 'inconsistent length values for upload',
});
}
private sendAlreadyCompletedProblem(response: Response): void {
response.status(400).contentType('application/problem+json').send({
type: `https://iana.org/assignments/http-problem-types#completed-upload`,
type: 'https://iana.org/assignments/http-problem-types#completed-upload',
title: 'upload is already completed',
});
}
@@ -348,47 +351,7 @@ export class AssetUploadService extends BaseService {
private sendChecksumMismatchResponse(response: Response, assetId: string, path: string): Promise<void> {
this.logger.warn(`Removing upload asset ${assetId} due to checksum mismatch`);
response.status(460).send('Checksum mismatch');
return this.removeAsset(assetId, path);
}
private requireUploadComplete(headers: Request['headers'], interopVersion: number | null): boolean {
if (interopVersion !== null && interopVersion <= 3) {
const value = headers['upload-incomplete'] as string | undefined;
if (value === undefined) {
throw new BadRequestException('Missing Upload-Incomplete header');
}
return value === '?0';
}
const value = headers['upload-complete'] as string | undefined;
if (value === undefined) {
throw new BadRequestException('Missing Upload-Complete header');
}
return value === '?1';
}
private getNumberHeader(headers: Request['headers'], name: string): number | null {
const value = headers[name] as string | undefined;
if (value === undefined) {
return null;
}
const number = parseInt(value, 10);
if (!isFinite(number) || number < 0) {
throw new BadRequestException(`Invalid ${name} header`);
}
return number;
}
private requireContentLength(headers: Request['headers']): number {
const value = headers['content-length'] as string | undefined;
if (value === undefined) {
throw new BadRequestException('Missing Content-Length header');
}
const length = parseInt(value, 10);
if (!isFinite(length) || length < 0) {
throw new BadRequestException('Invalid Content-Length header');
}
return length;
return this.onCancel(assetId, path);
}
private async withRetry<T>(operation: () => Promise<T>, retries: number = 2, delay: number = 100): Promise<T> {
@@ -428,48 +391,6 @@ export class AssetUploadService extends BaseService {
}
}
private requireChecksum(headers: Request['headers']): Buffer {
const value = headers['repr-digest'] as string | undefined;
if (value === undefined) {
throw new BadRequestException(`Missing 'repr-digest' header`);
}
const sha1Item = parseDictionary(value).get('sha');
if (!sha1Item) {
throw new BadRequestException(`Missing 'sha' in 'repr-digest' header`);
}
const checksum = sha1Item[0];
if (!(checksum instanceof ArrayBuffer)) {
throw new BadRequestException(`Invalid 'sha' in 'repr-digest' header`);
}
return Buffer.from(checksum);
}
private requireAssetData(headers: Request['headers']): UploadAssetDataDto {
const value = headers[ImmichHeader.AssetData] as string | undefined;
if (value === undefined) {
throw new BadRequestException(`Missing ${ImmichHeader.AssetData} header`);
}
let assetData: any;
try {
assetData = JSON.parse(Buffer.from(value, 'base64').toString('utf8'));
} catch {
throw new BadRequestException(`${ImmichHeader.AssetData} header is not valid base64-encoded JSON`);
}
const dto = plainToInstance(UploadAssetDataDto, assetData);
const errors = validateSync(dto, { whitelist: true });
if (errors.length > 0) {
const formatted = errors.map((e) => (e.constraints ? Object.values(e.constraints).join(', ') : ''));
throw new BadRequestException(`Invalid ${ImmichHeader.AssetData} header: ${formatted.join('; ')}`);
}
return dto;
}
private setCompleteHeader(response: Response, interopVersion: number | null, isComplete: boolean): void {
if (!interopVersion) {
return;

View File

@@ -192,7 +192,7 @@ export function mapToUploadFile(file: ImmichFile): UploadFile {
export const asUploadRequest = (request: AuthRequest, file: Express.Multer.File): UploadRequest => {
return {
auth: request.user || null,
auth: request.auth || null,
body: request.body,
fieldName: file.fieldname as UploadFieldName,
file: mapToUploadFile(file as ImmichFile),