Skip to content

Commit 0565a0a

Browse files
authored
feat: Fetch credentials on token expiry/401 (#604)
1 parent 38d36e2 commit 0565a0a

File tree

7 files changed

+114
-11
lines changed

7 files changed

+114
-11
lines changed

.changeset/cool-yaks-allow.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/web': minor
3+
---
4+
5+
To support the upstream credentials management changes from `@powersync/common`, the sync worker now communicates credentials invalidation to tabs.

.changeset/shiny-rules-invent.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': minor
3+
---
4+
5+
Improved credentials management and error handling. Credentials are invalidated when they expire or become invalid based on responses from the PowerSync service. The frequency of credential fetching has been reduced as a result of this work.

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 83 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,12 @@ export type BSONImplementation = typeof BSON;
1515

1616
export type RemoteConnector = {
1717
fetchCredentials: () => Promise<PowerSyncCredentials | null>;
18+
invalidateCredentials?: () => void;
1819
};
1920

2021
const POWERSYNC_TRAILING_SLASH_MATCH = /\/+$/;
2122
const POWERSYNC_JS_VERSION = PACKAGE.version;
2223

23-
// Refresh at least 30 sec before it expires
24-
const REFRESH_CREDENTIALS_SAFETY_PERIOD_MS = 30_000;
2524
const SYNC_QUEUE_REQUEST_LOW_WATER = 5;
2625

2726
// Keep alive message is sent every period
@@ -130,18 +129,59 @@ export abstract class AbstractRemote {
130129
: fetchImplementation;
131130
}
132131

132+
/**
133+
* Get credentials currently cached, or fetch new credentials if none are
134+
* available.
135+
*
136+
* These credentials may have expired already.
137+
*/
133138
async getCredentials(): Promise<PowerSyncCredentials | null> {
134-
const { expiresAt } = this.credentials ?? {};
135-
if (expiresAt && expiresAt > new Date(new Date().valueOf() + REFRESH_CREDENTIALS_SAFETY_PERIOD_MS)) {
136-
return this.credentials!;
139+
if (this.credentials) {
140+
return this.credentials;
137141
}
138-
this.credentials = await this.connector.fetchCredentials();
139-
if (this.credentials?.endpoint.match(POWERSYNC_TRAILING_SLASH_MATCH)) {
142+
143+
return this.prefetchCredentials();
144+
}
145+
146+
/**
147+
* Fetch a new set of credentials and cache it.
148+
*
149+
* Until this call succeeds, `getCredentials` will still return the
150+
* old credentials.
151+
*
152+
* This may be called before the current credentials have expired.
153+
*/
154+
async prefetchCredentials() {
155+
this.credentials = await this.fetchCredentials();
156+
157+
return this.credentials;
158+
}
159+
160+
/**
161+
* Get credentials for PowerSync.
162+
*
163+
* This should always fetch a fresh set of credentials - don't use cached
164+
* values.
165+
*/
166+
async fetchCredentials() {
167+
const credentials = await this.connector.fetchCredentials();
168+
if (credentials?.endpoint.match(POWERSYNC_TRAILING_SLASH_MATCH)) {
140169
throw new Error(
141-
`A trailing forward slash "/" was found in the fetchCredentials endpoint: "${this.credentials.endpoint}". Remove the trailing forward slash "/" to fix this error.`
170+
`A trailing forward slash "/" was found in the fetchCredentials endpoint: "${credentials.endpoint}". Remove the trailing forward slash "/" to fix this error.`
142171
);
143172
}
144-
return this.credentials;
173+
174+
return credentials;
175+
}
176+
177+
/***
178+
* Immediately invalidate credentials.
179+
*
180+
* This may be called when the current credentials have expired.
181+
*/
182+
invalidateCredentials() {
183+
this.credentials = null;
184+
this.connector.invalidateCredentials?.();
145185
}
146186

147187
getUserAgent() {
@@ -181,6 +221,10 @@ export abstract class AbstractRemote {
181221
body: JSON.stringify(data)
182222
});
183223

224+
if (res.status === 401) {
225+
this.invalidateCredentials();
226+
}
227+
184228
if (!res.ok) {
185229
throw new Error(`Received ${res.status} - ${res.statusText} when posting to ${path}: ${await res.text()}}`);
186230
}
@@ -198,6 +242,10 @@ export abstract class AbstractRemote {
198242
}
199243
});
200244

245+
if (res.status === 401) {
246+
this.invalidateCredentials();
247+
}
248+
201249
if (!res.ok) {
202250
throw new Error(`Received ${res.status} - ${res.statusText} when getting from ${path}: ${await res.text()}}`);
203251
}
@@ -224,6 +272,10 @@ export abstract class AbstractRemote {
224272
throw ex;
225273
});
226274

