@@ -54,6 +54,7 @@ class _ConnectedClient {
54
54
final _SyncWorker _worker;
55
55
56
56
_SyncRunner ? _runner;
57
+ StreamSubscription ? _logSubscription;
57
58
58
59
_ConnectedClient (MessagePort port, this ._worker) {
59
60
channel = WorkerCommunicationChannel (
@@ -73,6 +74,30 @@ class _ConnectedClient {
73
74
}
74
75
},
75
76
);
77
+
78
+ _logSubscription = _logger.onRecord.listen ((record) {
79
+ final msg = StringBuffer (
80
+ '[${record .loggerName }] ${record .level .name }: ${record .time }: ${record .message }' );
81
+
82
+ if (record.error != null ) {
83
+ msg
84
+ ..writeln ()
85
+ ..write (record.error);
86
+ }
87
+ if (record.stackTrace != null ) {
88
+ msg
89
+ ..writeln ()
90
+ ..write (record.stackTrace);
91
+ }
92
+
93
+ channel.notify (SyncWorkerMessageType .logEvent, msg.toString ().toJS);
94
+ });
95
+ }
96
+
97
+ void markClosed () {
98
+ _logSubscription? .cancel ();
99
+ _runner? .unregisterClient (this );
100
+ _runner = null ;
76
101
}
77
102
}
78
103
@@ -82,62 +107,39 @@ class _SyncRunner {
82
107
final StreamGroup <_RunnerEvent > _group = StreamGroup ();
83
108
final StreamController <_RunnerEvent > _mainEvents = StreamController ();
84
109
110
+ StreamingSync ? sync ;
111
+ _ConnectedClient ? databaseHost;
112
+ final connections = < _ConnectedClient > [];
113
+
85
114
_SyncRunner (this .identifier) {
86
115
_group.add (_mainEvents.stream);
87
116
88
117
Future (() async {
89
- final connections = < _ConnectedClient > [];
90
- StreamingSync ? sync ;
91
-
92
118
await for (final event in _group.stream) {
93
119
try {
94
120
switch (event) {
95
121
case _AddConnection (: final client):
96
122
connections.add (client);
97
123
if (sync == null ) {
98
- _logger.info ('Sync setup: Requesting database' );
99
-
100
- // This is the first client, ask for a database connection
101
- final connection = await client.channel.requestDatabase ();
102
- _logger.info ('Sync setup: Connecting to endpoint' );
103
- final database = await WebSqliteConnection .connectToEndpoint ((
104
- connectPort: connection.databasePort,
105
- connectName: connection.databaseName,
106
- lockName: connection.lockName,
107
- ));
108
- _logger.info ('Sync setup: Has database, starting sync!' );
109
-
110
- // todo: Detect client going down (sqlite_web exposes this), fall
111
- // back to other connection in that case.
112
-
113
- sync = StreamingSyncImplementation (
114
- adapter: BucketStorage (database),
115
- credentialsCallback: client.channel.credentialsCallback,
116
- invalidCredentialsCallback:
117
- client.channel.invalidCredentialsCallback,
118
- uploadCrud: client.channel.uploadCrud,
119
- updateStream: powerSyncUpdateNotifications (
120
- database.updates ?? const Stream .empty ()),
121
- retryDelay: Duration (seconds: 3 ),
122
- client: FetchClient (mode: RequestMode .cors),
123
- identifier: identifier,
124
- );
125
- sync .statusStream.listen ((event) {
126
- _logger.fine ('Broadcasting sync event: $event ' );
127
- for (final client in connections) {
128
- client.channel.notify (
129
- SyncWorkerMessageType .notifySyncStatus,
130
- SerializedSyncStatus .from (event));
131
- }
132
- });
133
- sync .streamingSync ();
124
+ await _requestDatabase (client);
134
125
}
135
126
case _RemoveConnection (: final client):
136
127
connections.remove (client);
137
128
if (connections.isEmpty) {
138
129
await sync ? .abort ();
139
130
sync = null ;
140
131
}
132
+ case _ActiveDatabaseClosed ():
133
+ _logger.info ('Remote database closed, finding a new client' );
134
+ sync ? .abort ();
135
+ sync = null ;
136
+
137
+ final newHost = await _collectActiveClients ();
138
+ if (newHost == null ) {
139
+ _logger.info ('No client remains' );
140
+ } else {
141
+ await _requestDatabase (newHost);
142
+ }
141
143
}
142
144
} catch (e, s) {
143
145
_logger.warning ('Error handling $event ' , e, s);
@@ -146,6 +148,84 @@ class _SyncRunner {
146
148
});
147
149
}
148
150
151
+ /// Pings all current [connections] , removing those that don't answer in 5s
152
+ /// (as they are likely closed tabs as well).
153
+ ///
154
+ /// Returns the first client that responds (without waiting for others).
155
+ Future <_ConnectedClient ?> _collectActiveClients () async {
156
+ final candidates = connections.toList ();
157
+ if (candidates.isEmpty) {
158
+ return null ;
159
+ }
160
+
161
+ final firstResponder = Completer <_ConnectedClient ?>();
162
+ var pendingRequests = candidates.length;
163
+
164
+ for (final candidate in candidates) {
165
+ candidate.channel.ping ().then ((_) {
166
+ pendingRequests-- ;
167
+ if (! firstResponder.isCompleted) {
168
+ firstResponder.complete (candidate);
169
+ }
170
+ }).timeout (const Duration (seconds: 5 ), onTimeout: () {
171
+ pendingRequests-- ;
172
+ candidate.markClosed ();
173
+ if (pendingRequests == 0 && ! firstResponder.isCompleted) {
174
+ // All requests have timed out, no connection remains
175
+ firstResponder.complete (null );
176
+ }
177
+ });
178
+ }
179
+
180
+ return firstResponder.future;
181
+ }
182
+
183
+ Future <void > _requestDatabase (_ConnectedClient client) async {
184
+ _logger.info ('Sync setup: Requesting database' );
185
+
186
+ // This is the first client, ask for a database connection
187
+ final connection = await client.channel.requestDatabase ();
188
+ _logger.info ('Sync setup: Connecting to endpoint' );
189
+ final database = await WebSqliteConnection .connectToEndpoint ((
190
+ connectPort: connection.databasePort,
191
+ connectName: connection.databaseName,
192
+ lockName: connection.lockName,
193
+ ));
194
+ _logger.info ('Sync setup: Has database, starting sync!' );
195
+ databaseHost = client;
196
+
197
+ database.closedFuture.then ((_) {
198
+ _logger.fine ('Detected closed client' );
199
+ client.markClosed ();
200
+
201
+ if (client == databaseHost) {
202
+ _logger
203
+ .info ('Tab providing sync database has gone down, reconnecting...' );
204
+ _mainEvents.add (const _ActiveDatabaseClosed ());
205
+ }
206
+ });
207
+
208
+ sync = StreamingSyncImplementation (
209
+ adapter: BucketStorage (database),
210
+ credentialsCallback: client.channel.credentialsCallback,
211
+ invalidCredentialsCallback: client.channel.invalidCredentialsCallback,
212
+ uploadCrud: client.channel.uploadCrud,
213
+ updateStream: powerSyncUpdateNotifications (
214
+ database.updates ?? const Stream .empty ()),
215
+ retryDelay: Duration (seconds: 3 ),
216
+ client: FetchClient (mode: RequestMode .cors),
217
+ identifier: identifier,
218
+ );
219
+ sync ! .statusStream.listen ((event) {
220
+ _logger.fine ('Broadcasting sync event: $event ' );
221
+ for (final client in connections) {
222
+ client.channel.notify (SyncWorkerMessageType .notifySyncStatus,
223
+ SerializedSyncStatus .from (event));
224
+ }
225
+ });
226
+ sync ! .streamingSync ();
227
+ }
228
+
149
229
void registerClient (_ConnectedClient client) {
150
230
_mainEvents.add (_AddConnection (client));
151
231
}
@@ -168,3 +248,7 @@ final class _RemoveConnection implements _RunnerEvent {
168
248
169
249
_RemoveConnection (this .client);
170
250
}
251
+
252
+ final class _ActiveDatabaseClosed implements _RunnerEvent {
253
+ const _ActiveDatabaseClosed ();
254
+ }
0 commit comments