Skip to content

Commit 90cff10

Browse files
committed
Improve streaming_sync_tests.
1 parent 21a28e9 commit 90cff10

File tree

2 files changed

+60
-40
lines changed

2 files changed

+60
-40
lines changed

packages/powersync/test/streaming_sync_test.dart

Lines changed: 6 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,8 @@ void main() {
4444
var server = await createServer();
4545

4646
credentialsCallback() async {
47-
final endpoint = 'http://${server.address.host}:${server.port}';
4847
return PowerSyncCredentials(
49-
endpoint: endpoint,
50-
token: 'token',
51-
userId: 'u1',
52-
expiresAt: DateTime.now());
48+
endpoint: server.endpoint, token: 'token');
5349
}
5450

5551
final pdb = await setupPowerSync(path: path);
@@ -59,12 +55,12 @@ void main() {
5955

6056
await Future.delayed(Duration(milliseconds: random.nextInt(100)));
6157
if (random.nextBool()) {
62-
server.close(force: true).ignore();
58+
server.close();
6359
}
6460

6561
await pdb.close();
6662

67-
server.close(force: true).ignore();
63+
server.close();
6864
}
6965
});
7066

@@ -81,18 +77,13 @@ void main() {
8177
// [PowerSync] WARNING: 2023-06-29 16:10:17.667537: Sync Isolate error
8278
// [Connection closed while receiving data, #0 IOClient.send.<anonymous closure> (package:http/src/io_client.dart:76:13)
8379

84-
HttpServer? server;
80+
TestServer? server;
8581

8682
credentialsCallback() async {
8783
if (server == null) {
8884
throw AssertionError('No active server');
8985
}
90-
final endpoint = 'http://${server.address.host}:${server.port}';
91-
return PowerSyncCredentials(
92-
endpoint: endpoint,
93-
token: 'token',
94-
userId: 'u1',
95-
expiresAt: DateTime.now());
86+
return PowerSyncCredentials(endpoint: server.endpoint, token: 'token');
9687
}
9788

9889
final pdb = await setupPowerSync(path: path);
@@ -107,7 +98,7 @@ void main() {
10798
// 2ms: HttpException: HttpServer is not bound to a socket
10899
// 20ms: Connection closed while receiving data
109100
await Future.delayed(Duration(milliseconds: 20));
110-
server.close(force: true).ignore();
101+
server.close();
111102
}
112103
await pdb.close();
113104
});

packages/powersync/test/test_server.dart

Lines changed: 54 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,67 @@
11
import 'dart:async';
22
import 'dart:convert' as convert;
33
import 'dart:io';
4+
import 'dart:math';
45

56
import 'package:http/http.dart' show ByteStream;
67
import 'package:shelf/shelf.dart';
78
import 'package:shelf/shelf_io.dart' as shelf_io;
89
import 'package:shelf_router/shelf_router.dart';
910

10-
Future<HttpServer> createServer() async {
11-
var app = Router();
11+
class TestServer {
12+
late HttpServer server;
13+
Router app = Router();
14+
int connectionCount = 0;
15+
int maxConnectionCount = 0;
16+
int tokenExpiresIn;
1217

13-
app.post('/sync/stream', handleSyncStream);
14-
// Open on an arbitrary open port
15-
var server = await shelf_io.serve(app.call, 'localhost', 0);
18+
TestServer({this.tokenExpiresIn = 65});
19+
20+
Future<void> init() async {
21+
app.post('/sync/stream', handleSyncStream);
22+
// Open on an arbitrary open port
23+
server = await shelf_io.serve(app.call, 'localhost', 0);
24+
}
25+
26+
String get endpoint {
27+
return 'http://${server.address.host}:${server.port}';
28+
}
29+
30+
Future<Response> handleSyncStream(Request request) async {
31+
connectionCount += 1;
32+
maxConnectionCount = max(connectionCount, maxConnectionCount);
33+
34+
stream() async* {
35+
try {
36+
var blob = "*" * 5000;
37+
for (var i = 0; i < 50; i++) {
38+
yield {"token_expires_in": tokenExpiresIn, "blob": blob};
39+
await Future.delayed(Duration(microseconds: 1));
40+
}
41+
} finally {
42+
connectionCount -= 1;
43+
}
44+
}
45+
46+
return Response.ok(
47+
encodeNdjson(stream()),
48+
headers: {
49+
'Content-Type': 'application/x-ndjson',
50+
},
51+
context: {
52+
'shelf.io.buffer_output': false,
53+
},
54+
);
55+
}
56+
57+
void close() {
58+
server.close(force: true).ignore();
59+
}
60+
}
61+
62+
Future<TestServer> createServer() async {
63+
var server = TestServer();
64+
await server.init();
1665
return server;
1766
}
1867

@@ -22,23 +71,3 @@ ByteStream encodeNdjson(Stream<Object> jsonInput) {
2271
final byteInput = stringInput.transform(convert.utf8.encoder);
2372
return ByteStream(byteInput);
2473
}
25-
26-
Future<Response> handleSyncStream(Request request) async {
27-
stream() async* {
28-
var blob = "*" * 5000;
29-
for (var i = 0; i < 50; i++) {
30-
yield {"token_expires_in": 5, "blob": blob};
31-
await Future.delayed(Duration(microseconds: 1));
32-
}
33-
}
34-
35-
return Response.ok(
36-
encodeNdjson(stream()),
37-
headers: {
38-
'Content-Type': 'application/x-ndjson',
39-
},
40-
context: {
41-
'shelf.io.buffer_output': false,
42-
},
43-
);
44-
}

0 commit comments

Comments
 (0)