Compare commits

...

1 Commits

Author SHA1 Message Date
shenlong-tanwen
f0fcc3a0e3 base job implementation 2025-10-22 21:53:51 +05:30
15 changed files with 1493 additions and 0 deletions

View File

@@ -0,0 +1,223 @@
import 'dart:async';
import 'dart:isolate';
import 'dart:ui';
import 'package:flutter/services.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/domain/jobs/job_context.dart';
import 'package:immich_mobile/domain/jobs/job_messages.dart';
import 'package:immich_mobile/domain/jobs/job_status.dart';
import 'package:immich_mobile/domain/jobs/job_types.dart';
import 'package:immich_mobile/domain/services/log.service.dart';
import 'package:immich_mobile/entities/store.entity.dart';
import 'package:immich_mobile/extensions/completer_extensions.dart';
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
import 'package:immich_mobile/infrastructure/repositories/logger_db.repository.dart';
import 'package:immich_mobile/providers/db.provider.dart';
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
import 'package:immich_mobile/utils/bootstrap.dart';
import 'package:immich_mobile/utils/debug_print.dart';
import 'package:isar/isar.dart';
import 'package:logging/logging.dart';
part 'job_entrypoint.dart';
final _jobLogger = Logger('Job');
abstract class Job<Input, Output> {
Isolate? _isolate;
SendPort? _jobPort;
ReceivePort? _receivePort;
final Map<String, StreamController<JobProgress<Output>>> _jobProgressControllers = {};
bool get isRunning => _isolate != null;
// ═══════════════════════════════════════════════════════════
// ABSTRACT METHODS / FIELDS - Concrete implementations override these
// ═══════════════════════════════════════════════════════════
JobType get type;
Duration get idleInterval;
Future<void> onIdle();
Duration get shutdownTimeout => const Duration(seconds: 5);
// ═══════════════════════════════════════════════════════════
// CONCRETE METHODS
// ═══════════════════════════════════════════════════════════
Timer? _idleTimer;
void _resetIdleTimer() {
_idleTimer?.cancel();
_idleTimer = Timer.periodic(idleInterval, (_) async {
await onIdle();
});
}
void _cancelIdleTimer() {
_idleTimer?.cancel();
_idleTimer = null;
}
Future<void> start() async {
if (_isolate != null) return;
final rootToken = RootIsolateToken.instance;
if (rootToken == null) {
throw StateError('RootIsolateToken is not available. Ensure that the job is started from the main isolate.');
}
_receivePort = ReceivePort();
_receivePort!.listen(_handleMessage);
_isolate = await Isolate.spawn(_isolateEntryPoint, (
type: type,
token: rootToken,
mainPort: _receivePort!.sendPort,
), debugName: type.id);
_resetIdleTimer();
}
String trigger(Input input) {
if (_jobPort == null) {
throw StateError('Job not started. Call start() before triggering jobs.');
}
final jobId = _generateJobId();
_jobPort!.send(JobTriggerRequest<Input>(jobId: jobId, input: input));
_jobProgressControllers[jobId] = StreamController<JobProgress<Output>>.broadcast();
_resetIdleTimer();
return jobId;
}
Future<Output?> run(Input input) async {
if (_jobPort == null) {
throw StateError('Job not started. Call start() before running jobs.');
}
final jobId = _generateJobId();
_jobPort!.send(JobTriggerRequest<Input>(jobId: jobId, input: input));
_jobProgressControllers[jobId] = StreamController<JobProgress<Output>>.broadcast();
_resetIdleTimer();
final progress = await waitFor(jobId);
return progress.result;
}
int _jobCounter = 0;
String _generateJobId() => '${type.id}_${DateTime.now().millisecondsSinceEpoch}_${_jobCounter++}';
void cancel({required String jobId}) {
if (_jobPort == null) {
throw StateError('Job not started. Call start() before cancelling jobs.');
}
_jobPort!.send(JobCancelRequest(jobId: jobId));
}
void cancelAll() {
if (_jobPort == null) {
throw StateError('Job not started. Call start() before cancelling jobs.');
}
_cancelIdleTimer();
_jobPort!.send(const IsolateCancelAllRequest());
}
Stream<JobProgress> watch(String jobId) {
if (!_jobProgressControllers.containsKey(jobId)) {
throw StateError('No job found with ID: $jobId. Trigger the job before watching its progress.');
}
return _jobProgressControllers[jobId]!.stream;
}
Future<JobProgress> waitFor(String jobId) {
return watch(jobId).firstWhere((p) => p.isComplete || p.isError || p.isCancelled);
}
Future<void> stop() async {
if (_isolate == null) return;
_cancelIdleTimer();
if (_jobPort == null) {
_jobLogger.warning('Job isolate is running but job port is null for type: $this.id.');
}
final jobPort = _jobPort;
final isolate = _isolate;
_jobPort = null;
_isolate = null;
jobPort?.send(const IsolateShutdownRequest());
// Wait for all jobs to complete before killing the isolate
if (_jobProgressControllers.isNotEmpty) {
await Future.wait(_jobProgressControllers.keys.map((id) => waitFor(id))).timeout(
shutdownTimeout,
onTimeout: () {
_jobLogger.warning(
'Timeout waiting for jobs to complete during shutdown of job type: $this.id. Force killing isolate.',
);
return <JobProgress>[];
},
);
}
isolate?.kill(priority: Isolate.immediate);
_receivePort?.close();
_receivePort = null;
for (final controller in _jobProgressControllers.values) {
await controller.close();
}
_jobProgressControllers.clear();
}
void _handleMessage(dynamic message) {
switch (message) {
case IsolateReadyResponse msg:
_jobPort = msg.requestPort;
case JobProgressResponse msg:
final progress = JobProgress<Output>.running(progress: msg.progress, current: msg.current, total: msg.total);
_addJobProgress(msg.jobId, progress, close: false);
case JobCompleteResponse msg:
final progress = JobProgress<Output>.completed(result: msg.result);
_addJobProgress(msg.jobId, progress);
case JobErrorResponse msg:
final progress = JobProgress<Output>.error(msg.error);
_addJobProgress(msg.jobId, progress);
case JobCancelledResponse msg:
final progress = JobProgress<Output>.cancelled();
_addJobProgress(msg.jobId, progress);
case JobSkippedResponse msg:
final progress = JobProgress<Output>.skipped();
_addJobProgress(msg.jobId, progress);
case IsolateErrorResponse msg:
_jobLogger.severe('Isolate error: ${msg.error}\nStackTrace: ${msg.stackTrace}');
}
}
void _addJobProgress(String jobId, JobProgress<Output> progress, {bool close = true}) {
final controller = _jobProgressControllers[jobId];
if (controller == null) {
_jobLogger.warning('Received progress update for unknown job: $jobId');
return;
}
controller.add(progress);
if (close) {
controller.close();
_jobProgressControllers.remove(jobId);
}
}
}

