@@ -9,13 +9,15 @@ import {
9
9
UpdateNotification ,
10
10
isBatchedUpdateNotification
11
11
} from '../db/DBAdapter.js' ;
12
+ import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js' ;
12
13
import { SyncPriorityStatus , SyncStatus } from '../db/crud/SyncStatus.js' ;
13
14
import { UploadQueueStats } from '../db/crud/UploadQueueStatus.js' ;
14
15
import { Schema } from '../db/schema/Schema.js' ;
15
16
import { BaseObserver } from '../utils/BaseObserver.js' ;
16
17
import { ControlledExecutor } from '../utils/ControlledExecutor.js' ;
17
- import { mutexRunExclusive } from '../utils/mutex.js' ;
18
18
import { throttleTrailing } from '../utils/async.js' ;
19
+ import { mutexRunExclusive } from '../utils/mutex.js' ;
20
+ import { ConnectionManager } from './ConnectionManager.js' ;
19
21
import { SQLOpenFactory , SQLOpenOptions , isDBAdapter , isSQLOpenFactory , isSQLOpenOptions } from './SQLOpenFactory.js' ;
20
22
import { PowerSyncBackendConnector } from './connection/PowerSyncBackendConnector.js' ;
21
23
import { runOnSchemaChange } from './runOnSchemaChange.js' ;
@@ -32,7 +34,6 @@ import {
32
34
type PowerSyncConnectionOptions ,
33
35
type RequiredAdditionalConnectionOptions
34
36
} from './sync/stream/AbstractStreamingSyncImplementation.js' ;
35
- import { FULL_SYNC_PRIORITY } from '../db/crud/SyncProgress.js' ;
36
37
37
38
export interface DisconnectAndClearOptions {
38
39
/** When set to false, data in local-only tables is preserved. */
@@ -165,17 +166,22 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
165
166
*/
166
167
currentStatus : SyncStatus ;
167
168
168
- syncStreamImplementation ?: StreamingSyncImplementation ;
169
169
sdkVersion : string ;
170
170
171
171
protected bucketStorageAdapter : BucketStorageAdapter ;
172
- private syncStatusListenerDisposer ?: ( ) => void ;
173
172
protected _isReadyPromise : Promise < void > ;
173
+ protected connectionManager : ConnectionManager ;
174
+
175
+ get syncStreamImplementation ( ) {
176
+ return this . connectionManager . syncStreamImplementation ;
177
+ }
174
178
175
179
protected _schema : Schema ;
176
180
177
181
private _database : DBAdapter ;
178
182
183
+ protected runExclusiveMutex : Mutex ;
184
+
179
185
constructor ( options : PowerSyncDatabaseOptionsWithDBAdapter ) ;
180
186
constructor ( options : PowerSyncDatabaseOptionsWithOpenFactory ) ;
181
187
constructor ( options : PowerSyncDatabaseOptionsWithSettings ) ;
@@ -206,7 +212,33 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
206
212
this . _schema = schema ;
207
213
this . ready = false ;
208
214
this . sdkVersion = '' ;
215
+ this . runExclusiveMutex = new Mutex ( ) ;
209
216
// Start async init
217
+ this . connectionManager = new ConnectionManager ( {
218
+ createSyncImplementation : async ( connector , options ) => {
219
+ await this . waitForReady ( ) ;
220
+
221
+ return this . runExclusive ( async ( ) => {
222
+ const sync = this . generateSyncStreamImplementation ( connector , this . resolvedConnectionOptions ( options ) ) ;
223
+ const onDispose = sync . registerListener ( {
224
+ statusChanged : ( status ) => {
225
+ this . currentStatus = new SyncStatus ( {
226
+ ...status . toJSON ( ) ,
227
+ hasSynced : this . currentStatus ?. hasSynced || ! ! status . lastSyncedAt
228
+ } ) ;
229
+ this . iterateListeners ( ( cb ) => cb . statusChanged ?.( this . currentStatus ) ) ;
230
+ }
231
+ } ) ;
232
+ await sync . waitForReady ( ) ;
233
+
234
+ return {
235
+ sync,
236
+ onDispose
237
+ } ;
238
+ } ) ;
239
+ } ,
240
+ logger : this . logger
241
+ } ) ;
210
242
this . _isReadyPromise = this . initialize ( ) ;
211
243
}
212
244
@@ -425,34 +457,19 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
425
457
} ;
426
458
}
427
459
460
+ /**
461
+ * Locking mechanism for exclusively running critical portions of connect/disconnect operations.
462
+ * Locking here is mostly only important on web for multiple tab scenarios.
463
+ */
464
+ protected runExclusive < T > ( callback : ( ) => Promise < T > ) : Promise < T > {
465
+ return this . runExclusiveMutex . runExclusive ( callback ) ;
466
+ }
467
+
428
468
/**
429
469
* Connects to stream of events from the PowerSync instance.
430
470
*/
431
471
async connect ( connector : PowerSyncBackendConnector , options ?: PowerSyncConnectionOptions ) {
432
- await this . waitForReady ( ) ;
433
-
434
- // close connection if one is open
435
- await this . disconnect ( ) ;
436
- if ( this . closed ) {
437
- throw new Error ( 'Cannot connect using a closed client' ) ;
438
- }
439
-
440
- const resolvedConnectOptions = this . resolvedConnectionOptions ( options ) ;
441
-
442
- this . syncStreamImplementation = this . generateSyncStreamImplementation ( connector , resolvedConnectOptions ) ;
443
- this . syncStatusListenerDisposer = this . syncStreamImplementation . registerListener ( {
444
- statusChanged : ( status ) => {
445
- this . currentStatus = new SyncStatus ( {
446
- ...status . toJSON ( ) ,
447
- hasSynced : this . currentStatus ?. hasSynced || ! ! status . lastSyncedAt
448
- } ) ;
449
- this . iterateListeners ( ( cb ) => cb . statusChanged ?.( this . currentStatus ) ) ;
450
- }
451
- } ) ;
452
-
453
- await this . syncStreamImplementation . waitForReady ( ) ;
454
- this . syncStreamImplementation . triggerCrudUpload ( ) ;
455
- await this . syncStreamImplementation . connect ( options ) ;
472
+ return this . connectionManager . connect ( connector , options ) ;
456
473
}
457
474
458
475
/**
@@ -461,11 +478,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
461
478
* Use {@link connect} to connect again.
462
479
*/
463
480
async disconnect ( ) {
464
- await this . waitForReady ( ) ;
465
- await this . syncStreamImplementation ?. disconnect ( ) ;
466
- this . syncStatusListenerDisposer ?.( ) ;
467
- await this . syncStreamImplementation ?. dispose ( ) ;
468
- this . syncStreamImplementation = undefined ;
481
+ return this . connectionManager . disconnect ( ) ;
469
482
}
470
483
471
484
/**
@@ -512,7 +525,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
512
525
await this . disconnect ( ) ;
513
526
}
514
527
515
- await this . syncStreamImplementation ?. dispose ( ) ;
528
+ await this . connectionManager . close ( ) ;
516
529
await this . database . close ( ) ;
517
530
this . closed = true ;
518
531
}
0 commit comments