fix: flaky mobile sync api tests (#23324)

This commit is contained in:
Brandon Wees
2025-10-28 12:16:36 -05:00
committed by GitHub
parent 9f0b5790af
commit 3edcb180eb

View File

@@ -80,6 +80,7 @@ void main() {
int onDataCallCount = 0;
bool abortWasCalledInCallback = false;
List<SyncEvent> receivedEventsBatch1 = [];
final Completer<void> firstBatchReceived = Completer<void>();
Future<void> onDataCallback(List<SyncEvent> events, Function() abort, Function() _) async {
onDataCallCount++;
@@ -87,6 +88,7 @@ void main() {
receivedEventsBatch1 = events;
abort();
abortWasCalledInCallback = true;
firstBatchReceived.complete();
} else {
fail("onData called more than once after abort was invoked");
}
@@ -94,7 +96,8 @@ void main() {
final streamChangesFuture = streamChanges(onDataCallback);
await pumpEventQueue();
// Give the stream subscription time to start (longer delay to account for mock delay)
await Future.delayed(const Duration(milliseconds: 50));
for (int i = 0; i < testBatchSize; i++) {
responseStreamController.add(
@@ -104,6 +107,11 @@ void main() {
);
}
await firstBatchReceived.future.timeout(
const Duration(seconds: 5),
onTimeout: () => fail('First batch was not processed within timeout'),
);
for (int i = testBatchSize; i < testBatchSize * 2; i++) {
responseStreamController.add(
utf8.encode(
@@ -124,12 +132,14 @@ void main() {
test('streamChanges does not process remaining lines in finally block if aborted', () async {
int onDataCallCount = 0;
bool abortWasCalledInCallback = false;
final Completer<void> firstBatchReceived = Completer<void>();
Future<void> onDataCallback(List<SyncEvent> events, Function() abort, Function() _) async {
onDataCallCount++;
if (onDataCallCount == 1) {
abort();
abortWasCalledInCallback = true;
firstBatchReceived.complete();
} else {
fail("onData called more than once after abort was invoked");
}
@@ -137,7 +147,7 @@ void main() {
final streamChangesFuture = streamChanges(onDataCallback);
await pumpEventQueue();
await Future.delayed(const Duration(milliseconds: 50));
for (int i = 0; i < testBatchSize; i++) {
responseStreamController.add(
@@ -147,6 +157,11 @@ void main() {
);
}
await firstBatchReceived.future.timeout(
const Duration(seconds: 5),
onTimeout: () => fail('First batch was not processed within timeout'),
);
// emit a single event to skip batching and trigger finally
responseStreamController.add(
utf8.encode(
@@ -166,13 +181,17 @@ void main() {
int onDataCallCount = 0;
List<SyncEvent> receivedEventsBatch1 = [];
List<SyncEvent> receivedEventsBatch2 = [];
final Completer<void> firstBatchReceived = Completer<void>();
final Completer<void> secondBatchReceived = Completer<void>();
Future<void> onDataCallback(List<SyncEvent> events, Function() _, Function() __) async {
onDataCallCount++;
if (onDataCallCount == 1) {
receivedEventsBatch1 = events;
firstBatchReceived.complete();
} else if (onDataCallCount == 2) {
receivedEventsBatch2 = events;
secondBatchReceived.complete();
} else {
fail("onData called more than expected");
}
@@ -180,7 +199,7 @@ void main() {
final streamChangesFuture = streamChanges(onDataCallback);
await pumpEventQueue();
await Future.delayed(const Duration(milliseconds: 50));
// Batch 1
for (int i = 0; i < testBatchSize; i++) {
@@ -191,7 +210,11 @@ void main() {
);
}
// Partial Batch 2
await firstBatchReceived.future.timeout(
const Duration(seconds: 5),
onTimeout: () => fail('First batch was not processed within timeout'),
);
responseStreamController.add(
utf8.encode(
_createJsonLine(SyncEntityType.userDeleteV1.toString(), SyncUserDeleteV1(userId: "user100").toJson(), 'ack100'),
@@ -199,6 +222,12 @@ void main() {
);
await responseStreamController.close();
await secondBatchReceived.future.timeout(
const Duration(seconds: 5),
onTimeout: () => fail('Second batch was not processed within timeout'),
);
await expectLater(streamChangesFuture, completes);
expect(onDataCallCount, 2);
@@ -217,7 +246,7 @@ void main() {
final streamChangesFuture = streamChanges(onDataCallback);
await pumpEventQueue();
await Future.delayed(const Duration(milliseconds: 50));
responseStreamController.add(
utf8.encode(