View File

@@ -0,0 +1,38 @@
import 'dart:isolate';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/domain/jobs/job_messages.dart';
class JobContext {
final String id;
final ProviderContainer ref;
final SendPort _mainPort;
const JobContext({required this.id, required this.ref, required SendPort mainPort}) : _mainPort = mainPort;
ProviderResult read<ProviderResult>(ProviderListenable<ProviderResult> provider) => provider.read(ref);
void reportJobProgress({required String jobId, double? progress, int? current, int? total}) {
_mainPort.send(JobProgressResponse(jobId: jobId, progress: progress, current: current, total: total));
}
void reportJobComplete<Result>({required String jobId, Result? result}) {
_mainPort.send(JobCompleteResponse(jobId: jobId, result: result));
}
void reportJobError({required String jobId, required String error, String? stackTrace}) {
_mainPort.send(JobErrorResponse(jobId: jobId, error: error, stackTrace: stackTrace));
}
void reportIsolateError({required String error, String? stackTrace}) {
_mainPort.send(IsolateErrorResponse(error: error, stackTrace: stackTrace));
}
void reportJobCancelled({required String jobId}) {
_mainPort.send(JobCancelledResponse(jobId: jobId));
}
void reportJobSkipped({required String jobId}) {
_mainPort.send(JobSkippedResponse(jobId: jobId));
}
}

View File

@@ -0,0 +1,76 @@
part of 'job.dart';
typedef _IsolateStartMessage = ({RootIsolateToken token, JobType type, SendPort mainPort});
Future<void> _isolateEntryPoint(_IsolateStartMessage message) async {
final _IsolateStartMessage(:token, :mainPort, :type) = message;
final logger = Logger(type.id);
final isolateCompleter = Completer<void>();
try {
BackgroundIsolateBinaryMessenger.ensureInitialized(token);
DartPluginRegistrant.ensureInitialized();
final (isar, drift, logDb) = await Bootstrap.initDB();
await Bootstrap.initDomain(isar, drift, logDb, shouldBufferLogs: false, listenStoreUpdates: false);
final container = ProviderContainer(
overrides: [
dbProvider.overrideWithValue(isar),
isarProvider.overrideWithValue(isar),
driftProvider.overrideWith(driftOverride(drift)),
],
);
final context = JobContext(id: type.id, ref: container, mainPort: mainPort);
final receivePort = ReceivePort();
mainPort.send(IsolateReadyResponse(receivePort.sendPort));
final handler = getJobHandler(type, context);
receivePort.listen((message) async {
try {
switch (message) {
case JobTriggerRequest msg:
handler.queue(msg.jobId, msg.input);
case JobCancelRequest msg:
await handler.cancel(msg.jobId);
case IsolateCancelAllRequest _:
await handler.cancelAll();
case IsolateShutdownRequest _:
receivePort.close();
await handler.shutdown();
await _cleanup(container, isar, drift, logDb);
isolateCompleter.completeOnce();
}
} catch (e, stack) {
logger.severe('Error handling command', e, stack);
context.reportIsolateError(error: 'Command error: $e', stackTrace: stack.toString());
}
});
} catch (e, stack) {
logger.severe('Job fatal error', e, stack);
mainPort.send(IsolateErrorResponse(error: 'Job crashed: $e', stackTrace: stack.toString()));
isolateCompleter.completeOnce();
}
await isolateCompleter.future;
}
Future<void> _cleanup(ProviderContainer container, Isar isar, Drift drift, DriftLogger logDb) async {
try {
container.dispose();
await Store.dispose();
await LogService.I.dispose();
await logDb.close();
await drift.close();
if (isar.isOpen) {
await isar.close();
}
} catch (error, stack) {
dPrint(() => 'Job Cleanup error: $error, stack: $stack');
}
}

View File

@@ -0,0 +1,161 @@
import 'dart:async';
import 'dart:collection';
import 'package:cancellation_token_http/http.dart';
import 'package:flutter/foundation.dart';
import 'package:immich_mobile/domain/jobs/job_context.dart';
import 'package:immich_mobile/extensions/completer_extensions.dart';
import 'package:logging/logging.dart';
abstract class JobHandler<Input> {
late final Logger logger;
final JobContext context;
final Queue<String> _jobQueue = Queue();
final Map<String, _JobState<Input>> _jobs = {};
int get runningCount => _jobs.values.where((s) => s.isRunning).length;
bool _isProcessing = false;
JobHandler({required this.context}) {
logger = Logger(context.id);
}
// ═══════════════════════════════════════════════════════════
// ABSTRACT METHODS / FIELDS - Concrete implementations override these
// ═══════════════════════════════════════════════════════════
Future<void> processJob(String jobId, Input input, CancellationToken token);
int get maxConcurrentJobs => 1;
Duration get cancellationTimeout => const Duration(seconds: 1);
bool shouldSkipInput(Input newInput, Input existingInput) => false;
// ═══════════════════════════════════════════════════════════
// CONCRETE METHODS
// ═══════════════════════════════════════════════════════════
@mustCallSuper
void queue(String jobId, Input input) async {
if (_shouldSkipInput(input)) {
context.reportJobSkipped(jobId: jobId);
return;
}
final token = CancellationToken();
_jobs[jobId] = _JobState(jobId: jobId, input: input, token: token, completer: Completer<void>(), isRunning: false);
_jobQueue.add(jobId);
_processQueue();
}
bool _shouldSkipInput(Input input) {
for (final job in _jobs.values) {
if (shouldSkipInput(input, job.input)) {
return true;
}
}
return false;
}
Future<void> _processQueue() async {
if (_isProcessing) {
return;
}
_isProcessing = true;
while (_jobQueue.isNotEmpty && runningCount < maxConcurrentJobs) {
final jobId = _jobQueue.removeFirst();
final jobState = _jobs[jobId];
if (jobState == null) {
logger.warning('Job state not found for jobId: $jobId');
continue;
}
if (jobState.token.isCancelled) {
context.reportJobCancelled(jobId: jobId);
jobState.completer.completeOnce();
_jobs.remove(jobId);
continue;
}
jobState.isRunning = true;
_runJob(jobState).then((_) {
_jobs[jobId]?.completer.completeOnce();
_jobs.remove(jobId);
_processQueue();
});
}
_isProcessing = false;
}
Future<void> _runJob(_JobState<Input> job) async {
try {
await processJob(job.jobId, job.input, job.token);
if (!job.token.isCancelled) {
context.reportJobComplete(jobId: job.jobId);
}
} on CancelledException {
context.reportJobCancelled(jobId: job.jobId);
} catch (e, stack) {
context.reportJobError(jobId: job.jobId, error: '$e', stackTrace: stack.toString());
}
}
@mustCallSuper
Future<void> cancel(String jobId) async {
final jobState = _jobs[jobId];
if (jobState == null) return;
jobState.token.cancel();
if (!jobState.isRunning) {
_jobQueue.removeWhere((id) => id == jobId);
}
await jobState.completer.future.timeout(cancellationTimeout, onTimeout: () {});
}
@mustCallSuper
Future<void> cancelAll() async {
for (final state in _jobs.values) {
state.token.cancel();
}
_jobQueue.clear();
if (_jobs.isNotEmpty) {
await Future.wait(_jobs.values.map((s) => s.completer.future)).timeout(
cancellationTimeout * 2,
onTimeout: () {
return [];
},
);
}
}
@mustCallSuper
Future<void> shutdown() async {
await cancelAll();
}
}
class _JobState<Input> {
final String jobId;
final Input input;
final CancellationToken token;
final Completer<void> completer;
bool isRunning;
_JobState({
required this.jobId,
required this.input,
required this.token,
required this.completer,
this.isRunning = false,
});
}

