Skip to content

Commit 8576e28

Browse files
committed
Close http connections immediately on disconnect.
1 parent b7f4511 commit 8576e28

File tree

2 files changed

+71
-24
lines changed

2 files changed

+71
-24
lines changed

packages/powersync/lib/src/powersync_database.dart

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,7 @@ Future<void> _powerSyncDatabaseIsolate(
532532

533533
CommonDatabase? db;
534534
final mutex = args.dbRef.mutex.open();
535+
StreamingSyncImplementation? _sync;
535536

536537
rPort.listen((message) async {
537538
if (message is List) {
@@ -546,6 +547,9 @@ Future<void> _powerSyncDatabaseIsolate(
546547
db = null;
547548
updateController.close();
548549
upstreamDbClient.close();
550+
// Abort any open http requests, and wait for it to be closed properly
551+
await _sync?.abort();
552+
// No kill the Isolate
549553
Isolate.current.kill();
550554
}
551555
}
@@ -592,6 +596,7 @@ Future<void> _powerSyncDatabaseIsolate(
592596
uploadCrud: uploadCrud,
593597
updateStream: updateController.stream,
594598
retryDelay: args.retryDelay);
599+
_sync = sync;
595600
sync.streamingSync();
596601
sync.statusStream.listen((event) {
597602
sPort.send(['status', event]);

packages/powersync/lib/src/streaming_sync.dart

Lines changed: 66 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import 'dart:convert' as convert;
33
import 'dart:io';
44

55
import 'package:http/http.dart' as http;
6+
import 'package:powersync/src/abort_controller.dart';
67
import 'package:powersync/src/exceptions.dart';
78
import 'package:powersync/src/log_internal.dart';
89

@@ -39,6 +40,8 @@ class StreamingSyncImplementation {
3940

4041
SyncStatus lastStatus = const SyncStatus();
4142

43+
AbortController? _abort;
44+
4245
StreamingSyncImplementation(
4346
{required this.adapter,
4447
required this.credentialsCallback,
@@ -50,34 +53,66 @@ class StreamingSyncImplementation {
5053
statusStream = _statusStreamController.stream;
5154
}
5255

56+
/// Close any active streams.
57+
Future<void> abort() async {
58+
// If streamingSync() hasn't been called yet, _abort will be null.
59+
var future = _abort?.abort();
60+
// This immediately triggers a new iteration in the merged stream, allowing us
61+
// to break immediately.
62+
// However, we still need to close the underlying stream explicitly, otherwise
63+
// the break will wait for the next line of data received on the stream.
64+
_localPingController.add(null);
65+
// According to the documentation, the behavior is undefined when calling
66+
// close() while requests are pending. However, this is no other
67+
// known way to cancel open streams, and this appears to end the stream with
68+
// a consistent ClientException.
69+
_client.close();
70+
// wait for completeAbort() to be called
71+
await future;
72+
}
73+
74+
bool get aborted {
75+
return _abort?.aborted ?? false;
76+
}
77+
5378
Future<void> streamingSync() async {
54-
crudLoop();
55-
var invalidCredentials = false;
56-
while (true) {
57-
_updateStatus(connecting: true);
58-
try {
59-
if (invalidCredentials && invalidCredentialsCallback != null) {
60-
// This may error. In that case it will be retried again on the next
61-
// iteration.
62-
await invalidCredentialsCallback!();
63-
invalidCredentials = false;
64-
}
65-
await streamingSyncIteration();
66-
// Continue immediately
67-
} catch (e, stacktrace) {
68-
final message = _syncErrorMessage(e);
69-
isolateLogger.warning('Sync error: $message', e, stacktrace);
70-
invalidCredentials = true;
79+
try {
80+
_abort = AbortController();
81+
crudLoop();
82+
var invalidCredentials = false;
83+
while (!aborted) {
84+
_updateStatus(connecting: true);
85+
try {
86+
if (invalidCredentials && invalidCredentialsCallback != null) {
87+
// This may error. In that case it will be retried again on the next
88+
// iteration.
89+
await invalidCredentialsCallback!();
90+
invalidCredentials = false;
91+
}
92+
await streamingSyncIteration();
93+
// Continue immediately
94+
} catch (e, stacktrace) {
95+
if (aborted && e is http.ClientException) {
96+
// Explicit abort requested - ignore. Example error:
97+
// ClientException: Connection closed while receiving data, uri=http://localhost:8080/sync/stream
98+
return;
99+
}
100+
final message = _syncErrorMessage(e);
101+
isolateLogger.warning('Sync error: $message', e, stacktrace);
102+
invalidCredentials = true;
71103

72-
_updateStatus(
73-
connected: false,
74-
connecting: true,
75-
downloading: false,
76-
downloadError: e);
104+
_updateStatus(
105+
connected: false,
106+
connecting: true,
107+
downloading: false,
108+
downloadError: e);
77109

78-
// On error, wait a little before retrying
79-
await Future.delayed(retryDelay);
110+
// On error, wait a little before retrying
111+
await Future.delayed(retryDelay);
112+
}
80113
}
114+
} finally {
115+
_abort!.completeAbort();
81116
}
82117
}
83118

@@ -204,6 +239,10 @@ class StreamingSyncImplementation {
204239
bool haveInvalidated = false;
205240

206241
await for (var line in merged) {
242+
if (aborted) {
243+
break;
244+
}
245+
207246
_updateStatus(connected: true, connecting: false);
208247
if (line is Checkpoint) {
209248
targetCheckpoint = line;
@@ -348,6 +387,9 @@ class StreamingSyncImplementation {
348387

349388
// Note: The response stream is automatically closed when this loop errors
350389
await for (var line in ndjson(res.stream)) {
390+
if (aborted) {
391+
break;
392+
}
351393
yield parseStreamingSyncLine(line as Map<String, dynamic>);
352394
}
353395
}

0 commit comments

Comments
 (0)