Skip to content

Commit 087b5d5

Browse files
committed
Share crud and sync mutexes
1 parent 87759eb commit 087b5d5

File tree

4 files changed

+59
-42
lines changed

4 files changed

+59
-42
lines changed

packages/powersync_core/lib/src/database/active_instances.dart

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,23 @@ final class ActiveDatabaseGroup {
1818

1919
/// Use to prevent multiple connections from being opened concurrently
2020
final Mutex syncConnectMutex = Mutex();
21+
final Mutex syncMutex;
22+
final Mutex crudMutex;
23+
2124
final String identifier;
2225

23-
ActiveDatabaseGroup._(this.identifier);
26+
ActiveDatabaseGroup._(this.identifier)
27+
: syncMutex = Mutex(identifier: '$identifier-sync'),
28+
crudMutex = Mutex(identifier: '$identifier-crud');
2429

25-
void close() {
30+
Future<void> close() async {
2631
if (--refCount == 0) {
2732
final removedGroup = _activeGroups.remove(identifier);
2833
assert(removedGroup == this);
34+
35+
await syncConnectMutex.close();
36+
await syncMutex.close();
37+
await crudMutex.close();
2938
}
3039
}
3140

packages/powersync_core/lib/src/database/native/native_powersync_database.dart

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,6 @@ class PowerSyncDatabaseImpl
4646
@protected
4747
late Future<void> isInitialized;
4848

49-
final SimpleMutex _syncMutex = SimpleMutex(), _crudMutex = SimpleMutex();
50-
5149
@override
5250

5351
/// The Logger used by this [PowerSyncDatabase].
@@ -223,6 +221,9 @@ class PowerSyncDatabaseImpl
223221
receivedIsolateExit.complete();
224222
});
225223

224+
final crudMutex = group.crudMutex as SimpleMutex;
225+
final syncMutex = group.syncMutex as SimpleMutex;
226+
226227
// Spawning the isolate can't be interrupted
227228
triedSpawningIsolate = true;
228229
await Isolate.spawn(
@@ -232,8 +233,8 @@ class PowerSyncDatabaseImpl
232233
dbRef,
233234
retryDelay,
234235
clientParams,
235-
_crudMutex.shared,
236-
_syncMutex.shared,
236+
crudMutex.shared,
237+
syncMutex.shared,
237238
),
238239
debugName: 'Sync ${database.openFactory.path}',
239240
onError: receiveUnhandledErrors.sendPort,
@@ -269,14 +270,6 @@ class PowerSyncDatabaseImpl
269270
return database.writeLock(callback,
270271
debugContext: debugContext, lockTimeout: lockTimeout);
271272
}
272-
273-
@override
274-
Future<void> close() async {
275-
await super.close();
276-
277-
await _crudMutex.close();
278-
await _crudMutex.close();
279-
}
280273
}
281274

282275
class _PowerSyncDatabaseIsolateArgs {
@@ -311,24 +304,32 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
311304
StreamingSyncImplementation? openedStreamingSync;
312305
StreamSubscription<void>? localUpdatesSubscription;
313306

314-
Future<void> shutdown() async {
315-
await openedStreamingSync?.abort();
316-
317-
localUpdatesSubscription?.cancel();
318-
db?.dispose();
319-
crudUpdateController.close();
320-
upstreamDbClient.close();
321-
322-
// The SyncSqliteConnection uses this mutex
323-
// It needs to be closed before killing the isolate
324-
// in order to free the mutex for other operations.
325-
await mutex.close();
326-
await crudMutex.close();
327-
await syncMutex.close();
328-
rPort.close();
307+
Completer<void> shutdownCompleter = Completer();
308+
309+
Future<void> shutdown() {
310+
if (!shutdownCompleter.isCompleted) {
311+
shutdownCompleter.complete(Future(() async {
312+
await openedStreamingSync?.abort();
313+
314+
localUpdatesSubscription?.cancel();
315+
db?.dispose();
316+
crudUpdateController.close();
317+
upstreamDbClient.close();
318+
319+
// The SyncSqliteConnection uses this mutex
320+
// It needs to be closed before killing the isolate
321+
// in order to free the mutex for other operations.
322+
await mutex.close();
323+
await crudMutex.close();
324+
await syncMutex.close();
325+
rPort.close();
326+
327+
// TODO: If we closed our resources properly, this wouldn't be necessary...
328+
Isolate.current.kill();
329+
}));
330+
}
329331

330-
// TODO: If we closed our resources properly, this wouldn't be necessary...
331-
Isolate.current.kill();
332+
return shutdownCompleter.future;
332333
}
333334

334335
rPort.listen((message) async {
@@ -415,12 +416,12 @@ Future<void> _syncIsolate(_PowerSyncDatabaseIsolateArgs args) async {
415416
updateDebouncer ??=
416417
Timer(const Duration(milliseconds: 1), maybeFireUpdates);
417418
});
418-
}, (error, stack) {
419+
}, (error, stack) async {
419420
// Properly dispose the database if an uncaught error occurs.
420421
// Unfortunately, this does not handle disposing while the database is opening.
421422
// This should be rare - any uncaught error is a bug. And in most cases,
422423
// it should occur after the database is already open.
423-
shutdown();
424+
await shutdown();
424425
throw error;
425426
});
426427
}