View File

@@ -0,0 +1,66 @@
import 'package:immich_mobile/domain/jobs/job.dart';
import 'package:immich_mobile/domain/jobs/job_types.dart';
import 'package:immich_mobile/domain/jobs/local_sync/local_sync.job.dart';
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync.job.dart';
class _JobRegistry {
final Map<String, Job> _jobs = {};
void register<I, O>(Job<I, O> job) {
_jobs[job.type.id] = job;
}
Job<I, O>? get<I, O>(String id) {
return _jobs[id] as Job<I, O>?;
}
List<Job> get all => _jobs.values.toList();
Future<void> startAll() async {
await Future.wait(_jobs.values.map((j) => j.start()));
}
Future<void> stopAll() async {
await Future.wait(_jobs.values.map((j) => j.stop()));
}
void cancelAll() {
for (final job in _jobs.values) {
job.cancelAll();
}
}
}
class JobManager {
final _JobRegistry _registry = _JobRegistry();
bool _isInitialized = false;
static final JobManager I = JobManager._();
JobManager._();
LocalSyncJob? get localSyncJob => _registry.get<LocalSyncInput, void>(JobType.localSync.id) as LocalSyncJob?;
RemoteSyncJob? get remoteSyncJob => _registry.get<void, void>(JobType.remoteSync.id) as RemoteSyncJob?;
void init() {
if (_isInitialized) {
return;
}
_registry.register(LocalSyncJob());
_registry.register(RemoteSyncJob());
_isInitialized = true;
}
Future<void> start() => _registry.startAll();
Future<void> stop() => _registry.stopAll();
void triggerLocalSync({bool fullSync = false}) {
localSyncJob?.trigger(LocalSyncInput(fullSync: fullSync));
}
void triggerRemoteSync({bool syncReset = false}) {
remoteSyncJob?.trigger(RemoteSyncInput(syncReset: syncReset));
}
}

View File

@@ -0,0 +1,79 @@
import 'dart:isolate';
sealed class IsolateRequest {
const IsolateRequest();
}
class JobTriggerRequest<Input> extends IsolateRequest {
final String jobId;
final Input? input;
const JobTriggerRequest({required this.jobId, this.input});
}
class JobCancelRequest extends IsolateRequest {
final String jobId;
const JobCancelRequest({required this.jobId});
}
class IsolateCancelAllRequest extends IsolateRequest {
const IsolateCancelAllRequest();
}
class IsolateShutdownRequest extends IsolateRequest {
const IsolateShutdownRequest();
}
sealed class IsolateResponse {
const IsolateResponse();
}
class IsolateReadyResponse extends IsolateResponse {
final SendPort requestPort;
const IsolateReadyResponse(this.requestPort);
}
class JobProgressResponse extends IsolateResponse {
final String jobId;
final double? progress;
final int? current;
final int? total;
const JobProgressResponse({required this.jobId, this.progress, this.current, this.total});
}
class JobCompleteResponse<Output> extends IsolateResponse {
final String jobId;
final Output? result;
const JobCompleteResponse({required this.jobId, this.result});
}
class JobErrorResponse extends IsolateResponse {
final String jobId;
final String error;
final String? stackTrace;
const JobErrorResponse({required this.jobId, required this.error, this.stackTrace});
}
class JobCancelledResponse extends IsolateResponse {
final String jobId;
const JobCancelledResponse({required this.jobId});
}
class JobSkippedResponse extends IsolateResponse {
final String jobId;
const JobSkippedResponse({required this.jobId});
}
class IsolateErrorResponse extends IsolateResponse {
final String error;
final String? stackTrace;
const IsolateErrorResponse({required this.error, this.stackTrace});
}

View File

@@ -0,0 +1,62 @@
enum JobStatus { running, completed, error, cancelled, skipped }
class JobProgress<T> {
final JobStatus status;
// JobStatus.completed
final T? result;
// JobStatus.running
final double? progress;
final int? current;
final int? total;
// JobStatus.error
final String? error;
const JobProgress({required this.status, this.result, this.progress, this.current, this.total, this.error});
bool get isRunning => status == JobStatus.running;
bool get isComplete => status == JobStatus.completed;
bool get isError => status == JobStatus.error;
bool get isCancelled => status == JobStatus.cancelled;
factory JobProgress.running({double? progress, int? current, int? total}) {
assert(progress == null || (progress >= 0 && progress <= 1), 'Progress must be 0-1');
assert(current == null || total == null || current <= total, 'Current must be <= total');
return JobProgress(status: JobStatus.running, progress: progress, current: current, total: total);
}
factory JobProgress.completed({T? result}) => JobProgress(status: JobStatus.completed, result: result);
factory JobProgress.error(String error) => JobProgress(status: JobStatus.error, error: error);
factory JobProgress.cancelled() => const JobProgress(status: JobStatus.cancelled);
factory JobProgress.skipped() => const JobProgress(status: JobStatus.skipped);
@override
String toString() => switch (status) {
JobStatus.running => 'JobProgress(running: progress=$progress, current=$current, total=$total)',
JobStatus.completed => 'JobProgress(completed: result=$result)',
JobStatus.error => 'JobProgress(error: error=$error)',
JobStatus.cancelled => 'JobProgress(cancelled)',
JobStatus.skipped => 'JobProgress(skipped)',
};
@override
bool operator ==(Object other) =>
identical(this, other) ||
other is JobProgress<T> &&
runtimeType == other.runtimeType &&
status == other.status &&
result == other.result &&
progress == other.progress &&
current == other.current &&
total == other.total &&
error == other.error;
@override
int get hashCode =>
status.hashCode ^ result.hashCode ^ progress.hashCode ^ current.hashCode ^ total.hashCode ^ error.hashCode;
}

