This commit is contained in:
mertalev
2025-10-06 23:32:20 -04:00
parent de117ebe7a
commit e3e8da168f
15 changed files with 317 additions and 468 deletions

View File

@@ -1,4 +1,4 @@
import { createHash, randomUUID } from 'crypto';
import { createHash, randomUUID } from 'node:crypto';
import { AssetUploadController } from 'src/controllers/asset-upload.controller';
import { AssetUploadService } from 'src/services/asset-upload.service';
import { serializeDictionary } from 'structured-headers';
@@ -31,10 +31,22 @@ describe(AssetUploadController.name, () => {
beforeEach(() => {
service.resetAllMocks();
service.startUpload.mockImplementation(async (auth, req, res, dto) => void res.send());
service.resumeUpload.mockImplementation(async (auth, req, res, id, dto) => void res.send());
service.cancelUpload.mockImplementation(async (auth, id, res) => void res.send());
service.getUploadStatus.mockImplementation(async (auth, res, id, dto) => void res.send());
service.startUpload.mockImplementation((_, __, res, ___) => {
res.send();
return Promise.resolve();
});
service.resumeUpload.mockImplementation((_, __, res, ___, ____) => {
res.send();
return Promise.resolve();
});
service.cancelUpload.mockImplementation((_, __, res) => {
res.send();
return Promise.resolve();
});
service.getUploadStatus.mockImplementation((_, res, __, ___) => {
res.send();
return Promise.resolve();
});
ctx.reset();
buffer = Buffer.from(randomUUID());
@@ -217,7 +229,7 @@ describe(AssetUploadController.name, () => {
});
it('should accept Upload-Incomplete header for version 3', async () => {
const { status, body } = await request(ctx.getHttpServer())
const { status } = await request(ctx.getHttpServer())
.post('/upload')
.set('Upload-Draft-Interop-Version', '3')
.set('X-Immich-Asset-Data', makeAssetData())
@@ -428,7 +440,7 @@ describe(AssetUploadController.name, () => {
});
it('should validate UUID parameter', async () => {
const { status, body } = await request(ctx.getHttpServer())
const { status } = await request(ctx.getHttpServer())
.head('/upload/invalid-uuid')
.set('Upload-Draft-Interop-Version', '8');

View File

@@ -56,7 +56,6 @@ export class AssetUploadController {
- file-created-at (string/date, required): ISO 8601 date string or Unix timestamp
- file-modified-at (string/date, required): ISO 8601 date string or Unix timestamp
- filename (string, required): Original filename
- duration (string, optional): Duration for video assets
- is-favorite (boolean, optional): Favorite status
- icloud-id (string, optional): iCloud identifier for assets from iOS devices`,
required: true,

View File

@@ -20,10 +20,6 @@ export class UploadAssetDataDto {
@ValidateDate()
fileModifiedAt!: Date;
@Optional()
@IsString()
duration?: string;
@IsString()
@IsNotEmpty()
filename!: string;
@@ -105,7 +101,7 @@ export class StartUploadDto extends BaseUploadHeadersDto {
isFavorite: dict.get('is-favorite')?.[0],
iCloudId: dict.get('icloud-id')?.[0],
});
} catch (error: any) {
} catch {
throw new BadRequestException(`${ImmichHeader.AssetData} must be a valid structured dictionary`);
}
})
@@ -156,4 +152,4 @@ export class GetUploadStatusDto extends BaseRufhHeadersDto {}
export class UploadOkDto {
id!: string;
}
}

View File

@@ -14,6 +14,7 @@ from
left join "smart_search" on "asset"."id" = "smart_search"."assetId"
where
"asset"."id" = $1::uuid
and "asset"."status" != 'partial'
limit
$2
@@ -40,6 +41,7 @@ from
"asset"
where
"asset"."id" = $1::uuid
and "asset"."status" != 'partial'
limit
$2
@@ -52,6 +54,7 @@ from
"asset"
where
"asset"."id" = $1::uuid
and "asset"."status" != 'partial'
limit
$2
@@ -78,7 +81,8 @@ from
"asset"
inner join "asset_job_status" on "asset_job_status"."assetId" = "asset"."id"
where
"asset"."deletedAt" is null
"asset"."status" != 'partial'
and "asset"."deletedAt" is null
and "asset"."visibility" != $1
and (
"asset_job_status"."previewAt" is null
@@ -110,6 +114,7 @@ from
"asset"
where
"asset"."id" = $1
and "asset"."status" != 'partial'
-- AssetJobRepository.getForGenerateThumbnailJob
select
@@ -141,6 +146,7 @@ from
inner join "asset_exif" on "asset"."id" = "asset_exif"."assetId"
where
"asset"."id" = $1
and "asset"."status" != 'partial'
-- AssetJobRepository.getForMetadataExtraction
select
@@ -178,6 +184,7 @@ from
"asset"
where
"asset"."id" = $1
and "asset"."status" != 'partial'
-- AssetJobRepository.getAlbumThumbnailFiles
select
@@ -198,7 +205,8 @@ from
inner join "smart_search" on "asset"."id" = "smart_search"."assetId"
inner join "asset_job_status" as "job_status" on "job_status"."assetId" = "asset"."id"
where
"asset"."deletedAt" is null
"asset"."status" != 'partial'
and "asset"."deletedAt" is null
and "asset"."visibility" in ('archive', 'timeline')
and "job_status"."duplicatesDetectedAt" is null
@@ -210,6 +218,7 @@ from
inner join "asset_job_status" as "job_status" on "assetId" = "asset"."id"
where
"asset"."visibility" != $1
and "asset"."status" != 'partial'
and "asset"."deletedAt" is null
and "job_status"."previewAt" is not null
and not exists (
@@ -244,6 +253,7 @@ from
"asset"
where
"asset"."id" = $2
and "asset"."status" != 'partial'
-- AssetJobRepository.getForDetectFacesJob
select
@@ -284,6 +294,7 @@ from
inner join "asset_exif" on "asset"."id" = "asset_exif"."assetId"
where
"asset"."id" = $2
and "asset"."status" != 'partial'
-- AssetJobRepository.getForOcr
select
@@ -385,6 +396,7 @@ from
) as "stacked_assets" on "stack"."id" is not null
where
"asset"."id" = $2
and "asset"."status" != 'partial'
-- AssetJobRepository.streamForVideoConversion
select
@@ -398,6 +410,7 @@ where
or "asset"."encodedVideoPath" = $2
)
and "asset"."visibility" != $3
and "asset"."status" != 'partial'
and "asset"."deletedAt" is null
-- AssetJobRepository.getForVideoConversion
@@ -411,6 +424,7 @@ from
where
"asset"."id" = $1
and "asset"."type" = $2
and "asset"."status" != 'partial'
-- AssetJobRepository.streamForMetadataExtraction
select
@@ -423,6 +437,7 @@ where
"asset_job_status"."metadataExtractedAt" is null
or "asset_job_status"."assetId" is null
)
and "asset"."status" != 'partial'
and "asset"."deletedAt" is null
-- AssetJobRepository.getForStorageTemplateJob
@@ -443,7 +458,8 @@ from
"asset"
inner join "asset_exif" on "asset"."id" = "asset_exif"."assetId"
where
"asset"."deletedAt" is null
"asset"."status" != 'partial'
and "asset"."deletedAt" is null
and "asset"."id" = $1
-- AssetJobRepository.streamForStorageTemplateJob
@@ -464,7 +480,8 @@ from
"asset"
inner join "asset_exif" on "asset"."id" = "asset_exif"."assetId"
where
"asset"."deletedAt" is null
"asset"."status" != 'partial'
and "asset"."deletedAt" is null
-- AssetJobRepository.streamForDeletedJob
select
@@ -474,6 +491,7 @@ from
"asset"
where
"asset"."deletedAt" <= $1
and "asset"."status" != 'partial'
-- AssetJobRepository.streamForSidecar
select
@@ -486,6 +504,7 @@ where
or "asset"."sidecarPath" is null
)
and "asset"."visibility" != $2
and "asset"."status" != 'partial'
-- AssetJobRepository.streamForDetectFacesJob
select
@@ -495,8 +514,10 @@ from
inner join "asset_job_status" as "job_status" on "assetId" = "asset"."id"
where
"asset"."visibility" != $1
and "asset"."status" != 'partial'
and "asset"."deletedAt" is null
and "job_status"."previewAt" is not null
and "asset"."status" != 'partial'
order by
"asset"."fileCreatedAt" desc
@@ -517,4 +538,14 @@ select
from
"asset"
where
"asset"."deletedAt" is null
"asset"."status" != 'partial'
and "asset"."deletedAt" is null
-- AssetJobRepository.streamForPartialAssetCleanupJob
select
"id"
from
"asset"
where
"asset"."status" = 'partial'
and "asset"."createdAt" < $1

View File

@@ -46,6 +46,56 @@ where
"assetId" = $1
and "key" = $2
-- AssetRepository.getCompletionMetadata
select
"originalPath" as "path",
"status",
"fileModifiedAt",
"createdAt",
"checksum",
"fileSizeInByte" as "size"
from
"asset"
inner join "asset_exif" on "asset"."id" = "asset_exif"."assetId"
where
"id" = $1
and "ownerId" = $2
-- AssetRepository.setComplete
update "asset"
set
"status" = $1,
"visibility" = $2
where
"id" = $3
and "status" = 'partial'
-- AssetRepository.removeAndDecrementQuota
with
"asset_exif" as (
select
"fileSizeInByte"
from
"asset_exif"
where
"assetId" = $1
),
"asset" as (
delete from "asset"
where
"id" = $2
returning
"ownerId"
)
update "user"
set
"quotaUsageInBytes" = "quotaUsageInBytes" - "fileSizeInByte"
from
"asset_exif",
"asset"
where
"user"."id" = "asset"."ownerId"
-- AssetRepository.getByDayOfYear
with
"res" as (
@@ -258,7 +308,9 @@ where
-- AssetRepository.getUploadAssetIdByChecksum
select
"id"
"id",
"status",
"createdAt"
from
"asset"
where

View File

@@ -279,6 +279,7 @@ export class AssetRepository {
.execute();
}
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID] })
getCompletionMetadata(assetId: string, ownerId: string) {
return this.db
.selectFrom('asset')
@@ -289,6 +290,7 @@ export class AssetRepository {
.executeTakeFirst();
}
@GenerateSql({ params: [DummyValue.UUID] })
async setComplete(assetId: string) {
await this.db
.updateTable('asset')
@@ -298,6 +300,7 @@ export class AssetRepository {
.execute();
}
@GenerateSql({ params: [DummyValue.UUID] })
async removeAndDecrementQuota(id: string): Promise<void> {
await this.db
.with('asset_exif', (qb) => qb.selectFrom('asset_exif').where('assetId', '=', id).select('fileSizeInByte'))

View File

@@ -160,7 +160,7 @@ export class StorageRepository {
}
}
mkdir(filepath: string): Promise<String | undefined> {
mkdir(filepath: string): Promise<string | undefined> {
return fs.mkdir(filepath, { recursive: true });
}

View File

@@ -4,6 +4,6 @@ export async function up(db: Kysely<any>): Promise<void> {
await sql`ALTER TYPE "assets_status_enum" ADD VALUE IF NOT EXISTS 'partial'`.execute(db);
}
export async function down(db: Kysely<any>): Promise<void> {
export async function down(): Promise<void> {
// Cannot remove enum values in PostgreSQL
}

View File

@@ -24,7 +24,7 @@ describe(AssetUploadService.name, () => {
fileCreatedAt: new Date('2025-01-01T00:00:00Z'),
fileModifiedAt: new Date('2025-01-01T12:00:00Z'),
isFavorite: false,
iCloudId: ''
iCloudId: '',
},
checksum: Buffer.from('checksum'),
uploadLength: 1024,
@@ -167,6 +167,7 @@ describe(AssetUploadService.name, () => {
(checksumError as any).constraint_name = ASSET_CHECKSUM_CONSTRAINT;
mocks.asset.createWithMetadata.mockRejectedValue(checksumError);
// eslint-disable-next-line unicorn/no-useless-undefined
mocks.asset.getUploadAssetIdByChecksum.mockResolvedValue(undefined);
await expect(sut.onStart(authStub.user1, mockDto)).rejects.toThrow(InternalServerErrorException);
@@ -197,29 +198,6 @@ describe(AssetUploadService.name, () => {
]);
});
it('should include duration for video assets', async () => {
const videoDto = {
...mockDto,
assetData: {
...mockDto.assetData,
filename: 'video.mp4',
duration: '00:05:30',
},
};
mocks.crypto.randomUUID.mockReturnValue(factory.uuid());
await sut.onStart(authStub.user1, videoDto);
expect(mocks.asset.createWithMetadata).toHaveBeenCalledWith(
expect.objectContaining({
duration: '00:05:30',
}),
expect.anything(),
undefined,
);
});
it('should set isFavorite when true', async () => {
const favoriteDto = {
...mockDto,
@@ -327,6 +305,7 @@ describe(AssetUploadService.name, () => {
const staleAssets = [{ id: factory.uuid() }, { id: factory.uuid() }, { id: factory.uuid() }];
mocks.assetJob.streamForPartialAssetCleanupJob.mockReturnValue(
// eslint-disable-next-line @typescript-eslint/require-await
(async function* () {
for (const asset of staleAssets) {
yield asset;
@@ -339,16 +318,17 @@ describe(AssetUploadService.name, () => {
expect(mocks.assetJob.streamForPartialAssetCleanupJob).toHaveBeenCalledWith(expect.any(Date));
expect(mocks.job.queueAll).toHaveBeenCalledWith([
{ name: JobName.PartialAssetCleanup, data: staleAssets[0] },
{ name: JobName.PartialAssetCleanup, data: staleAssets[1] },
{ name: JobName.PartialAssetCleanup, data: staleAssets[2] },
]);
{ name: JobName.PartialAssetCleanup, data: staleAssets[0] },
{ name: JobName.PartialAssetCleanup, data: staleAssets[1] },
{ name: JobName.PartialAssetCleanup, data: staleAssets[2] },
]);
});
it('should batch cleanup jobs', async () => {
const assets = Array.from({ length: 1500 }, () => ({ id: factory.uuid() }));
mocks.assetJob.streamForPartialAssetCleanupJob.mockReturnValue(
// eslint-disable-next-line @typescript-eslint/require-await
(async function* () {
for (const asset of assets) {
yield asset;
@@ -376,6 +356,7 @@ describe(AssetUploadService.name, () => {
const path = `/upload/${assetId}/file.jpg`;
it('should skip if asset not found', async () => {
// eslint-disable-next-line unicorn/no-useless-undefined
mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue(undefined);
const result = await sut.removeStaleUpload({ id: assetId });

View File

@@ -36,7 +36,7 @@ export class AssetUploadService extends BaseService {
const asset = await this.onStart(auth, dto);
if (asset.isDuplicate) {
if (asset.status !== AssetStatus.Partial) {
return this.sendAlreadyCompletedProblem(res);
return this.sendAlreadyCompleted(res);
}
const location = `/api/upload/${asset.id}`;
@@ -49,7 +49,7 @@ export class AssetUploadService extends BaseService {
}
if (isComplete && uploadLength !== contentLength) {
return this.sendInconsistentLengthProblem(res);
return this.sendInconsistentLength(res);
}
const location = `/api/upload/${asset.id}`;
@@ -66,25 +66,19 @@ export class AssetUploadService extends BaseService {
req.on('data', (data: Buffer) => hash.update(data));
writeStream.on('finish', () => (checksumBuffer = hash.digest()));
}
await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject));
this.setCompleteHeader(res, dto.version, isComplete);
if (!isComplete) {
res.status(201).set('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
return;
}
this.logger.log(`Finished upload to ${asset.path}`);
if (dto.checksum.compare(checksumBuffer!) !== 0) {
return await this.sendChecksumMismatch(res, asset.id, asset.path);
}
writeStream.on('finish', () => {
this.setCompleteHeader(res, dto.version, isComplete);
if (!isComplete) {
return res.status(201).set('Location', location).setHeader('Upload-Limit', 'min-size=0').send();
}
this.logger.log(`Finished upload to ${asset.path}`);
if (dto.checksum.compare(checksumBuffer!) !== 0) {
return this.sendChecksumMismatchResponse(res, asset.id, asset.path);
}
this.onComplete(metadata)
.then(() => res.status(200).send({ id: asset.id }))
.catch((error) => {
this.logger.error(`Failed to complete upload for ${asset.id}: ${error.message}`);
res.status(500).send();
});
});
await new Promise((resolve) => writeStream.on('close', resolve));
await this.onComplete(metadata);
res.status(200).send({ id: asset.id });
}
resumeUpload(auth: AuthDto, req: Readable, res: Response, id: string, dto: ResumeUploadDto): Promise<void> {
@@ -100,16 +94,16 @@ export class AssetUploadService extends BaseService {
const { fileModifiedAt, path, status, checksum: providedChecksum, size } = completionData;
if (status !== AssetStatus.Partial) {
return this.sendAlreadyCompletedProblem(res);
return this.sendAlreadyCompleted(res);
}
if (uploadLength && size && size !== uploadLength) {
return this.sendInconsistentLengthProblem(res);
return this.sendInconsistentLength(res);
}
const expectedOffset = await this.getCurrentOffset(path);
if (expectedOffset !== uploadOffset) {
return this.sendOffsetMismatchProblem(res, expectedOffset, uploadOffset);
return this.sendOffsetMismatch(res, expectedOffset, uploadOffset);
}
const newLength = uploadOffset + contentLength;
@@ -123,28 +117,29 @@ export class AssetUploadService extends BaseService {
return;
}
const metadata = { id, path, size: contentLength, fileModifiedAt: fileModifiedAt };
const metadata = { id, path, size: contentLength, fileModifiedAt };
const writeStream = this.pipe(req, res, metadata);
writeStream.on('finish', async () => {
this.setCompleteHeader(res, version, isComplete);
const currentOffset = await this.getCurrentOffset(path);
if (!isComplete) {
return res.status(204).setHeader('Upload-Offset', currentOffset.toString()).send();
}
this.logger.log(`Finished upload to ${path}`);
const checksum = await this.cryptoRepository.hashFile(path);
if (providedChecksum.compare(checksum) !== 0) {
return this.sendChecksumMismatchResponse(res, id, path);
}
await new Promise((resolve, reject) => writeStream.on('finish', resolve).on('close', reject));
this.setCompleteHeader(res, version, isComplete);
if (!isComplete) {
try {
await this.onComplete(metadata);
} finally {
res.status(200).send({ id });
const offset = await this.getCurrentOffset(path);
res.status(204).setHeader('Upload-Offset', offset.toString()).send();
} catch {
this.logger.error(`Failed to get current offset for ${path} after write`);
res.status(500).send();
}
});
await new Promise((resolve) => writeStream.on('close', resolve));
return;
}
this.logger.log(`Finished upload to ${path}`);
const checksum = await this.cryptoRepository.hashFile(path);
if (providedChecksum.compare(checksum) !== 0) {
return await this.sendChecksumMismatch(res, id, path);
}
await this.onComplete(metadata);
res.status(200).send({ id });
});
}
@@ -156,7 +151,7 @@ export class AssetUploadService extends BaseService {
return;
}
if (asset.status !== AssetStatus.Partial) {
return this.sendAlreadyCompletedProblem(res);
return this.sendAlreadyCompleted(res);
}
await this.onCancel(assetId, asset.path);
res.status(204).send();
@@ -250,9 +245,8 @@ export class AssetUploadService extends BaseService {
fileCreatedAt: assetData.fileCreatedAt,
fileModifiedAt: assetData.fileModifiedAt,
localDateTime: assetData.fileCreatedAt,
type: type,
type,
isFavorite: assetData.isFavorite,
duration: assetData.duration || null,
visibility: AssetVisibility.Hidden,
originalFileName: assetData.filename,
status: AssetStatus.Partial,
@@ -280,7 +274,7 @@ export class AssetUploadService extends BaseService {
async onComplete({ id, path, fileModifiedAt }: { id: string; path: string; fileModifiedAt: Date }) {
this.logger.debug('Completing upload for asset', id);
const jobData = { name: JobName.AssetExtractMetadata, data: { id: id, source: 'upload' } } as const;
const jobData = { name: JobName.AssetExtractMetadata, data: { id, source: 'upload' } } as const;
await withRetry(() => this.assetRepository.setComplete(id));
try {
await withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt));
@@ -317,9 +311,10 @@ export class AssetUploadService extends BaseService {
if (receivedLength + data.length > size) {
writeStream.destroy();
req.destroy();
return this.onCancel(id, path).finally(() =>
res.status(400).send('Received more data than specified in content-length'),
);
void this.onCancel(id, path)
.catch((error: any) => this.logger.error(`Failed to remove ${id} after too much data: ${error.message}`))
.finally(() => res.status(400).send('Received more data than specified in content-length'));
return;
}
receivedLength += data.length;
if (!writeStream.write(data)) {
@@ -333,9 +328,9 @@ export class AssetUploadService extends BaseService {
return writeStream.end();
}
writeStream.destroy();
this.onCancel(id, path).finally(() =>
res.status(400).send(`Received ${receivedLength} bytes when expecting ${size}`),
);
void this.onCancel(id, path)
.catch((error: any) => this.logger.error(`Failed to remove ${id} after unexpected length: ${error.message}`))
.finally(() => res.status(400).send(`Received ${receivedLength} bytes when expecting ${size}`));
});
return writeStream;
@@ -353,21 +348,21 @@ export class AssetUploadService extends BaseService {
}
}
private sendInconsistentLengthProblem(res: Response): void {
private sendInconsistentLength(res: Response): void {
res.status(400).contentType('application/problem+json').send({
type: 'https://iana.org/assignments/http-problem-types#inconsistent-upload-length',
title: 'inconsistent length values for upload',
});
}
private sendAlreadyCompletedProblem(res: Response): void {
private sendAlreadyCompleted(res: Response): void {
res.status(400).contentType('application/problem+json').send({
type: 'https://iana.org/assignments/http-problem-types#completed-upload',
title: 'upload is already completed',
});
}
private sendOffsetMismatchProblem(res: Response, expected: number, actual: number): void {
private sendOffsetMismatch(res: Response, expected: number, actual: number): void {
res.status(409).contentType('application/problem+json').setHeader('Upload-Offset', expected.toString()).send({
type: 'https://iana.org/assignments/http-problem-types#mismatching-upload-offset',
title: 'offset from request does not match offset of resource',
@@ -376,7 +371,7 @@ export class AssetUploadService extends BaseService {
});
}
private sendChecksumMismatchResponse(res: Response, assetId: string, path: string): Promise<void> {
private sendChecksumMismatch(res: Response, assetId: string, path: string) {
this.logger.warn(`Removing upload asset ${assetId} due to checksum mismatch`);
res.status(460).send('File on server does not match provided checksum');
return this.onCancel(assetId, path);