Skip to content

Commit 042589c

Browse files
[Feature] Add warning if crud transactions are not completed (#254)
1 parent 8fe44fb commit 042589c

File tree

6 files changed

+103
-61
lines changed

6 files changed

+103
-61
lines changed

.changeset/mean-carrots-relax.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
'@powersync/common': minor
3+
'@powersync/web': minor
4+
'@powersync/react-native': minor
5+
---
6+
7+
Added a warning if connector `uploadData` functions don't process CRUD items completely.

demos/react-native-supabase-todolist/ios/Podfile.lock

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ PODS:
2121
- ExpoModulesCore
2222
- ExpoKeepAwake (13.0.2):
2323
- ExpoModulesCore
24-
- ExpoModulesCore (1.12.13):
24+
- ExpoModulesCore (1.12.19):
2525
- DoubleConversion
2626
- glog
2727
- hermes-engine
@@ -1340,15 +1340,15 @@ DEPENDENCIES:
13401340
- boost (from `../../../node_modules/react-native/third-party-podspecs/boost.podspec`)
13411341
- DoubleConversion (from `../../../node_modules/react-native/third-party-podspecs/DoubleConversion.podspec`)
13421342
- EXConstants (from `../../../node_modules/expo-constants/ios`)
1343-
- Expo (from `../node_modules/expo`)
1343+
- Expo (from `../../../node_modules/expo`)
13441344
- ExpoAsset (from `../../../node_modules/expo-asset/ios`)
13451345
- ExpoCamera (from `../../../node_modules/expo-camera/ios`)
13461346
- ExpoCrypto (from `../../../node_modules/expo-crypto/ios`)
13471347
- ExpoFileSystem (from `../../../node_modules/expo-file-system/ios`)
13481348
- ExpoFont (from `../../../node_modules/expo-font/ios`)
13491349
- ExpoHead (from `../../../node_modules/expo-router/ios`)
13501350
- ExpoKeepAwake (from `../../../node_modules/expo-keep-awake/ios`)
1351-
- ExpoModulesCore (from `../node_modules/expo-modules-core`)
1351+
- ExpoModulesCore (from `../../../node_modules/expo-modules-core`)
13521352
- ExpoSecureStore (from `../../../node_modules/expo-secure-store/ios`)
13531353
- EXSplashScreen (from `../../../node_modules/expo-splash-screen/ios`)
13541354
- FBLazyVector (from `../../../node_modules/react-native/Libraries/FBLazyVector`)
@@ -1428,7 +1428,7 @@ EXTERNAL SOURCES:
14281428
EXConstants:
14291429
:path: "../../../node_modules/expo-constants/ios"
14301430
Expo:
1431-
:path: "../node_modules/expo"
1431+
:path: "../../../node_modules/expo"
14321432
ExpoAsset:
14331433
:path: "../../../node_modules/expo-asset/ios"
14341434
ExpoCamera:
@@ -1444,7 +1444,7 @@ EXTERNAL SOURCES:
14441444
ExpoKeepAwake:
14451445
:path: "../../../node_modules/expo-keep-awake/ios"
14461446
ExpoModulesCore:
1447-
:path: "../node_modules/expo-modules-core"
1447+
:path: "../../../node_modules/expo-modules-core"
14481448
ExpoSecureStore:
14491449
:path: "../../../node_modules/expo-secure-store/ios"
14501450
EXSplashScreen:
@@ -1583,7 +1583,7 @@ SPEC CHECKSUMS:
15831583
ExpoFont: e7f2275c10ca8573c991e007329ad6bf98086485
15841584
ExpoHead: 8eb4deb289c2fdd8bb624f996cd31414cd07f38a
15851585
ExpoKeepAwake: 3b8815d9dd1d419ee474df004021c69fdd316d08
1586-
ExpoModulesCore: a4b45b5f081f5fe9b8e87667906d180cd52f32d7
1586+
ExpoModulesCore: 734c1802786b23c9598f4d15273753a779969368
15871587
ExpoSecureStore: 060cebcb956b80ddae09821610ac1aa9e1ac74cd
15881588
EXSplashScreen: fbf0ec78e9cee911df188bf17b4fe51d15a84b87
15891589
FBLazyVector: 898d14d17bf19e2435cafd9ea2a1033efe445709

packages/common/src/client/sync/bucket/BucketStorageAdapter.ts

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
import { OpId } from './CrudEntry';
1+
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver';
22
import { CrudBatch } from './CrudBatch';
3+
import { CrudEntry, OpId } from './CrudEntry';
34
import { SyncDataBatch } from './SyncDataBatch';
4-
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver';
55

66
export interface Checkpoint {
77
last_op_id: OpId;
@@ -62,6 +62,7 @@ export interface BucketStorageAdapter extends BaseObserver<BucketStorageListener
6262

6363
syncLocalDatabase(checkpoint: Checkpoint): Promise<{ checkpointValid: boolean; ready: boolean; failures?: any[] }>;
6464

65+
nextCrudItem(): Promise<CrudEntry | undefined>;
6566
hasCrud(): Promise<boolean>;
6667
getCrudBatch(limit?: number): Promise<CrudBatch | null>;
6768

packages/common/src/client/sync/bucket/SqliteBucketStorage.ts

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
import { Mutex } from 'async-mutex';
2+
import Logger, { ILogger } from 'js-logger';
23
import { DBAdapter, Transaction, extractTableUpdates } from '../../../db/DBAdapter';
4+
import { BaseObserver } from '../../../utils/BaseObserver';
35
import {
46
BucketState,
57
BucketStorageAdapter,
@@ -8,12 +10,10 @@ import {
810
PSInternalTable,
911
SyncLocalDatabaseResult
1012
} from './BucketStorageAdapter';
11-
import { OpTypeEnum } from './OpType';
1213
import { CrudBatch } from './CrudBatch';
13-
import { CrudEntry } from './CrudEntry';
14+
import { CrudEntry, CrudEntryJSON } from './CrudEntry';
15+
import { OpTypeEnum } from './OpType';
1416
import { SyncDataBatch } from './SyncDataBatch';
15-
import Logger, { ILogger } from 'js-logger';
16-
import { BaseObserver } from '../../../utils/BaseObserver';
1717

1818
const COMPACT_OPERATION_INTERVAL = 1_000;
1919

@@ -51,10 +51,10 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
5151

5252
async init() {
5353
this._hasCompletedSync = false;
54-
const existingTableRows = await this.db.execute(
54+
const existingTableRows = await this.db.getAll<{ name: string }>(
5555
`SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'`
5656
);
57-
for (const row of existingTableRows.rows?._array ?? []) {
57+
for (const row of existingTableRows ?? []) {
5858
this.tableNames.add(row.name);
5959
}
6060
}
@@ -72,10 +72,10 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
7272
startSession(): void {}
7373

7474
async getBucketStates(): Promise<BucketState[]> {
75-
const result = await this.db.execute(
75+
const result = await this.db.getAll<BucketState>(
7676
'SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0'
7777
);
78-
return result.rows?._array ?? [];
78+
return result;
7979
}
8080

8181
async saveSyncData(batch: SyncDataBatch) {
@@ -258,19 +258,20 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
258258
}
259259

260260
async updateLocalTarget(cb: () => Promise<string>): Promise<boolean> {
261-
const rs1 = await this.db.execute("SELECT target_op FROM ps_buckets WHERE name = '$local' AND target_op = ?", [
261+
const rs1 = await this.db.getAll("SELECT target_op FROM ps_buckets WHERE name = '$local' AND target_op = ?", [
262262
SqliteBucketStorage.MAX_OP_ID
263263
]);
264-
if (!rs1.rows?.length) {
264+
if (!rs1.length) {
265265
// Nothing to update
266266
return false;
267267
}
268-
const rs = await this.db.execute("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'");
269-
if (!rs.rows?.length) {
268+
const rs = await this.db.getAll<{ seq: number }>("SELECT seq FROM sqlite_sequence WHERE name = 'ps_crud'");
269+
if (!rs.length) {
270270
// Nothing to update
271271
return false;
272272
}
273-
const seqBefore: number = rs.rows?.item(0)['seq'];
273+
274+
const seqBefore: number = rs[0]['seq'];
274275

275276
const opId = await cb();
276277

@@ -304,9 +305,17 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
304305
});
305306
}
306307

308+
async nextCrudItem(): Promise<CrudEntry | undefined> {
309+
const next = await this.db.getOptional<CrudEntryJSON>('SELECT * FROM ps_crud ORDER BY id ASC LIMIT 1');
310+
if (!next) {
311+
return;
312+
}
313+
return CrudEntry.fromRow(next);
314+
}
315+
307316
async hasCrud(): Promise<boolean> {
308-
const anyData = await this.db.execute('SELECT 1 FROM ps_crud LIMIT 1');
309-
return !!anyData.rows?.length;
317+
const anyData = await this.db.getOptional('SELECT 1 FROM ps_crud LIMIT 1');
318+
return !!anyData;
310319
}
311320

312321
/**
@@ -318,10 +327,10 @@ export class SqliteBucketStorage extends BaseObserver<BucketStorageListener> imp
318327
return null;
319328
}
320329

321-
const crudResult = await this.db.execute('SELECT * FROM ps_crud ORDER BY id ASC LIMIT ?', [limit]);
330+
const crudResult = await this.db.getAll<CrudEntryJSON>('SELECT * FROM ps_crud ORDER BY id ASC LIMIT ?', [limit]);
322331

323332
const all: CrudEntry[] = [];
324-
for (const row of crudResult.rows?._array ?? []) {
333+
for (const row of crudResult) {
325334
all.push(CrudEntry.fromRow(row));
326335
}
327336

packages/common/src/client/sync/stream/AbstractStreamingSyncImplementation.ts

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,13 @@ import throttle from 'lodash/throttle';
22

33
import Logger, { ILogger } from 'js-logger';
44

5+
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus';
6+
import { AbortOperation } from '../../../utils/AbortOperation';
7+
import { BaseListener, BaseObserver, Disposable } from '../../../utils/BaseObserver';
8+
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter';
9+
import { CrudEntry } from '../bucket/CrudEntry';
10+
import { SyncDataBucket } from '../bucket/SyncDataBucket';
11+
import { AbstractRemote, SyncStreamOptions } from './AbstractRemote';
512
import {
613
BucketRequest,
714
StreamingSyncRequestParameterType,
@@ -11,12 +18,6 @@ import {
1118
isStreamingSyncCheckpointDiff,
1219
isStreamingSyncData
1320
} from './streaming-sync-types';
14-
import { AbstractRemote, SyncStreamOptions } from './AbstractRemote';
15-
import { BucketChecksum, BucketStorageAdapter, Checkpoint } from '../bucket/BucketStorageAdapter';
16-
import { SyncStatus, SyncStatusOptions } from '../../../db/crud/SyncStatus';
17-
import { SyncDataBucket } from '../bucket/SyncDataBucket';
18-
import { BaseObserver, BaseListener, Disposable } from '../../../utils/BaseObserver';
19-
import { AbortOperation } from '../../../utils/AbortOperation';
2021

2122
export enum LockType {
2223
CRUD = 'crud',
@@ -215,18 +216,40 @@ export abstract class AbstractStreamingSyncImplementation
215216
return this.obtainLock({
216217
type: LockType.CRUD,
217218
callback: async () => {
218-
this.updateSyncStatus({
219-
dataFlow: {
220-
uploading: true
221-
}
222-
});
219+
/**
220+
* Keep track of the first item in the CRUD queue for the last `uploadCrud` iteration.
221+
*/
222+
let checkedCrudItem: CrudEntry | undefined;
223+
223224
while (true) {
225+
this.updateSyncStatus({
226+
dataFlow: {
227+
uploading: true
228+
}
229+
});
224230
try {
225-
const done = await this.uploadCrudBatch();
226-
if (done) {
231+
/**
232+
* This is the first item in the FIFO CRUD queue.
233+
*/
234+
const nextCrudItem = await this.options.adapter.nextCrudItem();
235+
if (nextCrudItem) {
236+
if (nextCrudItem.id == checkedCrudItem?.id) {
237+
// This will force a higher log level than exceptions which are caught here.
238+
this.logger.warn(`Potentially previously uploaded CRUD entries are still present in the upload queue.
239+
Make sure to handle uploads and complete CRUD transactions or batches by calling and awaiting their [.complete()] method.
240+
The next upload iteration will be delayed.`);
241+
throw new Error('Delaying due to previously encountered CRUD item.');
242+
}
243+
244+
checkedCrudItem = nextCrudItem;
245+
await this.options.uploadCrud();
246+
} else {
247+
// Uploading is completed
248+
await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint());
227249
break;
228250
}
229251
} catch (ex) {
252+
checkedCrudItem = undefined;
230253
this.updateSyncStatus({
231254
dataFlow: {
232255
uploading: false
@@ -252,17 +275,6 @@ export abstract class AbstractStreamingSyncImplementation
252275
});
253276
}
254277

255-
protected async uploadCrudBatch(): Promise<boolean> {
256-
const hasCrud = await this.options.adapter.hasCrud();
257-
if (hasCrud) {
258-
await this.options.uploadCrud();
259-
return false;
260-
} else {
261-
await this.options.adapter.updateLocalTarget(() => this.getWriteCheckpoint());
262-
return true;
263-
}
264-
}
265-
266278
async connect(options?: PowerSyncConnectionOptions) {
267279
if (this.abortController) {
268280
await this.disconnect();

packages/web/tests/multiple_instances.test.ts

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,15 @@
1-
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
21
import { AbstractPowerSyncDatabase, SqliteBucketStorage, SyncStatus } from '@powersync/common';
32
import {
43
PowerSyncDatabase,
54
SharedWebStreamingSyncImplementation,
65
WebRemote,
76
WebStreamingSyncImplementationOptions
87
} from '@powersync/web';
9-
import { testSchema } from './utils/testDb';
10-
import { TestConnector } from './utils/MockStreamOpenFactory';
118
import { Mutex } from 'async-mutex';
129
import Logger from 'js-logger';
10+
import { afterEach, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest';
11+
import { TestConnector } from './utils/MockStreamOpenFactory';
12+
import { testSchema } from './utils/testDb';
1313

1414
describe('Multiple Instances', () => {
1515
const dbFilename = 'test-multiple-instances.db';
@@ -23,6 +23,8 @@ describe('Multiple Instances', () => {
2323
schema: testSchema
2424
});
2525

26+
beforeAll(() => Logger.useDefaults());
27+
2628
beforeEach(() => {
2729
db = openDatabase();
2830
});
@@ -184,34 +186,44 @@ describe('Multiple Instances', () => {
184186
});
185187

186188
// Create the first streaming client
187-
const syncOptions1: WebStreamingSyncImplementationOptions = {
189+
const stream1 = new SharedWebStreamingSyncImplementation({
188190
adapter: new SqliteBucketStorage(db.database, new Mutex()),
189191
remote: new WebRemote(connector1),
190192
uploadCrud: async () => {
191193
triggerUpload1();
192194
connector1.uploadData(db);
193195
},
194-
identifier
195-
};
196-
const stream1 = new SharedWebStreamingSyncImplementation(syncOptions1);
196+
identifier,
197+
retryDelayMs: 100,
198+
flags: {
199+
broadcastLogs: true
200+
}
201+
});
197202

198203
// Generate the second streaming sync implementation
199204
const connector2 = new TestConnector();
200-
const spy2 = vi.spyOn(connector2, 'uploadData');
205+
// The second connector will be called first to upload, we don't want it to actually upload
206+
// This will cause the sync uploads to be delayed as the CRUD queue did not change
207+
const spy2 = vi.spyOn(connector2, 'uploadData').mockImplementation(async () => {});
208+
201209
let triggerUpload2: () => void;
202210
const upload2TriggeredPromise = new Promise<void>((resolve) => {
203211
triggerUpload2 = resolve;
204212
});
205-
const syncOptions2: WebStreamingSyncImplementationOptions = {
213+
214+
const stream2 = new SharedWebStreamingSyncImplementation({
206215
adapter: new SqliteBucketStorage(db.database, new Mutex()),
207216
remote: new WebRemote(connector1),
208217
uploadCrud: async () => {
209218
triggerUpload2();
210219
connector2.uploadData(db);
211220
},
212-
identifier
213-
};
214-
const stream2 = new SharedWebStreamingSyncImplementation(syncOptions2);
221+
identifier,
222+
retryDelayMs: 100,
223+
flags: {
224+
broadcastLogs: true
225+
}
226+
});
215227

216228
// Waits for the stream to be marked as connected
217229
const stream2UpdatedPromise = new Promise<void>((resolve, reject) => {
@@ -230,6 +242,7 @@ describe('Multiple Instances', () => {
230242

231243
// The status in the second stream client should be updated
232244
await stream2UpdatedPromise;
245+
233246
expect(stream2.isConnected).true;
234247

235248
// Create something with CRUD in it.

0 commit comments

Comments
 (0)