View File

@@ -0,0 +1,17 @@
import 'package:immich_mobile/domain/jobs/job_context.dart';
import 'package:immich_mobile/domain/jobs/job_handler.dart';
import 'package:immich_mobile/domain/jobs/local_sync/local_sync.job.dart';
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync.job.dart';
enum JobType {
localSync._('local_sync'),
remoteSync._('remote_sync');
final String id;
const JobType._(this.id);
}
JobHandler getJobHandler(JobType type, JobContext context) => switch (type) {
JobType.localSync => LocalSyncJobHandler(context: context),
JobType.remoteSync => RemoteSyncJobHandler(context: context),
};

View File

@@ -0,0 +1,52 @@
import 'package:cancellation_token_http/http.dart';
import 'package:immich_mobile/domain/jobs/job.dart';
import 'package:immich_mobile/domain/jobs/job_handler.dart';
import 'package:immich_mobile/domain/jobs/job_types.dart';
import 'package:immich_mobile/domain/jobs/local_sync/local_sync.service.dart';
import 'package:immich_mobile/providers/infrastructure/album.provider.dart';
import 'package:immich_mobile/providers/infrastructure/platform.provider.dart';
class LocalSyncInput {
final bool fullSync;
const LocalSyncInput({this.fullSync = false});
}
class LocalSyncJob extends Job<LocalSyncInput, void> {
@override
JobType get type => JobType.localSync;
@override
Duration get idleInterval => const Duration(minutes: 3);
@override
Future<void> onIdle() async {
// Trigger a partial sync on idle
trigger(const LocalSyncInput(fullSync: false));
}
}
class LocalSyncJobHandler extends JobHandler<LocalSyncInput> {
late final LocalSyncService _localSyncService;
LocalSyncJobHandler({required super.context}) {
LocalSyncService(
localAlbumRepository: context.read(localAlbumRepository),
nativeSyncApi: context.read(nativeSyncApiProvider),
);
}
@override
bool shouldSkipInput(LocalSyncInput newInput, LocalSyncInput existingInput) {
// Skip if a full sync is already scheduled/running
if (existingInput.fullSync) {
return true;
}
return false;
}
@override
Future<void> processJob(String jobId, LocalSyncInput input, CancellationToken token) =>
_localSyncService.sync(full: input.fullSync, token: token);
}

View File

