Skip to content

Commit efc8ba9

Browse files
authored
[Node] Undici WebSocket & Diagnostics (#621)
1 parent b0c6015 commit efc8ba9

File tree

15 files changed

+364
-147
lines changed

15 files changed

+364
-147
lines changed

.changeset/nasty-steaks-yell.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/node': minor
3+
---
4+
5+
Switch to undici WebSocket for Dispatcher and diagnostics_channel support. This now adds support for the `ALL_PROXY` environment variable by default, as well as `WSS_PROXY` for websocket connections.

.changeset/swift-waves-tease.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@powersync/common': minor
3+
---
4+
5+
Preserve more details on websocket errors.

demos/example-node/.env

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,4 @@
11
BACKEND=http://localhost:6060
22
SYNC_SERVICE=http://localhost:8080
3+
POWERSYNC_TOKEN=
4+
POWERSYNC_DEBUG=1

demos/example-node/package.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,12 @@
77
"scripts": {
88
"build": "tsc -b",
99
"watch": "tsc -b -w",
10-
"start": "node --loader ts-node/esm -r dotenv/config src/main.ts"
10+
"start": "node --import ./register.mjs src/main.ts"
1111
},
1212
"dependencies": {
1313
"@powersync/node": "workspace:*",
14-
"dotenv": "^16.4.7"
14+
"dotenv": "^16.4.7",
15+
"undici": "^7.10.0"
1516
},
1617
"devDependencies": {
1718
"ts-node": "^10.9.2",

demos/example-node/register.mjs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
// For cli usage: node --import ./register.mjs src/main.ts
2+
import { register } from 'node:module';
3+
import { pathToFileURL } from 'node:url';
4+
import 'dotenv/config';
5+
6+
register('ts-node/esm', pathToFileURL('./'));
Lines changed: 151 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
import * as diagnostics_channel from 'node:diagnostics_channel';
2+
import type { DiagnosticsChannel } from 'undici';
3+
4+
/**
5+
* Enable Undici diagnostics channel instrumentation for detailed connection and request logging.
6+
*
7+
* This includes fetch requests and websocket connections.
8+
*
9+
* Usage: enableUncidiDiagnostics();
10+
*/
11+
export function enableUncidiDiagnostics() {
12+
new UndiciDiagnostics().enable();
13+
}
14+
15+
class UndiciDiagnostics {
16+
private requestCounter: number = 0;
17+
private activeRequests: WeakMap<any, number> = new WeakMap();
18+
19+
enable() {
20+
// Available events are documented here:
21+
// https://github.com/nodejs/undici/blob/main/docs/docs/api/DiagnosticsChannel.md
22+
23+
diagnostics_channel.subscribe('undici:request:create', (message: DiagnosticsChannel.RequestCreateMessage) => {
24+
const requestId = ++this.requestCounter;
25+
const request = message.request;
26+
this.activeRequests.set(message.request, requestId);
27+
28+
console.log(`🔄 [DIAG-${requestId}] REQUEST CREATE:`, {
29+
host: request.origin,
30+
path: request.path,
31+
method: request.method,
32+
headers: formatHeaders(request.headers),
33+
contentType: (request as any).contentType,
34+
contentLength: (request as any).contentLength
35+
});
36+
});
37+
38+
diagnostics_channel.subscribe('undici:request:bodySent', (message: DiagnosticsChannel.RequestBodySentMessage) => {
39+
const requestId = this.activeRequests.get(message.request);
40+
console.log(`📤 [DIAG-${requestId}] REQUEST BODY SENT`);
41+
});
42+
43+
diagnostics_channel.subscribe('undici:request:headers', (message: DiagnosticsChannel.RequestHeadersMessage) => {
44+
const requestId = this.activeRequests.get(message.request);
45+
console.log(`📥 [DIAG-${requestId}] RESPONSE HEADERS:`, {
46+
statusCode: message.response.statusCode,
47+
statusText: message.response.statusText,
48+
headers: formatHeaders(message.response.headers)
49+
});
50+
});
51+
52+
diagnostics_channel.subscribe('undici:request:trailers', (message: DiagnosticsChannel.RequestTrailersMessage) => {
53+
const requestId = this.activeRequests.get(message.request);
54+
console.log(`🏁 [DIAG-${requestId}] REQUEST TRAILERS:`, {
55+
trailers: message.trailers
56+
});
57+
});
58+
59+
diagnostics_channel.subscribe('undici:request:error', (message: DiagnosticsChannel.RequestErrorMessage) => {
60+
const requestId = this.activeRequests.get(message.request);
61+
console.log(`❌ [DIAG-${requestId}] REQUEST ERROR:`, {
62+
error: message.error
63+
});
64+
65+
// Clean up tracking
66+
this.activeRequests.delete(message.request);
67+
});
68+
69+
// Client connection events
70+
diagnostics_channel.subscribe(
71+
'undici:client:sendHeaders',
72+
(message: DiagnosticsChannel.ClientSendHeadersMessage) => {
73+
console.log(`📡 [DIAG] CLIENT SEND HEADERS:`, {
74+
headers: formatHeaders(message.headers)
75+
});
76+
}
77+
);
78+
79+
diagnostics_channel.subscribe(
80+
'undici:client:beforeConnect',
81+
(message: DiagnosticsChannel.ClientBeforeConnectMessage) => {
82+
console.log(`🔌 [DIAG] CLIENT BEFORE CONNECT:`, {
83+
connectParams: message.connectParams
84+
});
85+
}
86+
);
87+
88+
diagnostics_channel.subscribe('undici:client:connected', (message: DiagnosticsChannel.ClientConnectedMessage) => {
89+
console.log(`✅ [DIAG] CLIENT CONNECTED:`, {
90+
connectParams: message.connectParams,
91+
connector: message.connector?.name,
92+
socket: {
93+
localAddress: message.socket?.localAddress,
94+
localPort: message.socket?.localPort,
95+
remoteAddress: message.socket?.remoteAddress,
96+
remotePort: message.socket?.remotePort
97+
}
98+
});
99+
});
100+
101+
diagnostics_channel.subscribe(
102+
'undici:client:connectError',
103+
(message: DiagnosticsChannel.ClientConnectErrorMessage) => {
104+
console.log(`❌ [DIAG] CLIENT CONNECT ERROR:`, {
105+
connectParams: message.connectParams,
106+
error: message.error
107+
});
108+
}
109+
);
110+
111+
// WebSocket events
112+
diagnostics_channel.subscribe('undici:websocket:open', (message: any) => {
113+
console.log(`🌐 [DIAG] WEBSOCKET OPEN:`, {
114+
address: message.address,
115+
protocol: message.protocol,
116+
extensions: message.extensions
117+
});
118+
});
119+
120+
diagnostics_channel.subscribe('undici:websocket:close', (message: any) => {
121+
console.log(`🌐 [DIAG] WEBSOCKET CLOSE:`, {
122+
websocket: message.websocket?.url,
123+
code: message.code,
124+
reason: message.reason
125+
});
126+
});
127+
128+
diagnostics_channel.subscribe('undici:websocket:socket_error', (message: any) => {
129+
console.log(`❌ [DIAG] WEBSOCKET SOCKET ERROR:`, {
130+
websocket: message.websocket?.url,
131+
error: message.error
132+
});
133+
});
134+
}
135+
}
136+
137+
function formatHeaders(headers: any[] | string | undefined) {
138+
if (typeof headers === 'string') {
139+
return headers;
140+
}
141+
142+
return headers?.map((header) => {
143+
if (typeof header == 'string') {
144+
return header;
145+
} else if (Buffer.isBuffer(header)) {
146+
return header.toString('utf-8');
147+
} else {
148+
return header;
149+
}
150+
});
151+
}

demos/example-node/src/main.ts

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,20 @@ import repl_factory from 'node:repl';
44
import { createBaseLogger, createLogger, PowerSyncDatabase, SyncStreamConnectionMethod } from '@powersync/node';
55
import { exit } from 'node:process';
66
import { AppSchema, DemoConnector } from './powersync.js';
7+
import { enableUncidiDiagnostics } from './UndiciDiagnostics.js';
78

89
const main = async () => {
910
const baseLogger = createBaseLogger();
1011
const logger = createLogger('PowerSyncDemo');
11-
baseLogger.useDefaults({ defaultLevel: logger.WARN });
12+
const debug = process.env.POWERSYNC_DEBUG == '1';
13+
baseLogger.useDefaults({ defaultLevel: debug ? logger.TRACE : logger.WARN });
1214

13-
if (!('BACKEND' in process.env) || !('SYNC_SERVICE' in process.env)) {
15+
// Enable detailed request/response logging for debugging purposes.
16+
if (debug) {
17+
enableUncidiDiagnostics();
18+
}
19+
20+
if (!('SYNC_SERVICE' in process.env)) {
1421
console.warn(
1522
'Set the BACKEND and SYNC_SERVICE environment variables to point to a sync service and a running demo backend.'
1623
);
@@ -26,7 +33,24 @@ const main = async () => {
2633
});
2734
console.log(await db.get('SELECT powersync_rs_version();'));
2835

29-
await db.connect(new DemoConnector(), { connectionMethod: SyncStreamConnectionMethod.HTTP });
36+
await db.connect(new DemoConnector(), {
37+
connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET
38+
});
39+
// Example using a proxy agent for more control over the connection:
40+
// const proxyAgent = new (await import('undici')).ProxyAgent({
41+
// uri: 'http://localhost:8080',
42+
// requestTls: {
43+
// ca: '<CA for the service>'
44+
// },
45+
// proxyTls: {
46+
// ca: '<CA for the proxy>'
47+
// }
48+
// });
49+
// await db.connect(new DemoConnector(), {
50+
// connectionMethod: SyncStreamConnectionMethod.WEB_SOCKET,
51+
// dispatcher: proxyAgent
52+
// });
53+
3054
await db.waitForFirstSync();
3155
console.log('First sync complete!');
3256

demos/example-node/src/powersync.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@ import { AbstractPowerSyncDatabase, column, PowerSyncBackendConnector, Schema, T
22

33
export class DemoConnector implements PowerSyncBackendConnector {
44
async fetchCredentials() {
5+
if (process.env.POWERSYNC_TOKEN) {
6+
return {
7+
endpoint: process.env.SYNC_SERVICE!,
8+
token: process.env.POWERSYNC_TOKEN
9+
};
10+
}
11+
512
const response = await fetch(`${process.env.BACKEND}/api/auth/token`);
613
if (response.status != 200) {
714
throw 'Could not fetch token';

packages/common/src/client/sync/stream/AbstractRemote.ts

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -312,18 +312,12 @@ export abstract class AbstractRemote {
312312
// automatically as a header.
313313
const userAgent = this.getUserAgent();
314314

315-
let socketCreationError: Error | undefined;
316-
315+
const url = this.options.socketUrlTransformer(request.url);
317316
const connector = new RSocketConnector({
318317
transport: new WebsocketClientTransport({
319-
url: this.options.socketUrlTransformer(request.url),
318+
url,
320319
wsCreator: (url) => {
321-
const s = this.createSocket(url);
322-
s.addEventListener('error', (e: Event) => {
323-
socketCreationError = new Error('Failed to create connection to websocket: ', (e.target as any).url ?? '');
324-
this.logger.warn('Socket error', e);
325-
});
326-
return s;
320+
return this.createSocket(url);
327321
}
328322
}),
329323
setup: {
@@ -347,11 +341,8 @@ export abstract class AbstractRemote {
347341
try {
348342
rsocket = await connector.connect();
349343
} catch (ex) {
350-
/**
351-
* On React native the connection exception can be `undefined` this causes issues
352-
* with detecting the exception inside async-mutex
353-
*/
354-
throw new Error(`Could not connect to PowerSync instance: ${JSON.stringify(ex ?? socketCreationError)}`);
344+
this.logger.error(`Failed to connect WebSocket`, ex);
345+
throw ex;
355346
}
356347

357348
const stream = new DataStream({

packages/common/src/client/sync/stream/WebsocketClientTransport.ts

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,17 @@ export class WebsocketClientTransport implements ClientTransport {
4444

4545
const errorListener = (ev: ErrorEvent) => {
4646
removeListeners();
47-
reject(ev.error);
47+
// We add a default error in that case.
48+
if (ev.error != null) {
49+
// undici typically provides an error object
50+
reject(ev.error);
51+
} else if (ev.message != null) {
52+
// React Native typically does not provide an error object, but does provide a message
53+
reject(new Error(`Failed to create websocket connection: ${ev.message}`));
54+
} else {
55+
// Browsers often provide no details at all
56+
reject(new Error(`Failed to create websocket connection to ${this.url}`));
57+
}
4858
};
4959

5060
/**

packages/node/package.json

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,7 @@
5353
"async-lock": "^1.4.0",
5454
"bson": "^6.6.0",
5555
"comlink": "^4.4.2",
56-
"proxy-agent": "^6.5.0",
57-
"undici": "^7.8.0",
58-
"ws": "^8.18.1"
56+
"undici": "^7.10.0"
5957
},
6058
"devDependencies": {
6159
"@powersync/drizzle-driver": "workspace:*",

packages/node/src/db/PowerSyncDatabase.ts

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,9 @@ import {
1313
SQLOpenFactory
1414
} from '@powersync/common';
1515

16-
import { NodeRemote } from '../sync/stream/NodeRemote.js';
16+
import { NodeCustomConnectionOptions, NodeRemote } from '../sync/stream/NodeRemote.js';
1717
import { NodeStreamingSyncImplementation } from '../sync/stream/NodeStreamingSyncImplementation.js';
1818

19-
import { Dispatcher } from 'undici';
2019
import { BetterSQLite3DBAdapter } from './BetterSQLite3DBAdapter.js';
2120
import { NodeSQLOpenOptions } from './options.js';
2221

@@ -30,13 +29,7 @@ export type NodePowerSyncDatabaseOptions = PowerSyncDatabaseOptions & {
3029
remoteOptions?: Partial<AbstractRemoteOptions>;
3130
};
3231

33-
export type NodeAdditionalConnectionOptions = AdditionalConnectionOptions & {
34-
/**
35-
* Optional custom dispatcher for HTTP connections (e.g. using undici).
36-
* Only used when the connection method is SyncStreamConnectionMethod.HTTP
37-
*/
38-
dispatcher?: Dispatcher;
39-
};
32+
export type NodeAdditionalConnectionOptions = AdditionalConnectionOptions & NodeCustomConnectionOptions;
4033

4134
export type NodePowerSyncConnectionOptions = PowerSyncConnectionOptions & NodeAdditionalConnectionOptions;
4235

@@ -76,7 +69,7 @@ export class PowerSyncDatabase extends AbstractPowerSyncDatabase {
7669

7770
connect(
7871
connector: PowerSyncBackendConnector,
79-
options?: PowerSyncConnectionOptions & { dispatcher?: Dispatcher }
72+
options?: PowerSyncConnectionOptions & NodeCustomConnectionOptions
8073
): Promise<void> {
8174
return super.connect(connector, options);
8275
}

0 commit comments

Comments
 (0)