1
1
package com.powersync.bucket
2
2
3
- import app.cash.sqldelight.async.coroutines.awaitAsOneOrNull
4
- import co.touchlab.kermit.Logger
5
- import com.powersync.db.internal.PsInternalDatabase
3
+ import com.powersync.db.crud.CrudEntry
6
4
import com.powersync.sync.SyncDataBatch
7
5
import com.powersync.sync.SyncLocalDatabaseResult
8
- import co.touchlab.stately.concurrency.AtomicBoolean
9
- import kotlinx.serialization.encodeToString
10
- import com.powersync.db.internal.InternalTable
11
- import com.powersync.utils.JsonUtil
12
6
13
- internal class BucketStorage (
14
- private val db : PsInternalDatabase ,
15
- private val logger : Logger
16
- ) {
17
- private val tableNames: MutableSet <String > = mutableSetOf ()
18
- private var hasCompletedSync = AtomicBoolean (false )
19
- private var pendingBucketDeletes = AtomicBoolean (false )
20
-
21
- /* *
22
- * Count up, and do a compact on startup.
23
- */
24
- private var compactCounter = COMPACT_OPERATION_INTERVAL
25
-
26
- companion object {
27
- const val MAX_OP_ID = " 9223372036854775807"
28
- const val COMPACT_OPERATION_INTERVAL = 1_000
29
- }
30
-
31
- init {
32
- readTableNames()
33
- }
34
-
35
- private fun readTableNames () {
36
- tableNames.clear()
37
- // Query to get existing table names
38
- val names = db.getExistingTableNames(" ps_data_*" )
39
-
40
- tableNames.addAll(names)
41
- }
42
-
43
- fun getMaxOpId (): String {
44
- return MAX_OP_ID
45
- }
46
-
47
- suspend fun getClientId (): String {
48
- val id = db.getOptional(" SELECT powersync_client_id() as client_id" ) {
49
- it.getString(0 )!!
50
- }
51
- return id ? : throw IllegalStateException (" Client ID not found" )
52
- }
53
-
54
- suspend fun hasCrud (): Boolean {
55
- return db.queries.hasCrud().awaitAsOneOrNull() == 1L
56
- }
57
-
58
- suspend fun updateLocalTarget (checkpointCallback : suspend () -> String ): Boolean {
59
- db.getOptional(
60
- " SELECT target_op FROM ${InternalTable .BUCKETS } WHERE name = '\$ local' AND target_op = ?" ,
61
- parameters = listOf (MAX_OP_ID ),
62
- mapper = { cursor -> cursor.getLong(0 )!! }
63
- )
64
- ? : // Nothing to update
65
- return false
66
-
67
- val seqBefore =
68
- db.getOptional(" SELECT seq FROM sqlite_sequence WHERE name = '${InternalTable .CRUD } '" ) {
69
- it.getLong(0 )!!
70
- } ? : // Nothing to update
71
- return false
72
-
73
- val opId = checkpointCallback()
74
-
75
- logger.i { " [updateLocalTarget] Updating target to checkpoint $opId " }
76
-
77
- return db.writeTransaction {
78
- if (hasCrud()) {
79
- logger.w { " [updateLocalTarget] ps crud is not empty" }
80
- return @writeTransaction false
81
- }
82
-
83
- val seqAfter =
84
- db.getOptional(" SELECT seq FROM sqlite_sequence WHERE name = '${InternalTable .CRUD } '" ) {
85
- it.getLong(0 )!!
86
- }
87
- ? : // assert isNotEmpty
88
- throw AssertionError (" Sqlite Sequence should not be empty" )
89
-
90
- if (seqAfter != seqBefore) {
91
- logger.d(" seqAfter != seqBefore seqAfter: $seqAfter seqBefore: $seqBefore " )
92
- // New crud data may have been uploaded since we got the checkpoint. Abort.
93
- return @writeTransaction false
94
- }
95
-
96
- db.execute(
97
- " UPDATE ${InternalTable .BUCKETS } SET target_op = CAST(? as INTEGER) WHERE name='\$ local'" ,
98
- listOf (opId)
99
- )
100
-
101
- return @writeTransaction true
102
- }
103
- }
104
-
105
- suspend fun saveSyncData (syncDataBatch : SyncDataBatch ) {
106
- db.writeTransaction { tx ->
107
- val jsonString = JsonUtil .json.encodeToString(syncDataBatch)
108
- tx.execute(
109
- " INSERT INTO powersync_operations(op, data) VALUES(?, ?)" ,
110
- listOf (" save" , jsonString)
111
- )
112
- }
113
- this .compactCounter + = syncDataBatch.buckets.sumOf { it.data.size }
114
- }
115
-
116
- suspend fun getBucketStates (): List <BucketState > {
117
- return db.getAll(
118
- " SELECT name AS bucket, CAST(last_op AS TEXT) AS op_id FROM ${InternalTable .BUCKETS } WHERE pending_delete = 0" ,
119
- mapper = { cursor ->
120
- BucketState (
121
- bucket = cursor.getString(0 )!! ,
122
- opId = cursor.getString(1 )!!
123
- )
124
- })
125
- }
126
-
127
- suspend fun removeBuckets (bucketsToDelete : List <String >) {
128
- bucketsToDelete.forEach { bucketName ->
129
- deleteBucket(bucketName)
130
- }
131
- }
132
-
133
-
134
- private suspend fun deleteBucket (bucketName : String ) {
135
-
136
- db.writeTransaction{ tx ->
137
- tx.execute(
138
- " INSERT INTO powersync_operations(op, data) VALUES(?, ?)" ,
139
- listOf (" delete_bucket" , bucketName)
140
- )
141
- }
142
-
143
- Logger .d(" [deleteBucket] Done deleting" )
144
-
145
- this .pendingBucketDeletes.value = true
146
- }
147
-
148
- suspend fun hasCompletedSync (): Boolean {
149
- if (hasCompletedSync.value) {
150
- return true
151
- }
152
-
153
- val completedSync = db.getOptional(
154
- " SELECT powersync_last_synced_at()" ,
155
- mapper = { cursor ->
156
- cursor.getString(0 )!!
157
- })
158
-
159
- return if (completedSync != null ) {
160
- hasCompletedSync.value = true
161
- true
162
- } else {
163
- false
164
- }
165
- }
166
-
167
- suspend fun syncLocalDatabase (targetCheckpoint : Checkpoint ): SyncLocalDatabaseResult {
168
- val result = validateChecksums(targetCheckpoint)
169
-
170
- if (! result.checkpointValid) {
171
- logger.w { " [SyncLocalDatabase] Checksums failed for ${result.checkpointFailures} " }
172
- result.checkpointFailures?.forEach { bucketName ->
173
- deleteBucket(bucketName)
174
- }
175
- result.ready = false
176
- return result
177
- }
178
-
179
- val bucketNames = targetCheckpoint.checksums.map { it.bucket }
180
-
181
- db.writeTransaction { tx ->
182
- tx.execute(
183
- " UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))" ,
184
- listOf (targetCheckpoint.lastOpId, JsonUtil .json.encodeToString(bucketNames))
185
- )
186
-
187
- if (targetCheckpoint.writeCheckpoint != null ) {
188
- tx.execute(
189
- " UPDATE ps_buckets SET last_op = ? WHERE name = '\$ local'" ,
190
- listOf (targetCheckpoint.writeCheckpoint),
191
- )
192
- }
193
- }
194
-
195
- val valid = updateObjectsFromBuckets()
196
-
197
- if (! valid) {
198
- return SyncLocalDatabaseResult (
199
- ready = false ,
200
- checkpointValid = true ,
201
- )
202
- }
203
-
204
- this .forceCompact()
205
-
206
- return SyncLocalDatabaseResult (
207
- ready = true ,
208
- )
209
- }
210
-
211
- private suspend fun validateChecksums (checkpoint : Checkpoint ): SyncLocalDatabaseResult {
212
- val res = db.getOptional(
213
- " SELECT powersync_validate_checkpoint(?) AS result" ,
214
- parameters = listOf (JsonUtil .json.encodeToString(checkpoint)),
215
- mapper = { cursor ->
216
- cursor.getString(0 )!!
217
- })
218
- ? : // no result
219
- return SyncLocalDatabaseResult (
220
- ready = false ,
221
- checkpointValid = false ,
222
- )
223
-
224
- return JsonUtil .json.decodeFromString<SyncLocalDatabaseResult >(res)
225
- }
226
-
227
- /* *
228
- * Atomically update the local state.
229
- *
230
- * This includes creating new tables, dropping old tables, and copying data over from the oplog.
231
- */
232
- private suspend fun updateObjectsFromBuckets (): Boolean {
233
- return db.writeTransaction { tx ->
234
-
235
- tx.execute(
236
- " INSERT INTO powersync_operations(op, data) VALUES(?, ?)" ,
237
- listOf (" sync_local" , " " )
238
- )
239
-
240
- val res = tx.get(" select last_insert_rowid()" ) { cursor ->
241
- cursor.getLong(0 )!!
242
- }
243
-
244
- return @writeTransaction res == 1L
245
- }
246
- }
247
-
248
- private suspend fun forceCompact () {
249
- // Reset counter
250
- this .compactCounter = COMPACT_OPERATION_INTERVAL
251
- this .pendingBucketDeletes.value = true
252
-
253
- this .autoCompact()
254
- }
255
-
256
-
257
- private suspend fun autoCompact () {
258
- // 1. Delete buckets
259
- deletePendingBuckets()
260
-
261
- // 2. Clear REMOVE operations, only keeping PUT ones
262
- clearRemoveOps()
263
- }
264
-
265
- private suspend fun deletePendingBuckets () {
266
- if (! this .pendingBucketDeletes.value) {
267
- return
268
- }
269
-
270
- db.writeTransaction { tx ->
271
- tx.execute(
272
- " INSERT INTO powersync_operations(op, data) VALUES (?, ?)" , listOf (" delete_pending_buckets" ," " )
273
- )
274
-
275
- // Executed once after start-up, and again when there are pending deletes.
276
- pendingBucketDeletes.value = false
277
- }
278
- }
279
-
280
- private suspend fun clearRemoveOps () {
281
- if (this .compactCounter < COMPACT_OPERATION_INTERVAL ) {
282
- return
283
- }
284
-
285
- db.writeTransaction { tx ->
286
- tx.execute(
287
- " INSERT INTO powersync_operations(op, data) VALUES (?, ?)" ,
288
- listOf (" clear_remove_ops" , " " )
289
- )
290
- }
291
- this .compactCounter = 0
292
- }
293
-
294
- @Suppress(" UNUSED_PARAMETER" )
295
- fun setTargetCheckpoint (checkpoint : Checkpoint ) {
296
- // No-op for now
297
- }
298
- }
7
+ internal interface BucketStorage {
8
+ fun getMaxOpId (): String
9
+ suspend fun getClientId (): String
10
+ suspend fun nextCrudItem (): CrudEntry ?
11
+ suspend fun hasCrud (): Boolean
12
+ suspend fun updateLocalTarget (checkpointCallback : suspend () -> String ): Boolean
13
+ suspend fun saveSyncData (syncDataBatch : SyncDataBatch )
14
+ suspend fun getBucketStates (): List <BucketState >
15
+ suspend fun removeBuckets (bucketsToDelete : List <String >)
16
+ suspend fun hasCompletedSync (): Boolean
17
+ suspend fun syncLocalDatabase (targetCheckpoint : Checkpoint ): SyncLocalDatabaseResult
18
+ fun setTargetCheckpoint (checkpoint : Checkpoint )
19
+ }
0 commit comments