@@ -0,0 +1,308 @@
import 'dart:async';
import 'package:cancellation_token_http/http.dart';
import 'package:collection/collection.dart';
import 'package:flutter/foundation.dart';
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
import 'package:immich_mobile/extensions/platform_extensions.dart';
import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart';
import 'package:immich_mobile/platform/native_sync_api.g.dart';
import 'package:immich_mobile/utils/datetime_helpers.dart';
import 'package:immich_mobile/utils/diff.dart';
import 'package:logging/logging.dart';
class LocalSyncService {
final DriftLocalAlbumRepository _localAlbumRepository;
final NativeSyncApi _nativeSyncApi;
final Logger _log = Logger("DeviceSyncService");
LocalSyncService({required DriftLocalAlbumRepository localAlbumRepository, required NativeSyncApi nativeSyncApi})
: _localAlbumRepository = localAlbumRepository,
_nativeSyncApi = nativeSyncApi;
Future<void> sync({bool full = false, CancellationToken? token}) async {
final Stopwatch stopwatch = Stopwatch()..start();
if (full || await _nativeSyncApi.shouldFullSync()) {
_log.fine("Full sync request from ${full ? "user" : "native"}");
token?.throwIfCancelled();
return await fullSync(token: token);
}
token?.throwIfCancelled();
final delta = await _nativeSyncApi.getMediaChanges();
if (!delta.hasChanges) {
_log.fine("No media changes detected. Skipping sync");
return;
}
_log.fine("Delta updated: ${delta.updates.length}");
_log.fine("Delta deleted: ${delta.deletes.length}");
final deviceAlbums = await _nativeSyncApi.getAlbums();
await _localAlbumRepository.updateAll(deviceAlbums.toLocalAlbums());
token?.throwIfCancelled();
await _localAlbumRepository.processDelta(
updates: delta.updates.toLocalAssets(),
deletes: delta.deletes,
assetAlbums: delta.assetAlbums,
);
final dbAlbums = await _localAlbumRepository.getAll();
// On Android, we need to sync all albums since it is not possible to
// detect album deletions from the native side
if (CurrentPlatform.isAndroid) {
for (final album in dbAlbums) {
token?.throwIfCancelled();
final deviceIds = await _nativeSyncApi.getAssetIdsForAlbum(album.id);
await _localAlbumRepository.syncDeletes(album.id, deviceIds);
}
}
if (CurrentPlatform.isIOS) {
// On iOS, we need to full sync albums that are marked as cloud as the delta sync
// does not include changes for cloud albums. If ignoreIcloudAssets is enabled,
// remove the albums from the local database from the previous sync
final cloudAlbums = deviceAlbums.where((a) => a.isCloud).toLocalAlbums();
for (final album in cloudAlbums) {
token?.throwIfCancelled();
final dbAlbum = dbAlbums.firstWhereOrNull((a) => a.id == album.id);
if (dbAlbum == null) {
_log.warning("Cloud album ${album.name} not found in local database. Skipping sync.");
continue;
}
await updateAlbum(dbAlbum, album, token: token);
}
}
await _nativeSyncApi.checkpointSync();
stopwatch.stop();
_log.info("Device sync took - ${stopwatch.elapsedMilliseconds}ms");
}
Future<void> fullSync({CancellationToken? token}) async {
final Stopwatch stopwatch = Stopwatch()..start();
final deviceAlbums = await _nativeSyncApi.getAlbums();
final dbAlbums = await _localAlbumRepository.getAll(sortBy: {SortLocalAlbumsBy.id});
await diffSortedLists(
dbAlbums,
deviceAlbums.toLocalAlbums(),
compare: (a, b) => a.id.compareTo(b.id),
both: (LocalAlbum oldAlbum, LocalAlbum newAlbum) {
token?.throwIfCancelled();
return updateAlbum(oldAlbum, newAlbum, token: token);
},
onlyFirst: (album) {
token?.throwIfCancelled();
return removeAlbum(album);
},
onlySecond: (album) {
token?.throwIfCancelled();
return addAlbum(album, token: token);
},
);
await _nativeSyncApi.checkpointSync();
stopwatch.stop();
_log.info("Full device sync took - ${stopwatch.elapsedMilliseconds}ms");
}
Future<void> addAlbum(LocalAlbum album, {CancellationToken? token}) async {
_log.fine("Adding device album ${album.name}");
final assets = album.assetCount > 0 ? await _nativeSyncApi.getAssetsForAlbum(album.id) : <PlatformAsset>[];
token?.throwIfCancelled();
await _localAlbumRepository.upsert(album, toUpsert: assets.toLocalAssets());
_log.fine("Successfully added device album ${album.name}");
}
Future<void> removeAlbum(LocalAlbum a) async {
_log.fine("Removing device album ${a.name}");
// Asset deletion is handled in the repository
await _localAlbumRepository.delete(a.id);
}
// The deviceAlbum is ignored since we are going to refresh it anyways
FutureOr<bool> updateAlbum(LocalAlbum dbAlbum, LocalAlbum deviceAlbum, {CancellationToken? token}) async {
_log.fine("Syncing device album ${dbAlbum.name}");
if (_albumsEqual(deviceAlbum, dbAlbum)) {
_log.fine("Device album ${dbAlbum.name} has not changed. Skipping sync.");
return false;
}
_log.fine("Device album ${dbAlbum.name} has changed. Syncing...");
// Faster path - only new assets added
token?.throwIfCancelled();
if (await checkAddition(dbAlbum, deviceAlbum, token: token)) {
_log.fine("Fast synced device album ${dbAlbum.name}");
return true;
}
// Slower path - full sync
token?.throwIfCancelled();
return await fullDiff(dbAlbum, deviceAlbum, token: token);
}
@visibleForTesting
// The [deviceAlbum] is expected to be refreshed before calling this method
// with modified time and asset count
Future<bool> checkAddition(LocalAlbum dbAlbum, LocalAlbum deviceAlbum, {CancellationToken? token}) async {
_log.fine("Fast syncing device album ${dbAlbum.name}");
// Assets has been modified
if (deviceAlbum.assetCount <= dbAlbum.assetCount) {
_log.fine("Local album has modifications. Proceeding to full sync");
return false;
}
final updatedTime = (dbAlbum.updatedAt.millisecondsSinceEpoch ~/ 1000) + 1;
token?.throwIfCancelled();
final newAssetsCount = await _nativeSyncApi.getAssetsCountSince(deviceAlbum.id, updatedTime);
// Early return if no new assets were found
if (newAssetsCount == 0) {
_log.fine("No new assets found despite album having changes. Proceeding to full sync for ${dbAlbum.name}");
return false;
}
// Check whether there is only addition or if there has been deletions
if (deviceAlbum.assetCount != dbAlbum.assetCount + newAssetsCount) {
_log.fine("Local album has modifications. Proceeding to full sync");
return false;
}
token?.throwIfCancelled();
final newAssets = await _nativeSyncApi.getAssetsForAlbum(deviceAlbum.id, updatedTimeCond: updatedTime);
token?.throwIfCancelled();
await _localAlbumRepository.upsert(
deviceAlbum.copyWith(backupSelection: dbAlbum.backupSelection),
toUpsert: newAssets.toLocalAssets(),
);
return true;
}
@visibleForTesting
// The [deviceAlbum] is expected to be refreshed before calling this method
// with modified time and asset count
Future<bool> fullDiff(LocalAlbum dbAlbum, LocalAlbum deviceAlbum, {CancellationToken? token}) async {
token?.throwIfCancelled();
final assetsInDevice = deviceAlbum.assetCount > 0
? await _nativeSyncApi.getAssetsForAlbum(deviceAlbum.id).then((a) => a.toLocalAssets())
: <LocalAsset>[];
final assetsInDb = dbAlbum.assetCount > 0 ? await _localAlbumRepository.getAssets(dbAlbum.id) : <LocalAsset>[];
if (deviceAlbum.assetCount == 0) {
_log.fine("Device album ${deviceAlbum.name} is empty. Removing assets from DB.");
token?.throwIfCancelled();
await _localAlbumRepository.upsert(
deviceAlbum.copyWith(backupSelection: dbAlbum.backupSelection),
toDelete: assetsInDb.map((a) => a.id),
);
return true;
}
final updatedDeviceAlbum = deviceAlbum.copyWith(backupSelection: dbAlbum.backupSelection);
if (dbAlbum.assetCount == 0) {
_log.fine("Device album ${deviceAlbum.name} is empty. Adding assets to DB.");
token?.throwIfCancelled();
await _localAlbumRepository.upsert(updatedDeviceAlbum, toUpsert: assetsInDevice);
return true;
}
assert(assetsInDb.isSortedBy((a) => a.id));
assetsInDevice.sort((a, b) => a.id.compareTo(b.id));
final assetsToUpsert = <LocalAsset>[];
final assetsToDelete = <String>[];
token?.throwIfCancelled();
diffSortedListsSync(
assetsInDb,
assetsInDevice,
compare: (a, b) => a.id.compareTo(b.id),
both: (dbAsset, deviceAsset) {
token?.throwIfCancelled();
// Custom comparison to check if the asset has been modified without
// comparing the checksum
if (!_assetsEqual(dbAsset, deviceAsset)) {
assetsToUpsert.add(deviceAsset);
return true;
}
return false;
},
onlyFirst: (dbAsset) => assetsToDelete.add(dbAsset.id),
onlySecond: (deviceAsset) => assetsToUpsert.add(deviceAsset),
);
_log.fine(
"Syncing ${deviceAlbum.name}. ${assetsToUpsert.length} assets to add/update and ${assetsToDelete.length} assets to delete",
);
token?.throwIfCancelled();
if (assetsToUpsert.isEmpty && assetsToDelete.isEmpty) {
_log.fine("No asset changes detected in album ${deviceAlbum.name}. Updating metadata.");
_localAlbumRepository.upsert(updatedDeviceAlbum);
return true;
}
token?.throwIfCancelled();
await _localAlbumRepository.upsert(updatedDeviceAlbum, toUpsert: assetsToUpsert, toDelete: assetsToDelete);
return true;
}
bool _assetsEqual(LocalAsset a, LocalAsset b) {
return a.updatedAt.isAtSameMomentAs(b.updatedAt) &&
a.createdAt.isAtSameMomentAs(b.createdAt) &&
a.width == b.width &&
a.height == b.height &&
a.durationInSeconds == b.durationInSeconds;
}
bool _albumsEqual(LocalAlbum a, LocalAlbum b) {
return a.name == b.name && a.assetCount == b.assetCount && a.updatedAt.isAtSameMomentAs(b.updatedAt);
}
}
extension on Iterable<PlatformAlbum> {
List<LocalAlbum> toLocalAlbums() {
return map(
(e) => LocalAlbum(
id: e.id,
name: e.name,
updatedAt: tryFromSecondsSinceEpoch(e.updatedAt, isUtc: true) ?? DateTime.timestamp(),
assetCount: e.assetCount,
),
).toList();
}
}
extension on Iterable<PlatformAsset> {
List<LocalAsset> toLocalAssets() {
return map(
(e) => LocalAsset(
id: e.id,
name: e.name,
checksum: null,
type: AssetType.values.elementAtOrNull(e.type) ?? AssetType.other,
createdAt: tryFromSecondsSinceEpoch(e.createdAt, isUtc: true) ?? DateTime.timestamp(),
updatedAt: tryFromSecondsSinceEpoch(e.updatedAt, isUtc: true) ?? DateTime.timestamp(),
width: e.width,
height: e.height,
durationInSeconds: e.durationInSeconds,
orientation: e.orientation,
isFavorite: e.isFavorite,
),
).toList();
}
}

