mirror of
https://github.com/immich-app/immich.git
synced 2025-12-14 01:10:38 +03:00
Compare commits
1 Commits
main
...
feat/mobil
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f0fcc3a0e3 |
223
mobile/lib/domain/jobs/job.dart
Normal file
223
mobile/lib/domain/jobs/job.dart
Normal file
@@ -0,0 +1,223 @@
|
|||||||
|
import 'dart:async';
|
||||||
|
import 'dart:isolate';
|
||||||
|
import 'dart:ui';
|
||||||
|
|
||||||
|
import 'package:flutter/services.dart';
|
||||||
|
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/job_context.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/job_messages.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/job_status.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/job_types.dart';
|
||||||
|
import 'package:immich_mobile/domain/services/log.service.dart';
|
||||||
|
import 'package:immich_mobile/entities/store.entity.dart';
|
||||||
|
import 'package:immich_mobile/extensions/completer_extensions.dart';
|
||||||
|
import 'package:immich_mobile/infrastructure/repositories/db.repository.dart';
|
||||||
|
import 'package:immich_mobile/infrastructure/repositories/logger_db.repository.dart';
|
||||||
|
import 'package:immich_mobile/providers/db.provider.dart';
|
||||||
|
import 'package:immich_mobile/providers/infrastructure/db.provider.dart';
|
||||||
|
import 'package:immich_mobile/utils/bootstrap.dart';
|
||||||
|
import 'package:immich_mobile/utils/debug_print.dart';
|
||||||
|
import 'package:isar/isar.dart';
|
||||||
|
import 'package:logging/logging.dart';
|
||||||
|
|
||||||
|
part 'job_entrypoint.dart';
|
||||||
|
|
||||||
|
final _jobLogger = Logger('Job');
|
||||||
|
|
||||||
|
abstract class Job<Input, Output> {
|
||||||
|
Isolate? _isolate;
|
||||||
|
SendPort? _jobPort;
|
||||||
|
ReceivePort? _receivePort;
|
||||||
|
final Map<String, StreamController<JobProgress<Output>>> _jobProgressControllers = {};
|
||||||
|
bool get isRunning => _isolate != null;
|
||||||
|
|
||||||
|
// ═══════════════════════════════════════════════════════════
|
||||||
|
// ABSTRACT METHODS / FIELDS - Concrete implementations override these
|
||||||
|
// ═══════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
JobType get type;
|
||||||
|
|
||||||
|
Duration get idleInterval;
|
||||||
|
Future<void> onIdle();
|
||||||
|
|
||||||
|
Duration get shutdownTimeout => const Duration(seconds: 5);
|
||||||
|
|
||||||
|
// ═══════════════════════════════════════════════════════════
|
||||||
|
// CONCRETE METHODS
|
||||||
|
// ═══════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
Timer? _idleTimer;
|
||||||
|
|
||||||
|
void _resetIdleTimer() {
|
||||||
|
_idleTimer?.cancel();
|
||||||
|
_idleTimer = Timer.periodic(idleInterval, (_) async {
|
||||||
|
await onIdle();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
void _cancelIdleTimer() {
|
||||||
|
_idleTimer?.cancel();
|
||||||
|
_idleTimer = null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> start() async {
|
||||||
|
if (_isolate != null) return;
|
||||||
|
|
||||||
|
final rootToken = RootIsolateToken.instance;
|
||||||
|
if (rootToken == null) {
|
||||||
|
throw StateError('RootIsolateToken is not available. Ensure that the job is started from the main isolate.');
|
||||||
|
}
|
||||||
|
|
||||||
|
_receivePort = ReceivePort();
|
||||||
|
_receivePort!.listen(_handleMessage);
|
||||||
|
|
||||||
|
_isolate = await Isolate.spawn(_isolateEntryPoint, (
|
||||||
|
type: type,
|
||||||
|
token: rootToken,
|
||||||
|
mainPort: _receivePort!.sendPort,
|
||||||
|
), debugName: type.id);
|
||||||
|
|
||||||
|
_resetIdleTimer();
|
||||||
|
}
|
||||||
|
|
||||||
|
String trigger(Input input) {
|
||||||
|
if (_jobPort == null) {
|
||||||
|
throw StateError('Job not started. Call start() before triggering jobs.');
|
||||||
|
}
|
||||||
|
|
||||||
|
final jobId = _generateJobId();
|
||||||
|
_jobPort!.send(JobTriggerRequest<Input>(jobId: jobId, input: input));
|
||||||
|
_jobProgressControllers[jobId] = StreamController<JobProgress<Output>>.broadcast();
|
||||||
|
_resetIdleTimer();
|
||||||
|
return jobId;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<Output?> run(Input input) async {
|
||||||
|
if (_jobPort == null) {
|
||||||
|
throw StateError('Job not started. Call start() before running jobs.');
|
||||||
|
}
|
||||||
|
|
||||||
|
final jobId = _generateJobId();
|
||||||
|
_jobPort!.send(JobTriggerRequest<Input>(jobId: jobId, input: input));
|
||||||
|
_jobProgressControllers[jobId] = StreamController<JobProgress<Output>>.broadcast();
|
||||||
|
_resetIdleTimer();
|
||||||
|
final progress = await waitFor(jobId);
|
||||||
|
return progress.result;
|
||||||
|
}
|
||||||
|
|
||||||
|
int _jobCounter = 0;
|
||||||
|
String _generateJobId() => '${type.id}_${DateTime.now().millisecondsSinceEpoch}_${_jobCounter++}';
|
||||||
|
|
||||||
|
void cancel({required String jobId}) {
|
||||||
|
if (_jobPort == null) {
|
||||||
|
throw StateError('Job not started. Call start() before cancelling jobs.');
|
||||||
|
}
|
||||||
|
|
||||||
|
_jobPort!.send(JobCancelRequest(jobId: jobId));
|
||||||
|
}
|
||||||
|
|
||||||
|
void cancelAll() {
|
||||||
|
if (_jobPort == null) {
|
||||||
|
throw StateError('Job not started. Call start() before cancelling jobs.');
|
||||||
|
}
|
||||||
|
|
||||||
|
_cancelIdleTimer();
|
||||||
|
_jobPort!.send(const IsolateCancelAllRequest());
|
||||||
|
}
|
||||||
|
|
||||||
|
Stream<JobProgress> watch(String jobId) {
|
||||||
|
if (!_jobProgressControllers.containsKey(jobId)) {
|
||||||
|
throw StateError('No job found with ID: $jobId. Trigger the job before watching its progress.');
|
||||||
|
}
|
||||||
|
|
||||||
|
return _jobProgressControllers[jobId]!.stream;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<JobProgress> waitFor(String jobId) {
|
||||||
|
return watch(jobId).firstWhere((p) => p.isComplete || p.isError || p.isCancelled);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> stop() async {
|
||||||
|
if (_isolate == null) return;
|
||||||
|
|
||||||
|
_cancelIdleTimer();
|
||||||
|
|
||||||
|
if (_jobPort == null) {
|
||||||
|
_jobLogger.warning('Job isolate is running but job port is null for type: $this.id.');
|
||||||
|
}
|
||||||
|
|
||||||
|
final jobPort = _jobPort;
|
||||||
|
final isolate = _isolate;
|
||||||
|
_jobPort = null;
|
||||||
|
_isolate = null;
|
||||||
|
|
||||||
|
jobPort?.send(const IsolateShutdownRequest());
|
||||||
|
|
||||||
|
// Wait for all jobs to complete before killing the isolate
|
||||||
|
if (_jobProgressControllers.isNotEmpty) {
|
||||||
|
await Future.wait(_jobProgressControllers.keys.map((id) => waitFor(id))).timeout(
|
||||||
|
shutdownTimeout,
|
||||||
|
onTimeout: () {
|
||||||
|
_jobLogger.warning(
|
||||||
|
'Timeout waiting for jobs to complete during shutdown of job type: $this.id. Force killing isolate.',
|
||||||
|
);
|
||||||
|
return <JobProgress>[];
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
isolate?.kill(priority: Isolate.immediate);
|
||||||
|
_receivePort?.close();
|
||||||
|
_receivePort = null;
|
||||||
|
|
||||||
|
for (final controller in _jobProgressControllers.values) {
|
||||||
|
await controller.close();
|
||||||
|
}
|
||||||
|
_jobProgressControllers.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
void _handleMessage(dynamic message) {
|
||||||
|
switch (message) {
|
||||||
|
case IsolateReadyResponse msg:
|
||||||
|
_jobPort = msg.requestPort;
|
||||||
|
|
||||||
|
case JobProgressResponse msg:
|
||||||
|
final progress = JobProgress<Output>.running(progress: msg.progress, current: msg.current, total: msg.total);
|
||||||
|
_addJobProgress(msg.jobId, progress, close: false);
|
||||||
|
|
||||||
|
case JobCompleteResponse msg:
|
||||||
|
final progress = JobProgress<Output>.completed(result: msg.result);
|
||||||
|
_addJobProgress(msg.jobId, progress);
|
||||||
|
|
||||||
|
case JobErrorResponse msg:
|
||||||
|
final progress = JobProgress<Output>.error(msg.error);
|
||||||
|
_addJobProgress(msg.jobId, progress);
|
||||||
|
|
||||||
|
case JobCancelledResponse msg:
|
||||||
|
final progress = JobProgress<Output>.cancelled();
|
||||||
|
_addJobProgress(msg.jobId, progress);
|
||||||
|
|
||||||
|
case JobSkippedResponse msg:
|
||||||
|
final progress = JobProgress<Output>.skipped();
|
||||||
|
_addJobProgress(msg.jobId, progress);
|
||||||
|
|
||||||
|
case IsolateErrorResponse msg:
|
||||||
|
_jobLogger.severe('Isolate error: ${msg.error}\nStackTrace: ${msg.stackTrace}');
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void _addJobProgress(String jobId, JobProgress<Output> progress, {bool close = true}) {
|
||||||
|
final controller = _jobProgressControllers[jobId];
|
||||||
|
if (controller == null) {
|
||||||
|
_jobLogger.warning('Received progress update for unknown job: $jobId');
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
controller.add(progress);
|
||||||
|
|
||||||
|
if (close) {
|
||||||
|
controller.close();
|
||||||
|
_jobProgressControllers.remove(jobId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
38
mobile/lib/domain/jobs/job_context.dart
Normal file
38
mobile/lib/domain/jobs/job_context.dart
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
import 'dart:isolate';
|
||||||
|
|
||||||
|
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/job_messages.dart';
|
||||||
|
|
||||||
|
class JobContext {
|
||||||
|
final String id;
|
||||||
|
final ProviderContainer ref;
|
||||||
|
final SendPort _mainPort;
|
||||||
|
|
||||||
|
const JobContext({required this.id, required this.ref, required SendPort mainPort}) : _mainPort = mainPort;
|
||||||
|
|
||||||
|
ProviderResult read<ProviderResult>(ProviderListenable<ProviderResult> provider) => provider.read(ref);
|
||||||
|
|
||||||
|
void reportJobProgress({required String jobId, double? progress, int? current, int? total}) {
|
||||||
|
_mainPort.send(JobProgressResponse(jobId: jobId, progress: progress, current: current, total: total));
|
||||||
|
}
|
||||||
|
|
||||||
|
void reportJobComplete<Result>({required String jobId, Result? result}) {
|
||||||
|
_mainPort.send(JobCompleteResponse(jobId: jobId, result: result));
|
||||||
|
}
|
||||||
|
|
||||||
|
void reportJobError({required String jobId, required String error, String? stackTrace}) {
|
||||||
|
_mainPort.send(JobErrorResponse(jobId: jobId, error: error, stackTrace: stackTrace));
|
||||||
|
}
|
||||||
|
|
||||||
|
void reportIsolateError({required String error, String? stackTrace}) {
|
||||||
|
_mainPort.send(IsolateErrorResponse(error: error, stackTrace: stackTrace));
|
||||||
|
}
|
||||||
|
|
||||||
|
void reportJobCancelled({required String jobId}) {
|
||||||
|
_mainPort.send(JobCancelledResponse(jobId: jobId));
|
||||||
|
}
|
||||||
|
|
||||||
|
void reportJobSkipped({required String jobId}) {
|
||||||
|
_mainPort.send(JobSkippedResponse(jobId: jobId));
|
||||||
|
}
|
||||||
|
}
|
||||||
76
mobile/lib/domain/jobs/job_entrypoint.dart
Normal file
76
mobile/lib/domain/jobs/job_entrypoint.dart
Normal file
@@ -0,0 +1,76 @@
|
|||||||
|
part of 'job.dart';
|
||||||
|
|
||||||
|
typedef _IsolateStartMessage = ({RootIsolateToken token, JobType type, SendPort mainPort});
|
||||||
|
|
||||||
|
Future<void> _isolateEntryPoint(_IsolateStartMessage message) async {
|
||||||
|
final _IsolateStartMessage(:token, :mainPort, :type) = message;
|
||||||
|
|
||||||
|
final logger = Logger(type.id);
|
||||||
|
final isolateCompleter = Completer<void>();
|
||||||
|
|
||||||
|
try {
|
||||||
|
BackgroundIsolateBinaryMessenger.ensureInitialized(token);
|
||||||
|
DartPluginRegistrant.ensureInitialized();
|
||||||
|
|
||||||
|
final (isar, drift, logDb) = await Bootstrap.initDB();
|
||||||
|
await Bootstrap.initDomain(isar, drift, logDb, shouldBufferLogs: false, listenStoreUpdates: false);
|
||||||
|
|
||||||
|
final container = ProviderContainer(
|
||||||
|
overrides: [
|
||||||
|
dbProvider.overrideWithValue(isar),
|
||||||
|
isarProvider.overrideWithValue(isar),
|
||||||
|
driftProvider.overrideWith(driftOverride(drift)),
|
||||||
|
],
|
||||||
|
);
|
||||||
|
|
||||||
|
final context = JobContext(id: type.id, ref: container, mainPort: mainPort);
|
||||||
|
final receivePort = ReceivePort();
|
||||||
|
mainPort.send(IsolateReadyResponse(receivePort.sendPort));
|
||||||
|
|
||||||
|
final handler = getJobHandler(type, context);
|
||||||
|
|
||||||
|
receivePort.listen((message) async {
|
||||||
|
try {
|
||||||
|
switch (message) {
|
||||||
|
case JobTriggerRequest msg:
|
||||||
|
handler.queue(msg.jobId, msg.input);
|
||||||
|
|
||||||
|
case JobCancelRequest msg:
|
||||||
|
await handler.cancel(msg.jobId);
|
||||||
|
|
||||||
|
case IsolateCancelAllRequest _:
|
||||||
|
await handler.cancelAll();
|
||||||
|
|
||||||
|
case IsolateShutdownRequest _:
|
||||||
|
receivePort.close();
|
||||||
|
await handler.shutdown();
|
||||||
|
await _cleanup(container, isar, drift, logDb);
|
||||||
|
isolateCompleter.completeOnce();
|
||||||
|
}
|
||||||
|
} catch (e, stack) {
|
||||||
|
logger.severe('Error handling command', e, stack);
|
||||||
|
context.reportIsolateError(error: 'Command error: $e', stackTrace: stack.toString());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} catch (e, stack) {
|
||||||
|
logger.severe('Job fatal error', e, stack);
|
||||||
|
mainPort.send(IsolateErrorResponse(error: 'Job crashed: $e', stackTrace: stack.toString()));
|
||||||
|
isolateCompleter.completeOnce();
|
||||||
|
}
|
||||||
|
await isolateCompleter.future;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _cleanup(ProviderContainer container, Isar isar, Drift drift, DriftLogger logDb) async {
|
||||||
|
try {
|
||||||
|
container.dispose();
|
||||||
|
await Store.dispose();
|
||||||
|
await LogService.I.dispose();
|
||||||
|
await logDb.close();
|
||||||
|
await drift.close();
|
||||||
|
if (isar.isOpen) {
|
||||||
|
await isar.close();
|
||||||
|
}
|
||||||
|
} catch (error, stack) {
|
||||||
|
dPrint(() => 'Job Cleanup error: $error, stack: $stack');
|
||||||
|
}
|
||||||
|
}
|
||||||
161
mobile/lib/domain/jobs/job_handler.dart
Normal file
161
mobile/lib/domain/jobs/job_handler.dart
Normal file
@@ -0,0 +1,161 @@
|
|||||||
|
import 'dart:async';
|
||||||
|
import 'dart:collection';
|
||||||
|
|
||||||
|
import 'package:cancellation_token_http/http.dart';
|
||||||
|
import 'package:flutter/foundation.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/job_context.dart';
|
||||||
|
import 'package:immich_mobile/extensions/completer_extensions.dart';
|
||||||
|
import 'package:logging/logging.dart';
|
||||||
|
|
||||||
|
abstract class JobHandler<Input> {
|
||||||
|
late final Logger logger;
|
||||||
|
final JobContext context;
|
||||||
|
|
||||||
|
final Queue<String> _jobQueue = Queue();
|
||||||
|
final Map<String, _JobState<Input>> _jobs = {};
|
||||||
|
int get runningCount => _jobs.values.where((s) => s.isRunning).length;
|
||||||
|
bool _isProcessing = false;
|
||||||
|
|
||||||
|
JobHandler({required this.context}) {
|
||||||
|
logger = Logger(context.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// ═══════════════════════════════════════════════════════════
|
||||||
|
// ABSTRACT METHODS / FIELDS - Concrete implementations override these
|
||||||
|
// ═══════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
Future<void> processJob(String jobId, Input input, CancellationToken token);
|
||||||
|
|
||||||
|
int get maxConcurrentJobs => 1;
|
||||||
|
|
||||||
|
Duration get cancellationTimeout => const Duration(seconds: 1);
|
||||||
|
|
||||||
|
bool shouldSkipInput(Input newInput, Input existingInput) => false;
|
||||||
|
|
||||||
|
// ═══════════════════════════════════════════════════════════
|
||||||
|
// CONCRETE METHODS
|
||||||
|
// ═══════════════════════════════════════════════════════════
|
||||||
|
|
||||||
|
@mustCallSuper
|
||||||
|
void queue(String jobId, Input input) async {
|
||||||
|
if (_shouldSkipInput(input)) {
|
||||||
|
context.reportJobSkipped(jobId: jobId);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final token = CancellationToken();
|
||||||
|
_jobs[jobId] = _JobState(jobId: jobId, input: input, token: token, completer: Completer<void>(), isRunning: false);
|
||||||
|
_jobQueue.add(jobId);
|
||||||
|
_processQueue();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool _shouldSkipInput(Input input) {
|
||||||
|
for (final job in _jobs.values) {
|
||||||
|
if (shouldSkipInput(input, job.input)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _processQueue() async {
|
||||||
|
if (_isProcessing) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
_isProcessing = true;
|
||||||
|
|
||||||
|
while (_jobQueue.isNotEmpty && runningCount < maxConcurrentJobs) {
|
||||||
|
final jobId = _jobQueue.removeFirst();
|
||||||
|
final jobState = _jobs[jobId];
|
||||||
|
|
||||||
|
if (jobState == null) {
|
||||||
|
logger.warning('Job state not found for jobId: $jobId');
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (jobState.token.isCancelled) {
|
||||||
|
context.reportJobCancelled(jobId: jobId);
|
||||||
|
jobState.completer.completeOnce();
|
||||||
|
_jobs.remove(jobId);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
jobState.isRunning = true;
|
||||||
|
|
||||||
|
_runJob(jobState).then((_) {
|
||||||
|
_jobs[jobId]?.completer.completeOnce();
|
||||||
|
_jobs.remove(jobId);
|
||||||
|
_processQueue();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
_isProcessing = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _runJob(_JobState<Input> job) async {
|
||||||
|
try {
|
||||||
|
await processJob(job.jobId, job.input, job.token);
|
||||||
|
|
||||||
|
if (!job.token.isCancelled) {
|
||||||
|
context.reportJobComplete(jobId: job.jobId);
|
||||||
|
}
|
||||||
|
} on CancelledException {
|
||||||
|
context.reportJobCancelled(jobId: job.jobId);
|
||||||
|
} catch (e, stack) {
|
||||||
|
context.reportJobError(jobId: job.jobId, error: '$e', stackTrace: stack.toString());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@mustCallSuper
|
||||||
|
Future<void> cancel(String jobId) async {
|
||||||
|
final jobState = _jobs[jobId];
|
||||||
|
if (jobState == null) return;
|
||||||
|
|
||||||
|
jobState.token.cancel();
|
||||||
|
|
||||||
|
if (!jobState.isRunning) {
|
||||||
|
_jobQueue.removeWhere((id) => id == jobId);
|
||||||
|
}
|
||||||
|
|
||||||
|
await jobState.completer.future.timeout(cancellationTimeout, onTimeout: () {});
|
||||||
|
}
|
||||||
|
|
||||||
|
@mustCallSuper
|
||||||
|
Future<void> cancelAll() async {
|
||||||
|
for (final state in _jobs.values) {
|
||||||
|
state.token.cancel();
|
||||||
|
}
|
||||||
|
|
||||||
|
_jobQueue.clear();
|
||||||
|
|
||||||
|
if (_jobs.isNotEmpty) {
|
||||||
|
await Future.wait(_jobs.values.map((s) => s.completer.future)).timeout(
|
||||||
|
cancellationTimeout * 2,
|
||||||
|
onTimeout: () {
|
||||||
|
return [];
|
||||||
|
},
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@mustCallSuper
|
||||||
|
Future<void> shutdown() async {
|
||||||
|
await cancelAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class _JobState<Input> {
|
||||||
|
final String jobId;
|
||||||
|
final Input input;
|
||||||
|
final CancellationToken token;
|
||||||
|
final Completer<void> completer;
|
||||||
|
bool isRunning;
|
||||||
|
|
||||||
|
_JobState({
|
||||||
|
required this.jobId,
|
||||||
|
required this.input,
|
||||||
|
required this.token,
|
||||||
|
required this.completer,
|
||||||
|
this.isRunning = false,
|
||||||
|
});
|
||||||
|
}
|
||||||
66
mobile/lib/domain/jobs/job_manager.dart
Normal file
66
mobile/lib/domain/jobs/job_manager.dart
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
import 'package:immich_mobile/domain/jobs/job.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/job_types.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/local_sync/local_sync.job.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync.job.dart';
|
||||||
|
|
||||||
|
class _JobRegistry {
|
||||||
|
final Map<String, Job> _jobs = {};
|
||||||
|
|
||||||
|
void register<I, O>(Job<I, O> job) {
|
||||||
|
_jobs[job.type.id] = job;
|
||||||
|
}
|
||||||
|
|
||||||
|
Job<I, O>? get<I, O>(String id) {
|
||||||
|
return _jobs[id] as Job<I, O>?;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<Job> get all => _jobs.values.toList();
|
||||||
|
|
||||||
|
Future<void> startAll() async {
|
||||||
|
await Future.wait(_jobs.values.map((j) => j.start()));
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> stopAll() async {
|
||||||
|
await Future.wait(_jobs.values.map((j) => j.stop()));
|
||||||
|
}
|
||||||
|
|
||||||
|
void cancelAll() {
|
||||||
|
for (final job in _jobs.values) {
|
||||||
|
job.cancelAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class JobManager {
|
||||||
|
final _JobRegistry _registry = _JobRegistry();
|
||||||
|
|
||||||
|
bool _isInitialized = false;
|
||||||
|
static final JobManager I = JobManager._();
|
||||||
|
JobManager._();
|
||||||
|
|
||||||
|
LocalSyncJob? get localSyncJob => _registry.get<LocalSyncInput, void>(JobType.localSync.id) as LocalSyncJob?;
|
||||||
|
|
||||||
|
RemoteSyncJob? get remoteSyncJob => _registry.get<void, void>(JobType.remoteSync.id) as RemoteSyncJob?;
|
||||||
|
|
||||||
|
void init() {
|
||||||
|
if (_isInitialized) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_registry.register(LocalSyncJob());
|
||||||
|
_registry.register(RemoteSyncJob());
|
||||||
|
_isInitialized = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> start() => _registry.startAll();
|
||||||
|
|
||||||
|
Future<void> stop() => _registry.stopAll();
|
||||||
|
|
||||||
|
void triggerLocalSync({bool fullSync = false}) {
|
||||||
|
localSyncJob?.trigger(LocalSyncInput(fullSync: fullSync));
|
||||||
|
}
|
||||||
|
|
||||||
|
void triggerRemoteSync({bool syncReset = false}) {
|
||||||
|
remoteSyncJob?.trigger(RemoteSyncInput(syncReset: syncReset));
|
||||||
|
}
|
||||||
|
}
|
||||||
79
mobile/lib/domain/jobs/job_messages.dart
Normal file
79
mobile/lib/domain/jobs/job_messages.dart
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
import 'dart:isolate';
|
||||||
|
|
||||||
|
sealed class IsolateRequest {
|
||||||
|
const IsolateRequest();
|
||||||
|
}
|
||||||
|
|
||||||
|
class JobTriggerRequest<Input> extends IsolateRequest {
|
||||||
|
final String jobId;
|
||||||
|
final Input? input;
|
||||||
|
|
||||||
|
const JobTriggerRequest({required this.jobId, this.input});
|
||||||
|
}
|
||||||
|
|
||||||
|
class JobCancelRequest extends IsolateRequest {
|
||||||
|
final String jobId;
|
||||||
|
|
||||||
|
const JobCancelRequest({required this.jobId});
|
||||||
|
}
|
||||||
|
|
||||||
|
class IsolateCancelAllRequest extends IsolateRequest {
|
||||||
|
const IsolateCancelAllRequest();
|
||||||
|
}
|
||||||
|
|
||||||
|
class IsolateShutdownRequest extends IsolateRequest {
|
||||||
|
const IsolateShutdownRequest();
|
||||||
|
}
|
||||||
|
|
||||||
|
sealed class IsolateResponse {
|
||||||
|
const IsolateResponse();
|
||||||
|
}
|
||||||
|
|
||||||
|
class IsolateReadyResponse extends IsolateResponse {
|
||||||
|
final SendPort requestPort;
|
||||||
|
|
||||||
|
const IsolateReadyResponse(this.requestPort);
|
||||||
|
}
|
||||||
|
|
||||||
|
class JobProgressResponse extends IsolateResponse {
|
||||||
|
final String jobId;
|
||||||
|
final double? progress;
|
||||||
|
final int? current;
|
||||||
|
final int? total;
|
||||||
|
|
||||||
|
const JobProgressResponse({required this.jobId, this.progress, this.current, this.total});
|
||||||
|
}
|
||||||
|
|
||||||
|
class JobCompleteResponse<Output> extends IsolateResponse {
|
||||||
|
final String jobId;
|
||||||
|
final Output? result;
|
||||||
|
|
||||||
|
const JobCompleteResponse({required this.jobId, this.result});
|
||||||
|
}
|
||||||
|
|
||||||
|
class JobErrorResponse extends IsolateResponse {
|
||||||
|
final String jobId;
|
||||||
|
final String error;
|
||||||
|
final String? stackTrace;
|
||||||
|
|
||||||
|
const JobErrorResponse({required this.jobId, required this.error, this.stackTrace});
|
||||||
|
}
|
||||||
|
|
||||||
|
class JobCancelledResponse extends IsolateResponse {
|
||||||
|
final String jobId;
|
||||||
|
|
||||||
|
const JobCancelledResponse({required this.jobId});
|
||||||
|
}
|
||||||
|
|
||||||
|
class JobSkippedResponse extends IsolateResponse {
|
||||||
|
final String jobId;
|
||||||
|
|
||||||
|
const JobSkippedResponse({required this.jobId});
|
||||||
|
}
|
||||||
|
|
||||||
|
class IsolateErrorResponse extends IsolateResponse {
|
||||||
|
final String error;
|
||||||
|
final String? stackTrace;
|
||||||
|
|
||||||
|
const IsolateErrorResponse({required this.error, this.stackTrace});
|
||||||
|
}
|
||||||
62
mobile/lib/domain/jobs/job_status.dart
Normal file
62
mobile/lib/domain/jobs/job_status.dart
Normal file
@@ -0,0 +1,62 @@
|
|||||||
|
enum JobStatus { running, completed, error, cancelled, skipped }
|
||||||
|
|
||||||
|
class JobProgress<T> {
|
||||||
|
final JobStatus status;
|
||||||
|
|
||||||
|
// JobStatus.completed
|
||||||
|
final T? result;
|
||||||
|
|
||||||
|
// JobStatus.running
|
||||||
|
final double? progress;
|
||||||
|
final int? current;
|
||||||
|
final int? total;
|
||||||
|
|
||||||
|
// JobStatus.error
|
||||||
|
final String? error;
|
||||||
|
|
||||||
|
const JobProgress({required this.status, this.result, this.progress, this.current, this.total, this.error});
|
||||||
|
|
||||||
|
bool get isRunning => status == JobStatus.running;
|
||||||
|
bool get isComplete => status == JobStatus.completed;
|
||||||
|
bool get isError => status == JobStatus.error;
|
||||||
|
bool get isCancelled => status == JobStatus.cancelled;
|
||||||
|
|
||||||
|
factory JobProgress.running({double? progress, int? current, int? total}) {
|
||||||
|
assert(progress == null || (progress >= 0 && progress <= 1), 'Progress must be 0-1');
|
||||||
|
assert(current == null || total == null || current <= total, 'Current must be <= total');
|
||||||
|
return JobProgress(status: JobStatus.running, progress: progress, current: current, total: total);
|
||||||
|
}
|
||||||
|
|
||||||
|
factory JobProgress.completed({T? result}) => JobProgress(status: JobStatus.completed, result: result);
|
||||||
|
|
||||||
|
factory JobProgress.error(String error) => JobProgress(status: JobStatus.error, error: error);
|
||||||
|
|
||||||
|
factory JobProgress.cancelled() => const JobProgress(status: JobStatus.cancelled);
|
||||||
|
|
||||||
|
factory JobProgress.skipped() => const JobProgress(status: JobStatus.skipped);
|
||||||
|
|
||||||
|
@override
|
||||||
|
String toString() => switch (status) {
|
||||||
|
JobStatus.running => 'JobProgress(running: progress=$progress, current=$current, total=$total)',
|
||||||
|
JobStatus.completed => 'JobProgress(completed: result=$result)',
|
||||||
|
JobStatus.error => 'JobProgress(error: error=$error)',
|
||||||
|
JobStatus.cancelled => 'JobProgress(cancelled)',
|
||||||
|
JobStatus.skipped => 'JobProgress(skipped)',
|
||||||
|
};
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool operator ==(Object other) =>
|
||||||
|
identical(this, other) ||
|
||||||
|
other is JobProgress<T> &&
|
||||||
|
runtimeType == other.runtimeType &&
|
||||||
|
status == other.status &&
|
||||||
|
result == other.result &&
|
||||||
|
progress == other.progress &&
|
||||||
|
current == other.current &&
|
||||||
|
total == other.total &&
|
||||||
|
error == other.error;
|
||||||
|
|
||||||
|
@override
|
||||||
|
int get hashCode =>
|
||||||
|
status.hashCode ^ result.hashCode ^ progress.hashCode ^ current.hashCode ^ total.hashCode ^ error.hashCode;
|
||||||
|
}
|
||||||
17
mobile/lib/domain/jobs/job_types.dart
Normal file
17
mobile/lib/domain/jobs/job_types.dart
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
import 'package:immich_mobile/domain/jobs/job_context.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/job_handler.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/local_sync/local_sync.job.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync.job.dart';
|
||||||
|
|
||||||
|
enum JobType {
|
||||||
|
localSync._('local_sync'),
|
||||||
|
remoteSync._('remote_sync');
|
||||||
|
|
||||||
|
final String id;
|
||||||
|
const JobType._(this.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
JobHandler getJobHandler(JobType type, JobContext context) => switch (type) {
|
||||||
|
JobType.localSync => LocalSyncJobHandler(context: context),
|
||||||
|
JobType.remoteSync => RemoteSyncJobHandler(context: context),
|
||||||
|
};
|
||||||
52
mobile/lib/domain/jobs/local_sync/local_sync.job.dart
Normal file
52
mobile/lib/domain/jobs/local_sync/local_sync.job.dart
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
import 'package:cancellation_token_http/http.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/job.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/job_handler.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/job_types.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/local_sync/local_sync.service.dart';
|
||||||
|
import 'package:immich_mobile/providers/infrastructure/album.provider.dart';
|
||||||
|
import 'package:immich_mobile/providers/infrastructure/platform.provider.dart';
|
||||||
|
|
||||||
|
class LocalSyncInput {
|
||||||
|
final bool fullSync;
|
||||||
|
|
||||||
|
const LocalSyncInput({this.fullSync = false});
|
||||||
|
}
|
||||||
|
|
||||||
|
class LocalSyncJob extends Job<LocalSyncInput, void> {
|
||||||
|
@override
|
||||||
|
JobType get type => JobType.localSync;
|
||||||
|
|
||||||
|
@override
|
||||||
|
Duration get idleInterval => const Duration(minutes: 3);
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> onIdle() async {
|
||||||
|
// Trigger a partial sync on idle
|
||||||
|
trigger(const LocalSyncInput(fullSync: false));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class LocalSyncJobHandler extends JobHandler<LocalSyncInput> {
|
||||||
|
late final LocalSyncService _localSyncService;
|
||||||
|
|
||||||
|
LocalSyncJobHandler({required super.context}) {
|
||||||
|
LocalSyncService(
|
||||||
|
localAlbumRepository: context.read(localAlbumRepository),
|
||||||
|
nativeSyncApi: context.read(nativeSyncApiProvider),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
bool shouldSkipInput(LocalSyncInput newInput, LocalSyncInput existingInput) {
|
||||||
|
// Skip if a full sync is already scheduled/running
|
||||||
|
if (existingInput.fullSync) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> processJob(String jobId, LocalSyncInput input, CancellationToken token) =>
|
||||||
|
_localSyncService.sync(full: input.fullSync, token: token);
|
||||||
|
}
|
||||||
308
mobile/lib/domain/jobs/local_sync/local_sync.service.dart
Normal file
308
mobile/lib/domain/jobs/local_sync/local_sync.service.dart
Normal file
@@ -0,0 +1,308 @@
|
|||||||
|
import 'dart:async';
|
||||||
|
|
||||||
|
import 'package:cancellation_token_http/http.dart';
|
||||||
|
import 'package:collection/collection.dart';
|
||||||
|
import 'package:flutter/foundation.dart';
|
||||||
|
import 'package:immich_mobile/domain/models/album/local_album.model.dart';
|
||||||
|
import 'package:immich_mobile/domain/models/asset/base_asset.model.dart';
|
||||||
|
import 'package:immich_mobile/extensions/platform_extensions.dart';
|
||||||
|
import 'package:immich_mobile/infrastructure/repositories/local_album.repository.dart';
|
||||||
|
import 'package:immich_mobile/platform/native_sync_api.g.dart';
|
||||||
|
import 'package:immich_mobile/utils/datetime_helpers.dart';
|
||||||
|
import 'package:immich_mobile/utils/diff.dart';
|
||||||
|
import 'package:logging/logging.dart';
|
||||||
|
|
||||||
|
class LocalSyncService {
|
||||||
|
final DriftLocalAlbumRepository _localAlbumRepository;
|
||||||
|
final NativeSyncApi _nativeSyncApi;
|
||||||
|
|
||||||
|
final Logger _log = Logger("DeviceSyncService");
|
||||||
|
|
||||||
|
LocalSyncService({required DriftLocalAlbumRepository localAlbumRepository, required NativeSyncApi nativeSyncApi})
|
||||||
|
: _localAlbumRepository = localAlbumRepository,
|
||||||
|
_nativeSyncApi = nativeSyncApi;
|
||||||
|
|
||||||
|
Future<void> sync({bool full = false, CancellationToken? token}) async {
|
||||||
|
final Stopwatch stopwatch = Stopwatch()..start();
|
||||||
|
|
||||||
|
if (full || await _nativeSyncApi.shouldFullSync()) {
|
||||||
|
_log.fine("Full sync request from ${full ? "user" : "native"}");
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
return await fullSync(token: token);
|
||||||
|
}
|
||||||
|
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
final delta = await _nativeSyncApi.getMediaChanges();
|
||||||
|
if (!delta.hasChanges) {
|
||||||
|
_log.fine("No media changes detected. Skipping sync");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
_log.fine("Delta updated: ${delta.updates.length}");
|
||||||
|
_log.fine("Delta deleted: ${delta.deletes.length}");
|
||||||
|
|
||||||
|
final deviceAlbums = await _nativeSyncApi.getAlbums();
|
||||||
|
await _localAlbumRepository.updateAll(deviceAlbums.toLocalAlbums());
|
||||||
|
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
await _localAlbumRepository.processDelta(
|
||||||
|
updates: delta.updates.toLocalAssets(),
|
||||||
|
deletes: delta.deletes,
|
||||||
|
assetAlbums: delta.assetAlbums,
|
||||||
|
);
|
||||||
|
|
||||||
|
final dbAlbums = await _localAlbumRepository.getAll();
|
||||||
|
// On Android, we need to sync all albums since it is not possible to
|
||||||
|
// detect album deletions from the native side
|
||||||
|
if (CurrentPlatform.isAndroid) {
|
||||||
|
for (final album in dbAlbums) {
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
final deviceIds = await _nativeSyncApi.getAssetIdsForAlbum(album.id);
|
||||||
|
await _localAlbumRepository.syncDeletes(album.id, deviceIds);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (CurrentPlatform.isIOS) {
|
||||||
|
// On iOS, we need to full sync albums that are marked as cloud as the delta sync
|
||||||
|
// does not include changes for cloud albums. If ignoreIcloudAssets is enabled,
|
||||||
|
// remove the albums from the local database from the previous sync
|
||||||
|
final cloudAlbums = deviceAlbums.where((a) => a.isCloud).toLocalAlbums();
|
||||||
|
for (final album in cloudAlbums) {
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
final dbAlbum = dbAlbums.firstWhereOrNull((a) => a.id == album.id);
|
||||||
|
if (dbAlbum == null) {
|
||||||
|
_log.warning("Cloud album ${album.name} not found in local database. Skipping sync.");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
await updateAlbum(dbAlbum, album, token: token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
await _nativeSyncApi.checkpointSync();
|
||||||
|
|
||||||
|
stopwatch.stop();
|
||||||
|
_log.info("Device sync took - ${stopwatch.elapsedMilliseconds}ms");
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> fullSync({CancellationToken? token}) async {
|
||||||
|
final Stopwatch stopwatch = Stopwatch()..start();
|
||||||
|
|
||||||
|
final deviceAlbums = await _nativeSyncApi.getAlbums();
|
||||||
|
final dbAlbums = await _localAlbumRepository.getAll(sortBy: {SortLocalAlbumsBy.id});
|
||||||
|
|
||||||
|
await diffSortedLists(
|
||||||
|
dbAlbums,
|
||||||
|
deviceAlbums.toLocalAlbums(),
|
||||||
|
compare: (a, b) => a.id.compareTo(b.id),
|
||||||
|
both: (LocalAlbum oldAlbum, LocalAlbum newAlbum) {
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
return updateAlbum(oldAlbum, newAlbum, token: token);
|
||||||
|
},
|
||||||
|
onlyFirst: (album) {
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
return removeAlbum(album);
|
||||||
|
},
|
||||||
|
onlySecond: (album) {
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
return addAlbum(album, token: token);
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
await _nativeSyncApi.checkpointSync();
|
||||||
|
stopwatch.stop();
|
||||||
|
_log.info("Full device sync took - ${stopwatch.elapsedMilliseconds}ms");
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> addAlbum(LocalAlbum album, {CancellationToken? token}) async {
|
||||||
|
_log.fine("Adding device album ${album.name}");
|
||||||
|
|
||||||
|
final assets = album.assetCount > 0 ? await _nativeSyncApi.getAssetsForAlbum(album.id) : <PlatformAsset>[];
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
await _localAlbumRepository.upsert(album, toUpsert: assets.toLocalAssets());
|
||||||
|
_log.fine("Successfully added device album ${album.name}");
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> removeAlbum(LocalAlbum a) async {
|
||||||
|
_log.fine("Removing device album ${a.name}");
|
||||||
|
// Asset deletion is handled in the repository
|
||||||
|
await _localAlbumRepository.delete(a.id);
|
||||||
|
}
|
||||||
|
|
||||||
|
// The deviceAlbum is ignored since we are going to refresh it anyways
|
||||||
|
FutureOr<bool> updateAlbum(LocalAlbum dbAlbum, LocalAlbum deviceAlbum, {CancellationToken? token}) async {
|
||||||
|
_log.fine("Syncing device album ${dbAlbum.name}");
|
||||||
|
|
||||||
|
if (_albumsEqual(deviceAlbum, dbAlbum)) {
|
||||||
|
_log.fine("Device album ${dbAlbum.name} has not changed. Skipping sync.");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
_log.fine("Device album ${dbAlbum.name} has changed. Syncing...");
|
||||||
|
|
||||||
|
// Faster path - only new assets added
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
if (await checkAddition(dbAlbum, deviceAlbum, token: token)) {
|
||||||
|
_log.fine("Fast synced device album ${dbAlbum.name}");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Slower path - full sync
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
return await fullDiff(dbAlbum, deviceAlbum, token: token);
|
||||||
|
}
|
||||||
|
|
||||||
|
@visibleForTesting
|
||||||
|
// The [deviceAlbum] is expected to be refreshed before calling this method
|
||||||
|
// with modified time and asset count
|
||||||
|
Future<bool> checkAddition(LocalAlbum dbAlbum, LocalAlbum deviceAlbum, {CancellationToken? token}) async {
|
||||||
|
_log.fine("Fast syncing device album ${dbAlbum.name}");
|
||||||
|
// Assets has been modified
|
||||||
|
if (deviceAlbum.assetCount <= dbAlbum.assetCount) {
|
||||||
|
_log.fine("Local album has modifications. Proceeding to full sync");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
final updatedTime = (dbAlbum.updatedAt.millisecondsSinceEpoch ~/ 1000) + 1;
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
final newAssetsCount = await _nativeSyncApi.getAssetsCountSince(deviceAlbum.id, updatedTime);
|
||||||
|
|
||||||
|
// Early return if no new assets were found
|
||||||
|
if (newAssetsCount == 0) {
|
||||||
|
_log.fine("No new assets found despite album having changes. Proceeding to full sync for ${dbAlbum.name}");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check whether there is only addition or if there has been deletions
|
||||||
|
if (deviceAlbum.assetCount != dbAlbum.assetCount + newAssetsCount) {
|
||||||
|
_log.fine("Local album has modifications. Proceeding to full sync");
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
final newAssets = await _nativeSyncApi.getAssetsForAlbum(deviceAlbum.id, updatedTimeCond: updatedTime);
|
||||||
|
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
await _localAlbumRepository.upsert(
|
||||||
|
deviceAlbum.copyWith(backupSelection: dbAlbum.backupSelection),
|
||||||
|
toUpsert: newAssets.toLocalAssets(),
|
||||||
|
);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@visibleForTesting
|
||||||
|
// The [deviceAlbum] is expected to be refreshed before calling this method
|
||||||
|
// with modified time and asset count
|
||||||
|
Future<bool> fullDiff(LocalAlbum dbAlbum, LocalAlbum deviceAlbum, {CancellationToken? token}) async {
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
final assetsInDevice = deviceAlbum.assetCount > 0
|
||||||
|
? await _nativeSyncApi.getAssetsForAlbum(deviceAlbum.id).then((a) => a.toLocalAssets())
|
||||||
|
: <LocalAsset>[];
|
||||||
|
final assetsInDb = dbAlbum.assetCount > 0 ? await _localAlbumRepository.getAssets(dbAlbum.id) : <LocalAsset>[];
|
||||||
|
|
||||||
|
if (deviceAlbum.assetCount == 0) {
|
||||||
|
_log.fine("Device album ${deviceAlbum.name} is empty. Removing assets from DB.");
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
await _localAlbumRepository.upsert(
|
||||||
|
deviceAlbum.copyWith(backupSelection: dbAlbum.backupSelection),
|
||||||
|
toDelete: assetsInDb.map((a) => a.id),
|
||||||
|
);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
final updatedDeviceAlbum = deviceAlbum.copyWith(backupSelection: dbAlbum.backupSelection);
|
||||||
|
|
||||||
|
if (dbAlbum.assetCount == 0) {
|
||||||
|
_log.fine("Device album ${deviceAlbum.name} is empty. Adding assets to DB.");
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
await _localAlbumRepository.upsert(updatedDeviceAlbum, toUpsert: assetsInDevice);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
assert(assetsInDb.isSortedBy((a) => a.id));
|
||||||
|
assetsInDevice.sort((a, b) => a.id.compareTo(b.id));
|
||||||
|
|
||||||
|
final assetsToUpsert = <LocalAsset>[];
|
||||||
|
final assetsToDelete = <String>[];
|
||||||
|
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
diffSortedListsSync(
|
||||||
|
assetsInDb,
|
||||||
|
assetsInDevice,
|
||||||
|
compare: (a, b) => a.id.compareTo(b.id),
|
||||||
|
both: (dbAsset, deviceAsset) {
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
// Custom comparison to check if the asset has been modified without
|
||||||
|
// comparing the checksum
|
||||||
|
if (!_assetsEqual(dbAsset, deviceAsset)) {
|
||||||
|
assetsToUpsert.add(deviceAsset);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
},
|
||||||
|
onlyFirst: (dbAsset) => assetsToDelete.add(dbAsset.id),
|
||||||
|
onlySecond: (deviceAsset) => assetsToUpsert.add(deviceAsset),
|
||||||
|
);
|
||||||
|
|
||||||
|
_log.fine(
|
||||||
|
"Syncing ${deviceAlbum.name}. ${assetsToUpsert.length} assets to add/update and ${assetsToDelete.length} assets to delete",
|
||||||
|
);
|
||||||
|
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
if (assetsToUpsert.isEmpty && assetsToDelete.isEmpty) {
|
||||||
|
_log.fine("No asset changes detected in album ${deviceAlbum.name}. Updating metadata.");
|
||||||
|
_localAlbumRepository.upsert(updatedDeviceAlbum);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
await _localAlbumRepository.upsert(updatedDeviceAlbum, toUpsert: assetsToUpsert, toDelete: assetsToDelete);
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool _assetsEqual(LocalAsset a, LocalAsset b) {
|
||||||
|
return a.updatedAt.isAtSameMomentAs(b.updatedAt) &&
|
||||||
|
a.createdAt.isAtSameMomentAs(b.createdAt) &&
|
||||||
|
a.width == b.width &&
|
||||||
|
a.height == b.height &&
|
||||||
|
a.durationInSeconds == b.durationInSeconds;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool _albumsEqual(LocalAlbum a, LocalAlbum b) {
|
||||||
|
return a.name == b.name && a.assetCount == b.assetCount && a.updatedAt.isAtSameMomentAs(b.updatedAt);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extension on Iterable<PlatformAlbum> {
|
||||||
|
List<LocalAlbum> toLocalAlbums() {
|
||||||
|
return map(
|
||||||
|
(e) => LocalAlbum(
|
||||||
|
id: e.id,
|
||||||
|
name: e.name,
|
||||||
|
updatedAt: tryFromSecondsSinceEpoch(e.updatedAt, isUtc: true) ?? DateTime.timestamp(),
|
||||||
|
assetCount: e.assetCount,
|
||||||
|
),
|
||||||
|
).toList();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
extension on Iterable<PlatformAsset> {
|
||||||
|
List<LocalAsset> toLocalAssets() {
|
||||||
|
return map(
|
||||||
|
(e) => LocalAsset(
|
||||||
|
id: e.id,
|
||||||
|
name: e.name,
|
||||||
|
checksum: null,
|
||||||
|
type: AssetType.values.elementAtOrNull(e.type) ?? AssetType.other,
|
||||||
|
createdAt: tryFromSecondsSinceEpoch(e.createdAt, isUtc: true) ?? DateTime.timestamp(),
|
||||||
|
updatedAt: tryFromSecondsSinceEpoch(e.updatedAt, isUtc: true) ?? DateTime.timestamp(),
|
||||||
|
width: e.width,
|
||||||
|
height: e.height,
|
||||||
|
durationInSeconds: e.durationInSeconds,
|
||||||
|
orientation: e.orientation,
|
||||||
|
isFavorite: e.isFavorite,
|
||||||
|
),
|
||||||
|
).toList();
|
||||||
|
}
|
||||||
|
}
|
||||||
46
mobile/lib/domain/jobs/remote_sync/remote_sync.job.dart
Normal file
46
mobile/lib/domain/jobs/remote_sync/remote_sync.job.dart
Normal file
@@ -0,0 +1,46 @@
|
|||||||
|
import 'package:cancellation_token_http/http.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/job.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/job_handler.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/job_types.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync.service.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync_api.repository.dart';
|
||||||
|
import 'package:immich_mobile/providers/infrastructure/sync.provider.dart';
|
||||||
|
|
||||||
|
class RemoteSyncInput {
|
||||||
|
final bool syncReset;
|
||||||
|
|
||||||
|
const RemoteSyncInput({this.syncReset = false});
|
||||||
|
}
|
||||||
|
|
||||||
|
class RemoteSyncJob extends Job<RemoteSyncInput, RemoteSyncStatus> {
|
||||||
|
@override
|
||||||
|
JobType get type => JobType.remoteSync;
|
||||||
|
|
||||||
|
@override
|
||||||
|
Duration get idleInterval => const Duration(seconds: 30);
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<void> onIdle() async {
|
||||||
|
trigger(const RemoteSyncInput(syncReset: false));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class RemoteSyncJobHandler extends JobHandler<RemoteSyncInput> {
|
||||||
|
late final RemoteSyncService _remoteSyncService;
|
||||||
|
|
||||||
|
RemoteSyncJobHandler({required super.context}) {
|
||||||
|
_remoteSyncService = RemoteSyncService(
|
||||||
|
syncApiRepository: context.read(remoteSyncApiRepositoryProvider),
|
||||||
|
syncStreamRepository: context.read(syncStreamRepositoryProvider),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
@override
|
||||||
|
Future<RemoteSyncStatus> processJob(String jobId, RemoteSyncInput input, CancellationToken token) async {
|
||||||
|
final status = await _remoteSyncService.sync(syncReset: input.syncReset, token: token);
|
||||||
|
if (status != RemoteSyncStatus.reset) {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
return _remoteSyncService.sync(syncReset: false, token: token);
|
||||||
|
}
|
||||||
|
}
|
||||||
161
mobile/lib/domain/jobs/remote_sync/remote_sync.service.dart
Normal file
161
mobile/lib/domain/jobs/remote_sync/remote_sync.service.dart
Normal file
@@ -0,0 +1,161 @@
|
|||||||
|
import 'dart:async';
|
||||||
|
|
||||||
|
import 'package:cancellation_token_http/http.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync_api.repository.dart';
|
||||||
|
import 'package:immich_mobile/domain/models/sync_event.model.dart';
|
||||||
|
import 'package:immich_mobile/infrastructure/repositories/sync_stream.repository.dart';
|
||||||
|
import 'package:logging/logging.dart';
|
||||||
|
import 'package:openapi/api.dart';
|
||||||
|
|
||||||
|
enum RemoteSyncStatus { success, failed, reset }
|
||||||
|
|
||||||
|
class RemoteSyncService {
|
||||||
|
final Logger _logger = Logger('RemoteSyncService');
|
||||||
|
|
||||||
|
final RemoteSyncApiRepository _syncApiRepository;
|
||||||
|
final SyncStreamRepository _syncStreamRepository;
|
||||||
|
|
||||||
|
RemoteSyncService({
|
||||||
|
required RemoteSyncApiRepository syncApiRepository,
|
||||||
|
required SyncStreamRepository syncStreamRepository,
|
||||||
|
}) : _syncApiRepository = syncApiRepository,
|
||||||
|
_syncStreamRepository = syncStreamRepository;
|
||||||
|
|
||||||
|
Future<RemoteSyncStatus> sync({syncReset = false, CancellationToken? token}) async {
|
||||||
|
_logger.info("Remote sync request for user");
|
||||||
|
return await _syncApiRepository.streamChanges(_handleEvents, token: token, syncReset: syncReset);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<RemoteSyncStatus> _handleEvents(List<SyncEvent> events, {CancellationToken? token}) async {
|
||||||
|
List<SyncEvent> items = [];
|
||||||
|
for (final event in events) {
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
|
||||||
|
if (event.type != items.firstOrNull?.type) {
|
||||||
|
await _processBatch(items);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (event.type == SyncEntityType.syncResetV1) {
|
||||||
|
return RemoteSyncStatus.reset;
|
||||||
|
}
|
||||||
|
|
||||||
|
items.add(event);
|
||||||
|
}
|
||||||
|
|
||||||
|
await _processBatch(items);
|
||||||
|
return RemoteSyncStatus.success;
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _processBatch(List<SyncEvent> batch) async {
|
||||||
|
if (batch.isEmpty) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
final type = batch.first.type;
|
||||||
|
await _handleSyncData(type, batch.map((e) => e.data));
|
||||||
|
await _syncApiRepository.ack([batch.last.ack]);
|
||||||
|
batch.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<void> _handleSyncData(SyncEntityType type, Iterable<Object> data) async {
|
||||||
|
_logger.fine("Processing sync data for $type of length ${data.length}");
|
||||||
|
switch (type) {
|
||||||
|
case SyncEntityType.authUserV1:
|
||||||
|
return _syncStreamRepository.updateAuthUsersV1(data.cast());
|
||||||
|
case SyncEntityType.userV1:
|
||||||
|
return _syncStreamRepository.updateUsersV1(data.cast());
|
||||||
|
case SyncEntityType.userDeleteV1:
|
||||||
|
return _syncStreamRepository.deleteUsersV1(data.cast());
|
||||||
|
case SyncEntityType.partnerV1:
|
||||||
|
return _syncStreamRepository.updatePartnerV1(data.cast());
|
||||||
|
case SyncEntityType.partnerDeleteV1:
|
||||||
|
return _syncStreamRepository.deletePartnerV1(data.cast());
|
||||||
|
case SyncEntityType.assetV1:
|
||||||
|
return _syncStreamRepository.updateAssetsV1(data.cast());
|
||||||
|
case SyncEntityType.assetDeleteV1:
|
||||||
|
return _syncStreamRepository.deleteAssetsV1(data.cast());
|
||||||
|
case SyncEntityType.assetExifV1:
|
||||||
|
return _syncStreamRepository.updateAssetsExifV1(data.cast());
|
||||||
|
case SyncEntityType.partnerAssetV1:
|
||||||
|
return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'partner');
|
||||||
|
case SyncEntityType.partnerAssetBackfillV1:
|
||||||
|
return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'partner backfill');
|
||||||
|
case SyncEntityType.partnerAssetDeleteV1:
|
||||||
|
return _syncStreamRepository.deleteAssetsV1(data.cast(), debugLabel: "partner");
|
||||||
|
case SyncEntityType.partnerAssetExifV1:
|
||||||
|
return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'partner');
|
||||||
|
case SyncEntityType.partnerAssetExifBackfillV1:
|
||||||
|
return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'partner backfill');
|
||||||
|
case SyncEntityType.albumV1:
|
||||||
|
return _syncStreamRepository.updateAlbumsV1(data.cast());
|
||||||
|
case SyncEntityType.albumDeleteV1:
|
||||||
|
return _syncStreamRepository.deleteAlbumsV1(data.cast());
|
||||||
|
case SyncEntityType.albumUserV1:
|
||||||
|
return _syncStreamRepository.updateAlbumUsersV1(data.cast());
|
||||||
|
case SyncEntityType.albumUserBackfillV1:
|
||||||
|
return _syncStreamRepository.updateAlbumUsersV1(data.cast(), debugLabel: 'backfill');
|
||||||
|
case SyncEntityType.albumUserDeleteV1:
|
||||||
|
return _syncStreamRepository.deleteAlbumUsersV1(data.cast());
|
||||||
|
case SyncEntityType.albumAssetCreateV1:
|
||||||
|
return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'album asset create');
|
||||||
|
case SyncEntityType.albumAssetUpdateV1:
|
||||||
|
return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'album asset update');
|
||||||
|
case SyncEntityType.albumAssetBackfillV1:
|
||||||
|
return _syncStreamRepository.updateAssetsV1(data.cast(), debugLabel: 'album asset backfill');
|
||||||
|
case SyncEntityType.albumAssetExifCreateV1:
|
||||||
|
return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'album asset exif create');
|
||||||
|
case SyncEntityType.albumAssetExifUpdateV1:
|
||||||
|
return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'album asset exif update');
|
||||||
|
case SyncEntityType.albumAssetExifBackfillV1:
|
||||||
|
return _syncStreamRepository.updateAssetsExifV1(data.cast(), debugLabel: 'album asset exif backfill');
|
||||||
|
case SyncEntityType.albumToAssetV1:
|
||||||
|
return _syncStreamRepository.updateAlbumToAssetsV1(data.cast());
|
||||||
|
case SyncEntityType.albumToAssetBackfillV1:
|
||||||
|
return _syncStreamRepository.updateAlbumToAssetsV1(data.cast(), debugLabel: 'backfill');
|
||||||
|
case SyncEntityType.albumToAssetDeleteV1:
|
||||||
|
return _syncStreamRepository.deleteAlbumToAssetsV1(data.cast());
|
||||||
|
// No-op. SyncAckV1 entities are checkpoints in the sync stream
|
||||||
|
// to acknowledge that the client has processed all the backfill events
|
||||||
|
case SyncEntityType.syncAckV1:
|
||||||
|
return;
|
||||||
|
// No-op. SyncCompleteV1 is used to signal the completion of the sync process
|
||||||
|
case SyncEntityType.syncCompleteV1:
|
||||||
|
return;
|
||||||
|
// Request to reset the client state. Clear everything related to remote entities
|
||||||
|
case SyncEntityType.syncResetV1:
|
||||||
|
return _syncStreamRepository.reset();
|
||||||
|
case SyncEntityType.memoryV1:
|
||||||
|
return _syncStreamRepository.updateMemoriesV1(data.cast());
|
||||||
|
case SyncEntityType.memoryDeleteV1:
|
||||||
|
return _syncStreamRepository.deleteMemoriesV1(data.cast());
|
||||||
|
case SyncEntityType.memoryToAssetV1:
|
||||||
|
return _syncStreamRepository.updateMemoryAssetsV1(data.cast());
|
||||||
|
case SyncEntityType.memoryToAssetDeleteV1:
|
||||||
|
return _syncStreamRepository.deleteMemoryAssetsV1(data.cast());
|
||||||
|
case SyncEntityType.stackV1:
|
||||||
|
return _syncStreamRepository.updateStacksV1(data.cast());
|
||||||
|
case SyncEntityType.stackDeleteV1:
|
||||||
|
return _syncStreamRepository.deleteStacksV1(data.cast());
|
||||||
|
case SyncEntityType.partnerStackV1:
|
||||||
|
return _syncStreamRepository.updateStacksV1(data.cast(), debugLabel: 'partner');
|
||||||
|
case SyncEntityType.partnerStackBackfillV1:
|
||||||
|
return _syncStreamRepository.updateStacksV1(data.cast(), debugLabel: 'partner backfill');
|
||||||
|
case SyncEntityType.partnerStackDeleteV1:
|
||||||
|
return _syncStreamRepository.deleteStacksV1(data.cast(), debugLabel: 'partner');
|
||||||
|
case SyncEntityType.userMetadataV1:
|
||||||
|
return _syncStreamRepository.updateUserMetadatasV1(data.cast());
|
||||||
|
case SyncEntityType.userMetadataDeleteV1:
|
||||||
|
return _syncStreamRepository.deleteUserMetadatasV1(data.cast());
|
||||||
|
case SyncEntityType.personV1:
|
||||||
|
return _syncStreamRepository.updatePeopleV1(data.cast());
|
||||||
|
case SyncEntityType.personDeleteV1:
|
||||||
|
return _syncStreamRepository.deletePeopleV1(data.cast());
|
||||||
|
case SyncEntityType.assetFaceV1:
|
||||||
|
return _syncStreamRepository.updateAssetFacesV1(data.cast());
|
||||||
|
case SyncEntityType.assetFaceDeleteV1:
|
||||||
|
return _syncStreamRepository.deleteAssetFacesV1(data.cast());
|
||||||
|
default:
|
||||||
|
_logger.warning("Unknown sync data type: $type");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,192 @@
|
|||||||
|
import 'dart:async';
|
||||||
|
import 'dart:convert';
|
||||||
|
|
||||||
|
import 'package:cancellation_token_http/http.dart';
|
||||||
|
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||||
|
import 'package:http/http.dart' as http;
|
||||||
|
import 'package:immich_mobile/constants/constants.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/remote_sync/remote_sync.service.dart';
|
||||||
|
import 'package:immich_mobile/domain/models/store.model.dart';
|
||||||
|
import 'package:immich_mobile/domain/models/sync_event.model.dart';
|
||||||
|
import 'package:immich_mobile/entities/store.entity.dart';
|
||||||
|
import 'package:immich_mobile/providers/api.provider.dart';
|
||||||
|
import 'package:immich_mobile/services/api.service.dart';
|
||||||
|
import 'package:logging/logging.dart';
|
||||||
|
import 'package:openapi/api.dart';
|
||||||
|
|
||||||
|
final remoteSyncApiRepositoryProvider = Provider<RemoteSyncApiRepository>((ref) {
|
||||||
|
return RemoteSyncApiRepository(ref.watch(apiServiceProvider));
|
||||||
|
});
|
||||||
|
|
||||||
|
class RemoteSyncApiRepository {
|
||||||
|
final Logger _logger = Logger('RemoteSyncApiRepository');
|
||||||
|
final ApiService _api;
|
||||||
|
RemoteSyncApiRepository(this._api);
|
||||||
|
|
||||||
|
Future<void> ack(List<String> data) {
|
||||||
|
return _api.syncApi.sendSyncAck(SyncAckSetDto(acks: data));
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<RemoteSyncStatus> streamChanges(
|
||||||
|
Future<RemoteSyncStatus> Function(List<SyncEvent>, {CancellationToken? token}) onData, {
|
||||||
|
bool syncReset = false,
|
||||||
|
CancellationToken? token,
|
||||||
|
int batchSize = kSyncEventBatchSize,
|
||||||
|
http.Client? httpClient,
|
||||||
|
}) async {
|
||||||
|
final stopwatch = Stopwatch()..start();
|
||||||
|
final client = httpClient ?? http.Client();
|
||||||
|
final endpoint = "${_api.apiClient.basePath}/sync/stream";
|
||||||
|
|
||||||
|
final headers = {'Content-Type': 'application/json', 'Accept': 'application/jsonlines+json'};
|
||||||
|
|
||||||
|
final headerParams = <String, String>{};
|
||||||
|
await _api.applyToParams([], headerParams);
|
||||||
|
headers.addAll(headerParams);
|
||||||
|
|
||||||
|
final request = http.Request('POST', Uri.parse(endpoint));
|
||||||
|
request.headers.addAll(headers);
|
||||||
|
request.body = jsonEncode(
|
||||||
|
SyncStreamDto(
|
||||||
|
types: [
|
||||||
|
SyncRequestType.authUsersV1,
|
||||||
|
SyncRequestType.usersV1,
|
||||||
|
SyncRequestType.assetsV1,
|
||||||
|
SyncRequestType.assetExifsV1,
|
||||||
|
SyncRequestType.partnersV1,
|
||||||
|
SyncRequestType.partnerAssetsV1,
|
||||||
|
SyncRequestType.partnerAssetExifsV1,
|
||||||
|
SyncRequestType.albumsV1,
|
||||||
|
SyncRequestType.albumUsersV1,
|
||||||
|
SyncRequestType.albumAssetsV1,
|
||||||
|
SyncRequestType.albumAssetExifsV1,
|
||||||
|
SyncRequestType.albumToAssetsV1,
|
||||||
|
SyncRequestType.memoriesV1,
|
||||||
|
SyncRequestType.memoryToAssetsV1,
|
||||||
|
SyncRequestType.stacksV1,
|
||||||
|
SyncRequestType.partnerStacksV1,
|
||||||
|
SyncRequestType.userMetadataV1,
|
||||||
|
SyncRequestType.peopleV1,
|
||||||
|
SyncRequestType.assetFacesV1,
|
||||||
|
],
|
||||||
|
reset: syncReset,
|
||||||
|
).toJson(),
|
||||||
|
);
|
||||||
|
|
||||||
|
String previousChunk = '';
|
||||||
|
List<String> lines = [];
|
||||||
|
|
||||||
|
RemoteSyncStatus status = RemoteSyncStatus.failed;
|
||||||
|
try {
|
||||||
|
final response = await client.send(request);
|
||||||
|
|
||||||
|
if (response.statusCode != 200) {
|
||||||
|
final errorBody = await response.stream.bytesToString();
|
||||||
|
throw ApiException(response.statusCode, 'Failed to get sync stream: $errorBody');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reset after successful stream start
|
||||||
|
await Store.put(StoreKey.shouldResetSync, false);
|
||||||
|
|
||||||
|
await for (final chunk in response.stream.transform(utf8.decoder)) {
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
|
||||||
|
previousChunk += chunk;
|
||||||
|
final parts = previousChunk.toString().split('\n');
|
||||||
|
previousChunk = parts.removeLast();
|
||||||
|
lines.addAll(parts);
|
||||||
|
|
||||||
|
if (lines.length < batchSize) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
status = await onData(_parseLines(lines), token: token);
|
||||||
|
lines.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
token?.throwIfCancelled();
|
||||||
|
if (lines.isNotEmpty) {
|
||||||
|
status = await onData(_parseLines(lines), token: token);
|
||||||
|
}
|
||||||
|
} catch (error, stack) {
|
||||||
|
return Future.error(error, stack);
|
||||||
|
} finally {
|
||||||
|
client.close();
|
||||||
|
}
|
||||||
|
stopwatch.stop();
|
||||||
|
_logger.info("Remote Sync completed in ${stopwatch.elapsed.inMilliseconds}ms");
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
|
||||||
|
List<SyncEvent> _parseLines(List<String> lines) {
|
||||||
|
final List<SyncEvent> data = [];
|
||||||
|
|
||||||
|
for (final line in lines) {
|
||||||
|
final jsonData = jsonDecode(line);
|
||||||
|
final type = SyncEntityType.fromJson(jsonData['type'])!;
|
||||||
|
final dataJson = jsonData['data'];
|
||||||
|
final ack = jsonData['ack'];
|
||||||
|
final converter = _kResponseMap[type];
|
||||||
|
if (converter == null) {
|
||||||
|
_logger.warning("Unknown type $type");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
data.add(SyncEvent(type: type, data: converter(dataJson), ack: ack));
|
||||||
|
}
|
||||||
|
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const _kResponseMap = <SyncEntityType, Function(Object)>{
|
||||||
|
SyncEntityType.authUserV1: SyncAuthUserV1.fromJson,
|
||||||
|
SyncEntityType.userV1: SyncUserV1.fromJson,
|
||||||
|
SyncEntityType.userDeleteV1: SyncUserDeleteV1.fromJson,
|
||||||
|
SyncEntityType.partnerV1: SyncPartnerV1.fromJson,
|
||||||
|
SyncEntityType.partnerDeleteV1: SyncPartnerDeleteV1.fromJson,
|
||||||
|
SyncEntityType.assetV1: SyncAssetV1.fromJson,
|
||||||
|
SyncEntityType.assetDeleteV1: SyncAssetDeleteV1.fromJson,
|
||||||
|
SyncEntityType.assetExifV1: SyncAssetExifV1.fromJson,
|
||||||
|
SyncEntityType.partnerAssetV1: SyncAssetV1.fromJson,
|
||||||
|
SyncEntityType.partnerAssetBackfillV1: SyncAssetV1.fromJson,
|
||||||
|
SyncEntityType.partnerAssetDeleteV1: SyncAssetDeleteV1.fromJson,
|
||||||
|
SyncEntityType.partnerAssetExifV1: SyncAssetExifV1.fromJson,
|
||||||
|
SyncEntityType.partnerAssetExifBackfillV1: SyncAssetExifV1.fromJson,
|
||||||
|
SyncEntityType.albumV1: SyncAlbumV1.fromJson,
|
||||||
|
SyncEntityType.albumDeleteV1: SyncAlbumDeleteV1.fromJson,
|
||||||
|
SyncEntityType.albumUserV1: SyncAlbumUserV1.fromJson,
|
||||||
|
SyncEntityType.albumUserBackfillV1: SyncAlbumUserV1.fromJson,
|
||||||
|
SyncEntityType.albumUserDeleteV1: SyncAlbumUserDeleteV1.fromJson,
|
||||||
|
SyncEntityType.albumAssetCreateV1: SyncAssetV1.fromJson,
|
||||||
|
SyncEntityType.albumAssetUpdateV1: SyncAssetV1.fromJson,
|
||||||
|
SyncEntityType.albumAssetBackfillV1: SyncAssetV1.fromJson,
|
||||||
|
SyncEntityType.albumAssetExifCreateV1: SyncAssetExifV1.fromJson,
|
||||||
|
SyncEntityType.albumAssetExifUpdateV1: SyncAssetExifV1.fromJson,
|
||||||
|
SyncEntityType.albumAssetExifBackfillV1: SyncAssetExifV1.fromJson,
|
||||||
|
SyncEntityType.albumToAssetV1: SyncAlbumToAssetV1.fromJson,
|
||||||
|
SyncEntityType.albumToAssetBackfillV1: SyncAlbumToAssetV1.fromJson,
|
||||||
|
SyncEntityType.albumToAssetDeleteV1: SyncAlbumToAssetDeleteV1.fromJson,
|
||||||
|
SyncEntityType.syncAckV1: _SyncEmptyDto.fromJson,
|
||||||
|
SyncEntityType.syncResetV1: _SyncEmptyDto.fromJson,
|
||||||
|
SyncEntityType.memoryV1: SyncMemoryV1.fromJson,
|
||||||
|
SyncEntityType.memoryDeleteV1: SyncMemoryDeleteV1.fromJson,
|
||||||
|
SyncEntityType.memoryToAssetV1: SyncMemoryAssetV1.fromJson,
|
||||||
|
SyncEntityType.memoryToAssetDeleteV1: SyncMemoryAssetDeleteV1.fromJson,
|
||||||
|
SyncEntityType.stackV1: SyncStackV1.fromJson,
|
||||||
|
SyncEntityType.stackDeleteV1: SyncStackDeleteV1.fromJson,
|
||||||
|
SyncEntityType.partnerStackV1: SyncStackV1.fromJson,
|
||||||
|
SyncEntityType.partnerStackBackfillV1: SyncStackV1.fromJson,
|
||||||
|
SyncEntityType.partnerStackDeleteV1: SyncStackDeleteV1.fromJson,
|
||||||
|
SyncEntityType.userMetadataV1: SyncUserMetadataV1.fromJson,
|
||||||
|
SyncEntityType.userMetadataDeleteV1: SyncUserMetadataDeleteV1.fromJson,
|
||||||
|
SyncEntityType.personV1: SyncPersonV1.fromJson,
|
||||||
|
SyncEntityType.personDeleteV1: SyncPersonDeleteV1.fromJson,
|
||||||
|
SyncEntityType.assetFaceV1: SyncAssetFaceV1.fromJson,
|
||||||
|
SyncEntityType.assetFaceDeleteV1: SyncAssetFaceDeleteV1.fromJson,
|
||||||
|
SyncEntityType.syncCompleteV1: _SyncEmptyDto.fromJson,
|
||||||
|
};
|
||||||
|
|
||||||
|
class _SyncEmptyDto {
|
||||||
|
static _SyncEmptyDto? fromJson(dynamic _) => _SyncEmptyDto();
|
||||||
|
}
|
||||||
9
mobile/lib/extensions/completer_extensions.dart
Normal file
9
mobile/lib/extensions/completer_extensions.dart
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
import 'dart:async';
|
||||||
|
|
||||||
|
extension CompleterExtensions<T> on Completer<T> {
|
||||||
|
void completeOnce([FutureOr<T>? value]) {
|
||||||
|
if (!isCompleted) {
|
||||||
|
complete(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -12,6 +12,7 @@ import 'package:flutter_displaymode/flutter_displaymode.dart';
|
|||||||
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
import 'package:hooks_riverpod/hooks_riverpod.dart';
|
||||||
import 'package:immich_mobile/constants/constants.dart';
|
import 'package:immich_mobile/constants/constants.dart';
|
||||||
import 'package:immich_mobile/constants/locales.dart';
|
import 'package:immich_mobile/constants/locales.dart';
|
||||||
|
import 'package:immich_mobile/domain/jobs/job_manager.dart';
|
||||||
import 'package:immich_mobile/domain/services/background_worker.service.dart';
|
import 'package:immich_mobile/domain/services/background_worker.service.dart';
|
||||||
import 'package:immich_mobile/entities/store.entity.dart';
|
import 'package:immich_mobile/entities/store.entity.dart';
|
||||||
import 'package:immich_mobile/extensions/build_context_extensions.dart';
|
import 'package:immich_mobile/extensions/build_context_extensions.dart';
|
||||||
@@ -53,6 +54,8 @@ void main() async {
|
|||||||
await initApp();
|
await initApp();
|
||||||
// Warm-up isolate pool for worker manager
|
// Warm-up isolate pool for worker manager
|
||||||
await workerManager.init(dynamicSpawning: true);
|
await workerManager.init(dynamicSpawning: true);
|
||||||
|
JobManager.I.init();
|
||||||
|
await JobManager.I.start();
|
||||||
await migrateDatabaseIfNeeded(isar, drift);
|
await migrateDatabaseIfNeeded(isar, drift);
|
||||||
HttpSSLOptions.apply();
|
HttpSSLOptions.apply();
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user