Compare commits

...

43 Commits

Author SHA1 Message Date
mertalev
7ba458668b remove upload-length from conventional upload e2e 2025-11-06 12:15:54 -05:00
mertalev
ea034f21bc linting 2025-11-06 12:15:54 -05:00
mertalev
a68513247d redundant check 2025-11-06 12:15:54 -05:00
mertalev
59f7f3c23e update api 2025-11-06 12:15:54 -05:00
mertalev
c88bde3cab lint 2025-11-06 12:15:54 -05:00
mertalev
818bd51036 support conventional uploads 2025-11-06 12:15:54 -05:00
mertalev
3c72409712 require header for incomplete uploads 2025-11-06 12:15:54 -05:00
mertalev
8d1a8b9465 reject empty file 2025-11-06 12:15:54 -05:00
mertalev
d880e7baed infer upload length when possible 2025-11-06 12:15:54 -05:00
mertalev
42801ace35 update api 2025-11-06 12:15:54 -05:00
mertalev
838b8e9126 set max-age limit 2025-11-06 12:15:19 -05:00
mertalev
9da5a48bdd add live photo e2e 2025-11-06 12:15:19 -05:00
mertalev
27f126bd58 better abort check 2025-11-06 12:15:19 -05:00
mertalev
a238c6a70d unnecessary change 2025-11-06 12:15:19 -05:00
mertalev
7222d7af30 configurable cleanup 2025-11-06 12:15:19 -05:00
mertalev
d660ab2218 handle live photos 2025-11-06 12:13:33 -05:00
mertalev
69ffbcd5cf tweak types 2025-11-06 12:13:33 -05:00
mertalev
bc84486668 MUST NOT validation 2025-11-06 12:13:33 -05:00
mertalev
2666ee2b4f remove log 2025-11-06 12:12:27 -05:00
mertalev
72ea7799c0 lint 2025-11-06 12:12:27 -05:00
mertalev
98c8c28b62 test interruption + abort 2025-11-06 12:12:27 -05:00
mertalev
6b1d26d3a2 more content length test inputs 2025-11-06 12:12:27 -05:00
mertalev
5e07976288 fix abortion return 2025-11-06 12:12:27 -05:00
mertalev
3f1133f9b7 typo 2025-11-06 12:12:27 -05:00
mertalev
3a087ed2cd proactive abortion 2025-11-06 12:12:27 -05:00
mertalev
c723a9ac78 better content length handling 2025-11-06 12:09:27 -05:00
mertalev
550460891d add timeout 2025-11-06 12:09:27 -05:00
mertalev
e3e8da168f tidying 2025-11-06 12:09:27 -05:00
mertalev
de117ebe7a listen to upload event in e2e
test resume with real image
2025-11-06 12:09:27 -05:00
mertalev
3d507015e0 add service tests 2025-11-06 12:09:26 -05:00
mertalev
fe71662d24 add controller tests, move validation testing from e2e
revert unnecessary change

update mocks

add structured-headers to e2e deps
2025-11-06 12:09:26 -05:00
mertalev
81a66350f6 add note about RFC 9651
authdto

remove excess logs

use structured dictionary
2025-11-06 12:07:34 -05:00
mertalev
c33e65362a clean up stale uploads
stale upload cleanup

try/catch file check
2025-11-06 12:07:34 -05:00
mertalev
bb5519036a unnecessary quota check 2025-11-06 12:07:34 -05:00
mertalev
177c997d96 interim+500
interim+500

interim+500
2025-11-06 12:07:34 -05:00
mertalev
2d6a2dc77b more e2e tests
consistent e2e sections

decrement quota on cancel
2025-11-06 12:07:34 -05:00
mertalev
e193cb3a5b tweaks
shared pipe method

shared pipe method

require size upfront

make length optional for patch requests
2025-11-06 12:07:34 -05:00
mertalev
4b63d3d055 ensure stream is closed before releasing lock 2025-11-06 12:07:34 -05:00
mertalev
4ed92f5df5 dto refactor
add logging

handle metadata
2025-11-06 12:07:34 -05:00
mertalev
6f61bf04e4 backward compatibility 2025-11-06 12:07:34 -05:00
mertalev
b21d0a1c53 working e2e 2025-11-06 12:07:34 -05:00
mertalev
f80326872e interop v8 compliance 2025-11-06 12:07:34 -05:00
mertalev
7561c5e1c4 chunked upload controller 2025-11-06 12:05:56 -05:00
46 changed files with 4271 additions and 35 deletions

View File

@@ -53,5 +53,8 @@
},
"volta": {
"node": "24.11.0"
},
"dependencies": {
"structured-headers": "^2.0.2"
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -561,6 +561,16 @@ export const utils = {
await utils.waitForQueueFinish(accessToken, 'sidecar');
await utils.waitForQueueFinish(accessToken, 'metadataExtraction');
},
downloadAsset: async (accessToken: string, id: string) => {
const downloadedRes = await fetch(`${baseUrl}/api/assets/${id}/original`, {
headers: asBearerAuth(accessToken),
});
if (!downloadedRes.ok) {
throw new Error(`Failed to download asset ${id}: ${downloadedRes.status} ${await downloadedRes.text()}`);
}
return await downloadedRes.blob();
},
};
utils.initSdk();

View File