View File

@@ -0,0 +1,46 @@
import 'package:cancellation_token_http/http.dart';
import 'package:immich_mobile/domain/jobs/job.dart';
import 'package:immich_mobile/domain/jobs/job_handler.dart';
import 'package:immich_mobile/domain/jobs/job_types.dart';
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync.service.dart';
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync_api.repository.dart';
import 'package:immich_mobile/providers/infrastructure/sync.provider.dart';
class RemoteSyncInput {
final bool syncReset;
const RemoteSyncInput({this.syncReset = false});
}
class RemoteSyncJob extends Job<RemoteSyncInput, RemoteSyncStatus> {
@override
JobType get type => JobType.remoteSync;
@override
Duration get idleInterval => const Duration(seconds: 30);
@override
Future<void> onIdle() async {
trigger(const RemoteSyncInput(syncReset: false));
}
}
class RemoteSyncJobHandler extends JobHandler<RemoteSyncInput> {
late final RemoteSyncService _remoteSyncService;
RemoteSyncJobHandler({required super.context}) {
_remoteSyncService = RemoteSyncService(
syncApiRepository: context.read(remoteSyncApiRepositoryProvider),
syncStreamRepository: context.read(syncStreamRepositoryProvider),
);
}
@override
Future<RemoteSyncStatus> processJob(String jobId, RemoteSyncInput input, CancellationToken token) async {
final status = await _remoteSyncService.sync(syncReset: input.syncReset, token: token);
if (status != RemoteSyncStatus.reset) {
return status;
}
return _remoteSyncService.sync(syncReset: false, token: token);
}
}

View File

