Skip to content

[Fix] CRUD Upload on Reconnect #203

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 16 commits into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,37 @@
All notable changes to this project will be documented in this file.
See [Conventional Commits](https://conventionalcommits.org) for commit guidelines.

## 2024-10-31

### Changes

---

Packages with breaking changes:

- There are no breaking changes in this release.

Packages with other changes:

- [`powersync` - `v1.8.9`](#powersync---v189)
- [`powersync_attachments_helper` - `v0.6.13`](#powersync_attachments_helper---v0613)
- [`powersync_flutter_libs` - `v0.4.2`](#powersync_flutter_libs---v042)

---

#### `powersync` - `v1.8.9`

- **FIX**: Issue where CRUD uploads were not triggered when the SDK reconnected to the PowerSync service after being offline.

#### `powersync_attachments_helper` - `v0.6.13`

- Update a dependency to the latest release.

#### `powersync_flutter_libs` - `v0.4.2`

- Update a dependency to the latest release.


## 2024-10-21

### Changes
Expand Down
2 changes: 1 addition & 1 deletion demos/django-todolist/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ environment:
dependencies:
flutter:
sdk: flutter
powersync: ^1.8.8
powersync: ^1.8.9
path_provider: ^2.1.1
path: ^1.8.3
logging: ^1.2.0
Expand Down
2 changes: 1 addition & 1 deletion demos/supabase-anonymous-auth/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies:
flutter:
sdk: flutter

powersync: ^1.8.8
powersync: ^1.8.9
path_provider: ^2.1.1
supabase_flutter: ^2.0.2
path: ^1.8.3
Expand Down
2 changes: 1 addition & 1 deletion demos/supabase-edge-function-auth/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ dependencies:
flutter:
sdk: flutter

powersync: ^1.8.8
powersync: ^1.8.9
path_provider: ^2.1.1
supabase_flutter: ^2.0.2
path: ^1.8.3
Expand Down
2 changes: 1 addition & 1 deletion demos/supabase-simple-chat/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ dependencies:

supabase_flutter: ^2.0.2
timeago: ^3.6.0
powersync: ^1.8.8
powersync: ^1.8.9
path_provider: ^2.1.1
path: ^1.8.3
logging: ^1.2.0
Expand Down
4 changes: 2 additions & 2 deletions demos/supabase-todolist-drift/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ environment:
dependencies:
flutter:
sdk: flutter
powersync_attachments_helper: ^0.6.12
powersync: ^1.8.8
powersync_attachments_helper: ^0.6.13
powersync: ^1.8.9
path_provider: ^2.1.1
supabase_flutter: ^2.0.1
path: ^1.8.3
Expand Down
2 changes: 1 addition & 1 deletion demos/supabase-todolist-optional-sync/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ environment:
dependencies:
flutter:
sdk: flutter
powersync: ^1.8.8
powersync: ^1.8.9
path_provider: ^2.1.1
supabase_flutter: ^2.0.1
path: ^1.8.3
Expand Down
4 changes: 2 additions & 2 deletions demos/supabase-todolist/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ environment:
dependencies:
flutter:
sdk: flutter
powersync_attachments_helper: ^0.6.12
powersync: ^1.8.8
powersync_attachments_helper: ^0.6.13
powersync: ^1.8.9
path_provider: ^2.1.1
supabase_flutter: ^2.0.1
path: ^1.8.3
Expand Down
4 changes: 4 additions & 0 deletions packages/powersync/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 1.8.9

- **FIX**: issue where CRUD uploads were not triggered when the SDK reconnected to the PowerSync service after being offline.

## 1.8.8

- Update dependency `powersync_flutter_libs`
Expand Down
42 changes: 36 additions & 6 deletions packages/powersync/lib/src/streaming_sync.dart
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,13 @@ class StreamingSyncImplementation {

final Future<void> Function() uploadCrud;

// An internal controller which is used to trigger CRUD uploads internally
// e.g. when reconnecting.
// This is only a broadcast controller since the `crudLoop` method is public
// and could potentially be called multiple times externally.
final StreamController<Null> _internalCrudTriggerController =
StreamController<Null>.broadcast();

final Stream crudUpdateTriggerStream;

final StreamController<SyncStatus> _statusStreamController =
Expand Down Expand Up @@ -92,6 +99,9 @@ class StreamingSyncImplementation {
if (_safeToClose) {
_client.close();
}

await _internalCrudTriggerController.close();

// wait for completeAbort() to be called
await future;

Expand Down Expand Up @@ -144,7 +154,7 @@ class StreamingSyncImplementation {

// On error, wait a little before retrying
// When aborting, don't wait
await Future.any([Future.delayed(retryDelay), _abort!.onAbort]);
await _delayRetry();
}
}
} finally {
Expand All @@ -155,10 +165,14 @@ class StreamingSyncImplementation {
Future<void> crudLoop() async {
await uploadAllCrud();

await for (var _ in crudUpdateTriggerStream) {
if (_abort?.aborted == true) {
break;
}
// Trigger a CRUD upload whenever the upstream trigger fires
// as-well-as whenever the sync stream reconnects.
// This has the potential (in rare cases) to affect the crudThrottleTime,
// but it should not result in excessive uploads since the
// sync reconnects are also throttled.
// The stream here is closed on abort.
await for (var _ in mergeStreams(
[crudUpdateTriggerStream, _internalCrudTriggerController.stream])) {
await uploadAllCrud();
}
}
Expand All @@ -170,6 +184,13 @@ class StreamingSyncImplementation {

while (true) {
try {
// It's possible that an abort or disconnect operation could
// be followed by a `close` operation. The close would cause these
// operations, which use the DB, to throw an exception. Breaking the loop
// here prevents unnecessary potential (caught) exceptions.
if (aborted) {
break;
}
// This is the first item in the FIFO CRUD queue.
CrudEntry? nextCrudItem = await adapter.nextCrudItem();
if (nextCrudItem != null) {
Expand All @@ -196,7 +217,7 @@ class StreamingSyncImplementation {
checkedCrudItem = null;
isolateLogger.warning('Data upload error', e, stacktrace);
_updateStatus(uploading: false, uploadError: e);
await Future.delayed(retryDelay);
await _delayRetry();
if (!isConnected) {
// Exit the upload loop if the sync stream is no longer connected
break;
Expand Down Expand Up @@ -298,6 +319,9 @@ class StreamingSyncImplementation {
Future<void>? credentialsInvalidation;
bool haveInvalidated = false;

// Trigger a CRUD upload on reconnect
_internalCrudTriggerController.add(null);

await for (var line in merged) {
if (aborted) {
break;
Expand Down Expand Up @@ -465,6 +489,12 @@ class StreamingSyncImplementation {
yield parseStreamingSyncLine(line as Map<String, dynamic>);
}
}

/// Delays the standard `retryDelay` Duration, but exits early if
/// an abort has been requested.
Future<void> _delayRetry() async {
await Future.any([Future.delayed(retryDelay), _abort!.onAbort]);
}
}

/// Attempt to give a basic summary of the error for cases where the full error
Expand Down
2 changes: 1 addition & 1 deletion packages/powersync/lib/src/version.dart
Original file line number Diff line number Diff line change
@@ -1 +1 @@
const String libraryVersion = '1.8.8';
const String libraryVersion = '1.8.9';
4 changes: 2 additions & 2 deletions packages/powersync/pubspec.yaml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name: powersync
version: 1.8.8
version: 1.8.9
homepage: https://powersync.com
repository: https://github.com/powersync-ja/powersync.dart
description: PowerSync Flutter SDK - sync engine for building local-first apps.
Expand All @@ -16,7 +16,7 @@ dependencies:
sqlite3: ^2.4.6
universal_io: ^2.0.0
sqlite3_flutter_libs: ^0.5.23
powersync_flutter_libs: ^0.4.1
powersync_flutter_libs: ^0.4.2
meta: ^1.0.0
http: ^1.1.0
uuid: ^4.2.0
Expand Down
136 changes: 136 additions & 0 deletions packages/powersync/test/connected_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
@TestOn('!browser')
// This test uses a local server which is possible to control in Web via hybrid main,
// but this makes the test significantly more complex.
import 'dart:async';

import 'package:powersync/powersync.dart';
import 'package:test/test.dart';

import 'server/sync_server/mock_sync_server.dart';
import 'streaming_sync_test.dart';
import 'utils/abstract_test_utils.dart';
import 'utils/test_utils_impl.dart';

final testUtils = TestUtils();

void main() {
group('connected tests', () {
late String path;
setUp(() async {
path = testUtils.dbPath();
});

tearDown(() async {
await testUtils.cleanDb(path: path);
});

createTestServer() async {
final testServer = TestHttpServerHelper();
await testServer.start();
addTearDown(() => testServer.stop());
return testServer;
}

test('should connect to mock PowerSync instance', () async {
final testServer = await createTestServer();
final connector = TestConnector(() async {
return PowerSyncCredentials(
endpoint: testServer.uri.toString(),
token: 'token not used here',
expiresAt: DateTime.now());
});

final db = PowerSyncDatabase.withFactory(
await testUtils.testFactory(path: path),
schema: defaultSchema,
maxReaders: 3);
await db.initialize();

final connectedCompleter = Completer();

db.statusStream.listen((status) {
if (status.connected) {
connectedCompleter.complete();
}
});

// Add a basic command for the test server to send
testServer.addEvent('{"token_expires_in": 3600}\n');

await db.connect(connector: connector);
await connectedCompleter.future;

expect(db.connected, isTrue);
await db.disconnect();
});

test('should trigger uploads when connection is re-established', () async {
int uploadCounter = 0;
Completer uploadTriggeredCompleter = Completer();
final testServer = await createTestServer();
final connector = TestConnector(() async {
return PowerSyncCredentials(
endpoint: testServer.uri.toString(),
token: 'token not used here',
expiresAt: DateTime.now());
}, uploadData: (database) async {
uploadCounter++;
uploadTriggeredCompleter.complete();
throw Exception('No uploads occur here');
});

final db = PowerSyncDatabase.withFactory(
await testUtils.testFactory(path: path),
schema: defaultSchema,
maxReaders: 3);
await db.initialize();

// Create an item which should trigger an upload.
await db.execute(
'INSERT INTO customers (id, name) VALUES (uuid(), ?)', ['steven']);

// Create a new completer to await the next upload
uploadTriggeredCompleter = Completer();

// Connect the PowerSync instance
final connectedCompleter = Completer();
// The first connection attempt will fail
final connectedErroredCompleter = Completer();

db.statusStream.listen((status) {
if (status.connected && !connectedCompleter.isCompleted) {
connectedCompleter.complete();
}
if (status.downloadError != null &&
!connectedErroredCompleter.isCompleted) {
connectedErroredCompleter.complete();
}
});

// The first command will not be valid, this simulates a failed connection
testServer.addEvent('asdf\n');
await db.connect(connector: connector);

// The connect operation should have triggered an upload (even though it fails to connect)
await uploadTriggeredCompleter.future;
expect(uploadCounter, equals(1));
// Create a new completer for the next iteration
uploadTriggeredCompleter = Completer();

// Connection attempt should initially fail
await connectedErroredCompleter.future;
expect(db.currentStatus.anyError, isNotNull);

// Now send a valid command. Which will result in successful connection
await testServer.clearEvents();
testServer.addEvent('{"token_expires_in": 3600}\n');
await connectedCompleter.future;
expect(db.connected, isTrue);

await uploadTriggeredCompleter.future;
expect(uploadCounter, equals(2));

await db.disconnect();
});
});
}
Loading
Loading