@@ -265,6 +265,11 @@ Class | Method | HTTP request | Description
*TrashApi* | [**emptyTrash**](doc//TrashApi.md#emptytrash) | **POST** /trash/empty |
*TrashApi* | [**restoreAssets**](doc//TrashApi.md#restoreassets) | **POST** /trash/restore/assets |
*TrashApi* | [**restoreTrash**](doc//TrashApi.md#restoretrash) | **POST** /trash/restore |
*UploadApi* | [**cancelUpload**](doc//UploadApi.md#cancelupload) | **DELETE** /upload/{id} |
*UploadApi* | [**getUploadOptions**](doc//UploadApi.md#getuploadoptions) | **OPTIONS** /upload |
*UploadApi* | [**getUploadStatus**](doc//UploadApi.md#getuploadstatus) | **HEAD** /upload/{id} |
*UploadApi* | [**resumeUpload**](doc//UploadApi.md#resumeupload) | **PATCH** /upload/{id} |
*UploadApi* | [**startUpload**](doc//UploadApi.md#startupload) | **POST** /upload |
*UsersApi* | [**createProfileImage**](doc//UsersApi.md#createprofileimage) | **POST** /users/profile-image |
*UsersApi* | [**deleteProfileImage**](doc//UsersApi.md#deleteprofileimage) | **DELETE** /users/profile-image |
*UsersApi* | [**deleteUserLicense**](doc//UsersApi.md#deleteuserlicense) | **DELETE** /users/me/license |
@@ -579,6 +584,8 @@ Class | Method | HTTP request | Description
- [UpdateAlbumUserDto](doc//UpdateAlbumUserDto.md)
- [UpdateAssetDto](doc//UpdateAssetDto.md)
- [UpdateLibraryDto](doc//UpdateLibraryDto.md)
- [UploadBackupConfig](doc//UploadBackupConfig.md)
- [UploadOkDto](doc//UploadOkDto.md)
- [UsageByUserDto](doc//UsageByUserDto.md)
- [UserAdminCreateDto](doc//UserAdminCreateDto.md)
- [UserAdminDeleteDto](doc//UserAdminDeleteDto.md)

View File

@@ -60,6 +60,7 @@ part 'api/system_metadata_api.dart';
part 'api/tags_api.dart';
part 'api/timeline_api.dart';
part 'api/trash_api.dart';
part 'api/upload_api.dart';
part 'api/users_api.dart';
part 'api/users_admin_api.dart';
part 'api/view_api.dart';
@@ -347,6 +348,8 @@ part 'model/update_album_dto.dart';
part 'model/update_album_user_dto.dart';
part 'model/update_asset_dto.dart';
part 'model/update_library_dto.dart';
part 'model/upload_backup_config.dart';
part 'model/upload_ok_dto.dart';
part 'model/usage_by_user_dto.dart';
part 'model/user_admin_create_dto.dart';
part 'model/user_admin_delete_dto.dart';

379
mobile/openapi/lib/api/upload_api.dart generated Normal file
View File

@@ -0,0 +1,379 @@
//
// AUTO-GENERATED FILE, DO NOT MODIFY!
//
// @dart=2.18
// ignore_for_file: unused_element, unused_import
// ignore_for_file: always_put_required_named_parameters_first
// ignore_for_file: constant_identifier_names
// ignore_for_file: lines_longer_than_80_chars
part of openapi.api;
class UploadApi {
UploadApi([ApiClient? apiClient]) : apiClient = apiClient ?? defaultApiClient;
final ApiClient apiClient;
/// This endpoint requires the `asset.upload` permission.
///
/// Note: This method returns the HTTP [Response].
///
/// Parameters:
///
/// * [String] id (required):
///
/// * [String] key:
///
/// * [String] slug:
Future<Response> cancelUploadWithHttpInfo(String id, { String? key, String? slug, }) async {
// ignore: prefer_const_declarations
final apiPath = r'/upload/{id}'
.replaceAll('{id}', id);
// ignore: prefer_final_locals
Object? postBody;
final queryParams = <QueryParam>[];
final headerParams = <String, String>{};
final formParams = <String, String>{};
if (key != null) {
queryParams.addAll(_queryParams('', 'key', key));
}
if (slug != null) {
queryParams.addAll(_queryParams('', 'slug', slug));
}
const contentTypes = <String>[];
return apiClient.invokeAPI(
apiPath,
'DELETE',
queryParams,
postBody,
headerParams,
formParams,
contentTypes.isEmpty ? null : contentTypes.first,
);
}
/// This endpoint requires the `asset.upload` permission.
///
/// Parameters:
///
/// * [String] id (required):
///
/// * [String] key:
///
/// * [String] slug:
Future<void> cancelUpload(String id, { String? key, String? slug, }) async {
final response = await cancelUploadWithHttpInfo(id, key: key, slug: slug, );
if (response.statusCode >= HttpStatus.badRequest) {
throw ApiException(response.statusCode, await _decodeBodyBytes(response));
}
}
/// Performs an HTTP 'OPTIONS /upload' operation and returns the [Response].
Future<Response> getUploadOptionsWithHttpInfo() async {
// ignore: prefer_const_declarations
final apiPath = r'/upload';
// ignore: prefer_final_locals
Object? postBody;
final queryParams = <QueryParam>[];
final headerParams = <String, String>{};
final formParams = <String, String>{};
const contentTypes = <String>[];
return apiClient.invokeAPI(
apiPath,
'OPTIONS',
queryParams,
postBody,
headerParams,
formParams,
contentTypes.isEmpty ? null : contentTypes.first,
);
}
Future<void> getUploadOptions() async {
final response = await getUploadOptionsWithHttpInfo();
if (response.statusCode >= HttpStatus.badRequest) {
throw ApiException(response.statusCode, await _decodeBodyBytes(response));
}
}
/// This endpoint requires the `asset.upload` permission.
///
/// Note: This method returns the HTTP [Response].
///
/// Parameters:
///
/// * [String] id (required):
///
/// * [String] uploadDraftInteropVersion (required):
/// Indicates the version of the RUFH protocol supported by the client.
///
/// * [String] key:
///
/// * [String] slug:
Future<Response> getUploadStatusWithHttpInfo(String id, String uploadDraftInteropVersion, { String? key, String? slug, }) async {
// ignore: prefer_const_declarations
final apiPath = r'/upload/{id}'
.replaceAll('{id}', id);
// ignore: prefer_final_locals
Object? postBody;
final queryParams = <QueryParam>[];
final headerParams = <String, String>{};
final formParams = <String, String>{};
if (key != null) {
queryParams.addAll(_queryParams('', 'key', key));
}
if (slug != null) {
queryParams.addAll(_queryParams('', 'slug', slug));
}
headerParams[r'upload-draft-interop-version'] = parameterToString(uploadDraftInteropVersion);
const contentTypes = <String>[];
return apiClient.invokeAPI(
apiPath,
'HEAD',
queryParams,
postBody,
headerParams,
formParams,
contentTypes.isEmpty ? null : contentTypes.first,
);
}
/// This endpoint requires the `asset.upload` permission.
///
/// Parameters:
///
/// * [String] id (required):
///
/// * [String] uploadDraftInteropVersion (required):
/// Indicates the version of the RUFH protocol supported by the client.
///
/// * [String] key:
///
/// * [String] slug:
Future<void> getUploadStatus(String id, String uploadDraftInteropVersion, { String? key, String? slug, }) async {
final response = await getUploadStatusWithHttpInfo(id, uploadDraftInteropVersion, key: key, slug: slug, );
if (response.statusCode >= HttpStatus.badRequest) {
throw ApiException(response.statusCode, await _decodeBodyBytes(response));
}
}
/// This endpoint requires the `asset.upload` permission.
///
/// Note: This method returns the HTTP [Response].
///
/// Parameters:
///
/// * [String] contentLength (required):
/// Non-negative size of the request body in bytes.
///
/// * [String] id (required):
///
/// * [String] uploadComplete (required):
/// Structured boolean indicating whether this request completes the file. Use Upload-Incomplete instead for version <= 3.
///
/// * [String] uploadDraftInteropVersion (required):
/// Indicates the version of the RUFH protocol supported by the client.
///
/// * [String] uploadOffset (required):
/// Non-negative byte offset indicating the starting position of the data in the request body within the entire file.
///
/// * [String] key:
///
/// * [String] slug:
Future<Response> resumeUploadWithHttpInfo(String contentLength, String id, String uploadComplete, String uploadDraftInteropVersion, String uploadOffset, { String? key, String? slug, }) async {
// ignore: prefer_const_declarations
final apiPath = r'/upload/{id}'
.replaceAll('{id}', id);
// ignore: prefer_final_locals
Object? postBody;
final queryParams = <QueryParam>[];
final headerParams = <String, String>{};
final formParams = <String, String>{};
if (key != null) {
queryParams.addAll(_queryParams('', 'key', key));
}
if (slug != null) {
queryParams.addAll(_queryParams('', 'slug', slug));
}
headerParams[r'content-length'] = parameterToString(contentLength);
headerParams[r'upload-complete'] = parameterToString(uploadComplete);
headerParams[r'upload-draft-interop-version'] = parameterToString(uploadDraftInteropVersion);
headerParams[r'upload-offset'] = parameterToString(uploadOffset);
const contentTypes = <String>[];
return apiClient.invokeAPI(
apiPath,
'PATCH',
queryParams,
postBody,
headerParams,
formParams,
contentTypes.isEmpty ? null : contentTypes.first,
);
}
/// This endpoint requires the `asset.upload` permission.
///
/// Parameters:
///
/// * [String] contentLength (required):
/// Non-negative size of the request body in bytes.
///
/// * [String] id (required):
///
/// * [String] uploadComplete (required):
/// Structured boolean indicating whether this request completes the file. Use Upload-Incomplete instead for version <= 3.
///
/// * [String] uploadDraftInteropVersion (required):
/// Indicates the version of the RUFH protocol supported by the client.
///
/// * [String] uploadOffset (required):
/// Non-negative byte offset indicating the starting position of the data in the request body within the entire file.
///
/// * [String] key:
///
/// * [String] slug:
Future<UploadOkDto?> resumeUpload(String contentLength, String id, String uploadComplete, String uploadDraftInteropVersion, String uploadOffset, { String? key, String? slug, }) async {
final response = await resumeUploadWithHttpInfo(contentLength, id, uploadComplete, uploadDraftInteropVersion, uploadOffset, key: key, slug: slug, );
if (response.statusCode >= HttpStatus.badRequest) {
throw ApiException(response.statusCode, await _decodeBodyBytes(response));
}
// When a remote server returns no body with a status of 204, we shall not decode it.
// At the time of writing this, `dart:convert` will throw an "Unexpected end of input"
// FormatException when trying to decode an empty string.
if (response.body.isNotEmpty && response.statusCode != HttpStatus.noContent) {
return await apiClient.deserializeAsync(await _decodeBodyBytes(response), 'UploadOkDto',) as UploadOkDto;
}
return null;
}
/// This endpoint requires the `asset.upload` permission.
///
/// Note: This method returns the HTTP [Response].
///
/// Parameters:
///
/// * [String] contentLength (required):
/// Non-negative size of the request body in bytes.
///
/// * [String] reprDigest (required):
/// RFC 9651 structured dictionary containing an `sha` (bytesequence) checksum used to detect duplicate files and validate data integrity.
///
/// * [String] xImmichAssetData (required):
/// RFC 9651 structured dictionary containing asset metadata with the following keys: - device-asset-id (string, required): Unique device asset identifier - device-id (string, required): Device identifier - file-created-at (string/date, required): ISO 8601 date string or Unix timestamp - file-modified-at (string/date, required): ISO 8601 date string or Unix timestamp - filename (string, required): Original filename - is-favorite (boolean, optional): Favorite status - live-photo-video-id (string, optional): Live photo ID for assets from iOS devices - icloud-id (string, optional): iCloud identifier for assets from iOS devices
///
/// * [String] key:
///
/// * [String] slug:
///
/// * [String] uploadComplete:
/// Structured boolean indicating whether this request completes the file. Use Upload-Incomplete instead for version <= 3.
///
/// * [String] uploadDraftInteropVersion:
/// Indicates the version of the RUFH protocol supported by the client.
Future<Response> startUploadWithHttpInfo(String contentLength, String reprDigest, String xImmichAssetData, { String? key, String? slug, String? uploadComplete, String? uploadDraftInteropVersion, }) async {
// ignore: prefer_const_declarations
final apiPath = r'/upload';
// ignore: prefer_final_locals
Object? postBody;
final queryParams = <QueryParam>[];
final headerParams = <String, String>{};
final formParams = <String, String>{};
if (key != null) {
queryParams.addAll(_queryParams('', 'key', key));
}
if (slug != null) {
queryParams.addAll(_queryParams('', 'slug', slug));
}
headerParams[r'content-length'] = parameterToString(contentLength);
headerParams[r'repr-digest'] = parameterToString(reprDigest);
if (uploadComplete != null) {
headerParams[r'upload-complete'] = parameterToString(uploadComplete);
}
if (uploadDraftInteropVersion != null) {
headerParams[r'upload-draft-interop-version'] = parameterToString(uploadDraftInteropVersion);
}
headerParams[r'x-immich-asset-data'] = parameterToString(xImmichAssetData);
const contentTypes = <String>[];
return apiClient.invokeAPI(
apiPath,
'POST',
queryParams,
postBody,
headerParams,
formParams,
contentTypes.isEmpty ? null : contentTypes.first,
);
}
/// This endpoint requires the `asset.upload` permission.
///
/// Parameters:
///
/// * [String] contentLength (required):
/// Non-negative size of the request body in bytes.
///
/// * [String] reprDigest (required):
/// RFC 9651 structured dictionary containing an `sha` (bytesequence) checksum used to detect duplicate files and validate data integrity.
///
/// * [String] xImmichAssetData (required):
/// RFC 9651 structured dictionary containing asset metadata with the following keys: - device-asset-id (string, required): Unique device asset identifier - device-id (string, required): Device identifier - file-created-at (string/date, required): ISO 8601 date string or Unix timestamp - file-modified-at (string/date, required): ISO 8601 date string or Unix timestamp - filename (string, required): Original filename - is-favorite (boolean, optional): Favorite status - live-photo-video-id (string, optional): Live photo ID for assets from iOS devices - icloud-id (string, optional): iCloud identifier for assets from iOS devices
///
/// * [String] key:
///
/// * [String] slug:
///
/// * [String] uploadComplete:
/// Structured boolean indicating whether this request completes the file. Use Upload-Incomplete instead for version <= 3.
///
/// * [String] uploadDraftInteropVersion:
/// Indicates the version of the RUFH protocol supported by the client.
Future<UploadOkDto?> startUpload(String contentLength, String reprDigest, String xImmichAssetData, { String? key, String? slug, String? uploadComplete, String? uploadDraftInteropVersion, }) async {
final response = await startUploadWithHttpInfo(contentLength, reprDigest, xImmichAssetData, key: key, slug: slug, uploadComplete: uploadComplete, uploadDraftInteropVersion: uploadDraftInteropVersion, );
if (response.statusCode >= HttpStatus.badRequest) {
throw ApiException(response.statusCode, await _decodeBodyBytes(response));
}
// When a remote server returns no body with a status of 204, we shall not decode it.
// At the time of writing this, `dart:convert` will throw an "Unexpected end of input"
// FormatException when trying to decode an empty string.
if (response.body.isNotEmpty && response.statusCode != HttpStatus.noContent) {
return await apiClient.deserializeAsync(await _decodeBodyBytes(response), 'UploadOkDto',) as UploadOkDto;
}
return null;
}
}

View File

@@ -748,6 +748,10 @@ class ApiClient {
return UpdateAssetDto.fromJson(value);
case 'UpdateLibraryDto':
return UpdateLibraryDto.fromJson(value);
case 'UploadBackupConfig':
return UploadBackupConfig.fromJson(value);
case 'UploadOkDto':
return UploadOkDto.fromJson(value);
case 'UsageByUserDto':
return UsageByUserDto.fromJson(value);
case 'UserAdminCreateDto':

View File

@@ -14,25 +14,31 @@ class SystemConfigBackupsDto {
/// Returns a new [SystemConfigBackupsDto] instance.
SystemConfigBackupsDto({
required this.database,
required this.upload,
});
DatabaseBackupConfig database;
UploadBackupConfig upload;
@override
bool operator ==(Object other) => identical(this, other) || other is SystemConfigBackupsDto &&
other.database == database;
other.database == database &&
other.upload == upload;
@override
int get hashCode =>
// ignore: unnecessary_parenthesis
(database.hashCode);
(database.hashCode) +
(upload.hashCode);
@override
String toString() => 'SystemConfigBackupsDto[database=$database]';
String toString() => 'SystemConfigBackupsDto[database=$database, upload=$upload]';
Map<String, dynamic> toJson() {
final json = <String, dynamic>{};
json[r'database'] = this.database;
json[r'upload'] = this.upload;
return json;
}
@@ -46,6 +52,7 @@ class SystemConfigBackupsDto {
return SystemConfigBackupsDto(
database: DatabaseBackupConfig.fromJson(json[r'database'])!,
upload: UploadBackupConfig.fromJson(json[r'upload'])!,
);
}
return null;
@@ -94,6 +101,7 @@ class SystemConfigBackupsDto {
/// The list of required keys that must be present in a JSON.
static const requiredKeys = <String>{
'database',
'upload',
};
}

View File

@@ -17,6 +17,7 @@ class SystemConfigNightlyTasksDto {
required this.databaseCleanup,
required this.generateMemories,
required this.missingThumbnails,
required this.removeStaleUploads,
required this.startTime,
required this.syncQuotaUsage,
});
@@ -29,6 +30,8 @@ class SystemConfigNightlyTasksDto {
bool missingThumbnails;
bool removeStaleUploads;
String startTime;
bool syncQuotaUsage;
@@ -39,6 +42,7 @@ class SystemConfigNightlyTasksDto {
other.databaseCleanup == databaseCleanup &&
other.generateMemories == generateMemories &&
other.missingThumbnails == missingThumbnails &&
other.removeStaleUploads == removeStaleUploads &&
other.startTime == startTime &&
other.syncQuotaUsage == syncQuotaUsage;
@@ -49,11 +53,12 @@ class SystemConfigNightlyTasksDto {
(databaseCleanup.hashCode) +
(generateMemories.hashCode) +
(missingThumbnails.hashCode) +
(removeStaleUploads.hashCode) +
(startTime.hashCode) +
(syncQuotaUsage.hashCode);
@override
String toString() => 'SystemConfigNightlyTasksDto[clusterNewFaces=$clusterNewFaces, databaseCleanup=$databaseCleanup, generateMemories=$generateMemories, missingThumbnails=$missingThumbnails, startTime=$startTime, syncQuotaUsage=$syncQuotaUsage]';
String toString() => 'SystemConfigNightlyTasksDto[clusterNewFaces=$clusterNewFaces, databaseCleanup=$databaseCleanup, generateMemories=$generateMemories, missingThumbnails=$missingThumbnails, removeStaleUploads=$removeStaleUploads, startTime=$startTime, syncQuotaUsage=$syncQuotaUsage]';
Map<String, dynamic> toJson() {
final json = <String, dynamic>{};
@@ -61,6 +66,7 @@ class SystemConfigNightlyTasksDto {
json[r'databaseCleanup'] = this.databaseCleanup;
json[r'generateMemories'] = this.generateMemories;
json[r'missingThumbnails'] = this.missingThumbnails;
json[r'removeStaleUploads'] = this.removeStaleUploads;
json[r'startTime'] = this.startTime;
json[r'syncQuotaUsage'] = this.syncQuotaUsage;
return json;
@@ -79,6 +85,7 @@ class SystemConfigNightlyTasksDto {
databaseCleanup: mapValueOfType<bool>(json, r'databaseCleanup')!,
generateMemories: mapValueOfType<bool>(json, r'generateMemories')!,
missingThumbnails: mapValueOfType<bool>(json, r'missingThumbnails')!,
removeStaleUploads: mapValueOfType<bool>(json, r'removeStaleUploads')!,
startTime: mapValueOfType<String>(json, r'startTime')!,
syncQuotaUsage: mapValueOfType<bool>(json, r'syncQuotaUsage')!,
);
@@ -132,6 +139,7 @@ class SystemConfigNightlyTasksDto {
'databaseCleanup',
'generateMemories',
'missingThumbnails',
'removeStaleUploads',
'startTime',
'syncQuotaUsage',
};

View File

@@ -0,0 +1,100 @@
//
// AUTO-GENERATED FILE, DO NOT MODIFY!
//
// @dart=2.18
// ignore_for_file: unused_element, unused_import
// ignore_for_file: always_put_required_named_parameters_first
// ignore_for_file: constant_identifier_names
// ignore_for_file: lines_longer_than_80_chars
part of openapi.api;
class UploadBackupConfig {
/// Returns a new [UploadBackupConfig] instance.
UploadBackupConfig({
required this.maxAgeHours,
});
/// Minimum value: 1
num maxAgeHours;
@override
bool operator ==(Object other) => identical(this, other) || other is UploadBackupConfig &&
other.maxAgeHours == maxAgeHours;
@override
int get hashCode =>
// ignore: unnecessary_parenthesis
(maxAgeHours.hashCode);
@override
String toString() => 'UploadBackupConfig[maxAgeHours=$maxAgeHours]';
Map<String, dynamic> toJson() {
final json = <String, dynamic>{};
json[r'maxAgeHours'] = this.maxAgeHours;
return json;
}
/// Returns a new [UploadBackupConfig] instance and imports its values from
/// [value] if it's a [Map], null otherwise.
// ignore: prefer_constructors_over_static_methods
static UploadBackupConfig? fromJson(dynamic value) {
upgradeDto(value, "UploadBackupConfig");
if (value is Map) {
final json = value.cast<String, dynamic>();
return UploadBackupConfig(
maxAgeHours: num.parse('${json[r'maxAgeHours']}'),
);
}
return null;
}
static List<UploadBackupConfig> listFromJson(dynamic json, {bool growable = false,}) {
final result = <UploadBackupConfig>[];
if (json is List && json.isNotEmpty) {
for (final row in json) {
final value = UploadBackupConfig.fromJson(row);
if (value != null) {
result.add(value);
}
}
}
return result.toList(growable: growable);
}
static Map<String, UploadBackupConfig> mapFromJson(dynamic json) {
final map = <String, UploadBackupConfig>{};
if (json is Map && json.isNotEmpty) {
json = json.cast<String, dynamic>(); // ignore: parameter_assignments
for (final entry in json.entries) {
final value = UploadBackupConfig.fromJson(entry.value);
if (value != null) {
map[entry.key] = value;
}
}
}
return map;
}
// maps a json object with a list of UploadBackupConfig-objects as value to a dart map
static Map<String, List<UploadBackupConfig>> mapListFromJson(dynamic json, {bool growable = false,}) {
final map = <String, List<UploadBackupConfig>>{};
if (json is Map && json.isNotEmpty) {
// ignore: parameter_assignments
json = json.cast<String, dynamic>();
for (final entry in json.entries) {
map[entry.key] = UploadBackupConfig.listFromJson(entry.value, growable: growable,);
}
}
return map;
}
/// The list of required keys that must be present in a JSON.
static const requiredKeys = <String>{
'maxAgeHours',
};
}

View File

@@ -0,0 +1,99 @@
//
// AUTO-GENERATED FILE, DO NOT MODIFY!
//
// @dart=2.18
// ignore_for_file: unused_element, unused_import
// ignore_for_file: always_put_required_named_parameters_first
// ignore_for_file: constant_identifier_names
// ignore_for_file: lines_longer_than_80_chars
part of openapi.api;
class UploadOkDto {
/// Returns a new [UploadOkDto] instance.
UploadOkDto({
required this.id,
});
String id;
@override
bool operator ==(Object other) => identical(this, other) || other is UploadOkDto &&
other.id == id;
@override
int get hashCode =>
// ignore: unnecessary_parenthesis
(id.hashCode);
@override
String toString() => 'UploadOkDto[id=$id]';
Map<String, dynamic> toJson() {
final json = <String, dynamic>{};
json[r'id'] = this.id;
return json;
}
/// Returns a new [UploadOkDto] instance and imports its values from
/// [value] if it's a [Map], null otherwise.
// ignore: prefer_constructors_over_static_methods
static UploadOkDto? fromJson(dynamic value) {
upgradeDto(value, "UploadOkDto");
if (value is Map) {
final json = value.cast<String, dynamic>();
return UploadOkDto(
id: mapValueOfType<String>(json, r'id')!,
);
}
return null;
}
static List<UploadOkDto> listFromJson(dynamic json, {bool growable = false,}) {
final result = <UploadOkDto>[];
if (json is List && json.isNotEmpty) {
for (final row in json) {
final value = UploadOkDto.fromJson(row);
if (value != null) {
result.add(value);
}
}
}
return result.toList(growable: growable);
}
static Map<String, UploadOkDto> mapFromJson(dynamic json) {
final map = <String, UploadOkDto>{};
if (json is Map && json.isNotEmpty) {
json = json.cast<String, dynamic>(); // ignore: parameter_assignments
for (final entry in json.entries) {
final value = UploadOkDto.fromJson(entry.value);
if (value != null) {
map[entry.key] = value;
}
}
}
return map;
}
// maps a json object with a list of UploadOkDto-objects as value to a dart map
static Map<String, List<UploadOkDto>> mapListFromJson(dynamic json, {bool growable = false,}) {
final map = <String, List<UploadOkDto>>{};
if (json is Map && json.isNotEmpty) {
// ignore: parameter_assignments
json = json.cast<String, dynamic>();
for (final entry in json.entries) {
map[entry.key] = UploadOkDto.listFromJson(entry.value, growable: growable,);
}
}
return map;
}
/// The list of required keys that must be present in a JSON.
static const requiredKeys = <String>{
'id',
};
}

View File

@@ -9373,6 +9373,324 @@
"description": "This endpoint requires the `asset.delete` permission."
}
},
"/upload": {
"options": {
"operationId": "getUploadOptions",
"parameters": [],
"responses": {
"204": {
"description": ""
}
},
"tags": [
"Upload"
]
},
"post": {
"operationId": "startUpload",
"parameters": [
{
"name": "content-length",
"in": "header",
"description": "Non-negative size of the request body in bytes.",
"required": true,
"schema": {
"type": "string"
}
},
{
"name": "key",
"required": false,
"in": "query",
"schema": {
"type": "string"
}
},
{
"name": "repr-digest",
"in": "header",
"description": "RFC 9651 structured dictionary containing an `sha` (bytesequence) checksum used to detect duplicate files and validate data integrity.",
"required": true,
"schema": {
"type": "string"
}
},
{
"name": "slug",
"required": false,
"in": "query",
"schema": {
"type": "string"
}
},
{
"name": "upload-complete",
"in": "header",
"description": "Structured boolean indicating whether this request completes the file. Use Upload-Incomplete instead for version <= 3.",
"required": false,
"schema": {
"type": "string"
}
},
{
"name": "upload-draft-interop-version",
"in": "header",
"description": "Indicates the version of the RUFH protocol supported by the client.",
"required": false,
"schema": {
"type": "string"
}
},
{
"name": "x-immich-asset-data",
"in": "header",
"description": "RFC 9651 structured dictionary containing asset metadata with the following keys:\n- device-asset-id (string, required): Unique device asset identifier\n- device-id (string, required): Device identifier\n- file-created-at (string/date, required): ISO 8601 date string or Unix timestamp\n- file-modified-at (string/date, required): ISO 8601 date string or Unix timestamp\n- filename (string, required): Original filename\n- is-favorite (boolean, optional): Favorite status\n- live-photo-video-id (string, optional): Live photo ID for assets from iOS devices\n- icloud-id (string, optional): iCloud identifier for assets from iOS devices",
"required": true,
"schema": {
"type": "string"
}
}
],
"responses": {
"200": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/UploadOkDto"
}
}
},
"description": ""
},
"201": {
"description": ""
}
},
"security": [
{
"bearer": []
},
{
"cookie": []
},
{
"api_key": []
}
],
"tags": [
"Upload"
],
"x-immich-permission": "asset.upload",
"description": "This endpoint requires the `asset.upload` permission."
}
},
"/upload/{id}": {
"delete": {
"operationId": "cancelUpload",
"parameters": [
{
"name": "id",
"required": true,
"in": "path",
"schema": {
"format": "uuid",
"type": "string"
}
},
{
"name": "key",
"required": false,
"in": "query",
"schema": {
"type": "string"
}
},
{
"name": "slug",
"required": false,
"in": "query",
"schema": {
"type": "string"
}
}
],
"responses": {
"200": {
"description": ""
}
},
"security": [
{
"bearer": []
},
{
"cookie": []
},
{
"api_key": []
}
],
"tags": [
"Upload"
],
"x-immich-permission": "asset.upload",
"description": "This endpoint requires the `asset.upload` permission."
},
"head": {
"operationId": "getUploadStatus",
"parameters": [
{
"name": "id",
"required": true,
"in": "path",
"schema": {
"format": "uuid",
"type": "string"
}
},
{
"name": "key",
"required": false,
"in": "query",
"schema": {
"type": "string"
}
},
{
"name": "slug",
"required": false,
"in": "query",
"schema": {
"type": "string"
}
},
{
"name": "upload-draft-interop-version",
"in": "header",
"description": "Indicates the version of the RUFH protocol supported by the client.",
"required": true,
"schema": {
"type": "string"
}
}
],
"responses": {
"200": {
"description": ""
}
},
"security": [
{
"bearer": []
},
{
"cookie": []
},
{
"api_key": []
}
],
"tags": [
"Upload"
],
"x-immich-permission": "asset.upload",
"description": "This endpoint requires the `asset.upload` permission."
},
"patch": {
"operationId": "resumeUpload",
"parameters": [
{
"name": "content-length",
"in": "header",
"description": "Non-negative size of the request body in bytes.",
"required": true,
"schema": {
"type": "string"
}
},
{
"name": "id",
"required": true,
"in": "path",
"schema": {
"format": "uuid",
"type": "string"
}
},
{
"name": "key",
"required": false,
"in": "query",
"schema": {
"type": "string"
}
},
{
"name": "slug",
"required": false,
"in": "query",
"schema": {
"type": "string"
}
},
{
"name": "upload-complete",
"in": "header",
"description": "Structured boolean indicating whether this request completes the file. Use Upload-Incomplete instead for version <= 3.",
"required": true,
"schema": {
"type": "string"
}
},
{
"name": "upload-draft-interop-version",
"in": "header",
"description": "Indicates the version of the RUFH protocol supported by the client.",
"required": true,
"schema": {
"type": "string"
}
},
{
"name": "upload-offset",
"in": "header",
"description": "Non-negative byte offset indicating the starting position of the data in the request body within the entire file.",
"required": true,
"schema": {
"type": "string"
}
}
],
"responses": {
"200": {
"content": {
"application/json": {
"schema": {
"$ref": "#/components/schemas/UploadOkDto"
}
}
},
"description": ""
}
},
"security": [
{
"bearer": []
},
{
"cookie": []
},
{
"api_key": []
}
],
"tags": [
"Upload"
],
"x-immich-permission": "asset.upload",
"description": "This endpoint requires the `asset.upload` permission."
}
},
"/users": {
"get": {
"operationId": "searchUsers",
@@ -16340,10 +16658,14 @@
"properties": {
"database": {
"$ref": "#/components/schemas/DatabaseBackupConfig"
},
"upload": {
"$ref": "#/components/schemas/UploadBackupConfig"
}
},
"required": [
"database"
"database",
"upload"
],
"type": "object"
},
@@ -16876,6 +17198,9 @@
"missingThumbnails": {
"type": "boolean"
},
"removeStaleUploads": {
"type": "boolean"
},
"startTime": {
"type": "string"
},
@@ -16888,6 +17213,7 @@
"databaseCleanup",
"generateMemories",
"missingThumbnails",
"removeStaleUploads",
"startTime",
"syncQuotaUsage"
],
@@ -17740,6 +18066,29 @@
},
"type": "object"
},
"UploadBackupConfig": {
"properties": {
"maxAgeHours": {
"minimum": 1,
"type": "number"
}
},
"required": [
"maxAgeHours"
],
"type": "object"
},
"UploadOkDto": {
"properties": {
"id": {
"type": "string"
}
},
"required": [
"id"
],
"type": "object"
},
"UsageByUserDto": {
"properties": {
"photos": {

View File

@@ -1359,8 +1359,12 @@ export type DatabaseBackupConfig = {
enabled: boolean;
keepLastAmount: number;
};
export type UploadBackupConfig = {
maxAgeHours: number;
};
export type SystemConfigBackupsDto = {
database: DatabaseBackupConfig;
upload: UploadBackupConfig;
};
export type SystemConfigFFmpegDto = {
accel: TranscodeHWAccel;
@@ -1489,6 +1493,7 @@ export type SystemConfigNightlyTasksDto = {
databaseCleanup: boolean;
generateMemories: boolean;
missingThumbnails: boolean;
removeStaleUploads: boolean;
startTime: string;
syncQuotaUsage: boolean;
};
@@ -1654,6 +1659,9 @@ export type TimeBucketsResponseDto = {
export type TrashResponseDto = {
count: number;
};
export type UploadOkDto = {
id: string;
};
export type UserUpdateMeDto = {
avatarColor?: (UserAvatarColor) | null;
email?: string;
@@ -4518,6 +4526,109 @@ export function restoreAssets({ bulkIdsDto }: {
body: bulkIdsDto
})));
}
export function getUploadOptions(opts?: Oazapfts.RequestOpts) {
return oazapfts.ok(oazapfts.fetchText("/upload", {
...opts,
method: "OPTIONS"
}));
}
/**
* This endpoint requires the `asset.upload` permission.
*/
export function startUpload({ contentLength, key, reprDigest, slug, uploadComplete, uploadDraftInteropVersion, xImmichAssetData }: {
contentLength: string;
key?: string;
reprDigest: string;
slug?: string;
uploadComplete?: string;
uploadDraftInteropVersion?: string;
xImmichAssetData: string;
}, opts?: Oazapfts.RequestOpts) {
return oazapfts.ok(oazapfts.fetchJson<{
status: 200;
data: UploadOkDto;
} | {
status: 201;
}>(`/upload${QS.query(QS.explode({
key,
slug
}))}`, {
...opts,
method: "POST",
headers: oazapfts.mergeHeaders(opts?.headers, {
"content-length": contentLength,
"repr-digest": reprDigest,
"upload-complete": uploadComplete,
"upload-draft-interop-version": uploadDraftInteropVersion,
"x-immich-asset-data": xImmichAssetData
})
}));
}
/**
* This endpoint requires the `asset.upload` permission.
*/
export function cancelUpload({ id, key, slug }: {
id: string;
key?: string;
slug?: string;
}, opts?: Oazapfts.RequestOpts) {
return oazapfts.ok(oazapfts.fetchText(`/upload/${encodeURIComponent(id)}${QS.query(QS.explode({
key,
slug
}))}`, {
...opts,
method: "DELETE"
}));
}
/**
* This endpoint requires the `asset.upload` permission.
*/
export function getUploadStatus({ id, key, slug, uploadDraftInteropVersion }: {
id: string;
key?: string;
slug?: string;
uploadDraftInteropVersion: string;
}, opts?: Oazapfts.RequestOpts) {
return oazapfts.ok(oazapfts.fetchText(`/upload/${encodeURIComponent(id)}${QS.query(QS.explode({
key,
slug
}))}`, {
...opts,
method: "HEAD",
headers: oazapfts.mergeHeaders(opts?.headers, {
"upload-draft-interop-version": uploadDraftInteropVersion
})
}));
}
/**
* This endpoint requires the `asset.upload` permission.
*/
export function resumeUpload({ contentLength, id, key, slug, uploadComplete, uploadDraftInteropVersion, uploadOffset }: {
contentLength: string;
id: string;
key?: string;
slug?: string;
uploadComplete: string;
uploadDraftInteropVersion: string;
uploadOffset: string;
}, opts?: Oazapfts.RequestOpts) {
return oazapfts.ok(oazapfts.fetchJson<{
status: 200;
data: UploadOkDto;
}>(`/upload/${encodeURIComponent(id)}${QS.query(QS.explode({
key,
slug
}))}`, {
...opts,
method: "PATCH",
headers: oazapfts.mergeHeaders(opts?.headers, {
"content-length": contentLength,
"upload-complete": uploadComplete,
"upload-draft-interop-version": uploadDraftInteropVersion,
"upload-offset": uploadOffset
})
}));
}
/**
* This endpoint requires the `user.read` permission.
*/

13
pnpm-lock.yaml generated
View File

@@ -191,6 +191,10 @@ importers:
version: 5.9.3
e2e:
dependencies:
structured-headers:
specifier: ^2.0.2
version: 2.0.2
devDependencies:
'@eslint/js':
specifier: ^9.8.0
@@ -511,6 +515,9 @@ importers:
socket.io:
specifier: ^4.8.1
version: 4.8.1
structured-headers:
specifier: ^2.0.2
version: 2.0.2
tailwindcss-preset-email:
specifier: ^1.4.0
version: 1.4.1(tailwindcss@3.4.18(yaml@2.8.1))
@@ -10407,6 +10414,10 @@ packages:
resolution: {integrity: sha512-KIy5nylvC5le1OdaaoCJ07L+8iQzJHGH6pWDuzS+d07Cu7n1MZ2x26P8ZKIWfbK02+XIL8Mp4RkWeqdUCrDMfg==}
engines: {node: '>=18'}
structured-headers@2.0.2:
resolution: {integrity: sha512-IUul56vVHuMg2UxWhwDj9zVJE6ztYEQQkynr1FQ/NydPhivtk5+Qb2N1RS36owEFk2fNUriTguJ2R7htRObcdA==}
engines: {node: '>=18', npm: '>=6'}
style-to-js@1.1.18:
resolution: {integrity: sha512-JFPn62D4kJaPTnhFUI244MThx+FEGbi+9dw1b9yBBQ+1CZpV7QAT8kUtJ7b7EUNdHajjF/0x8fT+16oLJoojLg==}
@@ -23397,6 +23408,8 @@ snapshots:
dependencies:
'@tokenizer/token': 0.3.0
structured-headers@2.0.2: {}
style-to-js@1.1.18:
dependencies:
style-to-object: 1.0.11

View File

@@ -104,6 +104,7 @@
"sharp": "^0.34.4",
"sirv": "^3.0.0",
"socket.io": "^4.8.1",
"structured-headers": "^2.0.2",
"tailwindcss-preset-email": "^1.4.0",
"thumbhash": "^0.1.1",
"ua-parser-js": "^2.0.0",

View File

@@ -22,6 +22,9 @@ export interface SystemConfig {
cronExpression: string;
keepLastAmount: number;
};
upload: {
maxAgeHours: number;
};
};
ffmpeg: {
crf: number;
@@ -140,6 +143,7 @@ export interface SystemConfig {
clusterNewFaces: boolean;
generateMemories: boolean;
syncQuotaUsage: boolean;
removeStaleUploads: boolean;
};
trash: {
enabled: boolean;
@@ -198,6 +202,9 @@ export const defaults = Object.freeze<SystemConfig>({
cronExpression: CronExpression.EVERY_DAY_AT_2AM,
keepLastAmount: 14,
},
upload: {
maxAgeHours: 72,
},
},
ffmpeg: {
crf: 23,
@@ -341,6 +348,7 @@ export const defaults = Object.freeze<SystemConfig>({
syncQuotaUsage: true,
missingThumbnails: true,
clusterNewFaces: true,
removeStaleUploads: true,
},
trash: {
enabled: true,

View File

@@ -0,0 +1,445 @@
import { createHash, randomUUID } from 'node:crypto';
import { AssetUploadController } from 'src/controllers/asset-upload.controller';
import { AssetUploadService } from 'src/services/asset-upload.service';
import { serializeDictionary } from 'structured-headers';
import request from 'supertest';
import { factory } from 'test/small.factory';
import { ControllerContext, controllerSetup, mockBaseService } from 'test/utils';
const makeAssetData = (overrides?: Partial<any>): string => {
return serializeDictionary({
filename: 'test-image.jpg',
'device-asset-id': 'test-asset-id',
'device-id': 'test-device',
'file-created-at': new Date('2025-01-02T00:00:00Z').toISOString(),
'file-modified-at': new Date('2025-01-01T00:00:00Z').toISOString(),
'is-favorite': false,
...overrides,
});
};
describe(AssetUploadController.name, () => {
let ctx: ControllerContext;
let buffer: Buffer;
let checksum: string;
const service = mockBaseService(AssetUploadService);
beforeAll(async () => {
ctx = await controllerSetup(AssetUploadController, [{ provide: AssetUploadService, useValue: service }]);
return () => ctx.close();
});
beforeEach(() => {
service.resetAllMocks();
service.startUpload.mockImplementation((_, __, res, ___) => {
res.send();
return Promise.resolve();
});
service.resumeUpload.mockImplementation((_, __, res, ___, ____) => {
res.send();
return Promise.resolve();
});
service.cancelUpload.mockImplementation((_, __, res) => {
res.send();
return Promise.resolve();
});
service.getUploadStatus.mockImplementation((_, res, __, ___) => {
res.send();
return Promise.resolve();
});
ctx.reset();
buffer = Buffer.from(randomUUID());
checksum = `sha=:${createHash('sha1').update(buffer).digest('base64')}:`;
});
describe('POST /upload', () => {
it('should be an authenticated route', async () => {
await request(ctx.getHttpServer()).post('/upload');
expect(ctx.authenticate).toHaveBeenCalled();
});
it('should require at least version 3 of Upload-Draft-Interop-Version header if provided', async () => {
const { status, body } = await request(ctx.getHttpServer())
.post('/upload')
.set('X-Immich-Asset-Data', makeAssetData())
.set('Upload-Draft-Interop-Version', '2')
.set('Repr-Digest', checksum)
.set('Upload-Complete', '?1')
.set('Upload-Length', '1024')
.send(buffer);
expect(status).toBe(400);
expect(body).toEqual(
expect.objectContaining({
message: expect.arrayContaining(['version must not be less than 3']),
}),
);
});
it('should require X-Immich-Asset-Data header', async () => {
const { status, body } = await request(ctx.getHttpServer())
.post('/upload')
.set('Upload-Draft-Interop-Version', '8')
.set('Repr-Digest', checksum)
.set('Upload-Complete', '?1')
.set('Upload-Length', '1024')
.send(buffer);
expect(status).toBe(400);
expect(body).toEqual(expect.objectContaining({ message: 'x-immich-asset-data header is required' }));
});
it('should require Repr-Digest header', async () => {
const { status, body } = await request(ctx.getHttpServer())
.post('/upload')
.set('Upload-Draft-Interop-Version', '8')
.set('X-Immich-Asset-Data', makeAssetData())
.set('Upload-Complete', '?1')
.set('Upload-Length', '1024')
.send(buffer);
expect(status).toBe(400);
expect(body).toEqual(expect.objectContaining({ message: 'Missing repr-digest header' }));
});
it('should allow conventional upload without Upload-Complete header', async () => {
const { status } = await request(ctx.getHttpServer())
.post('/upload')
.set('X-Immich-Asset-Data', makeAssetData())
.set('Repr-Digest', checksum)
.set('Upload-Length', '1024')
.send(buffer);
expect(status).toBe(201);
});
it('should require Upload-Length header for incomplete upload', async () => {
const { status, body } = await request(ctx.getHttpServer())
.post('/upload')
.set('Upload-Draft-Interop-Version', '8')
.set('X-Immich-Asset-Data', makeAssetData())
.set('Repr-Digest', checksum)
.set('Upload-Complete', '?0')
.send(buffer);
expect(status).toBe(400);
expect(body).toEqual(expect.objectContaining({ message: 'Missing upload-length header' }));
});
it('should infer upload length from content length if complete upload', async () => {
const { status } = await request(ctx.getHttpServer())
.post('/upload')
.set('Upload-Draft-Interop-Version', '8')
.set('X-Immich-Asset-Data', makeAssetData())
.set('Repr-Digest', checksum)
.set('Upload-Complete', '?1')
.send(buffer);
expect(status).toBe(201);
});
it('should reject invalid Repr-Digest format', async () => {
const { status, body } = await request(ctx.getHttpServer())
.post('/upload')
.set('Upload-Draft-Interop-Version', '8')
.set('X-Immich-Asset-Data', checksum)
.set('Repr-Digest', 'invalid-format')
.set('Upload-Complete', '?1')
.set('Upload-Length', '1024')
.send(buffer);
expect(status).toBe(400);
expect(body).toEqual(expect.objectContaining({ message: 'Invalid repr-digest header' }));
});
it('should validate device-asset-id is required in asset data', async () => {
const assetData = serializeDictionary({
filename: 'test.jpg',
'device-id': 'test-device',
'file-created-at': new Date().toISOString(),
'file-modified-at': new Date().toISOString(),
});
const { status, body } = await request(ctx.getHttpServer())
.post('/upload')
.set('Upload-Draft-Interop-Version', '8')
.set('X-Immich-Asset-Data', assetData)
.set('Repr-Digest', checksum)
.set('Upload-Complete', '?1')
.set('Upload-Length', '1024')
.send(buffer);
expect(status).toBe(400);
expect(body).toEqual(
expect.objectContaining({
message: expect.arrayContaining([expect.stringContaining('deviceAssetId')]),
}),
);
});
it('should validate device-id is required in asset data', async () => {
const assetData = serializeDictionary({
filename: 'test.jpg',
'device-asset-id': 'test-asset',
'file-created-at': new Date().toISOString(),
'file-modified-at': new Date().toISOString(),
});
const { status, body } = await request(ctx.getHttpServer())
.post('/upload')
.set('Upload-Draft-Interop-Version', '8')
.set('X-Immich-Asset-Data', assetData)
.set('Repr-Digest', checksum)
.set('Upload-Complete', '?1')
.set('Upload-Length', '1024')
.send(buffer);
expect(status).toBe(400);
expect(body).toEqual(
expect.objectContaining({
message: expect.arrayContaining([expect.stringContaining('deviceId')]),
}),
);
});
it('should validate filename is required in asset data', async () => {
const assetData = serializeDictionary({
'device-asset-id': 'test-asset',
'device-id': 'test-device',
'file-created-at': new Date().toISOString(),
'file-modified-at': new Date().toISOString(),
});
const { status, body } = await request(ctx.getHttpServer())
.post('/upload')
.set('Upload-Draft-Interop-Version', '8')
.set('X-Immich-Asset-Data', assetData)
.set('Repr-Digest', checksum)
.set('Upload-Complete', '?1')
.set('Upload-Length', '1024')
.send(buffer);
expect(status).toBe(400);
expect(body).toEqual(
expect.objectContaining({
message: expect.arrayContaining([expect.stringContaining('filename')]),
}),
);
});
it('should accept Upload-Incomplete header for version 3', async () => {
const { body, status } = await request(ctx.getHttpServer())
.post('/upload')
.set('Upload-Draft-Interop-Version', '3')
.set('X-Immich-Asset-Data', makeAssetData())
.set('Repr-Digest', checksum)
.set('Upload-Incomplete', '?0')
.set('Upload-Complete', '?1')
.set('Upload-Length', '1024')
.send(buffer);
expect(body).toEqual({});
expect(status).not.toBe(400);
});
it('should validate Upload-Complete is a boolean structured field', async () => {
const { status, body } = await request(ctx.getHttpServer())
.post('/upload')
.set('Upload-Draft-Interop-Version', '8')
.set('X-Immich-Asset-Data', makeAssetData())
.set('Repr-Digest', checksum)
.set('Upload-Complete', 'true')
.set('Upload-Length', '1024')
.send(buffer);
expect(status).toBe(400);
expect(body).toEqual(expect.objectContaining({ message: 'upload-complete must be a structured boolean value' }));
});
it('should validate Upload-Length is a positive integer', async () => {
const { status, body } = await request(ctx.getHttpServer())
.post('/upload')
.set('Upload-Draft-Interop-Version', '8')
.set('X-Immich-Asset-Data', makeAssetData())
.set('Repr-Digest', checksum)
.set('Upload-Complete', '?1')
.set('Upload-Length', '-100')
.send(buffer);
expect(status).toBe(400);
expect(body).toEqual(
expect.objectContaining({
message: expect.arrayContaining(['uploadLength must not be less than 1']),
}),
);
});
});
describe('PATCH /upload/:id', () => {
const uploadId = factory.uuid();
it('should be an authenticated route', async () => {
await request(ctx.getHttpServer()).patch(`/upload/${uploadId}`);
expect(ctx.authenticate).toHaveBeenCalled();
});
it('should require Upload-Draft-Interop-Version header', async () => {
const { status, body } = await request(ctx.getHttpServer())
.patch(`/upload/${uploadId}`)
.set('Upload-Offset', '0')
.set('Upload-Complete', '?1')
.send(Buffer.from('test'));
expect(status).toBe(400);
expect(body).toEqual(
expect.objectContaining({
message: expect.arrayContaining(['version must be an integer number', 'version must not be less than 3']),
}),
);
});
it('should require Upload-Offset header', async () => {
const { status, body } = await request(ctx.getHttpServer())
.patch(`/upload/${uploadId}`)
.set('Upload-Draft-Interop-Version', '8')
.set('Upload-Complete', '?1')
.send(Buffer.from('test'));
expect(status).toBe(400);
expect(body).toEqual(
expect.objectContaining({
message: expect.arrayContaining([
'uploadOffset must be an integer number',
'uploadOffset must not be less than 0',
]),
}),
);
});
it('should require Upload-Complete header', async () => {
const { status, body } = await request(ctx.getHttpServer())
.patch(`/upload/${uploadId}`)
.set('Upload-Draft-Interop-Version', '8')
.set('Upload-Offset', '0')
.set('Content-Type', 'application/partial-upload')
.send(Buffer.from('test'));
expect(status).toBe(400);
expect(body).toEqual(expect.objectContaining({ message: ['uploadComplete must be a boolean value'] }));
});
it('should validate UUID parameter', async () => {
const { status, body } = await request(ctx.getHttpServer())
.patch('/upload/invalid-uuid')
.set('Upload-Draft-Interop-Version', '8')
.set('Upload-Offset', '0')
.set('Upload-Complete', '?0')
.send(Buffer.from('test'));
expect(status).toBe(400);
expect(body).toEqual(expect.objectContaining({ message: ['id must be a UUID'] }));
});
it('should validate Upload-Offset is a non-negative integer', async () => {
const { status, body } = await request(ctx.getHttpServer())
.patch(`/upload/${uploadId}`)
.set('Upload-Draft-Interop-Version', '8')
.set('Upload-Offset', '-50')
.set('Upload-Complete', '?0')
.send(Buffer.from('test'));
expect(status).toBe(400);
expect(body).toEqual(
expect.objectContaining({
message: expect.arrayContaining(['uploadOffset must not be less than 0']),
}),
);
});
it('should require Content-Type: application/partial-upload for version >= 6', async () => {
const { status, body } = await request(ctx.getHttpServer())
.patch(`/upload/${uploadId}`)
.set('Upload-Draft-Interop-Version', '6')
.set('Upload-Offset', '0')
.set('Upload-Complete', '?0')
.set('Content-Type', 'application/octet-stream')
.send(Buffer.from('test'));
expect(status).toBe(400);
expect(body).toEqual(
expect.objectContaining({
message: ['contentType must be equal to application/partial-upload'],
}),
);
});
it('should allow other Content-Type for version < 6', async () => {
const { body } = await request(ctx.getHttpServer())
.patch(`/upload/${uploadId}`)
.set('Upload-Draft-Interop-Version', '3')
.set('Upload-Offset', '0')
.set('Upload-Incomplete', '?1')
.set('Content-Type', 'application/octet-stream')
.send();
// Will fail for other reasons, but not content-type validation
expect(body).not.toEqual(
expect.objectContaining({
message: expect.arrayContaining([expect.stringContaining('contentType')]),
}),
);
});
it('should accept Upload-Incomplete header for version 3', async () => {
const { status } = await request(ctx.getHttpServer())
.patch(`/upload/${uploadId}`)
.set('Upload-Draft-Interop-Version', '3')
.set('Upload-Offset', '0')
.set('Upload-Incomplete', '?1')
.send();
// Should not fail validation
expect(status).not.toBe(400);
});
});
describe('DELETE /upload/:id', () => {
const uploadId = factory.uuid();
it('should be an authenticated route', async () => {
await request(ctx.getHttpServer()).delete(`/upload/${uploadId}`);
expect(ctx.authenticate).toHaveBeenCalled();
});
it('should validate UUID parameter', async () => {
const { status, body } = await request(ctx.getHttpServer()).delete('/upload/invalid-uuid');
expect(status).toBe(400);
expect(body).toEqual(expect.objectContaining({ message: ['id must be a UUID'] }));
});
});
describe('HEAD /upload/:id', () => {
const uploadId = factory.uuid();
it('should be an authenticated route', async () => {
await request(ctx.getHttpServer()).head(`/upload/${uploadId}`);
expect(ctx.authenticate).toHaveBeenCalled();
});
it('should require Upload-Draft-Interop-Version header', async () => {
const { status } = await request(ctx.getHttpServer()).head(`/upload/${uploadId}`);
expect(status).toBe(400);
});
it('should validate UUID parameter', async () => {
const { status } = await request(ctx.getHttpServer())
.head('/upload/invalid-uuid')
.set('Upload-Draft-Interop-Version', '8');
expect(status).toBe(400);
});
});
});

View File

@@ -0,0 +1,108 @@
import { Controller, Delete, Head, HttpCode, HttpStatus, Options, Param, Patch, Post, Req, Res } from '@nestjs/common';
import { ApiHeader, ApiOkResponse, ApiTags } from '@nestjs/swagger';
import { Request, Response } from 'express';
import { GetUploadStatusDto, Header, ResumeUploadDto, StartUploadDto, UploadOkDto } from 'src/dtos/asset-upload.dto';
import { AuthDto } from 'src/dtos/auth.dto';
import { ImmichHeader, Permission } from 'src/enum';
import { Auth, Authenticated } from 'src/middleware/auth.guard';
import { AssetUploadService } from 'src/services/asset-upload.service';
import { validateSyncOrReject } from 'src/utils/request';
import { UUIDParamDto } from 'src/validation';
const apiInteropVersion = {
name: Header.InteropVersion,
description: `Indicates the version of the RUFH protocol supported by the client.`,
required: true,
};
const apiUploadComplete = {
name: Header.UploadComplete,
description:
'Structured boolean indicating whether this request completes the file. Use Upload-Incomplete instead for version <= 3.',
required: true,
};
const apiContentLength = {
name: Header.ContentLength,
description: 'Non-negative size of the request body in bytes.',
required: true,
};
// This is important to let go of the asset lock for an inactive request
const SOCKET_TIMEOUT_MS = 30_000;
@ApiTags('Upload')
@Controller('upload')
export class AssetUploadController {
constructor(private service: AssetUploadService) {}
@Post()
@Authenticated({ sharedLink: true, permission: Permission.AssetUpload })
@ApiHeader({
name: ImmichHeader.AssetData,
description: `RFC 9651 structured dictionary containing asset metadata with the following keys:
- device-asset-id (string, required): Unique device asset identifier
- device-id (string, required): Device identifier
- file-created-at (string/date, required): ISO 8601 date string or Unix timestamp
- file-modified-at (string/date, required): ISO 8601 date string or Unix timestamp
- filename (string, required): Original filename
- is-favorite (boolean, optional): Favorite status
- live-photo-video-id (string, optional): Live photo ID for assets from iOS devices
- icloud-id (string, optional): iCloud identifier for assets from iOS devices`,
required: true,
example:
'device-asset-id="abc123", device-id="phone1", filename="photo.jpg", file-created-at="2024-01-01T00:00:00Z", file-modified-at="2024-01-01T00:00:00Z"',
})
@ApiHeader({
name: Header.ReprDigest,
description:
'RFC 9651 structured dictionary containing an `sha` (bytesequence) checksum used to detect duplicate files and validate data integrity.',
required: true,
})
@ApiHeader({ ...apiInteropVersion, required: false })
@ApiHeader({ ...apiUploadComplete, required: false })
@ApiHeader(apiContentLength)
@ApiOkResponse({ type: UploadOkDto })
startUpload(@Auth() auth: AuthDto, @Req() req: Request, @Res() res: Response): Promise<void> {
res.setTimeout(SOCKET_TIMEOUT_MS);
return this.service.startUpload(auth, req, res, validateSyncOrReject(StartUploadDto, req.headers));
}
@Patch(':id')
@Authenticated({ sharedLink: true, permission: Permission.AssetUpload })
@ApiHeader({
name: Header.UploadOffset,
description:
'Non-negative byte offset indicating the starting position of the data in the request body within the entire file.',
required: true,
})
@ApiHeader(apiInteropVersion)
@ApiHeader(apiUploadComplete)
@ApiHeader(apiContentLength)
@ApiOkResponse({ type: UploadOkDto })
resumeUpload(@Auth() auth: AuthDto, @Req() req: Request, @Res() res: Response, @Param() { id }: UUIDParamDto) {
res.setTimeout(SOCKET_TIMEOUT_MS);
return this.service.resumeUpload(auth, req, res, id, validateSyncOrReject(ResumeUploadDto, req.headers));
}
@Delete(':id')
@Authenticated({ sharedLink: true, permission: Permission.AssetUpload })
cancelUpload(@Auth() auth: AuthDto, @Res() res: Response, @Param() { id }: UUIDParamDto) {
res.setTimeout(SOCKET_TIMEOUT_MS);
return this.service.cancelUpload(auth, id, res);
}
@Head(':id')
@Authenticated({ sharedLink: true, permission: Permission.AssetUpload })
@ApiHeader(apiInteropVersion)
getUploadStatus(@Auth() auth: AuthDto, @Req() req: Request, @Res() res: Response, @Param() { id }: UUIDParamDto) {
res.setTimeout(SOCKET_TIMEOUT_MS);
return this.service.getUploadStatus(auth, res, id, validateSyncOrReject(GetUploadStatusDto, req.headers));
}
@Options()
@HttpCode(HttpStatus.NO_CONTENT)
getUploadOptions(@Res() res: Response) {
return this.service.getUploadOptions(res);
}
}

View File

@@ -3,6 +3,7 @@ import { AlbumController } from 'src/controllers/album.controller';
import { ApiKeyController } from 'src/controllers/api-key.controller';
import { AppController } from 'src/controllers/app.controller';
import { AssetMediaController } from 'src/controllers/asset-media.controller';
import { AssetUploadController } from 'src/controllers/asset-upload.controller';
import { AssetController } from 'src/controllers/asset.controller';
import { AuthAdminController } from 'src/controllers/auth-admin.controller';
import { AuthController } from 'src/controllers/auth.controller';
@@ -40,6 +41,7 @@ export const controllers = [
AppController,
AssetController,
AssetMediaController,
AssetUploadController,
AuthController,
AuthAdminController,
DownloadController,

View File

@@ -0,0 +1,196 @@
import { BadRequestException } from '@nestjs/common';
import { ApiProperty } from '@nestjs/swagger';
import { Expose, plainToInstance, Transform, Type } from 'class-transformer';
import { Equals, IsBoolean, IsInt, IsNotEmpty, IsString, Min, ValidateIf, ValidateNested } from 'class-validator';
import { ImmichHeader } from 'src/enum';
import { Optional, ValidateBoolean, ValidateDate } from 'src/validation';
import { parseDictionary } from 'structured-headers';
export enum Header {
ContentLength = 'content-length',
ContentType = 'content-type',
InteropVersion = 'upload-draft-interop-version',
ReprDigest = 'repr-digest',
UploadComplete = 'upload-complete',
UploadIncomplete = 'upload-incomplete',
UploadLength = 'upload-length',
UploadOffset = 'upload-offset',
}
export class UploadAssetDataDto {
@IsNotEmpty()
@IsString()
deviceAssetId!: string;
@IsNotEmpty()
@IsString()
deviceId!: string;
@ValidateDate()
fileCreatedAt!: Date;
@ValidateDate()
fileModifiedAt!: Date;
@IsString()
@IsNotEmpty()
filename!: string;
@ValidateBoolean({ optional: true })
isFavorite?: boolean;
@Optional()
@IsString()
@IsNotEmpty()
livePhotoVideoId?: string;
@Optional()
@IsString()
@IsNotEmpty()
iCloudId!: string;
}
export class BaseUploadHeadersDto {
@Expose({ name: Header.ContentLength })
@Min(0)
@IsInt()
@Type(() => Number)
contentLength!: number;
}
export class StartUploadDto extends BaseUploadHeadersDto {
@Expose({ name: Header.InteropVersion })
@Optional()
@Min(3)
@IsInt()
@Type(() => Number)
version?: number;
@Expose({ name: ImmichHeader.AssetData })
@ValidateNested()
@Transform(({ value }) => {
if (!value) {
throw new BadRequestException(`${ImmichHeader.AssetData} header is required`);
}
try {
const dict = parseDictionary(value);
return plainToInstance(UploadAssetDataDto, {
deviceAssetId: dict.get('device-asset-id')?.[0],
deviceId: dict.get('device-id')?.[0],
filename: dict.get('filename')?.[0],
duration: dict.get('duration')?.[0],
fileCreatedAt: dict.get('file-created-at')?.[0],
fileModifiedAt: dict.get('file-modified-at')?.[0],
isFavorite: dict.get('is-favorite')?.[0],
livePhotoVideoId: dict.get('live-photo-video-id')?.[0],
iCloudId: dict.get('icloud-id')?.[0],
});
} catch {
throw new BadRequestException(`${ImmichHeader.AssetData} must be a valid structured dictionary`);
}
})
assetData!: UploadAssetDataDto;
@Expose({ name: Header.ReprDigest })
@Transform(({ value }) => {
if (!value) {
throw new BadRequestException(`Missing ${Header.ReprDigest} header`);
}
const checksum = parseDictionary(value).get('sha')?.[0];
if (checksum instanceof ArrayBuffer && checksum.byteLength === 20) {
return Buffer.from(checksum);
}
throw new BadRequestException(`Invalid ${Header.ReprDigest} header`);
})
checksum!: Buffer;
@Expose()
@Min(1)
@IsInt()
@Transform(({ obj }) => {
const uploadLength = obj[Header.UploadLength];
if (uploadLength != undefined) {
return Number(uploadLength);
}
const contentLength = obj[Header.ContentLength];
if (contentLength && isUploadComplete(obj) !== false) {
return Number(contentLength);
}
throw new BadRequestException(`Missing ${Header.UploadLength} header`);
})
uploadLength!: number;
@Expose()
@Transform(({ obj }) => isUploadComplete(obj))
uploadComplete?: boolean;
}
export class ResumeUploadDto extends BaseUploadHeadersDto {
@Expose({ name: Header.InteropVersion })
@Min(3)
@IsInt()
@Type(() => Number)
version!: number;
@Expose({ name: Header.ContentType })
@ValidateIf((o) => o.version && o.version >= 6)
@Equals('application/partial-upload')
contentType!: string;
@Expose({ name: Header.UploadLength })
@Min(1)
@IsInt()
@Type(() => Number)
@Optional()
uploadLength?: number;
@Expose({ name: Header.UploadOffset })
@Min(0)
@IsInt()
@Type(() => Number)
uploadOffset!: number;
@Expose()
@IsBoolean()
@Transform(({ obj }) => isUploadComplete(obj))
uploadComplete!: boolean;
}
export class GetUploadStatusDto {
@Expose({ name: Header.InteropVersion })
@Min(3)
@IsInt()
@Type(() => Number)
version!: number;
}
export class UploadOkDto {
@ApiProperty()
id!: string;
}
const STRUCTURED_TRUE = '?1';
const STRUCTURED_FALSE = '?0';
function isUploadComplete(obj: any) {
const uploadComplete = obj[Header.UploadComplete];
if (uploadComplete === STRUCTURED_TRUE) {
return true;
} else if (uploadComplete === STRUCTURED_FALSE) {
return false;
} else if (uploadComplete !== undefined) {
throw new BadRequestException('upload-complete must be a structured boolean value');
}
const uploadIncomplete = obj[Header.UploadIncomplete];
if (uploadIncomplete === STRUCTURED_TRUE) {
return false;
} else if (uploadIncomplete === STRUCTURED_FALSE) {
return true;
} else if (uploadComplete !== undefined) {
throw new BadRequestException('upload-incomplete must be a structured boolean value');
}
}

View File

@@ -55,11 +55,23 @@ export class DatabaseBackupConfig {
keepLastAmount!: number;
}
export class UploadBackupConfig {
@IsInt()
@IsPositive()
@IsNotEmpty()
maxAgeHours!: number;
}
export class SystemConfigBackupsDto {
@Type(() => DatabaseBackupConfig)
@ValidateNested()
@IsObject()
database!: DatabaseBackupConfig;
@Type(() => UploadBackupConfig)
@ValidateNested()
@IsObject()
upload!: UploadBackupConfig;
}
export class SystemConfigFFmpegDto {
@@ -355,6 +367,9 @@ class SystemConfigNightlyTasksDto {
@ValidateBoolean()
syncQuotaUsage!: boolean;
@ValidateBoolean()
removeStaleUploads!: boolean;
}
class SystemConfigOAuthDto {

View File

@@ -20,6 +20,7 @@ export enum ImmichHeader {
SharedLinkSlug = 'x-immich-share-slug',
Checksum = 'x-immich-checksum',
Cid = 'x-immich-cid',
AssetData = 'x-immich-asset-data',
}
export enum ImmichQuery {
@@ -306,6 +307,7 @@ export enum AssetStatus {
Active = 'active',
Trashed = 'trashed',
Deleted = 'deleted',
Partial = 'partial',
}
export enum SourceType {
@@ -496,6 +498,7 @@ export enum BootstrapEventPriority {
JobService = -190,
// Initialise config after other bootstrap services, stop other services from using config on bootstrap
SystemConfig = 100,
UploadService = 200,
}
export enum QueueName {
@@ -532,6 +535,8 @@ export enum JobName {
AssetFileMigration = 'AssetFileMigration',
AssetGenerateThumbnailsQueueAll = 'AssetGenerateThumbnailsQueueAll',
AssetGenerateThumbnails = 'AssetGenerateThumbnails',
PartialAssetCleanup = 'PartialAssetCleanup',
PartialAssetCleanupQueueAll = 'PartialAssetCleanupQueueAll',
AuditLogCleanup = 'AuditLogCleanup',
AuditTableCleanup = 'AuditTableCleanup',

View File

@@ -14,6 +14,7 @@ from
left join "smart_search" on "asset"."id" = "smart_search"."assetId"
where
"asset"."id" = $1::uuid
and "asset"."status" != 'partial'
limit
$2
@@ -40,6 +41,7 @@ from
"asset"
where
"asset"."id" = $1::uuid
and "asset"."status" != 'partial'
limit
$2
@@ -52,6 +54,7 @@ from
"asset"
where
"asset"."id" = $1::uuid
and "asset"."status" != 'partial'
limit
$2
@@ -78,7 +81,8 @@ from
"asset"
inner join "asset_job_status" on "asset_job_status"."assetId" = "asset"."id"
where
"asset"."deletedAt" is null
"asset"."status" != 'partial'
and "asset"."deletedAt" is null
and "asset"."visibility" != $1
and (
"asset_job_status"."previewAt" is null
@@ -110,6 +114,7 @@ from
"asset"
where
"asset"."id" = $1
and "asset"."status" != 'partial'
-- AssetJobRepository.getForGenerateThumbnailJob
select
@@ -141,6 +146,7 @@ from
inner join "asset_exif" on "asset"."id" = "asset_exif"."assetId"
where
"asset"."id" = $1
and "asset"."status" != 'partial'
-- AssetJobRepository.getForMetadataExtraction
select
@@ -178,6 +184,7 @@ from
"asset"
where
"asset"."id" = $1
and "asset"."status" != 'partial'
-- AssetJobRepository.getAlbumThumbnailFiles
select
@@ -198,7 +205,8 @@ from
inner join "smart_search" on "asset"."id" = "smart_search"."assetId"
inner join "asset_job_status" as "job_status" on "job_status"."assetId" = "asset"."id"
where
"asset"."deletedAt" is null
"asset"."status" != 'partial'
and "asset"."deletedAt" is null
and "asset"."visibility" in ('archive', 'timeline')
and "job_status"."duplicatesDetectedAt" is null
@@ -210,6 +218,7 @@ from
inner join "asset_job_status" as "job_status" on "assetId" = "asset"."id"
where
"asset"."visibility" != $1
and "asset"."status" != 'partial'
and "asset"."deletedAt" is null
and "job_status"."previewAt" is not null
and not exists (
@@ -244,6 +253,7 @@ from
"asset"
where
"asset"."id" = $2
and "asset"."status" != 'partial'
-- AssetJobRepository.getForDetectFacesJob
select
@@ -284,6 +294,7 @@ from
inner join "asset_exif" on "asset"."id" = "asset_exif"."assetId"
where
"asset"."id" = $2
and "asset"."status" != 'partial'
-- AssetJobRepository.getForOcr
select
@@ -385,6 +396,7 @@ from
) as "stacked_assets" on "stack"."id" is not null
where
"asset"."id" = $2
and "asset"."status" != 'partial'
-- AssetJobRepository.streamForVideoConversion
select
@@ -398,6 +410,7 @@ where
or "asset"."encodedVideoPath" = $2
)
and "asset"."visibility" != $3
and "asset"."status" != 'partial'
and "asset"."deletedAt" is null
-- AssetJobRepository.getForVideoConversion
@@ -411,6 +424,7 @@ from
where
"asset"."id" = $1
and "asset"."type" = $2
and "asset"."status" != 'partial'
-- AssetJobRepository.streamForMetadataExtraction
select
@@ -423,6 +437,7 @@ where
"asset_job_status"."metadataExtractedAt" is null
or "asset_job_status"."assetId" is null
)
and "asset"."status" != 'partial'
and "asset"."deletedAt" is null
-- AssetJobRepository.getForStorageTemplateJob
@@ -443,7 +458,8 @@ from
"asset"
inner join "asset_exif" on "asset"."id" = "asset_exif"."assetId"
where
"asset"."deletedAt" is null
"asset"."status" != 'partial'
and "asset"."deletedAt" is null
and "asset"."id" = $1
-- AssetJobRepository.streamForStorageTemplateJob
@@ -464,7 +480,8 @@ from
"asset"
inner join "asset_exif" on "asset"."id" = "asset_exif"."assetId"
where
"asset"."deletedAt" is null
"asset"."status" != 'partial'
and "asset"."deletedAt" is null
-- AssetJobRepository.streamForDeletedJob
select
@@ -474,6 +491,7 @@ from
"asset"
where
"asset"."deletedAt" <= $1
and "asset"."status" != 'partial'
-- AssetJobRepository.streamForSidecar
select
@@ -486,6 +504,7 @@ where
or "asset"."sidecarPath" is null
)
and "asset"."visibility" != $2
and "asset"."status" != 'partial'
-- AssetJobRepository.streamForDetectFacesJob
select
@@ -495,8 +514,10 @@ from
inner join "asset_job_status" as "job_status" on "assetId" = "asset"."id"
where
"asset"."visibility" != $1
and "asset"."status" != 'partial'
and "asset"."deletedAt" is null
and "job_status"."previewAt" is not null
and "asset"."status" != 'partial'
order by
"asset"."fileCreatedAt" desc
@@ -517,4 +538,14 @@ select
from
"asset"
where
"asset"."deletedAt" is null
"asset"."status" != 'partial'
and "asset"."deletedAt" is null
-- AssetJobRepository.streamForPartialAssetCleanupJob
select
"id"
from
"asset"
where
"asset"."status" = 'partial'
and "asset"."createdAt" < $1

View File

@@ -46,6 +46,68 @@ where
"assetId" = $1
and "key" = $2
-- AssetRepository.getCompletionMetadata
select
"originalPath" as "path",
"status",
"fileModifiedAt",
"createdAt",
"checksum",
"fileSizeInByte" as "size"
from
"asset"
inner join "asset_exif" on "asset"."id" = "asset_exif"."assetId"
where
"id" = $1
and "ownerId" = $2
-- AssetRepository.setComplete
update "asset" as "complete_asset"
set
"status" = 'active',
"visibility" = case
when (
"complete_asset"."type" = 'VIDEO'
and exists (
select
from
"asset"
where
"complete_asset"."id" = "asset"."livePhotoVideoId"
)
) then 'hidden'::asset_visibility_enum
else 'timeline'::asset_visibility_enum
end
where
"id" = $1
and "status" = 'partial'
-- AssetRepository.removeAndDecrementQuota
with
"asset_exif" as (
select
"fileSizeInByte"
from
"asset_exif"
where
"assetId" = $1
),
"asset" as (
delete from "asset"
where
"id" = $2
returning
"ownerId"
)
update "user"
set
"quotaUsageInBytes" = "quotaUsageInBytes" - "fileSizeInByte"
from
"asset_exif",
"asset"
where
"user"."id" = "asset"."ownerId"
-- AssetRepository.getByDayOfYear
with
"res" as (
@@ -258,7 +320,9 @@ where
-- AssetRepository.getUploadAssetIdByChecksum
select
"id"
"id",
"status",
"createdAt"
from
"asset"
where

View File

@@ -1,10 +1,10 @@
import { Injectable } from '@nestjs/common';
import { Kysely } from 'kysely';
import { Kysely, sql } from 'kysely';
import { jsonArrayFrom } from 'kysely/helpers/postgres';
import { InjectKysely } from 'nestjs-kysely';
import { Asset, columns } from 'src/database';
import { DummyValue, GenerateSql } from 'src/decorators';
import { AssetFileType, AssetType, AssetVisibility } from 'src/enum';
import { AssetFileType, AssetStatus, AssetType, AssetVisibility } from 'src/enum';
import { DB } from 'src/schema';
import { StorageAsset } from 'src/types';
import {
@@ -29,6 +29,7 @@ export class AssetJobRepository {
return this.db
.selectFrom('asset')
.where('asset.id', '=', asUuid(id))
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.leftJoin('smart_search', 'asset.id', 'smart_search.assetId')
.select(['id', 'type', 'ownerId', 'duplicateId', 'stackId', 'visibility', 'smart_search.embedding'])
.limit(1)
@@ -40,6 +41,7 @@ export class AssetJobRepository {
return this.db
.selectFrom('asset')
.where('asset.id', '=', asUuid(id))
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.select(['id', 'sidecarPath', 'originalPath'])
.select((eb) =>
jsonArrayFrom(
@@ -59,6 +61,7 @@ export class AssetJobRepository {
return this.db
.selectFrom('asset')
.where('asset.id', '=', asUuid(id))
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.select(['id', 'sidecarPath', 'originalPath'])
.limit(1)
.executeTakeFirst();
@@ -70,6 +73,7 @@ export class AssetJobRepository {
.selectFrom('asset')
.select(['asset.id', 'asset.thumbhash'])
.select(withFiles)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.where('asset.deletedAt', 'is', null)
.where('asset.visibility', '!=', AssetVisibility.Hidden)
.$if(!force, (qb) =>
@@ -94,6 +98,7 @@ export class AssetJobRepository {
.select(['asset.id', 'asset.ownerId', 'asset.encodedVideoPath'])
.select(withFiles)
.where('asset.id', '=', id)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@@ -113,6 +118,7 @@ export class AssetJobRepository {
.select(withFiles)
.$call(withExifInner)
.where('asset.id', '=', id)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@@ -123,6 +129,7 @@ export class AssetJobRepository {
.select(columns.asset)
.select(withFaces)
.where('asset.id', '=', id)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@@ -140,6 +147,7 @@ export class AssetJobRepository {
return this.db
.selectFrom('asset')
.where('asset.visibility', '!=', AssetVisibility.Hidden)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.where('asset.deletedAt', 'is', null)
.innerJoin('asset_job_status as job_status', 'assetId', 'asset.id')
.where('job_status.previewAt', 'is not', null);
@@ -150,6 +158,7 @@ export class AssetJobRepository {
return this.db
.selectFrom('asset')
.select(['asset.id'])
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.where('asset.deletedAt', 'is', null)
.innerJoin('smart_search', 'asset.id', 'smart_search.assetId')
.$call(withDefaultVisibility)
@@ -178,6 +187,7 @@ export class AssetJobRepository {
.select(['asset.id', 'asset.visibility'])
.select((eb) => withFiles(eb, AssetFileType.Preview))
.where('asset.id', '=', id)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@@ -190,6 +200,7 @@ export class AssetJobRepository {
.select((eb) => withFaces(eb, true))
.select((eb) => withFiles(eb, AssetFileType.Preview))
.where('asset.id', '=', id)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@@ -251,6 +262,7 @@ export class AssetJobRepository {
)
.select((eb) => toJson(eb, 'stacked_assets').as('stack'))
.where('asset.id', '=', id)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@@ -265,6 +277,7 @@ export class AssetJobRepository {
.where((eb) => eb.or([eb('asset.encodedVideoPath', 'is', null), eb('asset.encodedVideoPath', '=', '')]))
.where('asset.visibility', '!=', AssetVisibility.Hidden),
)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.where('asset.deletedAt', 'is', null)
.stream();
}
@@ -276,6 +289,7 @@ export class AssetJobRepository {
.select(['asset.id', 'asset.ownerId', 'asset.originalPath', 'asset.encodedVideoPath'])
.where('asset.id', '=', id)
.where('asset.type', '=', AssetType.Video)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@@ -291,6 +305,7 @@ export class AssetJobRepository {
eb.or([eb('asset_job_status.metadataExtractedAt', 'is', null), eb('asset_job_status.assetId', 'is', null)]),
),
)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.where('asset.deletedAt', 'is', null)
.stream();
}
@@ -313,6 +328,7 @@ export class AssetJobRepository {
'asset_exif.timeZone',
'asset_exif.fileSizeInByte',
])
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.where('asset.deletedAt', 'is', null);
}
@@ -334,6 +350,7 @@ export class AssetJobRepository {
.selectFrom('asset')
.select(['id', 'isOffline'])
.where('asset.deletedAt', '<=', trashedBefore)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.stream();
}
@@ -346,6 +363,7 @@ export class AssetJobRepository {
qb.where((eb) => eb.or([eb('asset.sidecarPath', '=', ''), eb('asset.sidecarPath', 'is', null)])),
)
.where('asset.visibility', '!=', AssetVisibility.Hidden)
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.stream();
}
@@ -354,6 +372,7 @@ export class AssetJobRepository {
return this.assetsWithPreviews()
.$if(force === false, (qb) => qb.where('job_status.facesRecognizedAt', 'is', null))
.select(['asset.id'])
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.orderBy('asset.fileCreatedAt', 'desc')
.stream();
}
@@ -375,6 +394,31 @@ export class AssetJobRepository {
@GenerateSql({ params: [DummyValue.DATE], stream: true })
streamForMigrationJob() {
return this.db.selectFrom('asset').select(['id']).where('asset.deletedAt', 'is', null).stream();
return this.db
.selectFrom('asset')
.select(['id'])
.where('asset.status', '!=', sql.lit(AssetStatus.Partial))
.where('asset.deletedAt', 'is', null)
.stream();
}
getForPartialAssetCleanupJob(assetId: string) {
return this.db
.selectFrom('asset')
.innerJoin('asset_exif', 'asset.id', 'asset_exif.assetId')
.select(['originalPath as path', 'fileSizeInByte as size', 'checksum', 'fileModifiedAt'])
.where('id', '=', assetId)
.where('status', '=', sql.lit(AssetStatus.Partial))
.executeTakeFirst();
}
@GenerateSql({ params: [DummyValue.DATE], stream: true })
streamForPartialAssetCleanupJob(createdBefore: Date) {
return this.db
.selectFrom('asset')
.select(['id'])
.where('asset.status', '=', sql.lit(AssetStatus.Partial))
.where('asset.createdAt', '<', createdBefore)
.stream();
}
}

View File

@@ -255,6 +255,96 @@ export class AssetRepository {
return this.db.insertInto('asset').values(asset).returningAll().executeTakeFirstOrThrow();
}
createWithMetadata(asset: Insertable<AssetTable> & { id: string }, size: number, metadata?: AssetMetadataItem[]) {
let query = this.db;
if (asset.livePhotoVideoId) {
(query as any) = query.with('motion_asset', (qb) =>
qb
.updateTable('asset')
.set({ visibility: AssetVisibility.Hidden })
.where('id', '=', asset.livePhotoVideoId!)
.where('type', '=', sql.lit(AssetType.Video))
.where('ownerId', '=', asset.ownerId)
.returning('id'),
);
}
(query as any) = query
.with('asset', (qb) =>
qb
.insertInto('asset')
.values(
asset.livePhotoVideoId ? { ...asset, livePhotoVideoId: sql<string>`(select id from motion_asset)` } : asset,
)
.returning(['id', 'ownerId']),
)
.with('exif', (qb) =>
qb
.insertInto('asset_exif')
.columns(['assetId', 'fileSizeInByte'])
.expression((eb) => eb.selectFrom('asset').select(['asset.id', eb.val(size).as('fileSizeInByte')])),
);
if (metadata && metadata.length > 0) {
(query as any) = query.with('metadata', (qb) =>
qb.insertInto('asset_metadata').values(metadata.map(({ key, value }) => ({ assetId: asset.id, key, value }))),
);
}
return query
.updateTable('user')
.from('asset')
.set({ quotaUsageInBytes: sql`"quotaUsageInBytes" + ${size}` })
.whereRef('user.id', '=', 'asset.ownerId')
.execute();
}
@GenerateSql({ params: [DummyValue.UUID, DummyValue.UUID] })
getCompletionMetadata(assetId: string, ownerId: string) {
return this.db
.selectFrom('asset')
.innerJoin('asset_exif', 'asset.id', 'asset_exif.assetId')
.select(['originalPath as path', 'status', 'fileModifiedAt', 'createdAt', 'checksum', 'fileSizeInByte as size'])
.where('id', '=', assetId)
.where('ownerId', '=', ownerId)
.executeTakeFirst();
}
@GenerateSql({ params: [DummyValue.UUID] })
async setComplete(assetId: string) {
await this.db
.updateTable('asset as complete_asset')
.set((eb) => ({
status: sql.lit(AssetStatus.Active),
visibility: eb
.case()
.when(
eb.and([
eb('complete_asset.type', '=', sql.lit(AssetType.Video)),
eb.exists(eb.selectFrom('asset').whereRef('complete_asset.id', '=', 'asset.livePhotoVideoId')),
]),
)
.then(sql<AssetVisibility>`'hidden'::asset_visibility_enum`)
.else(sql<AssetVisibility>`'timeline'::asset_visibility_enum`)
.end(),
}))
.where('id', '=', assetId)
.where('status', '=', sql.lit(AssetStatus.Partial))
.execute();
}
@GenerateSql({ params: [DummyValue.UUID] })
async removeAndDecrementQuota(id: string): Promise<void> {
await this.db
.with('asset_exif', (qb) => qb.selectFrom('asset_exif').where('assetId', '=', id).select('fileSizeInByte'))
.with('asset', (qb) => qb.deleteFrom('asset').where('id', '=', id).returning('ownerId'))
.updateTable('user')
.from(['asset_exif', 'asset'])
.set({ quotaUsageInBytes: sql`"quotaUsageInBytes" - "fileSizeInByte"` })
.whereRef('user.id', '=', 'asset.ownerId')
.execute();
}
createAll(assets: Insertable<AssetTable>[]) {
return this.db.insertInto('asset').values(assets).returningAll().execute();
}
@@ -494,17 +584,15 @@ export class AssetRepository {
}
@GenerateSql({ params: [DummyValue.UUID, DummyValue.BUFFER] })
async getUploadAssetIdByChecksum(ownerId: string, checksum: Buffer): Promise<string | undefined> {
const asset = await this.db
getUploadAssetIdByChecksum(ownerId: string, checksum: Buffer) {
return this.db
.selectFrom('asset')
.select('id')
.select(['id', 'status', 'createdAt'])
.where('ownerId', '=', asUuid(ownerId))
.where('checksum', '=', checksum)
.where('libraryId', 'is', null)
.limit(1)
.executeTakeFirst();
return asset?.id;
}
findLivePhotoMatch(options: LivePhotoSearchOptions) {

View File

@@ -451,6 +451,20 @@ export class DatabaseRepository {
return res as R;
}
async withUuidLock<R>(uuid: string, callback: () => Promise<R>): Promise<R> {
let res;
await this.db.connection().execute(async (connection) => {
try {
await this.acquireUuidLock(uuid, connection);
res = await callback();
} finally {
await this.releaseUuidLock(uuid, connection);
}
});
return res as R;
}
tryLock(lock: DatabaseLock): Promise<boolean> {
return this.db.connection().execute(async (connection) => this.acquireTryLock(lock, connection));
}
@@ -467,6 +481,10 @@ export class DatabaseRepository {
await sql`SELECT pg_advisory_lock(${lock})`.execute(connection);
}
private async acquireUuidLock(uuid: string, connection: Kysely<DB>): Promise<void> {
await sql`SELECT pg_advisory_lock(uuid_hash_extended(${uuid}, 0))`.execute(connection);
}
private async acquireTryLock(lock: DatabaseLock, connection: Kysely<DB>): Promise<boolean> {
const { rows } = await sql<{
pg_try_advisory_lock: boolean;
@@ -477,4 +495,8 @@ export class DatabaseRepository {
private async releaseLock(lock: DatabaseLock, connection: Kysely<DB>): Promise<void> {
await sql`SELECT pg_advisory_unlock(${lock})`.execute(connection);
}
private async releaseUuidLock(uuid: string, connection: Kysely<DB>): Promise<void> {
await sql`SELECT pg_advisory_unlock(uuid_hash_extended(${uuid}, 0))`.execute(connection);
}
}

View File

@@ -79,6 +79,9 @@ type EventMap = {
// stack bulk events
StackDeleteAll: [{ stackIds: string[]; userId: string }];
// upload events
UploadAbort: [{ assetId: string; abortTime: Date }];
// user events
UserSignup: [{ notify: boolean; id: string; password?: string }];
UserCreate: [UserEvent];

View File

@@ -62,7 +62,11 @@ export class StorageRepository {
}
createWriteStream(filepath: string): Writable {
return createWriteStream(filepath, { flags: 'w' });
return createWriteStream(filepath, { flags: 'w', highWaterMark: 1024 * 1024 });
}
createOrAppendWriteStream(filepath: string): Writable {
return createWriteStream(filepath, { flags: 'a', highWaterMark: 1024 * 1024 });
}
createOrOverwriteFile(filepath: string, buffer: Buffer) {
@@ -156,10 +160,13 @@ export class StorageRepository {
}
}
mkdir(filepath: string): Promise<string | undefined> {
return fs.mkdir(filepath, { recursive: true });
}
mkdirSync(filepath: string): void {
if (!existsSync(filepath)) {
mkdirSync(filepath, { recursive: true });
}
// does not throw an error if the folder already exists
mkdirSync(filepath, { recursive: true });
}
existsSync(filepath: string) {

View File

@@ -16,7 +16,7 @@ import { ArgsOf, EventRepository } from 'src/repositories/event.repository';
import { LoggingRepository } from 'src/repositories/logging.repository';
import { handlePromiseError } from 'src/utils/misc';
export const serverEvents = ['ConfigUpdate'] as const;
export const serverEvents = ['ConfigUpdate', 'UploadAbort'] as const;
export type ServerEvents = (typeof serverEvents)[number];
export interface ClientEventMap {

View File

@@ -0,0 +1,9 @@
import { Kysely, sql } from 'kysely';
export async function up(db: Kysely<any>): Promise<void> {
await sql`ALTER TYPE "assets_status_enum" ADD VALUE IF NOT EXISTS 'partial'`.execute(db);
}
export async function down(): Promise<void> {
// Cannot remove enum values in PostgreSQL
}

View File

@@ -215,7 +215,11 @@ describe(AssetMediaService.name, () => {
});
it('should find an existing asset', async () => {
mocks.asset.getUploadAssetIdByChecksum.mockResolvedValue('asset-id');
mocks.asset.getUploadAssetIdByChecksum.mockResolvedValue({
id: 'asset-id',
createdAt: new Date(),
status: AssetStatus.Active,
});
await expect(sut.getUploadAssetIdByChecksum(authStub.admin, file1.toString('hex'))).resolves.toEqual({
id: 'asset-id',
status: AssetMediaStatus.DUPLICATE,
@@ -224,7 +228,11 @@ describe(AssetMediaService.name, () => {
});
it('should find an existing asset by base64', async () => {
mocks.asset.getUploadAssetIdByChecksum.mockResolvedValue('asset-id');
mocks.asset.getUploadAssetIdByChecksum.mockResolvedValue({
id: 'asset-id',
createdAt: new Date(),
status: AssetStatus.Active,
});
await expect(sut.getUploadAssetIdByChecksum(authStub.admin, file1.toString('base64'))).resolves.toEqual({
id: 'asset-id',
status: AssetMediaStatus.DUPLICATE,
@@ -378,7 +386,11 @@ describe(AssetMediaService.name, () => {
(error as any).constraint_name = ASSET_CHECKSUM_CONSTRAINT;
mocks.asset.create.mockRejectedValue(error);
mocks.asset.getUploadAssetIdByChecksum.mockResolvedValue(assetEntity.id);
mocks.asset.getUploadAssetIdByChecksum.mockResolvedValue({
id: assetEntity.id,
createdAt: new Date(),
status: AssetStatus.Active,
});
await expect(sut.uploadAsset(authStub.user1, createDto, file)).resolves.toEqual({
id: 'id_1',
@@ -803,7 +815,11 @@ describe(AssetMediaService.name, () => {
mocks.asset.update.mockRejectedValue(error);
mocks.asset.getById.mockResolvedValueOnce(sidecarAsset);
mocks.asset.getUploadAssetIdByChecksum.mockResolvedValue(sidecarAsset.id);
mocks.asset.getUploadAssetIdByChecksum.mockResolvedValue({
id: sidecarAsset.id,
createdAt: new Date(),
status: AssetStatus.Active,
});
mocks.access.asset.checkOwnerAccess.mockResolvedValue(new Set([sidecarAsset.id]));
// this is the original file size
mocks.storage.stat.mockResolvedValue({ size: 0 } as Stats);

View File

@@ -43,12 +43,12 @@ export class AssetMediaService extends BaseService {
return;
}
const assetId = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, fromChecksum(checksum));
if (!assetId) {
const asset = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, fromChecksum(checksum));
if (!asset) {
return;
}
return { id: assetId, status: AssetMediaStatus.DUPLICATE };
return { id: asset.id, status: AssetMediaStatus.DUPLICATE };
}
canUploadFile({ auth, fieldName, file }: UploadRequest): true {
@@ -165,6 +165,10 @@ export class AssetMediaService extends BaseService {
throw new Error('Asset not found');
}
if (asset.status === AssetStatus.Partial) {
throw new BadRequestException('Cannot replace a partial asset');
}
this.requireQuota(auth, file.size);
await this.replaceFileData(asset.id, dto, file, sidecarFile?.originalPath);
@@ -313,12 +317,12 @@ export class AssetMediaService extends BaseService {
// handle duplicates with a success response
if (isAssetChecksumConstraint(error)) {
const duplicateId = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, file.checksum);
if (!duplicateId) {
const duplicate = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, file.checksum);
if (!duplicate) {
this.logger.error(`Error locating duplicate for checksum constraint`);
throw new InternalServerErrorException();
}
return { status: AssetMediaStatus.DUPLICATE, id: duplicateId };
return { status: AssetMediaStatus.DUPLICATE, id: duplicate.id };
}
this.logger.error(`Error uploading file ${error}`, error?.stack);

View File

@@ -0,0 +1,456 @@
import { BadRequestException, InternalServerErrorException } from '@nestjs/common';
import { AssetMetadataKey, AssetStatus, AssetType, AssetVisibility, JobName, JobStatus } from 'src/enum';
import { AssetUploadService } from 'src/services/asset-upload.service';
import { ASSET_CHECKSUM_CONSTRAINT } from 'src/utils/database';
import { authStub } from 'test/fixtures/auth.stub';
import { factory } from 'test/small.factory';
import { newTestService, ServiceMocks } from 'test/utils';
describe(AssetUploadService.name, () => {
let sut: AssetUploadService;
let mocks: ServiceMocks;
beforeEach(() => {
({ sut, mocks } = newTestService(AssetUploadService));
});
describe('onStart', () => {
const mockDto = {
assetData: {
filename: 'test.jpg',
deviceAssetId: 'device-asset-1',
deviceId: 'device-1',
fileCreatedAt: new Date('2025-01-01T00:00:00Z'),
fileModifiedAt: new Date('2025-01-01T12:00:00Z'),
isFavorite: false,
iCloudId: '',
},
checksum: Buffer.from('checksum'),
uploadLength: 1024,
uploadComplete: true,
contentLength: 1024,
isComplete: true,
version: 8,
};
it('should create a new asset and return upload metadata', async () => {
const assetId = factory.uuid();
mocks.crypto.randomUUID.mockReturnValue(assetId);
const result = await sut.onStart(authStub.user1, mockDto);
expect(result).toEqual({
id: assetId,
path: expect.stringContaining(assetId),
status: AssetStatus.Partial,
isDuplicate: false,
});
expect(mocks.asset.createWithMetadata).toHaveBeenCalledWith(
expect.objectContaining({
id: assetId,
ownerId: authStub.user1.user.id,
checksum: mockDto.checksum,
deviceAssetId: mockDto.assetData.deviceAssetId,
deviceId: mockDto.assetData.deviceId,
fileCreatedAt: mockDto.assetData.fileCreatedAt,
fileModifiedAt: mockDto.assetData.fileModifiedAt,
type: AssetType.Image,
isFavorite: false,
status: AssetStatus.Partial,
visibility: AssetVisibility.Hidden,
originalFileName: 'test.jpg',
}),
1024,
undefined,
);
});
it('should determine asset type from filename extension', async () => {
const videoDto = { ...mockDto, assetData: { ...mockDto.assetData, filename: 'video.mp4' } };
mocks.crypto.randomUUID.mockReturnValue(factory.uuid());
await sut.onStart(authStub.user1, videoDto);
expect(mocks.asset.createWithMetadata).toHaveBeenCalledWith(
expect.objectContaining({
type: AssetType.Video,
}),
expect.anything(),
undefined,
);
});
it('should throw BadRequestException for unsupported file types', async () => {
const unsupportedDto = { ...mockDto, assetData: { ...mockDto.assetData, filename: 'document.xyz' } };
await expect(sut.onStart(authStub.user1, unsupportedDto)).rejects.toThrow(BadRequestException);
await expect(sut.onStart(authStub.user1, unsupportedDto)).rejects.toThrow('unsupported file type');
});
it('should validate quota before creating asset', async () => {
const authWithQuota = {
...authStub.user1,
user: {
...authStub.user1.user,
quotaSizeInBytes: 2000,
quotaUsageInBytes: 1500,
},
};
await expect(sut.onStart(authWithQuota, mockDto)).rejects.toThrow(BadRequestException);
await expect(sut.onStart(authWithQuota, mockDto)).rejects.toThrow('Quota has been exceeded');
});
it('should allow upload when quota is null (unlimited)', async () => {
const authWithUnlimitedQuota = {
...authStub.user1,
user: {
...authStub.user1.user,
quotaSizeInBytes: null,
quotaUsageInBytes: 1000,
},
};
mocks.crypto.randomUUID.mockReturnValue(factory.uuid());
await expect(sut.onStart(authWithUnlimitedQuota, mockDto)).resolves.toBeDefined();
});
it('should allow upload when within quota', async () => {
const authWithQuota = {
...authStub.user1,
user: {
...authStub.user1.user,
quotaSizeInBytes: 5000,
quotaUsageInBytes: 1000,
},
};
mocks.crypto.randomUUID.mockReturnValue(factory.uuid());
const result = await sut.onStart(authWithQuota, mockDto);
expect(result.isDuplicate).toBe(false);
});
it('should handle duplicate detection via checksum constraint', async () => {
const existingAssetId = factory.uuid();
const checksumError = new Error('duplicate key value violates unique constraint');
(checksumError as any).constraint_name = ASSET_CHECKSUM_CONSTRAINT;
mocks.asset.createWithMetadata.mockRejectedValue(checksumError);
mocks.asset.getUploadAssetIdByChecksum.mockResolvedValue({
id: existingAssetId,
status: AssetStatus.Partial,
createdAt: new Date(),
});
const result = await sut.onStart(authStub.user1, mockDto);
expect(result).toEqual({
id: existingAssetId,
path: expect.any(String),
status: AssetStatus.Partial,
isDuplicate: true,
});
expect(mocks.asset.getUploadAssetIdByChecksum).toHaveBeenCalledWith(authStub.user1.user.id, mockDto.checksum);
});
it('should throw InternalServerErrorException if duplicate lookup fails', async () => {
const checksumError = new Error('duplicate key value violates unique constraint');
(checksumError as any).constraint_name = ASSET_CHECKSUM_CONSTRAINT;
mocks.asset.createWithMetadata.mockRejectedValue(checksumError);
// eslint-disable-next-line unicorn/no-useless-undefined
mocks.asset.getUploadAssetIdByChecksum.mockResolvedValue(undefined);
await expect(sut.onStart(authStub.user1, mockDto)).rejects.toThrow(InternalServerErrorException);
});
it('should throw InternalServerErrorException for non-checksum errors', async () => {
const genericError = new Error('database connection failed');
mocks.asset.createWithMetadata.mockRejectedValue(genericError);
await expect(sut.onStart(authStub.user1, mockDto)).rejects.toThrow(InternalServerErrorException);
});
it('should include iCloud metadata when provided', async () => {
const dtoWithICloud = {
...mockDto,
assetData: {
...mockDto.assetData,
iCloudId: 'icloud-123',
},
};
mocks.crypto.randomUUID.mockReturnValue(factory.uuid());
await sut.onStart(authStub.user1, dtoWithICloud);
expect(mocks.asset.createWithMetadata).toHaveBeenCalledWith(expect.anything(), expect.anything(), [
{ key: AssetMetadataKey.MobileApp, value: { iCloudId: 'icloud-123' } },
]);
});
it('should set isFavorite when true', async () => {
const favoriteDto = {
...mockDto,
assetData: {
...mockDto.assetData,
isFavorite: true,
},
};
mocks.crypto.randomUUID.mockReturnValue(factory.uuid());
await sut.onStart(authStub.user1, favoriteDto);
expect(mocks.asset.createWithMetadata).toHaveBeenCalledWith(
expect.objectContaining({
isFavorite: true,
}),
expect.anything(),
undefined,
);
});
});
describe('onComplete', () => {
const assetId = factory.uuid();
const path = `/upload/${assetId}/file.jpg`;
const fileModifiedAt = new Date('2025-01-01T12:00:00Z');
it('should mark asset as complete and queue metadata extraction job', async () => {
await sut.onComplete({ id: assetId, path, fileModifiedAt });
expect(mocks.asset.setComplete).toHaveBeenCalledWith(assetId);
expect(mocks.job.queue).toHaveBeenCalledWith({
name: JobName.AssetExtractMetadata,
data: { id: assetId, source: 'upload' },
});
});
it('should update file modification time', async () => {
await sut.onComplete({ id: assetId, path, fileModifiedAt });
expect(mocks.storage.utimes).toHaveBeenCalledWith(path, expect.any(Date), fileModifiedAt);
});
it('should handle utimes failure gracefully', async () => {
mocks.storage.utimes.mockRejectedValue(new Error('Permission denied'));
await expect(sut.onComplete({ id: assetId, path, fileModifiedAt })).resolves.toBeUndefined();
// Should still complete asset and queue job
expect(mocks.asset.setComplete).toHaveBeenCalled();
expect(mocks.job.queue).toHaveBeenCalled();
});
it('should retry setComplete on transient failures', async () => {
mocks.asset.setComplete
.mockRejectedValueOnce(new Error('Transient error'))
.mockRejectedValueOnce(new Error('Transient error'))
.mockResolvedValue();
await sut.onComplete({ id: assetId, path, fileModifiedAt });
expect(mocks.asset.setComplete).toHaveBeenCalledTimes(3);
});
it('should retry job queueing on transient failures', async () => {
mocks.job.queue.mockRejectedValueOnce(new Error('Transient error')).mockResolvedValue();
await sut.onComplete({ id: assetId, path, fileModifiedAt });
expect(mocks.job.queue).toHaveBeenCalledTimes(2);
});
});
describe('onCancel', () => {
const assetId = factory.uuid();
const path = `/upload/${assetId}/file.jpg`;
it('should delete file and remove asset record', async () => {
await sut.onCancel(assetId, path);
expect(mocks.storage.unlink).toHaveBeenCalledWith(path);
expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledWith(assetId);
});
it('should retry unlink on transient failures', async () => {
mocks.storage.unlink.mockRejectedValueOnce(new Error('Transient error')).mockResolvedValue();
await sut.onCancel(assetId, path);
expect(mocks.storage.unlink).toHaveBeenCalledTimes(2);
});
it('should retry removeAndDecrementQuota on transient failures', async () => {
mocks.asset.removeAndDecrementQuota.mockRejectedValueOnce(new Error('Transient error')).mockResolvedValue();
await sut.onCancel(assetId, path);
expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledTimes(2);
});
});
describe('removeStaleUploads', () => {
it('should queue cleanup jobs for stale partial assets', async () => {
const staleAssets = [{ id: factory.uuid() }, { id: factory.uuid() }, { id: factory.uuid() }];
mocks.assetJob.streamForPartialAssetCleanupJob.mockReturnValue(
// eslint-disable-next-line @typescript-eslint/require-await
(async function* () {
for (const asset of staleAssets) {
yield asset;
}
})(),
);
await sut.removeStaleUploads();
expect(mocks.assetJob.streamForPartialAssetCleanupJob).toHaveBeenCalledWith(expect.any(Date));
expect(mocks.job.queueAll).toHaveBeenCalledWith([
{ name: JobName.PartialAssetCleanup, data: staleAssets[0] },
{ name: JobName.PartialAssetCleanup, data: staleAssets[1] },
{ name: JobName.PartialAssetCleanup, data: staleAssets[2] },
]);
});
it('should batch cleanup jobs', async () => {
const assets = Array.from({ length: 1500 }, () => ({ id: factory.uuid() }));
mocks.assetJob.streamForPartialAssetCleanupJob.mockReturnValue(
// eslint-disable-next-line @typescript-eslint/require-await
(async function* () {
for (const asset of assets) {
yield asset;
}
})(),
);
await sut.removeStaleUploads();
// Should be called twice: once for 1000, once for 500
expect(mocks.job.queueAll).toHaveBeenCalledTimes(2);
});
it('should handle empty stream', async () => {
mocks.assetJob.streamForPartialAssetCleanupJob.mockReturnValue((async function* () {})());
await sut.removeStaleUploads();
expect(mocks.job.queueAll).toHaveBeenCalledWith([]);
});
});
describe('removeStaleUpload', () => {
const assetId = factory.uuid();
const path = `/upload/${assetId}/file.jpg`;
it('should skip if asset not found', async () => {
// eslint-disable-next-line unicorn/no-useless-undefined
mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue(undefined);
const result = await sut.removeStaleUpload({ id: assetId });
expect(result).toBe(JobStatus.Skipped);
expect(mocks.storage.stat).not.toHaveBeenCalled();
});
it('should complete asset if file matches expected state', async () => {
const checksum = Buffer.from('checksum');
const fileModifiedAt = new Date();
mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue({
path,
checksum,
fileModifiedAt,
size: 1024,
});
mocks.storage.stat.mockResolvedValue({ size: 1024 } as any);
mocks.crypto.hashFile.mockResolvedValue(checksum);
const result = await sut.removeStaleUpload({ id: assetId });
expect(result).toBe(JobStatus.Success);
expect(mocks.asset.setComplete).toHaveBeenCalledWith(assetId);
expect(mocks.storage.unlink).not.toHaveBeenCalled();
});
it('should cancel asset if file size does not match', async () => {
mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue({
path,
checksum: Buffer.from('checksum'),
fileModifiedAt: new Date(),
size: 1024,
});
mocks.storage.stat.mockResolvedValue({ size: 512 } as any);
const result = await sut.removeStaleUpload({ id: assetId });
expect(result).toBe(JobStatus.Success);
expect(mocks.storage.unlink).toHaveBeenCalledWith(path);
expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledWith(assetId);
});
it('should cancel asset if checksum does not match', async () => {
mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue({
path,
checksum: Buffer.from('expected-checksum'),
fileModifiedAt: new Date(),
size: 1024,
});
mocks.storage.stat.mockResolvedValue({ size: 1024 } as any);
mocks.crypto.hashFile.mockResolvedValue(Buffer.from('actual-checksum'));
const result = await sut.removeStaleUpload({ id: assetId });
expect(result).toBe(JobStatus.Success);
expect(mocks.storage.unlink).toHaveBeenCalledWith(path);
expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledWith(assetId);
});
it('should cancel asset if file does not exist', async () => {
mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue({
path,
checksum: Buffer.from('checksum'),
fileModifiedAt: new Date(),
size: 1024,
});
const error = new Error('File not found') as NodeJS.ErrnoException;
error.code = 'ENOENT';
mocks.storage.stat.mockRejectedValue(error);
const result = await sut.removeStaleUpload({ id: assetId });
expect(result).toBe(JobStatus.Success);
expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledWith(assetId);
});
it('should cancel asset if stat fails with permission error', async () => {
mocks.assetJob.getForPartialAssetCleanupJob.mockResolvedValue({
path,
checksum: Buffer.from('checksum'),
fileModifiedAt: new Date(),
size: 1024,
});
const error = new Error('Permission denied') as NodeJS.ErrnoException;
error.code = 'EACCES';
mocks.storage.stat.mockRejectedValue(error);
const result = await sut.removeStaleUpload({ id: assetId });
expect(result).toBe(JobStatus.Success);
expect(mocks.asset.removeAndDecrementQuota).toHaveBeenCalledWith(assetId);
});
});
});

View File

@@ -0,0 +1,454 @@
import { BadRequestException, Injectable, InternalServerErrorException } from '@nestjs/common';
import { Response } from 'express';
import { DateTime } from 'luxon';
import { createHash } from 'node:crypto';
import { dirname, extname, join } from 'node:path';
import { Readable, Writable } from 'node:stream';
import { SystemConfig } from 'src/config';
import { JOBS_ASSET_PAGINATION_SIZE } from 'src/constants';
import { StorageCore } from 'src/cores/storage.core';
import { OnEvent, OnJob } from 'src/decorators';
import { GetUploadStatusDto, ResumeUploadDto, StartUploadDto } from 'src/dtos/asset-upload.dto';
import { AuthDto } from 'src/dtos/auth.dto';
import {
AssetMetadataKey,
AssetStatus,
AssetType,
AssetVisibility,
ImmichWorker,
JobName,
JobStatus,
QueueName,
StorageFolder,
} from 'src/enum';
import { ArgOf } from 'src/repositories/event.repository';
import { BaseService } from 'src/services/base.service';
import { JobItem, JobOf } from 'src/types';
import { isAssetChecksumConstraint } from 'src/utils/database';
import { mimeTypes } from 'src/utils/mime-types';
import { withRetry } from 'src/utils/misc';
export const MAX_RUFH_INTEROP_VERSION = 8;
@Injectable()
export class AssetUploadService extends BaseService {
// This is used to proactively abort previous requests for the same asset
// when a new one arrives. The previous request still holds the asset lock
// and will prevent the new request from proceeding until the previous one
// times out. As normal client behavior will not have concurrent requests,
// we can assume the previous request has already failed on the client end.
private activeRequests = new Map<string, { req: Readable; startTime: Date }>();
@OnEvent({ name: 'UploadAbort', workers: [ImmichWorker.Api], server: true })
onUploadAbort({ assetId, abortTime }: ArgOf<'UploadAbort'>) {
const entry = this.activeRequests.get(assetId);
if (!entry) {
return false;
}
if (abortTime > entry.startTime) {
entry.req.destroy();
this.activeRequests.delete(assetId);
}
return true;
}
async startUpload(auth: AuthDto, req: Readable, res: Response, dto: StartUploadDto): Promise<void> {
this.logger.verboseFn(() => `Starting upload: ${JSON.stringify(dto)}`);
const { uploadComplete, assetData, uploadLength, contentLength, version } = dto;
const isComplete = uploadComplete !== false;
const isResumable = version && uploadComplete !== undefined;
const { backup } = await this.getConfig({ withCache: true });
const asset = await this.onStart(auth, dto);
if (asset.isDuplicate) {
if (asset.status !== AssetStatus.Partial) {
return this.sendAlreadyCompleted(res);
}
const location = `/api/upload/${asset.id}`;
if (isResumable) {
this.sendInterimResponse(res, location, version, this.getUploadLimits(backup));
// this is a 5xx to indicate the client should do offset retrieval and resume
res.status(500).send('Incomplete asset already exists');
return;
}
}
if (isComplete && uploadLength !== contentLength) {
return this.sendInconsistentLength(res);
}
const location = `/api/upload/${asset.id}`;
if (isResumable) {
this.sendInterimResponse(res, location, version, this.getUploadLimits(backup));
}
this.addRequest(asset.id, req);
await this.databaseRepository.withUuidLock(asset.id, async () => {
// conventional upload, check status again with lock acquired before overwriting
if (asset.isDuplicate) {
const existingAsset = await this.assetRepository.getCompletionMetadata(asset.id, auth.user.id);
if (existingAsset?.status !== AssetStatus.Partial) {
return this.sendAlreadyCompleted(res);
}
}
await this.storageRepository.mkdir(dirname(asset.path));
let checksumBuffer: Buffer | undefined;
const writeStream = asset.isDuplicate
? this.storageRepository.createWriteStream(asset.path)
: this.storageRepository.createOrAppendWriteStream(asset.path);
this.pipe(req, writeStream, contentLength);
if (isComplete) {
const hash = createHash('sha1');
req.on('data', (data: Buffer) => hash.update(data));
writeStream.on('finish', () => (checksumBuffer = hash.digest()));
}
await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject));
if (isResumable) {
this.setCompleteHeader(res, version, uploadComplete);
}
if (!isComplete) {
res.status(201).set('Location', location).setHeader('Upload-Limit', this.getUploadLimits(backup)).send();
return;
}
if (dto.checksum.compare(checksumBuffer!) !== 0) {
return await this.sendChecksumMismatch(res, asset.id, asset.path);
}
await this.onComplete({ id: asset.id, path: asset.path, fileModifiedAt: assetData.fileModifiedAt });
res.status(200).send({ id: asset.id });
});
}
resumeUpload(auth: AuthDto, req: Readable, res: Response, id: string, dto: ResumeUploadDto): Promise<void> {
this.logger.verboseFn(() => `Resuming upload for ${id}: ${JSON.stringify(dto)}`);
const { uploadComplete, uploadLength, uploadOffset, contentLength, version } = dto;
this.setCompleteHeader(res, version, false);
this.addRequest(id, req);
return this.databaseRepository.withUuidLock(id, async () => {
const completionData = await this.assetRepository.getCompletionMetadata(id, auth.user.id);
if (!completionData) {
res.status(404).send('Asset not found');
return;
}
const { fileModifiedAt, path, status, checksum: providedChecksum, size } = completionData;
if (status !== AssetStatus.Partial) {
return this.sendAlreadyCompleted(res);
}
if (uploadLength && size && size !== uploadLength) {
return this.sendInconsistentLength(res);
}
const expectedOffset = await this.getCurrentOffset(path);
if (expectedOffset !== uploadOffset) {
return this.sendOffsetMismatch(res, expectedOffset, uploadOffset);
}
const newLength = uploadOffset + contentLength;
if (uploadLength !== undefined && newLength > uploadLength) {
res.status(400).send('Upload would exceed declared length');
return;
}
if (contentLength === 0 && !uploadComplete) {
res.status(204).setHeader('Upload-Offset', expectedOffset.toString()).send();
return;
}
const writeStream = this.storageRepository.createOrAppendWriteStream(path);
this.pipe(req, writeStream, contentLength);
await new Promise((resolve, reject) => writeStream.on('close', resolve).on('error', reject));
this.setCompleteHeader(res, version, uploadComplete);
if (!uploadComplete) {
try {
const offset = await this.getCurrentOffset(path);
res.status(204).setHeader('Upload-Offset', offset.toString()).send();
} catch {
this.logger.error(`Failed to get current offset for ${path} after write`);
res.status(500).send();
}
return;
}
const checksum = await this.cryptoRepository.hashFile(path);
if (providedChecksum.compare(checksum) !== 0) {
return await this.sendChecksumMismatch(res, id, path);
}
await this.onComplete({ id, path, fileModifiedAt });
res.status(200).send({ id });
});
}
cancelUpload(auth: AuthDto, assetId: string, res: Response): Promise<void> {
this.abortExistingRequest(assetId);
return this.databaseRepository.withUuidLock(assetId, async () => {
const asset = await this.assetRepository.getCompletionMetadata(assetId, auth.user.id);
if (!asset) {
res.status(404).send('Asset not found');
return;
}
if (asset.status !== AssetStatus.Partial) {
return this.sendAlreadyCompleted(res);
}
await this.onCancel(assetId, asset.path);
res.status(204).send();
});
}
async getUploadStatus(auth: AuthDto, res: Response, id: string, { version }: GetUploadStatusDto): Promise<void> {
this.logger.verboseFn(() => `Getting upload status for ${id} with version ${version}`);
const { backup } = await this.getConfig({ withCache: true });
this.abortExistingRequest(id);
return this.databaseRepository.withUuidLock(id, async () => {
const asset = await this.assetRepository.getCompletionMetadata(id, auth.user.id);
if (!asset) {
res.status(404).send('Asset not found');
return;
}
const offset = await this.getCurrentOffset(asset.path);
this.setCompleteHeader(res, version, asset.status !== AssetStatus.Partial);
res
.status(204)
.setHeader('Upload-Offset', offset.toString())
.setHeader('Cache-Control', 'no-store')
.setHeader('Upload-Limit', this.getUploadLimits(backup))
.send();
});
}
async getUploadOptions(res: Response): Promise<void> {
const { backup } = await this.getConfig({ withCache: true });
res.status(204).setHeader('Upload-Limit', this.getUploadLimits(backup)).send();
}
@OnJob({ name: JobName.PartialAssetCleanupQueueAll, queue: QueueName.BackgroundTask })
async removeStaleUploads(): Promise<void> {
const config = await this.getConfig({ withCache: false });
const createdBefore = DateTime.now().minus({ hours: config.backup.upload.maxAgeHours }).toJSDate();
let jobs: JobItem[] = [];
const assets = this.assetJobRepository.streamForPartialAssetCleanupJob(createdBefore);
for await (const asset of assets) {
jobs.push({ name: JobName.PartialAssetCleanup, data: asset });
if (jobs.length >= JOBS_ASSET_PAGINATION_SIZE) {
await this.jobRepository.queueAll(jobs);
jobs = [];
}
}
await this.jobRepository.queueAll(jobs);
}
@OnJob({ name: JobName.PartialAssetCleanup, queue: QueueName.BackgroundTask })
removeStaleUpload({ id }: JobOf<JobName.PartialAssetCleanup>): Promise<JobStatus> {
return this.databaseRepository.withUuidLock(id, async () => {
const asset = await this.assetJobRepository.getForPartialAssetCleanupJob(id);
if (!asset) {
return JobStatus.Skipped;
}
const { checksum, fileModifiedAt, path, size } = asset;
try {
const stat = await this.storageRepository.stat(path);
if (size === stat.size && checksum === (await this.cryptoRepository.hashFile(path))) {
await this.onComplete({ id, path, fileModifiedAt });
return JobStatus.Success;
}
} catch (error: any) {
this.logger.debugFn(() => `Failed to check upload file ${path}: ${error.message}`);
}
await this.onCancel(id, path);
return JobStatus.Success;
});
}
async onStart(
auth: AuthDto,
{ assetData, checksum, uploadLength }: StartUploadDto,
): Promise<{ id: string; path: string; status: AssetStatus; isDuplicate: boolean }> {
const assetId = this.cryptoRepository.randomUUID();
const folder = StorageCore.getNestedFolder(StorageFolder.Upload, auth.user.id, assetId);
const extension = extname(assetData.filename);
const path = join(folder, `${assetId}${extension}`);
const type = mimeTypes.assetType(path);
if (type === AssetType.Other) {
throw new BadRequestException(`${assetData.filename} is an unsupported file type`);
}
this.validateQuota(auth, uploadLength);
try {
await this.assetRepository.createWithMetadata(
{
id: assetId,
ownerId: auth.user.id,
libraryId: null,
checksum,
originalPath: path,
deviceAssetId: assetData.deviceAssetId,
deviceId: assetData.deviceId,
fileCreatedAt: assetData.fileCreatedAt,
fileModifiedAt: assetData.fileModifiedAt,
localDateTime: assetData.fileCreatedAt,
type,
isFavorite: assetData.isFavorite,
livePhotoVideoId: assetData.livePhotoVideoId,
visibility: AssetVisibility.Hidden,
originalFileName: assetData.filename,
status: AssetStatus.Partial,
},
uploadLength,
assetData.iCloudId ? [{ key: AssetMetadataKey.MobileApp, value: { iCloudId: assetData.iCloudId } }] : undefined,
);
} catch (error: any) {
if (!isAssetChecksumConstraint(error)) {
this.logger.error(`Error creating upload asset record: ${error.message}`);
throw new InternalServerErrorException('Error creating asset');
}
const duplicate = await this.assetRepository.getUploadAssetIdByChecksum(auth.user.id, checksum);
if (!duplicate) {
throw new InternalServerErrorException('Error locating duplicate for checksum constraint');
}
return { id: duplicate.id, path, status: duplicate.status, isDuplicate: true };
}
return { id: assetId, path, status: AssetStatus.Partial, isDuplicate: false };
}
async onComplete({ id, path, fileModifiedAt }: { id: string; path: string; fileModifiedAt: Date }) {
this.logger.log('Completing upload for asset', id);
const jobData = { name: JobName.AssetExtractMetadata, data: { id, source: 'upload' } } as const;
await withRetry(() => this.assetRepository.setComplete(id));
try {
await withRetry(() => this.storageRepository.utimes(path, new Date(), fileModifiedAt));
} catch (error: any) {
this.logger.error(`Failed to update times for ${path}: ${error.message}`);
}
await withRetry(() => this.jobRepository.queue(jobData));
}
async onCancel(assetId: string, path: string): Promise<void> {
this.logger.log('Cancelling upload for asset', assetId);
await withRetry(() => this.storageRepository.unlink(path));
await withRetry(() => this.assetRepository.removeAndDecrementQuota(assetId));
}
private addRequest(assetId: string, req: Readable) {
const addTime = new Date();
const activeRequest = { req, startTime: addTime };
this.abortExistingRequest(assetId, addTime);
this.activeRequests.set(assetId, activeRequest);
req.on('close', () => {
if (this.activeRequests.get(assetId)?.req === req) {
this.activeRequests.delete(assetId);
}
});
}
private abortExistingRequest(assetId: string, abortTime = new Date()) {
const abortEvent = { assetId, abortTime };
// only emit if we didn't just abort it ourselves
if (!this.onUploadAbort(abortEvent)) {
this.websocketRepository.serverSend('UploadAbort', abortEvent);
}
}
private pipe(req: Readable, writeStream: Writable, size: number) {
let receivedLength = 0;
req.on('data', (data: Buffer) => {
receivedLength += data.length;
if (!writeStream.write(data)) {
req.pause();
writeStream.once('drain', () => req.resume());
}
});
req.on('close', () => {
if (receivedLength < size) {
writeStream.emit('error', new Error('Request closed before all data received'));
}
writeStream.end();
});
}
private sendInterimResponse({ socket }: Response, location: string, interopVersion: number, limits: string): void {
if (socket && !socket.destroyed) {
// Express doesn't understand interim responses, so write directly to socket
socket.write(
'HTTP/1.1 104 Upload Resumption Supported\r\n' +
`Location: ${location}\r\n` +
`Upload-Limit: ${limits}\r\n` +
`Upload-Draft-Interop-Version: ${interopVersion}\r\n\r\n`,
);
}
}
private sendInconsistentLength(res: Response): void {
res.status(400).contentType('application/problem+json').send({
type: 'https://iana.org/assignments/http-problem-types#inconsistent-upload-length',
title: 'inconsistent length values for upload',
});
}
private sendAlreadyCompleted(res: Response): void {
res.status(400).contentType('application/problem+json').send({
type: 'https://iana.org/assignments/http-problem-types#completed-upload',
title: 'upload is already completed',
});
}
private sendOffsetMismatch(res: Response, expected: number, actual: number): void {
res.status(409).contentType('application/problem+json').setHeader('Upload-Offset', expected.toString()).send({
type: 'https://iana.org/assignments/http-problem-types#mismatching-upload-offset',
title: 'offset from request does not match offset of resource',
'expected-offset': expected,
'provided-offset': actual,
});
}
private sendChecksumMismatch(res: Response, assetId: string, path: string) {
this.logger.warn(`Removing upload asset ${assetId} due to checksum mismatch`);
res.status(460).send('File on server does not match provided checksum');
return this.onCancel(assetId, path);
}
private validateQuota(auth: AuthDto, size: number): void {
const { quotaSizeInBytes: quotaLimit, quotaUsageInBytes: currentUsage } = auth.user;
if (quotaLimit === null) {
return;
}
if (quotaLimit < currentUsage + size) {
throw new BadRequestException('Quota has been exceeded!');
}
}
private async getCurrentOffset(path: string): Promise<number> {
try {
const stat = await this.storageRepository.stat(path);
return stat.size;
} catch (error: any) {
if ((error as NodeJS.ErrnoException)?.code === 'ENOENT') {
return 0;
}
throw error;
}
}
private setCompleteHeader(res: Response, interopVersion: number | undefined, isComplete: boolean): void {
if (interopVersion === undefined || interopVersion > 3) {
res.setHeader('Upload-Complete', isComplete ? '?1' : '?0');
} else {
res.setHeader('Upload-Incomplete', isComplete ? '?0' : '?1');
}
}
private getUploadLimits({ upload }: SystemConfig['backup']) {
return `min-size=1, max-age=${upload.maxAgeHours * 3600}`;
}
}

View File

@@ -3,6 +3,7 @@ import { AlbumService } from 'src/services/album.service';
import { ApiKeyService } from 'src/services/api-key.service';
import { ApiService } from 'src/services/api.service';
import { AssetMediaService } from 'src/services/asset-media.service';
import { AssetUploadService } from 'src/services/asset-upload.service';
import { AssetService } from 'src/services/asset.service';
import { AuditService } from 'src/services/audit.service';
import { AuthAdminService } from 'src/services/auth-admin.service';
@@ -49,6 +50,7 @@ export const services = [
AlbumService,
ApiService,
AssetMediaService,
AssetUploadService,
AssetService,
AuditService,
AuthService,

View File

@@ -48,6 +48,7 @@ describe(JobService.name, () => {
{ name: JobName.UserSyncUsage },
{ name: JobName.AssetGenerateThumbnailsQueueAll, data: { force: false } },
{ name: JobName.FacialRecognitionQueueAll, data: { force: false, nightly: true } },
{ name: JobName.PartialAssetCleanupQueueAll },
]);
});
});

View File

@@ -303,6 +303,10 @@ export class JobService extends BaseService {
jobs.push({ name: JobName.FacialRecognitionQueueAll, data: { force: false, nightly: true } });
}
if (config.nightlyTasks.removeStaleUploads) {
jobs.push({ name: JobName.PartialAssetCleanupQueueAll });
}
await this.jobRepository.queueAll(jobs);
}

View File

@@ -47,6 +47,9 @@ const updatedConfig = Object.freeze<SystemConfig>({
cronExpression: '0 02 * * *',
keepLastAmount: 14,
},
upload: {
maxAgeHours: 72,
},
},
ffmpeg: {
crf: 30,
@@ -123,6 +126,7 @@ const updatedConfig = Object.freeze<SystemConfig>({
missingThumbnails: true,
generateMemories: true,
syncQuotaUsage: true,
removeStaleUploads: true,
},
reverseGeocoding: {
enabled: true,

View File

@@ -352,6 +352,8 @@ export type JobItem =
| { name: JobName.PersonCleanup; data?: IBaseJob }
| { name: JobName.AssetDelete; data: IAssetDeleteJob }
| { name: JobName.AssetDeleteCheck; data?: IBaseJob }
| { name: JobName.PartialAssetCleanup; data: IEntityJob }
| { name: JobName.PartialAssetCleanupQueueAll; data?: IBaseJob }
// Library Management
| { name: JobName.LibrarySyncFiles; data: ILibraryFileJob }

View File

@@ -99,7 +99,7 @@ export const getKyselyConfig = (
}),
}),
log(event) {
if (event.level === 'error') {
if (event.level === 'error' && (event.error as PostgresError).constraint_name !== ASSET_CHECKSUM_CONSTRAINT) {
console.error('Query failed :', {
durationMs: event.queryDurationMillis,
error: event.error,

View File

@@ -14,6 +14,7 @@ import {
import _ from 'lodash';
import { writeFileSync } from 'node:fs';
import path from 'node:path';
import { setTimeout } from 'node:timers/promises';
import picomatch from 'picomatch';
import parse from 'picomatch/lib/parse';
import { SystemConfig } from 'src/config';
@@ -326,3 +327,18 @@ export const globToSqlPattern = (glob: string) => {
export function clamp(value: number, min: number, max: number) {
return Math.max(min, Math.min(max, value));
}
export async function withRetry<T>(operation: () => Promise<T>, retries: number = 2, delay: number = 100): Promise<T> {
let lastError: any;
for (let attempt = 0; attempt <= retries; attempt++) {
try {
return await operation();
} catch (error: any) {
lastError = error;
}
if (attempt < retries) {
await setTimeout(delay);
}
}
throw lastError;
}

View File

@@ -1,3 +1,7 @@
import { BadRequestException } from '@nestjs/common';
import { plainToInstance } from 'class-transformer';
import { validateSync } from 'class-validator';
import { IncomingHttpHeaders } from 'node:http';
import { UAParser } from 'ua-parser-js';
@@ -20,3 +24,29 @@ export const getUserAgentDetails = (headers: IncomingHttpHeaders) => {
appVersion,
};
};
export function validateSyncOrReject<T extends object>(cls: new () => T, obj: any): T {
const dto = plainToInstance(cls, obj, { excludeExtraneousValues: true });
const errors = validateSync(dto);
if (errors.length === 0) {
return dto;
}
const constraints = [];
for (const error of errors) {
if (error.constraints) {
constraints.push(...Object.values(error.constraints));
}
if (!error.children) {
continue;
}
for (const child of error.children) {
if (child.constraints) {
constraints.push(...Object.values(child.constraints));
}
}
}
throw new BadRequestException(constraints);
}

View File

@@ -45,5 +45,9 @@ export const newAssetRepositoryMock = (): Mocked<RepositoryInterface<AssetReposi
upsertMetadata: vitest.fn(),
getMetadataByKey: vitest.fn(),
deleteMetadataByKey: vitest.fn(),
getCompletionMetadata: vitest.fn(),
createWithMetadata: vitest.fn(),
removeAndDecrementQuota: vitest.fn(),
setComplete: vitest.fn(),
};
};

View File

@@ -20,6 +20,7 @@ export const newDatabaseRepositoryMock = (): Mocked<RepositoryInterface<Database
prewarm: vitest.fn(),
runMigrations: vitest.fn(),
withLock: vitest.fn().mockImplementation((_, function_: <R>() => Promise<R>) => function_()),
withUuidLock: vitest.fn().mockImplementation((_, function_: <R>() => Promise<R>) => function_()),
tryLock: vitest.fn(),
isBusy: vitest.fn(),
wait: vitest.fn(),

View File

@@ -51,6 +51,7 @@ export const newStorageRepositoryMock = (): Mocked<RepositoryInterface<StorageRe
readFile: vitest.fn(),
createFile: vitest.fn(),
createWriteStream: vitest.fn(),
createOrAppendWriteStream: vitest.fn(),
createOrOverwriteFile: vitest.fn(),
existsSync: vitest.fn(),
overwriteFile: vitest.fn(),
@@ -58,6 +59,7 @@ export const newStorageRepositoryMock = (): Mocked<RepositoryInterface<StorageRe
unlinkDir: vitest.fn().mockResolvedValue(true),
removeEmptyDirs: vitest.fn(),
checkFileExists: vitest.fn(),
mkdir: vitest.fn(),
mkdirSync: vitest.fn(),
checkDiskUsage: vitest.fn(),
readdir: vitest.fn(),