275+
if (res.status === 401) {
276+
this.invalidateCredentials();
277+
}
278+
227279
if (!res.ok) {
228280
const text = await res.text();
229281
this.logger.error(`Could not POST streaming to ${path} - ${res.status} - ${res.statusText}: ${text}`);
@@ -260,10 +312,19 @@ export abstract class AbstractRemote {
260312
// automatically as a header.
261313
const userAgent = this.getUserAgent();
262314

315+
let socketCreationError: Error | undefined;
316+
263317
const connector = new RSocketConnector({
264318
transport: new WebsocketClientTransport({
265319
url: this.options.socketUrlTransformer(request.url),
266-
wsCreator: (url) => this.createSocket(url)
320+
wsCreator: (url) => {
321+
const s = this.createSocket(url);
322+
s.addEventListener('error', (e: Event) => {
323+
socketCreationError = new Error('Failed to create connection to websocket: ', (e.target as any).url ?? '');
324+
this.logger.warn('Socket error', e);
325+
});
326+
return s;
327+
}
267328
}),
268329
setup: {
269330
keepAlive: KEEP_ALIVE_MS,
@@ -290,7 +351,7 @@ export abstract class AbstractRemote {
290351
* On React native the connection exception can be `undefined` this causes issues
291352
* with detecting the exception inside async-mutex
292353
*/
293-
throw new Error(`Could not connect to PowerSync instance: ${JSON.stringify(ex)}`);
354+
throw new Error(`Could not connect to PowerSync instance: ${JSON.stringify(ex ?? socketCreationError)}`);
294355
}
295356

296357
const stream = new DataStream({
@@ -335,6 +396,17 @@ export abstract class AbstractRemote {
335396
syncQueueRequestSize, // The initial N amount
336397
{
337398
onError: (e) => {
399+
if (e.message.includes('PSYNC_')) {
400+
if (e.message.includes('PSYNC_S21')) {
401+
this.invalidateCredentials();
402+
}
403+
} else {
404+
// Possible that connection is with an older service, always invalidate to be safe
405+
if (e.message !== 'Closed. ') {
406+
this.invalidateCredentials();
407+
}
408+
}
409+
338410
// Don't log closed as an error
339411
if (e.message !== 'Closed. ') {
340412
this.logger.error(e);

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -683,12 +683,19 @@ The next upload iteration will be delayed.`);
683683
if (remaining_seconds == 0) {
684684
// Connection would be closed automatically right after this
685685
this.logger.debug('Token expiring; reconnect');
686+
this.options.remote.invalidateCredentials();
687+
686688
/**
687689
* For a rare case where the backend connector does not update the token
688690
* (uses the same one), this should have some delay.
689691
*/
690692
await this.delayRetry();
691693
return;
694+
} else if (remaining_seconds < 30) {
695+
this.logger.debug('Token will expire soon; reconnect');
696+
// Pre-emptively refresh the token
697+
this.options.remote.invalidateCredentials();
698+
return;
692699
}
693700
this.triggerCrudUpload();
694701
} else {

packages/web/src/db/sync/SharedWebStreamingSyncImplementation.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ class SharedSyncClientProvider extends AbstractSharedSyncClientProvider {
3131
return Comlink.transfer(port, [port]);
3232
}
3333

34+
invalidateCredentials() {
35+
this.options.remote.invalidateCredentials();
36+
}
37+
3438
async fetchCredentials(): Promise<PowerSyncCredentials | null> {
3539
const credentials = await this.options.remote.getCredentials();
3640
if (credentials == null) {

packages/web/src/worker/sync/AbstractSharedSyncClientProvider.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import type { PowerSyncCredentials, SyncStatusOptions } from '@powersync/common'
55
*/
66
export abstract class AbstractSharedSyncClientProvider {
77
abstract fetchCredentials(): Promise<PowerSyncCredentials | null>;
8+
abstract invalidateCredentials(): void;
89
abstract uploadCrud(): Promise<void>;
910
abstract statusChanged(status: SyncStatusOptions): void;
1011
abstract getDBWorkerPort(): Promise<MessagePort>;

packages/web/src/worker/sync/SharedSyncImplementation.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -308,6 +308,15 @@ export class SharedSyncImplementation
308308
adapter: new SqliteBucketStorage(this.dbAdapter!, new Mutex(), this.logger),
309309
remote: new WebRemote(
310310
{
311+
invalidateCredentials: async () => {
312+
const lastPort = this.ports[this.ports.length - 1];
313+
try {
314+
this.logger.log('calling the last port client provider to invalidate credentials');
315+
lastPort.clientProvider.invalidateCredentials();
316+
} catch (ex) {
317+
this.logger.error('error invalidating credentials', ex);
318+
}
319+
},
311320
fetchCredentials: async () => {
312321
const lastPort = this.ports[this.ports.length - 1];
313322
return new Promise(async (resolve, reject) => {

0 commit comments

Comments
 (0)