packages/powersync_core/lib/src/database/powersync_db_mixin.dart

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,14 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
5252

5353
late final ActiveDatabaseGroup _activeGroup;
5454

55+
/// An [ActiveDatabaseGroup] sharing mutexes for the sync client.
56+
///
57+
/// This is used to ensure that, even if two databases to the same file are
58+
/// open concurrently, they won't both open a sync stream. Doing so would
59+
/// waste resources.
60+
@internal
61+
ActiveDatabaseGroup get group => _activeGroup;
62+
5563
@override
5664

5765
/// Broadcast stream that is notified of any table updates.
@@ -242,7 +250,7 @@ mixin PowerSyncDatabaseMixin implements SqliteConnection {
242250
// If there are paused subscriptionso n the status stream, don't delay
243251
// closing the database because of that.
244252
unawaited(statusStreamController.close());
245-
_activeGroup.close();
253+
await _activeGroup.close();
246254
}
247255
}
248256

packages/powersync_core/lib/src/streaming_sync.dart

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -150,8 +150,7 @@ class StreamingSyncImplementation implements StreamingSync {
150150
invalidCredentials = false;
151151
}
152152
// Protect sync iterations with exclusivity (if a valid Mutex is provided)
153-
await syncMutex.lock(
154-
() => streamingSyncIteration(abortController: _abort),
153+
await syncMutex.lock(() => streamingSyncIteration(),
155154
timeout: retryDelay);
156155
} catch (e, stacktrace) {
157156
if (aborted && e is http.ClientException) {
@@ -345,11 +344,13 @@ class StreamingSyncImplementation implements StreamingSync {
345344
return (initialRequests, localDescriptions);
346345
}
347346

348-
Future<void> streamingSyncIteration(
349-
{AbortController? abortController}) async {
347+
Future<void> streamingSyncIteration() async {
350348
adapter.startSession();
351349

352350
var (bucketRequests, bucketMap) = await _collectLocalBucketState();
351+
if (aborted) {
352+
return;
353+
}
353354

354355
Checkpoint? targetCheckpoint;
355356
Checkpoint? validatedCheckpoint;
@@ -388,8 +389,7 @@ class StreamingSyncImplementation implements StreamingSync {
388389
await adapter.removeBuckets([...bucketsToDelete]);
389390
_updateStatus(downloading: true);
390391
case StreamingSyncCheckpointComplete():
391-
final result =
392-
await _applyCheckpoint(targetCheckpoint!, abortController);
392+
final result = await _applyCheckpoint(targetCheckpoint!, _abort);
393393
if (result.abort) {
394394
return;
395395
}
@@ -479,8 +479,7 @@ class StreamingSyncImplementation implements StreamingSync {
479479
downloadError: _noError,
480480
lastSyncedAt: DateTime.now());
481481
} else if (validatedCheckpoint == targetCheckpoint) {
482-
final result =
483-
await _applyCheckpoint(targetCheckpoint!, abortController);
482+
final result = await _applyCheckpoint(targetCheckpoint!, _abort);
484483
if (result.abort) {
485484
return;
486485
}

0 commit comments

Comments
 (0)