mirror of
https://github.com/immich-app/immich.git
synced 2025-12-09 17:23:13 +03:00
Compare commits
1 Commits
push-nklmv
...
feat/mobil
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f0fcc3a0e3 |
223
mobile/lib/domain/jobs/job.dart
Normal file
223
mobile/lib/domain/jobs/job.dart
Normal file
@@ -0,0 +1,223 @@
|
||||
import 'dart:async';
|
||||
import 'dart:isolate';
|
||||
import 'dart:ui';
|
||||
|
||||
import 'package:flutter/services.dart';
|
||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||
import 'package:immich_mobile/domain/jobs/job_context.dart';
|
||||
import 'package:immich_mobile/domain/jobs/job_messages.dart';
|
||||
import 'package:immich_mobile/domain/jobs/job_status.dart';
|
||||
import 'package:immich_mobile/domain/jobs/job_types.dart';
|
||||
import 'package:immich_mobile/domain/services/log.service.dart';
|
||||
import 'package:immich_mobile/entities/store.entity.dart';
|
||||
import 'package:immich_mobile/extensions/completer_extensions.dart';
|
||||
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
|
||||
import 'package:immich_mobile/infrastructure/repositories/logger_db.repository.dart';
|
||||
import 'package:immich_mobile/providers/db.provider.dart';
|
||||
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
|
||||
import 'package:immich_mobile/utils/bootstrap.dart';
|
||||
import 'package:immich_mobile/utils/debug_print.dart';
|
||||
import 'package:isar/isar.dart';
|
||||
import 'package:logging/logging.dart';
|
||||
|
||||
part 'job_entrypoint.dart';
|
||||
|
||||
final _jobLogger = Logger('Job');
|
||||
|
||||
abstract class Job<Input, Output> {
|
||||
Isolate? _isolate;
|
||||
SendPort? _jobPort;
|
||||
ReceivePort? _receivePort;
|
||||
final Map<String, StreamController<JobProgress<Output>>> _jobProgressControllers = {};
|
||||
bool get isRunning => _isolate != null;
|
||||
|
||||
// ═══════════════════════════════════════════════════════════
|
||||
// ABSTRACT METHODS / FIELDS - Concrete implementations override these
|
||||
// ═══════════════════════════════════════════════════════════
|
||||
|
||||
JobType get type;
|
||||
|
||||
Duration get idleInterval;
|
||||
Future<void> onIdle();
|
||||
|
||||
Duration get shutdownTimeout => const Duration(seconds: 5);
|
||||
|
||||
// ═══════════════════════════════════════════════════════════
|
||||
// CONCRETE METHODS
|
||||
// ═══════════════════════════════════════════════════════════
|
||||
|
||||
Timer? _idleTimer;
|
||||
|
||||
void _resetIdleTimer() {
|
||||
_idleTimer?.cancel();
|
||||
_idleTimer = Timer.periodic(idleInterval, (_) async {
|
||||
await onIdle();
|
||||
});
|
||||
}
|
||||
|
||||
void _cancelIdleTimer() {
|
||||
_idleTimer?.cancel();
|
||||
_idleTimer = null;
|
||||
}
|
||||
|
||||
Future<void> start() async {
|
||||
if (_isolate != null) return;
|
||||
|
||||
final rootToken = RootIsolateToken.instance;
|
||||
if (rootToken == null) {
|
||||
throw StateError('RootIsolateToken is not available. Ensure that the job is started from the main isolate.');
|
||||
}
|
||||
|
||||
_receivePort = ReceivePort();
|
||||
_receivePort!.listen(_handleMessage);
|
||||
|
||||
_isolate = await Isolate.spawn(_isolateEntryPoint, (
|
||||
type: type,
|
||||
token: rootToken,
|
||||
mainPort: _receivePort!.sendPort,
|
||||
), debugName: type.id);
|
||||
|
||||
_resetIdleTimer();
|
||||
}
|
||||
|
||||
String trigger(Input input) {
|
||||
if (_jobPort == null) {
|
||||
throw StateError('Job not started. Call start() before triggering jobs.');
|
||||
}
|
||||
|
||||
final jobId = _generateJobId();
|
||||
_jobPort!.send(JobTriggerRequest<Input>(jobId: jobId, input: input));
|
||||
_jobProgressControllers[jobId] = StreamController<JobProgress<Output>>.broadcast();
|
||||
_resetIdleTimer();
|
||||
return jobId;
|
||||
}
|
||||
|
||||
Future<Output?> run(Input input) async {
|
||||
if (_jobPort == null) {
|
||||
throw StateError('Job not started. Call start() before running jobs.');
|
||||
}
|
||||
|
||||
final jobId = _generateJobId();
|
||||
_jobPort!.send(JobTriggerRequest<Input>(jobId: jobId, input: input));
|
||||
_jobProgressControllers[jobId] = StreamController<JobProgress<Output>>.broadcast();
|
||||
_resetIdleTimer();
|
||||
final progress = await waitFor(jobId);
|
||||
return progress.result;
|
||||
}
|
||||
|
||||
int _jobCounter = 0;
|
||||
String _generateJobId() => '${type.id}_${DateTime.now().millisecondsSinceEpoch}_${_jobCounter++}';
|
||||
|
||||
void cancel({required String jobId}) {
|
||||
if (_jobPort == null) {
|
||||
throw StateError('Job not started. Call start() before cancelling jobs.');
|
||||
}
|
||||
|
||||
_jobPort!.send(JobCancelRequest(jobId: jobId));
|
||||
}
|
||||
|
||||
void cancelAll() {
|
||||
if (_jobPort == null) {
|
||||
throw StateError('Job not started. Call start() before cancelling jobs.');
|
||||
}
|
||||
|
||||
_cancelIdleTimer();
|
||||
_jobPort!.send(const IsolateCancelAllRequest());
|
||||
}
|
||||
|
||||
Stream<JobProgress> watch(String jobId) {
|
||||
if (!_jobProgressControllers.containsKey(jobId)) {
|
||||
throw StateError('No job found with ID: $jobId. Trigger the job before watching its progress.');
|
||||
}
|
||||
|
||||
return _jobProgressControllers[jobId]!.stream;
|
||||
}
|
||||
|
||||
Future<JobProgress> waitFor(String jobId) {
|
||||
return watch(jobId).firstWhere((p) => p.isComplete || p.isError || p.isCancelled);
|
||||
}
|
||||
|
||||
Future<void> stop() async {
|
||||
if (_isolate == null) return;
|
||||
|
||||
_cancelIdleTimer();
|
||||
|
||||
if (_jobPort == null) {
|
||||
_jobLogger.warning('Job isolate is running but job port is null for type: $this.id.');
|
||||
}
|
||||
|
||||
final jobPort = _jobPort;
|
||||
final isolate = _isolate;
|
||||
_jobPort = null;
|
||||
_isolate = null;
|
||||
|
||||
jobPort?.send(const IsolateShutdownRequest());
|
||||
|
||||
// Wait for all jobs to complete before killing the isolate
|
||||
if (_jobProgressControllers.isNotEmpty) {
|
||||
await Future.wait(_jobProgressControllers.keys.map((id) => waitFor(id))).timeout(
|
||||
shutdownTimeout,
|
||||
onTimeout: () {
|
||||
_jobLogger.warning(
|
||||
'Timeout waiting for jobs to complete during shutdown of job type: $this.id. Force killing isolate.',
|
||||
);
|
||||
return <JobProgress>[];
|
||||
},
|
||||
);
|
||||
}
|
||||
|
||||
isolate?.kill(priority: Isolate.immediate);
|
||||
_receivePort?.close();
|
||||
_receivePort = null;
|
||||
|
||||
for (final controller in _jobProgressControllers.values) {
|
||||
await controller.close();
|
||||
}
|
||||
_jobProgressControllers.clear();
|
||||
}
|
||||
|
||||
void _handleMessage(dynamic message) {
|
||||
switch (message) {
|
||||
case IsolateReadyResponse msg:
|
||||
_jobPort = msg.requestPort;
|
||||
|
||||
case JobProgressResponse msg:
|
||||
final progress = JobProgress<Output>.running(progress: msg.progress, current: msg.current, total: msg.total);
|
||||
_addJobProgress(msg.jobId, progress, close: false);
|
||||
|
||||
case JobCompleteResponse msg:
|
||||
final progress = JobProgress<Output>.completed(result: msg.result);
|
||||
_addJobProgress(msg.jobId, progress);
|
||||
|
||||
case JobErrorResponse msg:
|
||||
final progress = JobProgress<Output>.error(msg.error);
|
||||
_addJobProgress(msg.jobId, progress);
|
||||
|
||||
case JobCancelledResponse msg:
|
||||
final progress = JobProgress<Output>.cancelled();
|
||||
_addJobProgress(msg.jobId, progress);
|
||||
|
||||
case JobSkippedResponse msg:
|
||||
final progress = JobProgress<Output>.skipped();
|
||||
_addJobProgress(msg.jobId, progress);
|
||||
|
||||
case IsolateErrorResponse msg:
|
||||
_jobLogger.severe('Isolate error: ${msg.error}\nStackTrace: ${msg.stackTrace}');
|
||||
}
|
||||
}
|
||||
|
||||
void _addJobProgress(String jobId, JobProgress<Output> progress, {bool close = true}) {
|
||||
final controller = _jobProgressControllers[jobId];
|
||||
if (controller == null) {
|
||||
_jobLogger.warning('Received progress update for unknown job: $jobId');
|
||||
return;
|
||||
}
|
||||
|
||||
controller.add(progress);
|
||||
|
||||
if (close) {
|
||||
controller.close();
|
||||
_jobProgressControllers.remove(jobId);
|
||||
}
|
||||
}
|
||||
}
|
||||
38
mobile/lib/domain/jobs/job_context.dart
Normal file
38
mobile/lib/domain/jobs/job_context.dart
Normal file
@@ -0,0 +1,38 @@
|
||||
import 'dart:isolate';
|
||||
|
||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||
import 'package:immich_mobile/domain/jobs/job_messages.dart';
|
||||
|
||||
class JobContext {
|
||||
final String id;
|
||||
final ProviderContainer ref;
|
||||
final SendPort _mainPort;
|
||||
|
||||
const JobContext({required this.id, required this.ref, required SendPort mainPort}) : _mainPort = mainPort;
|
||||
|
||||
ProviderResult read<ProviderResult>(ProviderListenable<ProviderResult> provider) => provider.read(ref);
|
||||
|
||||
void reportJobProgress({required String jobId, double? progress, int? current, int? total}) {
|
||||
_mainPort.send(JobProgressResponse(jobId: jobId, progress: progress, current: current, total: total));
|
||||
}
|
||||
|
||||
void reportJobComplete<Result>({required String jobId, Result? result}) {
|
||||
_mainPort.send(JobCompleteResponse(jobId: jobId, result: result));
|
||||
}
|
||||
|
||||
void reportJobError({required String jobId, required String error, String? stackTrace}) {
|
||||
_mainPort.send(JobErrorResponse(jobId: jobId, error: error, stackTrace: stackTrace));
|
||||
}
|
||||
|
||||
void reportIsolateError({required String error, String? stackTrace}) {
|
||||
_mainPort.send(IsolateErrorResponse(error: error, stackTrace: stackTrace));
|
||||
}
|
||||
|
||||
void reportJobCancelled({required String jobId}) {
|
||||
_mainPort.send(JobCancelledResponse(jobId: jobId));
|
||||
}
|
||||
|
||||
void reportJobSkipped({required String jobId}) {
|
||||
_mainPort.send(JobSkippedResponse(jobId: jobId));
|
||||
}
|
||||
}
|
||||
76
mobile/lib/domain/jobs/job_entrypoint.dart
Normal file
76
mobile/lib/domain/jobs/job_entrypoint.dart
Normal file
@@ -0,0 +1,76 @@
|
||||
part of 'job.dart';
|
||||
|
||||
typedef _IsolateStartMessage = ({RootIsolateToken token, JobType type, SendPort mainPort});
|
||||
|
||||
Future<void> _isolateEntryPoint(_IsolateStartMessage message) async {
|
||||
final _IsolateStartMessage(:token, :mainPort, :type) = message;
|
||||
|
||||
final logger = Logger(type.id);
|
||||
final isolateCompleter = Completer<void>();
|
||||
|
||||
try {
|
||||
BackgroundIsolateBinaryMessenger.ensureInitialized(token);
|
||||
DartPluginRegistrant.ensureInitialized();
|
||||
|
||||
final (isar, drift, logDb) = await Bootstrap.initDB();
|
||||
await Bootstrap.initDomain(isar, drift, logDb, shouldBufferLogs: false, listenStoreUpdates: false);
|
||||
|
||||
final container = ProviderContainer(
|
||||
overrides: [
|
||||
dbProvider.overrideWithValue(isar),
|
||||
isarProvider.overrideWithValue(isar),
|
||||
driftProvider.overrideWith(driftOverride(drift)),
|
||||
],
|
||||
);
|
||||
|
||||
final context = JobContext(id: type.id, ref: container, mainPort: mainPort);
|
||||
final receivePort = ReceivePort();
|
||||
mainPort.send(IsolateReadyResponse(receivePort.sendPort));
|
||||
|
||||
final handler = getJobHandler(type, context);
|
||||
|
||||
receivePort.listen((message) async {
|
||||
try {
|
||||
switch (message) {
|
||||
case JobTriggerRequest msg:
|
||||
handler.queue(msg.jobId, msg.input);
|
||||
|
||||
case JobCancelRequest msg:
|
||||
await handler.cancel(msg.jobId);
|
||||
|
||||
case IsolateCancelAllRequest _:
|
||||
await handler.cancelAll();
|
||||
|
||||
case IsolateShutdownRequest _:
|
||||
receivePort.close();
|
||||
await handler.shutdown();
|
||||
await _cleanup(container, isar, drift, logDb);
|
||||
isolateCompleter.completeOnce();
|
||||
}
|
||||
} catch (e, stack) {
|
||||
logger.severe('Error handling command', e, stack);
|
||||
context.reportIsolateError(error: 'Command error: $e', stackTrace: stack.toString());
|
||||
}
|
||||
});
|
||||
} catch (e, stack) {
|
||||
logger.severe('Job fatal error', e, stack);
|
||||
mainPort.send(IsolateErrorResponse(error: 'Job crashed: $e', stackTrace: stack.toString()));
|
||||
isolateCompleter.completeOnce();
|
||||
}
|
||||
await isolateCompleter.future;
|
||||
}
|
||||
|
||||
Future<void> _cleanup(ProviderContainer container, Isar isar, Drift drift, DriftLogger logDb) async {
|
||||
try {
|
||||
container.dispose();
|
||||
await Store.dispose();
|
||||
await LogService.I.dispose();
|
||||
await logDb.close();
|
||||
await drift.close();
|
||||
if (isar.isOpen) {
|
||||
await isar.close();
|
||||
}
|
||||
} catch (error, stack) {
|
||||
dPrint(() => 'Job Cleanup error: $error, stack: $stack');
|
||||
}
|
||||
}
|
||||
161
mobile/lib/domain/jobs/job_handler.dart
Normal file
161
mobile/lib/domain/jobs/job_handler.dart
Normal file
@@ -0,0 +1,161 @@
|
||||
import 'dart:async';
|
||||
import 'dart:collection';
|
||||
|
||||
import 'package:cancellation_token_http/http.dart';
|
||||
import 'package:flutter/foundation.dart';
|
||||
import 'package:immich_mobile/domain/jobs/job_context.dart';
|
||||
import 'package:immich_mobile/extensions/completer_extensions.dart';
|
||||
import 'package:logging/logging.dart';
|
||||
|
||||
abstract class JobHandler<Input> {
|
||||
late final Logger logger;
|
||||
final JobContext context;
|
||||
|
||||
final Queue<String> _jobQueue = Queue();
|
||||
final Map<String, _JobState<Input>> _jobs = {};
|
||||
int get runningCount => _jobs.values.where((s) => s.isRunning).length;
|
||||
bool _isProcessing = false;
|
||||
|
||||
JobHandler({required this.context}) {
|
||||
logger = Logger(context.id);
|
||||
}
|
||||
|
||||
// ═══════════════════════════════════════════════════════════
|
||||
// ABSTRACT METHODS / FIELDS - Concrete implementations override these
|
||||
// ═══════════════════════════════════════════════════════════
|
||||
|
||||
Future<void> processJob(String jobId, Input input, CancellationToken token);
|
||||
|
||||
int get maxConcurrentJobs => 1;
|
||||
|
||||
Duration get cancellationTimeout => const Duration(seconds: 1);
|
||||
|
||||
bool shouldSkipInput(Input newInput, Input existingInput) => false;
|
||||
|
||||
// ═══════════════════════════════════════════════════════════
|
||||
// CONCRETE METHODS
|
||||
// ═══════════════════════════════════════════════════════════
|
||||
|
||||
@mustCallSuper
|
||||
void queue(String jobId, Input input) async {
|
||||
if (_shouldSkipInput(input)) {
|
||||
context.reportJobSkipped(jobId: jobId);
|
||||
return;
|
||||
}
|
||||
|
||||
final token = CancellationToken();
|
||||
_jobs[jobId] = _JobState(jobId: jobId, input: input, token: token, completer: Completer<void>(), isRunning: false);
|
||||
_jobQueue.add(jobId);
|
||||
_processQueue();
|
||||
}
|
||||
|
||||
bool _shouldSkipInput(Input input) {
|
||||
for (final job in _jobs.values) {
|
||||
if (shouldSkipInput(input, job.input)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
Future<void> _processQueue() async {
|
||||
if (_isProcessing) {
|
||||
return;
|
||||
}
|
||||
_isProcessing = true;
|
||||
|
||||
while (_jobQueue.isNotEmpty && runningCount < maxConcurrentJobs) {
|
||||
final jobId = _jobQueue.removeFirst();
|
||||
final jobState = _jobs[jobId];
|
||||
|
||||
if (jobState == null) {
|
||||
logger.warning('Job state not found for jobId: $jobId');
|
||||
continue;
|
||||
}
|
||||
|
||||
if (jobState.token.isCancelled) {
|
||||
context.reportJobCancelled(jobId: jobId);
|
||||
jobState.completer.completeOnce();
|
||||
_jobs.remove(jobId);
|
||||
continue;
|
||||
}
|
||||
|
||||
jobState.isRunning = true;
|
||||
|
||||
_runJob(jobState).then((_) {
|
||||
_jobs[jobId]?.completer.completeOnce();
|
||||
_jobs.remove(jobId);
|
||||
_processQueue();
|
||||
});
|
||||
}
|
||||
_isProcessing = false;
|
||||
}
|
||||
|
||||
Future<void> _runJob(_JobState<Input> job) async {
|
||||
try {
|
||||
await processJob(job.jobId, job.input, job.token);
|
||||
|
||||
if (!job.token.isCancelled) {
|
||||
context.reportJobComplete(jobId: job.jobId);
|
||||
}
|
||||
} on CancelledException {
|
||||
context.reportJobCancelled(jobId: job.jobId);
|
||||
} catch (e, stack) {
|
||||
context.reportJobError(jobId: job.jobId, error: '$e', stackTrace: stack.toString());
|
||||
}
|
||||
}
|
||||
|
||||
@mustCallSuper
|
||||
Future<void> cancel(String jobId) async {
|
||||
final jobState = _jobs[jobId];
|
||||
if (jobState == null) return;
|
||||
|
||||
jobState.token.cancel();
|
||||
|
||||
if (!jobState.isRunning) {
|
||||
_jobQueue.removeWhere((id) => id == jobId);
|
||||
}
|
||||
|
||||
await jobState.completer.future.timeout(cancellationTimeout, onTimeout: () {});
|
||||
}
|
||||
|
||||
@mustCallSuper
|
||||
Future<void> cancelAll() async {
|
||||
for (final state in _jobs.values) {
|
||||
state.token.cancel();
|
||||
}
|
||||
|
||||
_jobQueue.clear();
|
||||
|
||||
if (_jobs.isNotEmpty) {
|
||||
await Future.wait(_jobs.values.map((s) => s.completer.future)).timeout(
|
||||
cancellationTimeout * 2,
|
||||
onTimeout: () {
|
||||
return [];
|
||||
},
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@mustCallSuper
|
||||
Future<void> shutdown() async {
|
||||
await cancelAll();
|
||||
}
|
||||
}
|
||||
|
||||
class _JobState<Input> {
|
||||
final String jobId;
|
||||
final Input input;
|
||||
final CancellationToken token;
|
||||
final Completer<void> completer;
|
||||
bool isRunning;
|
||||
|
||||
_JobState({
|
||||
required this.jobId,
|
||||
required this.input,
|
||||
required this.token,
|
||||
required this.completer,
|
||||
this.isRunning = false,
|
||||
});
|
||||
}
|
||||
66
mobile/lib/domain/jobs/job_manager.dart
Normal file
66
mobile/lib/domain/jobs/job_manager.dart
Normal file
@@ -0,0 +1,66 @@
|
||||
import 'package:immich_mobile/domain/jobs/job.dart';
|
||||
import 'package:immich_mobile/domain/jobs/job_types.dart';
|
||||
import 'package:immich_mobile/domain/jobs/local_sync/local_sync.job.dart';
|
||||
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync.job.dart';
|
||||
|
||||
class _JobRegistry {
|
||||
final Map<String, Job> _jobs = {};
|
||||
|
||||
void register<I, O>(Job<I, O> job) {
|
||||
_jobs[job.type.id] = job;
|
||||
}
|
||||
|
||||
Job<I, O>? get<I, O>(String id) {
|
||||
return _jobs[id] as Job<I, O>?;
|
||||
}
|
||||
|
||||
List<Job> get all => _jobs.values.toList();
|
||||
|
||||
Future<void> startAll() async {
|
||||
await Future.wait(_jobs.values.map((j) => j.start()));
|
||||
}
|
||||
|
||||
Future<void> stopAll() async {
|
||||
await Future.wait(_jobs.values.map((j) => j.stop()));
|
||||
}
|
||||
|
||||
void cancelAll() {
|
||||
for (final job in _jobs.values) {
|
||||
job.cancelAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class JobManager {
|
||||
final _JobRegistry _registry = _JobRegistry();
|
||||
|
||||
bool _isInitialized = false;
|
||||
static final JobManager I = JobManager._();
|
||||
JobManager._();
|
||||
|
||||
LocalSyncJob? get localSyncJob => _registry.get<LocalSyncInput, void>(JobType.localSync.id) as LocalSyncJob?;
|
||||
|
||||
RemoteSyncJob? get remoteSyncJob => _registry.get<void, void>(JobType.remoteSync.id) as RemoteSyncJob?;
|
||||
|
||||
void init() {
|
||||
if (_isInitialized) {
|
||||
return;
|
||||
}
|
||||
|
||||
_registry.register(LocalSyncJob());
|
||||
_registry.register(RemoteSyncJob());
|
||||
_isInitialized = true;
|
||||
}
|
||||
|
||||
Future<void> start() => _registry.startAll();
|
||||
|
||||
Future<void> stop() => _registry.stopAll();
|
||||
|
||||
void triggerLocalSync({bool fullSync = false}) {
|
||||
localSyncJob?.trigger(LocalSyncInput(fullSync: fullSync));
|
||||
}
|
||||
|
||||
void triggerRemoteSync({bool syncReset = false}) {
|
||||
remoteSyncJob?.trigger(RemoteSyncInput(syncReset: syncReset));
|
||||
}
|
||||
}
|
||||
79
mobile/lib/domain/jobs/job_messages.dart
Normal file
79
mobile/lib/domain/jobs/job_messages.dart
Normal file
@@ -0,0 +1,79 @@
|
||||
import 'dart:isolate';
|
||||
|
||||
sealed class IsolateRequest {
|
||||
const IsolateRequest();
|
||||
}
|
||||
|
||||
class JobTriggerRequest<Input> extends IsolateRequest {
|
||||
final String jobId;
|
||||
final Input? input;
|
||||
|
||||
const JobTriggerRequest({required this.jobId, this.input});
|
||||
}
|
||||
|
||||
class JobCancelRequest extends IsolateRequest {
|
||||
final String jobId;
|
||||
|
||||
const JobCancelRequest({required this.jobId});
|
||||
}
|
||||
|
||||
class IsolateCancelAllRequest extends IsolateRequest {
|
||||
const IsolateCancelAllRequest();
|
||||
}
|
||||
|
||||
class IsolateShutdownRequest extends IsolateRequest {
|
||||
const IsolateShutdownRequest();
|
||||
}
|
||||
|
||||
sealed class IsolateResponse {
|
||||
const IsolateResponse();
|
||||
}
|
||||
|
||||
class IsolateReadyResponse extends IsolateResponse {
|
||||
final SendPort requestPort;
|
||||
|
||||
const IsolateReadyResponse(this.requestPort);
|
||||
}
|
||||
|
||||
class JobProgressResponse extends IsolateResponse {
|
||||
final String jobId;
|
||||
final double? progress;
|
||||
final int? current;
|
||||
final int? total;
|
||||
|
||||
const JobProgressResponse({required this.jobId, this.progress, this.current, this.total});
|
||||
}
|
||||
|
||||
class JobCompleteResponse<Output> extends IsolateResponse {
|
||||
final String jobId;
|
||||
final Output? result;
|
||||
|
||||
const JobCompleteResponse({required this.jobId, this.result});
|
||||
}
|
||||
|
||||
class JobErrorResponse extends IsolateResponse {
|
||||
final String jobId;
|
||||
final String error;
|
||||
final String? stackTrace;
|
||||
|
||||
const JobErrorResponse({required this.jobId, required this.error, this.stackTrace});
|
||||
}
|
||||
|
||||
class JobCancelledResponse extends IsolateResponse {
|
||||
final String jobId;
|
||||
|
||||
const JobCancelledResponse({required this.jobId});
|
||||
}
|
||||
|
||||
class JobSkippedResponse extends IsolateResponse {
|
||||
final String jobId;
|
||||
|
||||
const JobSkippedResponse({required this.jobId});
|
||||
}
|
||||
|
||||
class IsolateErrorResponse extends IsolateResponse {
|
||||
final String error;
|
||||
final String? stackTrace;
|
||||
|
||||
const IsolateErrorResponse({required this.error, this.stackTrace});
|
||||
}
|
||||
62
mobile/lib/domain/jobs/job_status.dart
Normal file
62
mobile/lib/domain/jobs/job_status.dart
Normal file
@@ -0,0 +1,62 @@
|
||||
enum JobStatus { running, completed, error, cancelled, skipped }
|
||||
|
||||
class JobProgress<T> {
|
||||
final JobStatus status;
|
||||
|
||||
// JobStatus.completed
|
||||
final T? result;
|
||||
|
||||
// JobStatus.running
|
||||
final double? progress;
|
||||
final int? current;
|
||||
final int? total;
|
||||
|
||||
// JobStatus.error
|
||||
final String? error;
|
||||
|
||||
const JobProgress({required this.status, this.result, this.progress, this.current, this.total, this.error});
|
||||
|
||||
bool get isRunning => status == JobStatus.running;
|
||||
bool get isComplete => status == JobStatus.completed;
|
||||
bool get isError => status == JobStatus.error;
|
||||
bool get isCancelled => status == JobStatus.cancelled;
|
||||
|
||||
factory JobProgress.running({double? progress, int? current, int? total}) {
|
||||
assert(progress == null || (progress >= 0 && progress <= 1), 'Progress must be 0-1');
|
||||
assert(current == null || total == null || current <= total, 'Current must be <= total');
|
||||
return JobProgress(status: JobStatus.running, progress: progress, current: current, total: total);
|
||||
}
|
||||
|
||||
factory JobProgress.completed({T? result}) => JobProgress(status: JobStatus.completed, result: result);
|
||||
|
||||
factory JobProgress.error(String error) => JobProgress(status: JobStatus.error, error: error);
|
||||
|
||||
factory JobProgress.cancelled() => const JobProgress(status: JobStatus.cancelled);
|
||||
|
||||
factory JobProgress.skipped() => const JobProgress(status: JobStatus.skipped);
|
||||
|
||||
@override
|
||||
String toString() => switch (status) {
|
||||
JobStatus.running => 'JobProgress(running: progress=$progress, current=$current, total=$total)',
|
||||
JobStatus.completed => 'JobProgress(completed: result=$result)',
|
||||
JobStatus.error => 'JobProgress(error: error=$error)',
|
||||
JobStatus.cancelled => 'JobProgress(cancelled)',
|
||||
JobStatus.skipped => 'JobProgress(skipped)',
|
||||
};
|
||||
|
||||
@override
|
||||
bool operator ==(Object other) =>
|
||||
identical(this, other) ||
|
||||
other is JobProgress<T> &&
|
||||
runtimeType == other.runtimeType &&
|
||||
status == other.status &&
|
||||
result == other.result &&
|
||||
progress == other.progress &&
|
||||
current == other.current &&
|
||||
total == other.total &&
|
||||
error == other.error;
|
||||
|
||||
@override
|
||||
int get hashCode =>
|
||||
status.hashCode ^ result.hashCode ^ progress.hashCode ^ current.hashCode ^ total.hashCode ^ error.hashCode;
|
||||
}
|
||||
17
mobile/lib/domain/jobs/job_types.dart
Normal file
17
mobile/lib/domain/jobs/job_types.dart
Normal file
@@ -0,0 +1,17 @@
|
||||
import 'package:immich_mobile/domain/jobs/job_context.dart';
|
||||
import 'package:immich_mobile/domain/jobs/job_handler.dart';
|
||||
import 'package:immich_mobile/domain/jobs/local_sync/local_sync.job.dart';
|
||||
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync.job.dart';
|
||||
|
||||
enum JobType {
|
||||
localSync._('local_sync'),
|
||||
remoteSync._('remote_sync');
|
||||
|
||||
final String id;
|
||||
const JobType._(this.id);
|
||||
}
|
||||
|
||||
JobHandler getJobHandler(JobType type, JobContext context) => switch (type) {
|
||||
JobType.localSync => LocalSyncJobHandler(context: context),
|
||||
JobType.remoteSync => RemoteSyncJobHandler(context: context),
|
||||
};
|
||||
52
mobile/lib/domain/jobs/local_sync/local_sync.job.dart
Normal file
52
mobile/lib/domain/jobs/local_sync/local_sync.job.dart
Normal file
@@ -0,0 +1,52 @@
|
||||
import 'package:cancellation_token_http/http.dart';
|
||||
import 'package:immich_mobile/domain/jobs/job.dart';
|
||||
import 'package:immich_mobile/domain/jobs/job_handler.dart';
|
||||
import 'package:immich_mobile/domain/jobs/job_types.dart';
|
||||
import 'package:immich_mobile/domain/jobs/local_sync/local_sync.service.dart';
|
||||
import 'package:immich_mobile/providers/infrastructure/album.provider.dart';
|
||||
import 'package:immich_mobile/providers/infrastructure/platform.provider.dart';
|
||||
|
||||
class LocalSyncInput {
|
||||
final bool fullSync;
|
||||
|
||||
const LocalSyncInput({this.fullSync = false});
|
||||
}
|
||||
|
||||
class LocalSyncJob extends Job<LocalSyncInput, void> {
|
||||
@override
|
||||
JobType get type => JobType.localSync;
|
||||
|
||||
@override
|
||||
Duration get idleInterval => const Duration(minutes: 3);
|
||||
|
||||
@override
|
||||
Future<void> onIdle() async {
|
||||
// Trigger a partial sync on idle
|
||||
trigger(const LocalSyncInput(fullSync: false));
|
||||
}
|
||||
}
|
||||
|
||||
class LocalSyncJobHandler extends JobHandler<LocalSyncInput> {
|
||||
late final LocalSyncService _localSyncService;
|
||||
|
||||
LocalSyncJobHandler({required super.context}) {
|
||||
LocalSyncService(
|
||||
localAlbumRepository: context.read(localAlbumRepository),
|
||||
nativeSyncApi: context.read(nativeSyncApiProvider),
|
||||
);
|
||||
}
|
||||
|
||||
@override
|
||||
bool shouldSkipInput(LocalSyncInput newInput, LocalSyncInput existingInput) {
|
||||
// Skip if a full sync is already scheduled/running
|
||||
if (existingInput.fullSync) {
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
@override
|
||||
Future<void> processJob(String jobId, LocalSyncInput input, CancellationToken token) =>
|
||||
_localSyncService.sync(full: input.fullSync, token: token);
|
||||
}
|
||||
308
mobile/lib/domain/jobs/local_sync/local_sync.service.dart
Normal file
308
mobile/lib/domain/jobs/local_sync/local_sync.service.dart
Normal file
@@ -0,0 +1,308 @@
|
||||
import 'dart:async';
|
||||
|
||||
import 'package:cancellation_token_http/http.dart';
|
||||
import 'package:collection/collection.dart';
|
||||
import 'package:flutter/foundation.dart';
|
||||
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
|
||||
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
|
||||
import 'package:immich_mobile/extensions/platform_extensions.dart';
|
||||
import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart';
|
||||
import 'package:immich_mobile/platform/native_sync_api.g.dart';
|
||||
import 'package:immich_mobile/utils/datetime_helpers.dart';
|
||||
import 'package:immich_mobile/utils/diff.dart';
|
||||
import 'package:logging/logging.dart';
|
||||
|
||||
class LocalSyncService {
|
||||
final DriftLocalAlbumRepository _localAlbumRepository;
|
||||
final NativeSyncApi _nativeSyncApi;
|
||||
|
||||
final Logger _log = Logger("DeviceSyncService");
|
||||
|
||||
LocalSyncService({required DriftLocalAlbumRepository localAlbumRepository, required NativeSyncApi nativeSyncApi})
|
||||
: _localAlbumRepository = localAlbumRepository,
|
||||
_nativeSyncApi = nativeSyncApi;
|
||||
|
||||
Future<void> sync({bool full = false, CancellationToken? token}) async {
|
||||
final Stopwatch stopwatch = Stopwatch()..start();
|
||||
|
||||
if (full || await _nativeSyncApi.shouldFullSync()) {
|
||||
_log.fine("Full sync request from ${full ? "user" : "native"}");
|
||||
token?.throwIfCancelled();
|
||||
return await fullSync(token: token);
|
||||
}
|
||||
|
||||
token?.throwIfCancelled();
|
||||
final delta = await _nativeSyncApi.getMediaChanges();
|
||||
if (!delta.hasChanges) {
|
||||
_log.fine("No media changes detected. Skipping sync");
|
||||
return;
|
||||
}
|
||||
|
||||
_log.fine("Delta updated: ${delta.updates.length}");
|
||||
_log.fine("Delta deleted: ${delta.deletes.length}");
|
||||
|
||||
final deviceAlbums = await _nativeSyncApi.getAlbums();
|
||||
await _localAlbumRepository.updateAll(deviceAlbums.toLocalAlbums());
|
||||
|
||||
token?.throwIfCancelled();
|
||||
await _localAlbumRepository.processDelta(
|
||||
updates: delta.updates.toLocalAssets(),
|
||||
deletes: delta.deletes,
|
||||
assetAlbums: delta.assetAlbums,
|
||||
);
|
||||
|
||||
final dbAlbums = await _localAlbumRepository.getAll();
|
||||
// On Android, we need to sync all albums since it is not possible to
|
||||
// detect album deletions from the native side
|
||||
if (CurrentPlatform.isAndroid) {
|
||||
for (final album in dbAlbums) {
|
||||
token?.throwIfCancelled();
|
||||
final deviceIds = await _nativeSyncApi.getAssetIdsForAlbum(album.id);
|
||||
await _localAlbumRepository.syncDeletes(album.id, deviceIds);
|
||||
}
|
||||
}
|
||||
|
||||
if (CurrentPlatform.isIOS) {
|
||||
// On iOS, we need to full sync albums that are marked as cloud as the delta sync
|
||||
// does not include changes for cloud albums. If ignoreIcloudAssets is enabled,
|
||||
// remove the albums from the local database from the previous sync
|
||||
final cloudAlbums = deviceAlbums.where((a) => a.isCloud).toLocalAlbums();
|
||||
for (final album in cloudAlbums) {
|
||||
token?.throwIfCancelled();
|
||||
final dbAlbum = dbAlbums.firstWhereOrNull((a) => a.id == album.id);
|
||||
if (dbAlbum == null) {
|
||||
_log.warning("Cloud album ${album.name} not found in local database. Skipping sync.");
|
||||
continue;
|
||||
}
|
||||
await updateAlbum(dbAlbum, album, token: token);
|
||||
}
|
||||
}
|
||||
|
||||
await _nativeSyncApi.checkpointSync();
|
||||
|
||||
stopwatch.stop();
|
||||
_log.info("Device sync took - ${stopwatch.elapsedMilliseconds}ms");
|
||||
}
|
||||
|
||||
Future<void> fullSync({CancellationToken? token}) async {
|
||||
final Stopwatch stopwatch = Stopwatch()..start();
|
||||
|
||||
final deviceAlbums = await _nativeSyncApi.getAlbums();
|
||||
final dbAlbums = await _localAlbumRepository.getAll(sortBy: {SortLocalAlbumsBy.id});
|
||||
|
||||
await diffSortedLists(
|
||||
dbAlbums,
|
||||
deviceAlbums.toLocalAlbums(),
|
||||
compare: (a, b) => a.id.compareTo(b.id),
|
||||
both: (LocalAlbum oldAlbum, LocalAlbum newAlbum) {
|
||||
token?.throwIfCancelled();
|
||||
return updateAlbum(oldAlbum, newAlbum, token: token);
|
||||
},
|
||||
onlyFirst: (album) {
|
||||
token?.throwIfCancelled();
|
||||
return removeAlbum(album);
|
||||
},
|
||||
onlySecond: (album) {
|
||||
token?.throwIfCancelled();
|
||||
return addAlbum(album, token: token);
|
||||
},
|
||||
);
|
||||
|
||||
await _nativeSyncApi.checkpointSync();
|
||||
stopwatch.stop();
|
||||
_log.info("Full device sync took - ${stopwatch.elapsedMilliseconds}ms");
|
||||
}
|
||||
|
||||
Future<void> addAlbum(LocalAlbum album, {CancellationToken? token}) async {
|
||||
_log.fine("Adding device album ${album.name}");
|
||||
|
||||
final assets = album.assetCount > 0 ? await _nativeSyncApi.getAssetsForAlbum(album.id) : <PlatformAsset>[];
|
||||
token?.throwIfCancelled();
|
||||
await _localAlbumRepository.upsert(album, toUpsert: assets.toLocalAssets());
|
||||
_log.fine("Successfully added device album ${album.name}");
|
||||
}
|
||||
|
||||
Future<void> removeAlbum(LocalAlbum a) async {
|
||||
_log.fine("Removing device album ${a.name}");
|
||||
// Asset deletion is handled in the repository
|
||||
await _localAlbumRepository.delete(a.id);
|
||||
}
|
||||
|
||||
// The deviceAlbum is ignored since we are going to refresh it anyways
|
||||
FutureOr<bool> updateAlbum(LocalAlbum dbAlbum, LocalAlbum deviceAlbum, {CancellationToken? token}) async {
|
||||
_log.fine("Syncing device album ${dbAlbum.name}");
|
||||
|
||||
if (_albumsEqual(deviceAlbum, dbAlbum)) {
|
||||
_log.fine("Device album ${dbAlbum.name} has not changed. Skipping sync.");
|
||||
return false;
|
||||
}
|
||||
|
||||
_log.fine("Device album ${dbAlbum.name} has changed. Syncing...");
|
||||
|
||||
// Faster path - only new assets added
|
||||
token?.throwIfCancelled();
|
||||
if (await checkAddition(dbAlbum, deviceAlbum, token: token)) {
|
||||
_log.fine("Fast synced device album ${dbAlbum.name}");
|
||||
return true;
|
||||
}
|
||||
|
||||
// Slower path - full sync
|
||||
token?.throwIfCancelled();
|
||||
return await fullDiff(dbAlbum, deviceAlbum, token: token);
|
||||
}
|
||||
|
||||
@visibleForTesting
|
||||
// The [deviceAlbum] is expected to be refreshed before calling this method
|
||||
// with modified time and asset count
|
||||
Future<bool> checkAddition(LocalAlbum dbAlbum, LocalAlbum deviceAlbum, {CancellationToken? token}) async {
|
||||
_log.fine("Fast syncing device album ${dbAlbum.name}");
|
||||
// Assets has been modified
|
||||
if (deviceAlbum.assetCount <= dbAlbum.assetCount) {
|
||||
_log.fine("Local album has modifications. Proceeding to full sync");
|
||||
return false;
|
||||
}
|
||||
|
||||
final updatedTime = (dbAlbum.updatedAt.millisecondsSinceEpoch ~/ 1000) + 1;
|
||||
token?.throwIfCancelled();
|
||||
final newAssetsCount = await _nativeSyncApi.getAssetsCountSince(deviceAlbum.id, updatedTime);
|
||||
|
||||
// Early return if no new assets were found
|
||||
if (newAssetsCount == 0) {
|
||||
_log.fine("No new assets found despite album having changes. Proceeding to full sync for ${dbAlbum.name}");
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check whether there is only addition or if there has been deletions
|
||||
if (deviceAlbum.assetCount != dbAlbum.assetCount + newAssetsCount) {
|
||||
_log.fine("Local album has modifications. Proceeding to full sync");
|
||||
return false;
|
||||
}
|
||||
|
||||
token?.throwIfCancelled();
|
||||
final newAssets = await _nativeSyncApi.getAssetsForAlbum(deviceAlbum.id, updatedTimeCond: updatedTime);
|
||||
|
||||
token?.throwIfCancelled();
|
||||
await _localAlbumRepository.upsert(
|
||||
deviceAlbum.copyWith(backupSelection: dbAlbum.backupSelection),
|
||||
toUpsert: newAssets.toLocalAssets(),
|
||||
);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
@visibleForTesting
|
||||
// The [deviceAlbum] is expected to be refreshed before calling this method
|
||||
// with modified time and asset count
|
||||
Future<bool> fullDiff(LocalAlbum dbAlbum, LocalAlbum deviceAlbum, {CancellationToken? token}) async {
|
||||
token?.throwIfCancelled();
|
||||
final assetsInDevice = deviceAlbum.assetCount > 0
|
||||
? await _nativeSyncApi.getAssetsForAlbum(deviceAlbum.id).then((a) => a.toLocalAssets())
|
||||
: <LocalAsset>[];
|
||||
final assetsInDb = dbAlbum.assetCount > 0 ? await _localAlbumRepository.getAssets(dbAlbum.id) : <LocalAsset>[];
|
||||
|
||||
if (deviceAlbum.assetCount == 0) {
|
||||
_log.fine("Device album ${deviceAlbum.name} is empty. Removing assets from DB.");
|
||||
token?.throwIfCancelled();
|
||||
await _localAlbumRepository.upsert(
|
||||
deviceAlbum.copyWith(backupSelection: dbAlbum.backupSelection),
|
||||
toDelete: assetsInDb.map((a) => a.id),
|
||||
);
|
||||
return true;
|
||||
}
|
||||
|
||||
final updatedDeviceAlbum = deviceAlbum.copyWith(backupSelection: dbAlbum.backupSelection);
|
||||
|
||||
if (dbAlbum.assetCount == 0) {
|
||||
_log.fine("Device album ${deviceAlbum.name} is empty. Adding assets to DB.");
|
||||
token?.throwIfCancelled();
|
||||
await _localAlbumRepository.upsert(updatedDeviceAlbum, toUpsert: assetsInDevice);
|
||||
return true;
|
||||
}
|
||||
|
||||
assert(assetsInDb.isSortedBy((a) => a.id));
|
||||
assetsInDevice.sort((a, b) => a.id.compareTo(b.id));
|
||||
|
||||
final assetsToUpsert = <LocalAsset>[];
|
||||
final assetsToDelete = <String>[];
|
||||
|
||||
token?.throwIfCancelled();
|
||||
diffSortedListsSync(
|
||||
assetsInDb,
|
||||
assetsInDevice,
|
||||
compare: (a, b) => a.id.compareTo(b.id),
|
||||
both: (dbAsset, deviceAsset) {
|
||||
token?.throwIfCancelled();
|
||||
// Custom comparison to check if the asset has been modified without
|
||||
// comparing the checksum
|
||||
if (!_assetsEqual(dbAsset, deviceAsset)) {
|
||||
assetsToUpsert.add(deviceAsset);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
},
|
||||
onlyFirst: (dbAsset) => assetsToDelete.add(dbAsset.id),
|
||||
onlySecond: (deviceAsset) => assetsToUpsert.add(deviceAsset),
|
||||
);
|
||||
|
||||
_log.fine(
|
||||
"Syncing ${deviceAlbum.name}. ${assetsToUpsert.length} assets to add/update and ${assetsToDelete.length} assets to delete",
|
||||
);
|
||||
|
||||
token?.throwIfCancelled();
|
||||
if (assetsToUpsert.isEmpty && assetsToDelete.isEmpty) {
|
||||
_log.fine("No asset changes detected in album ${deviceAlbum.name}. Updating metadata.");
|
||||
_localAlbumRepository.upsert(updatedDeviceAlbum);
|
||||
return true;
|
||||
}
|
||||
|
||||
token?.throwIfCancelled();
|
||||
await _localAlbumRepository.upsert(updatedDeviceAlbum, toUpsert: assetsToUpsert, toDelete: assetsToDelete);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
bool _assetsEqual(LocalAsset a, LocalAsset b) {
|
||||
return a.updatedAt.isAtSameMomentAs(b.updatedAt) &&
|
||||
a.createdAt.isAtSameMomentAs(b.createdAt) &&
|
||||
a.width == b.width &&
|
||||
a.height == b.height &&
|
||||
a.durationInSeconds == b.durationInSeconds;
|
||||
}
|
||||
|
||||
bool _albumsEqual(LocalAlbum a, LocalAlbum b) {
|
||||
return a.name == b.name && a.assetCount == b.assetCount && a.updatedAt.isAtSameMomentAs(b.updatedAt);
|
||||
}
|
||||
}
|
||||
|
||||
extension on Iterable<PlatformAlbum> {
|
||||
List<LocalAlbum> toLocalAlbums() {
|
||||
return map(
|
||||
(e) => LocalAlbum(
|
||||
id: e.id,
|
||||
name: e.name,
|
||||
updatedAt: tryFromSecondsSinceEpoch(e.updatedAt, isUtc: true) ?? DateTime.timestamp(),
|
||||
assetCount: e.assetCount,
|
||||
),
|
||||
).toList();
|
||||
}
|
||||
}
|
||||
|
||||
extension on Iterable<PlatformAsset> {
|
||||
List<LocalAsset> toLocalAssets() {
|
||||
return map(
|
||||
(e) => LocalAsset(
|
||||
id: e.id,
|
||||
name: e.name,
|
||||
checksum: null,
|
||||
type: AssetType.values.elementAtOrNull(e.type) ?? AssetType.other,
|
||||
createdAt: tryFromSecondsSinceEpoch(e.createdAt, isUtc: true) ?? DateTime.timestamp(),
|
||||
updatedAt: tryFromSecondsSinceEpoch(e.updatedAt, isUtc: true) ?? DateTime.timestamp(),
|
||||
width: e.width,
|
||||
height: e.height,
|
||||
durationInSeconds: e.durationInSeconds,
|
||||
orientation: e.orientation,
|
||||
isFavorite: e.isFavorite,
|
||||
),
|
||||
).toList();
|
||||
}
|
||||
}
|
||||
46
mobile/lib/domain/jobs/remote_sync/remote_sync.job.dart
Normal file
46
mobile/lib/domain/jobs/remote_sync/remote_sync.job.dart
Normal file
@@ -0,0 +1,46 @@
|
||||
import 'package:cancellation_token_http/http.dart';
|
||||
import 'package:immich_mobile/domain/jobs/job.dart';
|
||||
import 'package:immich_mobile/domain/jobs/job_handler.dart';
|
||||
import 'package:immich_mobile/domain/jobs/job_types.dart';
|
||||
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync.service.dart';
|
||||
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync_api.repository.dart';
|
||||
import 'package:immich_mobile/providers/infrastructure/sync.provider.dart';
|
||||
|
||||
class RemoteSyncInput {
|
||||
final bool syncReset;
|
||||
|
||||
const RemoteSyncInput({this.syncReset = false});
|
||||
}
|
||||
|
||||
class RemoteSyncJob extends Job<RemoteSyncInput, RemoteSyncStatus> {
|
||||
@override
|
||||
JobType get type => JobType.remoteSync;
|
||||
|
||||
@override
|
||||
Duration get idleInterval => const Duration(seconds: 30);
|
||||
|
||||
@override
|
||||
Future<void> onIdle() async {
|
||||
trigger(const RemoteSyncInput(syncReset: false));
|
||||
}
|
||||
}
|
||||
|
||||
class RemoteSyncJobHandler extends JobHandler<RemoteSyncInput> {
|
||||
late final RemoteSyncService _remoteSyncService;
|
||||
|
||||
RemoteSyncJobHandler({required super.context}) {
|
||||
_remoteSyncService = RemoteSyncService(
|
||||
syncApiRepository: context.read(remoteSyncApiRepositoryProvider),
|
||||
syncStreamRepository: context.read(syncStreamRepositoryProvider),
|
||||
);
|
||||
}
|
||||
|
||||
@override
|
||||
Future<RemoteSyncStatus> processJob(String jobId, RemoteSyncInput input, CancellationToken token) async {
|
||||
final status = await _remoteSyncService.sync(syncReset: input.syncReset, token: token);
|
||||
if (status != RemoteSyncStatus.reset) {
|
||||
return status;
|
||||
}
|
||||
return _remoteSyncService.sync(syncReset: false, token: token);
|
||||
}
|
||||
}
|
||||
161
mobile/lib/domain/jobs/remote_sync/remote_sync.service.dart
Normal file
161
mobile/lib/domain/jobs/remote_sync/remote_sync.service.dart
Normal file
@@ -0,0 +1,161 @@
|
||||
import 'dart:async';
|
||||
|
||||
import 'package:cancellation_token_http/http.dart';
|
||||
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync_api.repository.dart';
|
||||
import 'package:immich_mobile/domain/models/sync_event.model.dart';
|
||||
import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository.dart';
|
||||
import 'package:logging/logging.dart';
|
||||
import 'package:openapi/api.dart';
|
||||
|
||||
enum RemoteSyncStatus { success, failed, reset }
|
||||
|
||||
class RemoteSyncService {
|
||||
final Logger _logger = Logger('RemoteSyncService');
|
||||
|
||||
final RemoteSyncApiRepository _syncApiRepository;
|
||||
final SyncStreamRepository _syncStreamRepository;
|
||||
|
||||
RemoteSyncService({
|
||||
required RemoteSyncApiRepository syncApiRepository,
|
||||
required SyncStreamRepository syncStreamRepository,
|
||||
}) : _syncApiRepository = syncApiRepository,
|
||||
_syncStreamRepository = syncStreamRepository;
|
||||
|
||||
Future<RemoteSyncStatus> sync({syncReset = false, CancellationToken? token}) async {
|
||||
_logger.info("Remote sync request for user");
|
||||
return await _syncApiRepository.streamChanges(_handleEvents, token: token, syncReset: syncReset);
|
||||
}
|
||||
|
||||
Future<RemoteSyncStatus> _handleEvents(List<SyncEvent> events, {CancellationToken? token}) async {
|
||||
List<SyncEvent> items = [];
|
||||
for (final event in events) {
|
||||
token?.throwIfCancelled();
|
||||
|
||||
if (event.type != items.firstOrNull?.type) {
|
||||
await _processBatch(items);
|
||||
}
|
||||
|
||||
if (event.type == SyncEntityType.syncResetV1) {
|
||||
return RemoteSyncStatus.reset;
|
||||
}
|
||||
|
||||
items.add(event);
|
||||
}
|
||||
|
||||
await _processBatch(items);
|
||||
return RemoteSyncStatus.success;
|
||||
}
|
||||
|
||||
Future<void> _processBatch(List<SyncEvent> batch) async {
|
||||
if (batch.isEmpty) {
|
||||
return;
|
||||
}
|
||||
|
||||
final type = batch.first.type;
|
||||
await _handleSyncData(type, batch.map((e) => e.data));
|
||||
await _syncApiRepository.ack([batch.last.ack]);
|
||||
batch.clear();
|
||||
}
|
||||
|
||||
Future<void> _handleSyncData(SyncEntityType type, Iterable<Object> data) async {
|
||||
_logger.fine("Processing sync data for $type of length ${data.length}");
|
||||
switch (type) {
|
||||
case SyncEntityType.authUserV1:
|
||||
return _syncStreamRepository.updateAuthUsersV1(data.cast());
|
||||
case SyncEntityType.userV1:
|
||||
return _syncStreamRepository.updateUsersV1(data.cast());
|
||||
case SyncEntityType.userDeleteV1:
|
||||
return _syncStreamRepository.deleteUsersV1(data.cast());
|
||||
case SyncEntityType.partnerV1:
|
||||
return _syncStreamRepository.updatePartnerV1(data.cast());
|
||||
case SyncEntityType.partnerDeleteV1:
|
||||
return _syncStreamRepository.deletePartnerV1(data.cast());
|
||||
case SyncEntityType.assetV1:
|
||||
return _syncStreamRepository.updateAssetsV1(data.cast());
|
||||
case SyncEntityType.assetDeleteV1:
|
||||
return _syncStreamRepository.deleteAssetsV1(data.cast());
|
||||
case SyncEntityType.assetExifV1:
|
||||
return _syncStreamRepository.updateAssetsExifV1(data.cast());
|
||||
case SyncEntityType.partnerAssetV1:
|
||||
return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'partner');
|
||||
case SyncEntityType.partnerAssetBackfillV1:
|
||||
return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'partner backfill');
|
||||
case SyncEntityType.partnerAssetDeleteV1:
|
||||
return _syncStreamRepository.deleteAssetsV1(data.cast(), debugLabel: "partner");
|
||||
case SyncEntityType.partnerAssetExifV1:
|
||||
return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'partner');
|
||||
case SyncEntityType.partnerAssetExifBackfillV1:
|
||||
return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'partner backfill');
|
||||
case SyncEntityType.albumV1:
|
||||
return _syncStreamRepository.updateAlbumsV1(data.cast());
|
||||
case SyncEntityType.albumDeleteV1:
|
||||
return _syncStreamRepository.deleteAlbumsV1(data.cast());
|
||||
case SyncEntityType.albumUserV1:
|
||||
return _syncStreamRepository.updateAlbumUsersV1(data.cast());
|
||||
case SyncEntityType.albumUserBackfillV1:
|
||||
return _syncStreamRepository.updateAlbumUsersV1(data.cast(), debugLabel: 'backfill');
|
||||
case SyncEntityType.albumUserDeleteV1:
|
||||
return _syncStreamRepository.deleteAlbumUsersV1(data.cast());
|
||||
case SyncEntityType.albumAssetCreateV1:
|
||||
return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'album asset create');
|
||||
case SyncEntityType.albumAssetUpdateV1:
|
||||
return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'album asset update');
|
||||
case SyncEntityType.albumAssetBackfillV1:
|
||||
return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'album asset backfill');
|
||||
case SyncEntityType.albumAssetExifCreateV1:
|
||||
return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'album asset exif create');
|
||||
case SyncEntityType.albumAssetExifUpdateV1:
|
||||
return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'album asset exif update');
|
||||
case SyncEntityType.albumAssetExifBackfillV1:
|
||||
return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'album asset exif backfill');
|
||||
case SyncEntityType.albumToAssetV1:
|
||||
return _syncStreamRepository.updateAlbumToAssetsV1(data.cast());
|
||||
case SyncEntityType.albumToAssetBackfillV1:
|
||||
return _syncStreamRepository.updateAlbumToAssetsV1(data.cast(), debugLabel: 'backfill');
|
||||
case SyncEntityType.albumToAssetDeleteV1:
|
||||
return _syncStreamRepository.deleteAlbumToAssetsV1(data.cast());
|
||||
// No-op. SyncAckV1 entities are checkpoints in the sync stream
|
||||
// to acknowledge that the client has processed all the backfill events
|
||||
case SyncEntityType.syncAckV1:
|
||||
return;
|
||||
// No-op. SyncCompleteV1 is used to signal the completion of the sync process
|
||||
case SyncEntityType.syncCompleteV1:
|
||||
return;
|
||||
// Request to reset the client state. Clear everything related to remote entities
|
||||
case SyncEntityType.syncResetV1:
|
||||
return _syncStreamRepository.reset();
|
||||
case SyncEntityType.memoryV1:
|
||||
return _syncStreamRepository.updateMemoriesV1(data.cast());
|
||||
case SyncEntityType.memoryDeleteV1:
|
||||
return _syncStreamRepository.deleteMemoriesV1(data.cast());
|
||||
case SyncEntityType.memoryToAssetV1:
|
||||
return _syncStreamRepository.updateMemoryAssetsV1(data.cast());
|
||||
case SyncEntityType.memoryToAssetDeleteV1:
|
||||
return _syncStreamRepository.deleteMemoryAssetsV1(data.cast());
|
||||
case SyncEntityType.stackV1:
|
||||
return _syncStreamRepository.updateStacksV1(data.cast());
|
||||
case SyncEntityType.stackDeleteV1:
|
||||
return _syncStreamRepository.deleteStacksV1(data.cast());
|
||||
case SyncEntityType.partnerStackV1:
|
||||
return _syncStreamRepository.updateStacksV1(data.cast(), debugLabel: 'partner');
|
||||
case SyncEntityType.partnerStackBackfillV1:
|
||||
return _syncStreamRepository.updateStacksV1(data.cast(), debugLabel: 'partner backfill');
|
||||
case SyncEntityType.partnerStackDeleteV1:
|
||||
return _syncStreamRepository.deleteStacksV1(data.cast(), debugLabel: 'partner');
|
||||
case SyncEntityType.userMetadataV1:
|
||||
return _syncStreamRepository.updateUserMetadatasV1(data.cast());
|
||||
case SyncEntityType.userMetadataDeleteV1:
|
||||
return _syncStreamRepository.deleteUserMetadatasV1(data.cast());
|
||||
case SyncEntityType.personV1:
|
||||
return _syncStreamRepository.updatePeopleV1(data.cast());
|
||||
case SyncEntityType.personDeleteV1:
|
||||
return _syncStreamRepository.deletePeopleV1(data.cast());
|
||||
case SyncEntityType.assetFaceV1:
|
||||
return _syncStreamRepository.updateAssetFacesV1(data.cast());
|
||||
case SyncEntityType.assetFaceDeleteV1:
|
||||
return _syncStreamRepository.deleteAssetFacesV1(data.cast());
|
||||
default:
|
||||
_logger.warning("Unknown sync data type: $type");
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,192 @@
|
||||
import 'dart:async';
|
||||
import 'dart:convert';
|
||||
|
||||
import 'package:cancellation_token_http/http.dart';
|
||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||
import 'package:http/http.dart' as http;
|
||||
import 'package:immich_mobile/constants/constants.dart';
|
||||
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync.service.dart';
|
||||
import 'package:immich_mobile/domain/models/store.model.dart';
|
||||
import 'package:immich_mobile/domain/models/sync_event.model.dart';
|
||||
import 'package:immich_mobile/entities/store.entity.dart';
|
||||
import 'package:immich_mobile/providers/api.provider.dart';
|
||||
import 'package:immich_mobile/services/api.service.dart';
|
||||
import 'package:logging/logging.dart';
|
||||
import 'package:openapi/api.dart';
|
||||
|
||||
final remoteSyncApiRepositoryProvider = Provider<RemoteSyncApiRepository>((ref) {
|
||||
return RemoteSyncApiRepository(ref.watch(apiServiceProvider));
|
||||
});
|
||||
|
||||
class RemoteSyncApiRepository {
|
||||
final Logger _logger = Logger('RemoteSyncApiRepository');
|
||||
final ApiService _api;
|
||||
RemoteSyncApiRepository(this._api);
|
||||
|
||||
Future<void> ack(List<String> data) {
|
||||
return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: data));
|
||||
}
|
||||
|
||||
Future<RemoteSyncStatus> streamChanges(
|
||||
Future<RemoteSyncStatus> Function(List<SyncEvent>, {CancellationToken? token}) onData, {
|
||||
bool syncReset = false,
|
||||
CancellationToken? token,
|
||||
int batchSize = kSyncEventBatchSize,
|
||||
http.Client? httpClient,
|
||||
}) async {
|
||||
final stopwatch = Stopwatch()..start();
|
||||
final client = httpClient ?? http.Client();
|
||||
final endpoint = "${_api.apiClient.basePath}/sync/stream";
|
||||
|
||||
final headers = {'Content-Type': 'application/json', 'Accept': 'application/jsonlines+json'};
|
||||
|
||||
final headerParams = <String, String>{};
|
||||
await _api.applyToParams([], headerParams);
|
||||
headers.addAll(headerParams);
|
||||
|
||||
final request = http.Request('POST', Uri.parse(endpoint));
|
||||
request.headers.addAll(headers);
|
||||
request.body = jsonEncode(
|
||||
SyncStreamDto(
|
||||
types: [
|
||||
SyncRequestType.authUsersV1,
|
||||
SyncRequestType.usersV1,
|
||||
SyncRequestType.assetsV1,
|
||||
SyncRequestType.assetExifsV1,
|
||||
SyncRequestType.partnersV1,
|
||||
SyncRequestType.partnerAssetsV1,
|
||||
SyncRequestType.partnerAssetExifsV1,
|
||||
SyncRequestType.albumsV1,
|
||||
SyncRequestType.albumUsersV1,
|
||||
SyncRequestType.albumAssetsV1,
|
||||
SyncRequestType.albumAssetExifsV1,
|
||||
SyncRequestType.albumToAssetsV1,
|
||||
SyncRequestType.memoriesV1,
|
||||
SyncRequestType.memoryToAssetsV1,
|
||||
SyncRequestType.stacksV1,
|
||||
SyncRequestType.partnerStacksV1,
|
||||
SyncRequestType.userMetadataV1,
|
||||
SyncRequestType.peopleV1,
|
||||
SyncRequestType.assetFacesV1,
|
||||
],
|
||||
reset: syncReset,
|
||||
).toJson(),
|
||||
);
|
||||
|
||||
String previousChunk = '';
|
||||
List<String> lines = [];
|
||||
|
||||
RemoteSyncStatus status = RemoteSyncStatus.failed;
|
||||
try {
|
||||
final response = await client.send(request);
|
||||
|
||||
if (response.statusCode != 200) {
|
||||
final errorBody = await response.stream.bytesToString();
|
||||
throw ApiException(response.statusCode, 'Failed to get sync stream: $errorBody');
|
||||
}
|
||||
|
||||
// Reset after successful stream start
|
||||
await Store.put(StoreKey.shouldResetSync, false);
|
||||
|
||||
await for (final chunk in response.stream.transform(utf8.decoder)) {
|
||||
token?.throwIfCancelled();
|
||||
|
||||
previousChunk += chunk;
|
||||
final parts = previousChunk.toString().split('\n');
|
||||
previousChunk = parts.removeLast();
|
||||
lines.addAll(parts);
|
||||
|
||||
if (lines.length < batchSize) {
|
||||
continue;
|
||||
}
|
||||
|
||||
status = await onData(_parseLines(lines), token: token);
|
||||
lines.clear();
|
||||
}
|
||||
|
||||
token?.throwIfCancelled();
|
||||
if (lines.isNotEmpty) {
|
||||
status = await onData(_parseLines(lines), token: token);
|
||||
}
|
||||
} catch (error, stack) {
|
||||
return Future.error(error, stack);
|
||||
} finally {
|
||||
client.close();
|
||||
}
|
||||
stopwatch.stop();
|
||||
_logger.info("Remote Sync completed in ${stopwatch.elapsed.inMilliseconds}ms");
|
||||
return status;
|
||||
}
|
||||
|
||||
List<SyncEvent> _parseLines(List<String> lines) {
|
||||
final List<SyncEvent> data = [];
|
||||
|
||||
for (final line in lines) {
|
||||
final jsonData = jsonDecode(line);
|
||||
final type = SyncEntityType.fromJson(jsonData['type'])!;
|
||||
final dataJson = jsonData['data'];
|
||||
final ack = jsonData['ack'];
|
||||
final converter = _kResponseMap[type];
|
||||
if (converter == null) {
|
||||
_logger.warning("Unknown type $type");
|
||||
continue;
|
||||
}
|
||||
|
||||
data.add(SyncEvent(type: type, data: converter(dataJson), ack: ack));
|
||||
}
|
||||
|
||||
return data;
|
||||
}
|
||||
}
|
||||
|
||||
const _kResponseMap = <SyncEntityType, Function(Object)>{
|
||||
SyncEntityType.authUserV1: SyncAuthUserV1.fromJson,
|
||||
SyncEntityType.userV1: SyncUserV1.fromJson,
|
||||
SyncEntityType.userDeleteV1: SyncUserDeleteV1.fromJson,
|
||||
SyncEntityType.partnerV1: SyncPartnerV1.fromJson,
|
||||
SyncEntityType.partnerDeleteV1: SyncPartnerDeleteV1.fromJson,
|
||||
SyncEntityType.assetV1: SyncAssetV1.fromJson,
|
||||
SyncEntityType.assetDeleteV1: SyncAssetDeleteV1.fromJson,
|
||||
SyncEntityType.assetExifV1: SyncAssetExifV1.fromJson,
|
||||
SyncEntityType.partnerAssetV1: SyncAssetV1.fromJson,
|
||||
SyncEntityType.partnerAssetBackfillV1: SyncAssetV1.fromJson,
|
||||
SyncEntityType.partnerAssetDeleteV1: SyncAssetDeleteV1.fromJson,
|
||||
SyncEntityType.partnerAssetExifV1: SyncAssetExifV1.fromJson,
|
||||
SyncEntityType.partnerAssetExifBackfillV1: SyncAssetExifV1.fromJson,
|
||||
SyncEntityType.albumV1: SyncAlbumV1.fromJson,
|
||||
SyncEntityType.albumDeleteV1: SyncAlbumDeleteV1.fromJson,
|
||||
SyncEntityType.albumUserV1: SyncAlbumUserV1.fromJson,
|
||||
SyncEntityType.albumUserBackfillV1: SyncAlbumUserV1.fromJson,
|
||||
SyncEntityType.albumUserDeleteV1: SyncAlbumUserDeleteV1.fromJson,
|
||||
SyncEntityType.albumAssetCreateV1: SyncAssetV1.fromJson,
|
||||
SyncEntityType.albumAssetUpdateV1: SyncAssetV1.fromJson,
|
||||
SyncEntityType.albumAssetBackfillV1: SyncAssetV1.fromJson,
|
||||
SyncEntityType.albumAssetExifCreateV1: SyncAssetExifV1.fromJson,
|
||||
SyncEntityType.albumAssetExifUpdateV1: SyncAssetExifV1.fromJson,
|
||||
SyncEntityType.albumAssetExifBackfillV1: SyncAssetExifV1.fromJson,
|
||||
SyncEntityType.albumToAssetV1: SyncAlbumToAssetV1.fromJson,
|
||||
SyncEntityType.albumToAssetBackfillV1: SyncAlbumToAssetV1.fromJson,
|
||||
SyncEntityType.albumToAssetDeleteV1: SyncAlbumToAssetDeleteV1.fromJson,
|
||||
SyncEntityType.syncAckV1: _SyncEmptyDto.fromJson,
|
||||
SyncEntityType.syncResetV1: _SyncEmptyDto.fromJson,
|
||||
SyncEntityType.memoryV1: SyncMemoryV1.fromJson,
|
||||
SyncEntityType.memoryDeleteV1: SyncMemoryDeleteV1.fromJson,
|
||||
SyncEntityType.memoryToAssetV1: SyncMemoryAssetV1.fromJson,
|
||||
SyncEntityType.memoryToAssetDeleteV1: SyncMemoryAssetDeleteV1.fromJson,
|
||||
SyncEntityType.stackV1: SyncStackV1.fromJson,
|
||||
SyncEntityType.stackDeleteV1: SyncStackDeleteV1.fromJson,
|
||||
SyncEntityType.partnerStackV1: SyncStackV1.fromJson,
|
||||
SyncEntityType.partnerStackBackfillV1: SyncStackV1.fromJson,
|
||||
SyncEntityType.partnerStackDeleteV1: SyncStackDeleteV1.fromJson,
|
||||
SyncEntityType.userMetadataV1: SyncUserMetadataV1.fromJson,
|
||||
SyncEntityType.userMetadataDeleteV1: SyncUserMetadataDeleteV1.fromJson,
|
||||
SyncEntityType.personV1: SyncPersonV1.fromJson,
|
||||
SyncEntityType.personDeleteV1: SyncPersonDeleteV1.fromJson,
|
||||
SyncEntityType.assetFaceV1: SyncAssetFaceV1.fromJson,
|
||||
SyncEntityType.assetFaceDeleteV1: SyncAssetFaceDeleteV1.fromJson,
|
||||
SyncEntityType.syncCompleteV1: _SyncEmptyDto.fromJson,
|
||||
};
|
||||
|
||||
class _SyncEmptyDto {
|
||||
static _SyncEmptyDto? fromJson(dynamic _) => _SyncEmptyDto();
|
||||
}
|
||||
9
mobile/lib/extensions/completer_extensions.dart
Normal file
9
mobile/lib/extensions/completer_extensions.dart
Normal file
@@ -0,0 +1,9 @@
|
||||
import 'dart:async';
|
||||
|
||||
extension CompleterExtensions<T> on Completer<T> {
|
||||
void completeOnce([FutureOr<T>? value]) {
|
||||
if (!isCompleted) {
|
||||
complete(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -12,6 +12,7 @@ import 'package:flutter_displaymode/flutter_displaymode.dart';
|
||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||
import 'package:immich_mobile/constants/constants.dart';
|
||||
import 'package:immich_mobile/constants/locales.dart';
|
||||
import 'package:immich_mobile/domain/jobs/job_manager.dart';
|
||||
import 'package:immich_mobile/domain/services/background_worker.service.dart';
|
||||
import 'package:immich_mobile/entities/store.entity.dart';
|
||||
import 'package:immich_mobile/extensions/build_context_extensions.dart';
|
||||
@@ -53,6 +54,8 @@ void main() async {
|
||||
await initApp();
|
||||
// Warm-up isolate pool for worker manager
|
||||
await workerManager.init(dynamicSpawning: true);
|
||||
JobManager.I.init();
|
||||
await JobManager.I.start();
|
||||
await migrateDatabaseIfNeeded(isar, drift);
|
||||
HttpSSLOptions.apply();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user