@@ -0,0 +1,161 @@
import 'dart:async';
import 'package:cancellation_token_http/http.dart';
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync_api.repository.dart';
import 'package:immich_mobile/domain/models/sync_event.model.dart';
import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository.dart';
import 'package:logging/logging.dart';
import 'package:openapi/api.dart';
enum RemoteSyncStatus { success, failed, reset }
class RemoteSyncService {
final Logger _logger = Logger('RemoteSyncService');
final RemoteSyncApiRepository _syncApiRepository;
final SyncStreamRepository _syncStreamRepository;
RemoteSyncService({
required RemoteSyncApiRepository syncApiRepository,
required SyncStreamRepository syncStreamRepository,
}) : _syncApiRepository = syncApiRepository,
_syncStreamRepository = syncStreamRepository;
Future<RemoteSyncStatus> sync({syncReset = false, CancellationToken? token}) async {
_logger.info("Remote sync request for user");
return await _syncApiRepository.streamChanges(_handleEvents, token: token, syncReset: syncReset);
}
Future<RemoteSyncStatus> _handleEvents(List<SyncEvent> events, {CancellationToken? token}) async {
List<SyncEvent> items = [];
for (final event in events) {
token?.throwIfCancelled();
if (event.type != items.firstOrNull?.type) {
await _processBatch(items);
}
if (event.type == SyncEntityType.syncResetV1) {
return RemoteSyncStatus.reset;
}
items.add(event);
}
await _processBatch(items);
return RemoteSyncStatus.success;
}
Future<void> _processBatch(List<SyncEvent> batch) async {
if (batch.isEmpty) {
return;
}
final type = batch.first.type;
await _handleSyncData(type, batch.map((e) => e.data));
await _syncApiRepository.ack([batch.last.ack]);
batch.clear();
}
Future<void> _handleSyncData(SyncEntityType type, Iterable<Object> data) async {
_logger.fine("Processing sync data for $type of length ${data.length}");
switch (type) {
case SyncEntityType.authUserV1:
return _syncStreamRepository.updateAuthUsersV1(data.cast());
case SyncEntityType.userV1:
return _syncStreamRepository.updateUsersV1(data.cast());
case SyncEntityType.userDeleteV1:
return _syncStreamRepository.deleteUsersV1(data.cast());
case SyncEntityType.partnerV1:
return _syncStreamRepository.updatePartnerV1(data.cast());
case SyncEntityType.partnerDeleteV1:
return _syncStreamRepository.deletePartnerV1(data.cast());
case SyncEntityType.assetV1:
return _syncStreamRepository.updateAssetsV1(data.cast());
case SyncEntityType.assetDeleteV1:
return _syncStreamRepository.deleteAssetsV1(data.cast());
case SyncEntityType.assetExifV1:
return _syncStreamRepository.updateAssetsExifV1(data.cast());
case SyncEntityType.partnerAssetV1:
return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'partner');
case SyncEntityType.partnerAssetBackfillV1:
return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'partner backfill');
case SyncEntityType.partnerAssetDeleteV1:
return _syncStreamRepository.deleteAssetsV1(data.cast(), debugLabel: "partner");
case SyncEntityType.partnerAssetExifV1:
return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'partner');
case SyncEntityType.partnerAssetExifBackfillV1:
return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'partner backfill');
case SyncEntityType.albumV1:
return _syncStreamRepository.updateAlbumsV1(data.cast());
case SyncEntityType.albumDeleteV1:
return _syncStreamRepository.deleteAlbumsV1(data.cast());
case SyncEntityType.albumUserV1:
return _syncStreamRepository.updateAlbumUsersV1(data.cast());
case SyncEntityType.albumUserBackfillV1:
return _syncStreamRepository.updateAlbumUsersV1(data.cast(), debugLabel: 'backfill');
case SyncEntityType.albumUserDeleteV1:
return _syncStreamRepository.deleteAlbumUsersV1(data.cast());
case SyncEntityType.albumAssetCreateV1:
return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'album asset create');
case SyncEntityType.albumAssetUpdateV1:
return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'album asset update');
case SyncEntityType.albumAssetBackfillV1:
return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'album asset backfill');
case SyncEntityType.albumAssetExifCreateV1:
return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'album asset exif create');
case SyncEntityType.albumAssetExifUpdateV1:
return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'album asset exif update');
case SyncEntityType.albumAssetExifBackfillV1:
return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'album asset exif backfill');
case SyncEntityType.albumToAssetV1:
return _syncStreamRepository.updateAlbumToAssetsV1(data.cast());
case SyncEntityType.albumToAssetBackfillV1:
return _syncStreamRepository.updateAlbumToAssetsV1(data.cast(), debugLabel: 'backfill');
case SyncEntityType.albumToAssetDeleteV1:
return _syncStreamRepository.deleteAlbumToAssetsV1(data.cast());
// No-op. SyncAckV1 entities are checkpoints in the sync stream
// to acknowledge that the client has processed all the backfill events
case SyncEntityType.syncAckV1:
return;
// No-op. SyncCompleteV1 is used to signal the completion of the sync process
case SyncEntityType.syncCompleteV1:
return;
// Request to reset the client state. Clear everything related to remote entities
case SyncEntityType.syncResetV1:
return _syncStreamRepository.reset();
case SyncEntityType.memoryV1:
return _syncStreamRepository.updateMemoriesV1(data.cast());
case SyncEntityType.memoryDeleteV1:
return _syncStreamRepository.deleteMemoriesV1(data.cast());
case SyncEntityType.memoryToAssetV1:
return _syncStreamRepository.updateMemoryAssetsV1(data.cast());
case SyncEntityType.memoryToAssetDeleteV1:
return _syncStreamRepository.deleteMemoryAssetsV1(data.cast());
case SyncEntityType.stackV1:
return _syncStreamRepository.updateStacksV1(data.cast());
case SyncEntityType.stackDeleteV1:
return _syncStreamRepository.deleteStacksV1(data.cast());
case SyncEntityType.partnerStackV1:
return _syncStreamRepository.updateStacksV1(data.cast(), debugLabel: 'partner');
case SyncEntityType.partnerStackBackfillV1:
return _syncStreamRepository.updateStacksV1(data.cast(), debugLabel: 'partner backfill');
case SyncEntityType.partnerStackDeleteV1:
return _syncStreamRepository.deleteStacksV1(data.cast(), debugLabel: 'partner');
case SyncEntityType.userMetadataV1:
return _syncStreamRepository.updateUserMetadatasV1(data.cast());
case SyncEntityType.userMetadataDeleteV1:
return _syncStreamRepository.deleteUserMetadatasV1(data.cast());
case SyncEntityType.personV1:
return _syncStreamRepository.updatePeopleV1(data.cast());
case SyncEntityType.personDeleteV1:
return _syncStreamRepository.deletePeopleV1(data.cast());
case SyncEntityType.assetFaceV1:
return _syncStreamRepository.updateAssetFacesV1(data.cast());
case SyncEntityType.assetFaceDeleteV1:
return _syncStreamRepository.deleteAssetFacesV1(data.cast());
default:
_logger.warning("Unknown sync data type: $type");
}
}
}

View File

