Skip to content

Commit a6982e7

Browse files
committed
Fix some errors when closing client while connecting.
1 parent 8576e28 commit a6982e7

File tree

2 files changed

+43
-10
lines changed

2 files changed

+43
-10
lines changed

packages/powersync/lib/src/powersync_database.dart

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -191,13 +191,24 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection {
191191
/// Throttle time between CRUD operations
192192
/// Defaults to 10 milliseconds.
193193
Duration crudThrottleTime = const Duration(milliseconds: 10)}) async {
194-
_connectMutex.lock(() =>
195-
_connect(connector: connector, crudThrottleTime: crudThrottleTime));
194+
Zone current = Zone.current;
195+
196+
Future<void> reconnect() {
197+
return _connectMutex.lock(() => _connect(
198+
connector: connector,
199+
crudThrottleTime: crudThrottleTime,
200+
// The reconnect function needs to run in the original zone,
201+
// to avoid recursive lock errors.
202+
reconnect: current.bindCallback(reconnect)));
203+
}
204+
205+
await reconnect();
196206
}
197207

198208
Future<void> _connect(
199209
{required PowerSyncBackendConnector connector,
200-
required Duration crudThrottleTime}) async {
210+
required Duration crudThrottleTime,
211+
required Future<void> Function() reconnect}) async {
201212
await initialize();
202213

203214
// Disconnect if connected
@@ -266,7 +277,9 @@ class PowerSyncDatabase with SqliteQueries implements SqliteConnection {
266277
logger.severe('Sync Isolate error', message);
267278

268279
// Reconnect
269-
connect(connector: connector);
280+
// Use the param like this instead of directly calling connect(), to avoid recursive
281+
// locks in some edge cases.
282+
reconnect();
270283
});
271284

272285
disconnected() {
@@ -532,7 +545,7 @@ Future<void> _powerSyncDatabaseIsolate(
532545

533546
CommonDatabase? db;
534547
final mutex = args.dbRef.mutex.open();
535-
StreamingSyncImplementation? _sync;
548+
StreamingSyncImplementation? openedStreamingSync;
536549

537550
rPort.listen((message) async {
538551
if (message is List) {
@@ -548,7 +561,7 @@ Future<void> _powerSyncDatabaseIsolate(
548561
updateController.close();
549562
upstreamDbClient.close();
550563
// Abort any open http requests, and wait for it to be closed properly
551-
await _sync?.abort();
564+
await openedStreamingSync?.abort();
552565
// No kill the Isolate
553566
Isolate.current.kill();
554567
}
@@ -596,7 +609,7 @@ Future<void> _powerSyncDatabaseIsolate(
596609
uploadCrud: uploadCrud,
597610
updateStream: updateController.stream,
598611
retryDelay: args.retryDelay);
599-
_sync = sync;
612+
openedStreamingSync = sync;
600613
sync.streamingSync();
601614
sync.statusStream.listen((event) {
602615
sPort.send(['status', event]);

packages/powersync/lib/src/streaming_sync.dart

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ class StreamingSyncImplementation {
4242

4343
AbortController? _abort;
4444

45+
bool _safeToClose = true;
46+
4547
StreamingSyncImplementation(
4648
{required this.adapter,
4749
required this.credentialsCallback,
@@ -65,10 +67,17 @@ class StreamingSyncImplementation {
6567
// According to the documentation, the behavior is undefined when calling
6668
// close() while requests are pending. However, this is no other
6769
// known way to cancel open streams, and this appears to end the stream with
68-
// a consistent ClientException.
69-
_client.close();
70+
// a consistent ClientException if a request is open.
71+
// We avoid closing the client while opening a request, as that does cause
72+
// unpredicable uncaught errors.
73+
if (_safeToClose) {
74+
_client.close();
75+
}
7076
// wait for completeAbort() to be called
7177
await future;
78+
79+
// Now close the client in all cases not covered above
80+
_client.close();
7281
}
7382

7483
bool get aborted {
@@ -375,7 +384,18 @@ class StreamingSyncImplementation {
375384
request.headers['Authorization'] = "Token ${credentials.token}";
376385
request.body = convert.jsonEncode(data);
377386

378-
final res = await _client.send(request);
387+
http.StreamedResponse res;
388+
try {
389+
// Do not close the client during the request phase - this causes uncaught errors.
390+
_safeToClose = false;
391+
res = await _client.send(request);
392+
} finally {
393+
_safeToClose = true;
394+
}
395+
if (aborted) {
396+
return;
397+
}
398+
379399
if (res.statusCode == 401) {
380400
if (invalidCredentialsCallback != null) {
381401
await invalidCredentialsCallback!();

0 commit comments

Comments
 (0)