@@ -0,0 +1,192 @@
import 'dart:async';
import 'dart:convert';
import 'package:cancellation_token_http/http.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:http/http.dart' as http;
import 'package:immich_mobile/constants/constants.dart';
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync.service.dart';
import 'package:immich_mobile/domain/models/store.model.dart';
import 'package:immich_mobile/domain/models/sync_event.model.dart';
import 'package:immich_mobile/entities/store.entity.dart';
import 'package:immich_mobile/providers/api.provider.dart';
import 'package:immich_mobile/services/api.service.dart';
import 'package:logging/logging.dart';
import 'package:openapi/api.dart';
final remoteSyncApiRepositoryProvider = Provider<RemoteSyncApiRepository>((ref) {
return RemoteSyncApiRepository(ref.watch(apiServiceProvider));
});
class RemoteSyncApiRepository {
final Logger _logger = Logger('RemoteSyncApiRepository');
final ApiService _api;
RemoteSyncApiRepository(this._api);
Future<void> ack(List<String> data) {
return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: data));
}
Future<RemoteSyncStatus> streamChanges(
Future<RemoteSyncStatus> Function(List<SyncEvent>, {CancellationToken? token}) onData, {
bool syncReset = false,
CancellationToken? token,
int batchSize = kSyncEventBatchSize,
http.Client? httpClient,
}) async {
final stopwatch = Stopwatch()..start();
final client = httpClient ?? http.Client();
final endpoint = "${_api.apiClient.basePath}/sync/stream";
final headers = {'Content-Type': 'application/json', 'Accept': 'application/jsonlines+json'};
final headerParams = <String, String>{};
await _api.applyToParams([], headerParams);
headers.addAll(headerParams);
final request = http.Request('POST', Uri.parse(endpoint));
request.headers.addAll(headers);
request.body = jsonEncode(
SyncStreamDto(
types: [
SyncRequestType.authUsersV1,
SyncRequestType.usersV1,
SyncRequestType.assetsV1,
SyncRequestType.assetExifsV1,
SyncRequestType.partnersV1,
SyncRequestType.partnerAssetsV1,
SyncRequestType.partnerAssetExifsV1,
SyncRequestType.albumsV1,
SyncRequestType.albumUsersV1,
SyncRequestType.albumAssetsV1,
SyncRequestType.albumAssetExifsV1,
SyncRequestType.albumToAssetsV1,
SyncRequestType.memoriesV1,
SyncRequestType.memoryToAssetsV1,
SyncRequestType.stacksV1,
SyncRequestType.partnerStacksV1,
SyncRequestType.userMetadataV1,
SyncRequestType.peopleV1,
SyncRequestType.assetFacesV1,
],
reset: syncReset,
).toJson(),
);
String previousChunk = '';
List<String> lines = [];
RemoteSyncStatus status = RemoteSyncStatus.failed;
try {
final response = await client.send(request);
if (response.statusCode != 200) {
final errorBody = await response.stream.bytesToString();
throw ApiException(response.statusCode, 'Failed to get sync stream: $errorBody');
}
// Reset after successful stream start
await Store.put(StoreKey.shouldResetSync, false);
await for (final chunk in response.stream.transform(utf8.decoder)) {
token?.throwIfCancelled();
previousChunk += chunk;
final parts = previousChunk.toString().split('\n');
previousChunk = parts.removeLast();
lines.addAll(parts);
if (lines.length < batchSize) {
continue;
}
status = await onData(_parseLines(lines), token: token);
lines.clear();
}
token?.throwIfCancelled();
if (lines.isNotEmpty) {
status = await onData(_parseLines(lines), token: token);
}
} catch (error, stack) {
return Future.error(error, stack);
} finally {
client.close();
}
stopwatch.stop();
_logger.info("Remote Sync completed in ${stopwatch.elapsed.inMilliseconds}ms");
return status;
}
List<SyncEvent> _parseLines(List<String> lines) {
final List<SyncEvent> data = [];
for (final line in lines) {
final jsonData = jsonDecode(line);
final type = SyncEntityType.fromJson(jsonData['type'])!;
final dataJson = jsonData['data'];
final ack = jsonData['ack'];
final converter = _kResponseMap[type];
if (converter == null) {
_logger.warning("Unknown type $type");
continue;
}
data.add(SyncEvent(type: type, data: converter(dataJson), ack: ack));
}
return data;
}
}
const _kResponseMap = <SyncEntityType, Function(Object)>{
SyncEntityType.authUserV1: SyncAuthUserV1.fromJson,
SyncEntityType.userV1: SyncUserV1.fromJson,
SyncEntityType.userDeleteV1: SyncUserDeleteV1.fromJson,
SyncEntityType.partnerV1: SyncPartnerV1.fromJson,
SyncEntityType.partnerDeleteV1: SyncPartnerDeleteV1.fromJson,
SyncEntityType.assetV1: SyncAssetV1.fromJson,
SyncEntityType.assetDeleteV1: SyncAssetDeleteV1.fromJson,
SyncEntityType.assetExifV1: SyncAssetExifV1.fromJson,
SyncEntityType.partnerAssetV1: SyncAssetV1.fromJson,
SyncEntityType.partnerAssetBackfillV1: SyncAssetV1.fromJson,
SyncEntityType.partnerAssetDeleteV1: SyncAssetDeleteV1.fromJson,
SyncEntityType.partnerAssetExifV1: SyncAssetExifV1.fromJson,
SyncEntityType.partnerAssetExifBackfillV1: SyncAssetExifV1.fromJson,
SyncEntityType.albumV1: SyncAlbumV1.fromJson,
SyncEntityType.albumDeleteV1: SyncAlbumDeleteV1.fromJson,
SyncEntityType.albumUserV1: SyncAlbumUserV1.fromJson,
SyncEntityType.albumUserBackfillV1: SyncAlbumUserV1.fromJson,
SyncEntityType.albumUserDeleteV1: SyncAlbumUserDeleteV1.fromJson,
SyncEntityType.albumAssetCreateV1: SyncAssetV1.fromJson,
SyncEntityType.albumAssetUpdateV1: SyncAssetV1.fromJson,
SyncEntityType.albumAssetBackfillV1: SyncAssetV1.fromJson,
SyncEntityType.albumAssetExifCreateV1: SyncAssetExifV1.fromJson,
SyncEntityType.albumAssetExifUpdateV1: SyncAssetExifV1.fromJson,
SyncEntityType.albumAssetExifBackfillV1: SyncAssetExifV1.fromJson,
SyncEntityType.albumToAssetV1: SyncAlbumToAssetV1.fromJson,
SyncEntityType.albumToAssetBackfillV1: SyncAlbumToAssetV1.fromJson,
SyncEntityType.albumToAssetDeleteV1: SyncAlbumToAssetDeleteV1.fromJson,
SyncEntityType.syncAckV1: _SyncEmptyDto.fromJson,
SyncEntityType.syncResetV1: _SyncEmptyDto.fromJson,
SyncEntityType.memoryV1: SyncMemoryV1.fromJson,
SyncEntityType.memoryDeleteV1: SyncMemoryDeleteV1.fromJson,
SyncEntityType.memoryToAssetV1: SyncMemoryAssetV1.fromJson,
SyncEntityType.memoryToAssetDeleteV1: SyncMemoryAssetDeleteV1.fromJson,
SyncEntityType.stackV1: SyncStackV1.fromJson,
SyncEntityType.stackDeleteV1: SyncStackDeleteV1.fromJson,
SyncEntityType.partnerStackV1: SyncStackV1.fromJson,
SyncEntityType.partnerStackBackfillV1: SyncStackV1.fromJson,
SyncEntityType.partnerStackDeleteV1: SyncStackDeleteV1.fromJson,
SyncEntityType.userMetadataV1: SyncUserMetadataV1.fromJson,
SyncEntityType.userMetadataDeleteV1: SyncUserMetadataDeleteV1.fromJson,
SyncEntityType.personV1: SyncPersonV1.fromJson,
SyncEntityType.personDeleteV1: SyncPersonDeleteV1.fromJson,
SyncEntityType.assetFaceV1: SyncAssetFaceV1.fromJson,
SyncEntityType.assetFaceDeleteV1: SyncAssetFaceDeleteV1.fromJson,
SyncEntityType.syncCompleteV1: _SyncEmptyDto.fromJson,
};
class _SyncEmptyDto {
static _SyncEmptyDto? fromJson(dynamic _) => _SyncEmptyDto();
}

View File

@@ -0,0 +1,9 @@
import 'dart:async';
extension CompleterExtensions<T> on Completer<T> {
void completeOnce([FutureOr<T>? value]) {
if (!isCompleted) {
complete(value);
}
}
}

View File

@@ -12,6 +12,7 @@ import 'package:flutter_displaymode/flutter_displaymode.dart';
import 'package:hooks_riverpod/hooks_riverpod.dart';
import 'package:immich_mobile/constants/constants.dart';
import 'package:immich_mobile/constants/locales.dart';
import 'package:immich_mobile/domain/jobs/job_manager.dart';
import 'package:immich_mobile/domain/services/background_worker.service.dart';
import 'package:immich_mobile/entities/store.entity.dart';
import 'package:immich_mobile/extensions/build_context_extensions.dart';
@@ -53,6 +54,8 @@ void main() async {
await initApp();
// Warm-up isolate pool for worker manager
await workerManager.init(dynamicSpawning: true);
JobManager.I.init();
await JobManager.I.start();
await migrateDatabaseIfNeeded(isar, drift);
HttpSSLOptions.apply();