Skip to content

Commit 9b7af7d

Browse files
authored
chore: implement Erase for a range (#4106)
chore: implement Erase with a range Also migrate more unit tests from valkey repo. Finally, fix OpTrim All tests `list_family_test --list_experimental_v2` pass. Signed-off-by: Roman Gershman <roman@dragonflydb.io> chore: implement OpTrim with QList
1 parent 503bb4e commit 9b7af7d

File tree

4 files changed

+292
-8
lines changed

4 files changed

+292
-8
lines changed

src/core/qlist.cc

Lines changed: 75 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -758,7 +758,7 @@ void QList::Compress(quicklistNode* node) {
758758
reverse = reverse->prev;
759759
}
760760

761-
if (!in_depth)
761+
if (!in_depth && node)
762762
CompressNodeIfNeeded(node);
763763

764764
/* At this point, forward and reverse are one node beyond depth */
@@ -1022,6 +1022,80 @@ auto QList::Erase(Iterator it) -> Iterator {
10221022
return it;
10231023
}
10241024

1025+
bool QList::Erase(const long start, unsigned count) {
1026+
if (count == 0)
1027+
return false;
1028+
1029+
unsigned extent = count; /* range is inclusive of start position */
1030+
1031+
if (start >= 0 && extent > (count_ - start)) {
1032+
/* if requesting delete more elements than exist, limit to list size. */
1033+
extent = count_ - start;
1034+
} else if (start < 0 && extent > (unsigned long)(-start)) {
1035+
/* else, if at negative offset, limit max size to rest of list. */
1036+
extent = -start; /* c.f. LREM -29 29; just delete until end. */
1037+
}
1038+
1039+
Iterator it = GetIterator(start);
1040+
quicklistNode* node = it.current_;
1041+
long offset = it.offset_;
1042+
1043+
/* iterate over next nodes until everything is deleted. */
1044+
while (extent) {
1045+
quicklistNode* next = node->next;
1046+
1047+
unsigned long del;
1048+
int delete_entire_node = 0;
1049+
if (offset == 0 && extent >= node->count) {
1050+
/* If we are deleting more than the count of this node, we
1051+
* can just delete the entire node without listpack math. */
1052+
delete_entire_node = 1;
1053+
del = node->count;
1054+
} else if (offset >= 0 && extent + offset >= node->count) {
1055+
/* If deleting more nodes after this one, calculate delete based
1056+
* on size of current node. */
1057+
del = node->count - offset;
1058+
} else if (offset < 0) {
1059+
/* If offset is negative, we are in the first run of this loop
1060+
* and we are deleting the entire range
1061+
* from this start offset to end of list. Since the Negative
1062+
* offset is the number of elements until the tail of the list,
1063+
* just use it directly as the deletion count. */
1064+
del = -offset;
1065+
1066+
/* If the positive offset is greater than the remaining extent,
1067+
* we only delete the remaining extent, not the entire offset.
1068+
*/
1069+
if (del > extent)
1070+
del = extent;
1071+
} else {
1072+
/* else, we are deleting less than the extent of this node, so
1073+
* use extent directly. */
1074+
del = extent;
1075+
}
1076+
1077+
if (delete_entire_node || QL_NODE_IS_PLAIN(node)) {
1078+
DelNode(node);
1079+
} else {
1080+
DecompressNodeIfNeeded(true, node);
1081+
node->entry = lpDeleteRange(node->entry, offset, del);
1082+
NodeUpdateSz(node);
1083+
node->count -= del;
1084+
count_ -= del;
1085+
if (node->count == 0) {
1086+
DelNode(node);
1087+
} else {
1088+
RecompressOnly(node);
1089+
}
1090+
}
1091+
1092+
extent -= del;
1093+
node = next;
1094+
offset = 0;
1095+
}
1096+
return true;
1097+
}
1098+
10251099
bool QList::Entry::operator==(std::string_view sv) const {
10261100
if (std::holds_alternative<int64_t>(value_)) {
10271101
char buf[absl::numbers_internal::kFastToBufferSize];

src/core/qlist.h

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,25 @@ class QList {
122122
return len_;
123123
}
124124

125+
unsigned compress_param() const {
126+
return compress_;
127+
}
128+
125129
Iterator Erase(Iterator it);
126130

131+
// Returns true if elements were deleted, false if list has not changed.
132+
// Negative start index is allowed.
133+
bool Erase(const long start, unsigned count);
134+
135+
// Needed by tests and the rdb code.
127136
const quicklistNode* Head() const {
128137
return head_;
129138
}
130139

140+
const quicklistNode* Tail() const {
141+
return tail_;
142+
}
143+
131144
private:
132145
bool AllowCompression() const {
133146
return compress_ != 0;

src/core/qlist_test.cc

Lines changed: 192 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
#include "core/qlist.h"
66

77
#include <absl/strings/str_cat.h>
8+
#include <absl/strings/str_format.h>
89
#include <gmock/gmock.h>
910

1011
#include "base/gtest.h"
@@ -21,6 +22,100 @@ namespace dfly {
2122
using namespace std;
2223
using namespace testing;
2324

25+
static int _ql_verify_compress(const QList& ql) {
26+
int errors = 0;
27+
unsigned compress_param = ql.compress_param();
28+
if (compress_param > 0) {
29+
const quicklistNode* node = ql.Head();
30+
unsigned int low_raw = compress_param;
31+
unsigned int high_raw = ql.node_count() - compress_param;
32+
33+
for (unsigned int at = 0; at < ql.node_count(); at++, node = node->next) {
34+
if (node && (at < low_raw || at >= high_raw)) {
35+
if (node->encoding != QUICKLIST_NODE_ENCODING_RAW) {
36+
LOG(ERROR) << "Incorrect compression: node " << at << " is compressed at depth "
37+
<< compress_param << " ((" << low_raw << "," << high_raw
38+
<< " total nodes: " << ql.node_count() << "; size: " << node->sz
39+
<< "; recompress: " << node->recompress;
40+
errors++;
41+
}
42+
} else {
43+
if (node->encoding != QUICKLIST_NODE_ENCODING_LZF && !node->attempted_compress) {
44+
LOG(ERROR) << absl::StrFormat(
45+
"Incorrect non-compression: node %d is NOT "
46+
"compressed at depth %d ((%u, %u); total "
47+
"nodes: %lu; size: %zu; recompress: %d; attempted: %d)",
48+
at, compress_param, low_raw, high_raw, ql.node_count(), node->sz, node->recompress,
49+
node->attempted_compress);
50+
errors++;
51+
}
52+
}
53+
}
54+
}
55+
return errors;
56+
}
57+
58+
/* Verify list metadata matches physical list contents. */
59+
static int ql_verify(const QList& ql, uint32_t nc, uint32_t count, uint32_t head_count,
60+
uint32_t tail_count) {
61+
int errors = 0;
62+
63+
if (nc != ql.node_count()) {
64+
LOG(ERROR) << "quicklist length wrong: expected " << nc << " got " << ql.node_count();
65+
errors++;
66+
}
67+
68+
if (count != ql.Size()) {
69+
LOG(ERROR) << "quicklist count wrong: expected " << count << " got " << ql.Size();
70+
errors++;
71+
}
72+
73+
auto* node = ql.Head();
74+
size_t node_size = 0;
75+
while (node) {
76+
node_size += node->count;
77+
node = node->next;
78+
}
79+
80+
if (node_size != ql.Size()) {
81+
LOG(ERROR) << "quicklist cached count not match actual count: expected " << ql.Size() << " got "
82+
<< node_size;
83+
errors++;
84+
}
85+
86+
node = ql.Tail();
87+
node_size = 0;
88+
while (node) {
89+
node_size += node->count;
90+
node = node->prev;
91+
}
92+
if (node_size != ql.Size()) {
93+
LOG(ERROR) << "has different forward count than reverse count! "
94+
"Forward count is "
95+
<< ql.Size() << ", reverse count is " << node_size;
96+
errors++;
97+
}
98+
99+
if (ql.node_count() == 0 && errors == 0) {
100+
return 0;
101+
}
102+
103+
if (ql.Head() && head_count != ql.Head()->count && head_count != lpLength(ql.Head()->entry)) {
104+
LOG(ERROR) << absl::StrFormat("head count wrong: expected %u got cached %u vs. actual %lu",
105+
head_count, ql.Head()->count, lpLength(ql.Head()->entry));
106+
errors++;
107+
}
108+
109+
if (ql.Tail() && tail_count != ql.Tail()->count && tail_count != lpLength(ql.Tail()->entry)) {
110+
LOG(ERROR) << "tail count wrong: expected " << tail_count << "got cached " << ql.Tail()->count
111+
<< " vs. actual " << lpLength(ql.Tail()->entry);
112+
errors++;
113+
}
114+
115+
errors += _ql_verify_compress(ql);
116+
return errors;
117+
}
118+
24119
class QListTest : public ::testing::Test {
25120
protected:
26121
QListTest() : mr_(mi_heap_get_backing()) {
@@ -183,4 +278,100 @@ TEST_P(OptionsTest, Numbers) {
183278
EXPECT_EQ("xxxxxxxxxxxxxxxxxxxx", it.Get().view());
184279
}
185280

186-
}; // namespace dfly
281+
TEST_P(OptionsTest, DelRangeA) {
282+
auto [fill, compress] = GetParam();
283+
ql_ = QList(fill, compress);
284+
long long nums[5000];
285+
for (int i = 0; i < 33; i++) {
286+
nums[i] = -5157318210846258176 + i;
287+
ql_.Push(absl::StrCat(nums[i]), QList::TAIL);
288+
}
289+
if (fill == 32)
290+
ql_verify(ql_, 2, 33, 32, 1);
291+
292+
/* ltrim 3 3 (keep [3,3] inclusive = 1 remaining) */
293+
ql_.Erase(0, 3);
294+
ql_.Erase(-29, 4000); /* make sure not loop forever */
295+
if (fill == 32)
296+
ql_verify(ql_, 1, 1, 1, 1);
297+
298+
auto it = ql_.GetIterator(0);
299+
ASSERT_TRUE(it.Next());
300+
EXPECT_EQ(-5157318210846258173, it.Get().ival());
301+
}
302+
303+
TEST_P(OptionsTest, DelRangeB) {
304+
auto [fill, _] = GetParam();
305+
ql_ = QList(fill, QUICKLIST_NOCOMPRESS); // ignore compress parameter
306+
307+
long long nums[5000];
308+
for (int i = 0; i < 33; i++) {
309+
nums[i] = i;
310+
ql_.Push(absl::StrCat(nums[i]), QList::TAIL);
311+
}
312+
if (fill == 32)
313+
ql_verify(ql_, 2, 33, 32, 1);
314+
315+
/* ltrim 5 16 (keep [5,16] inclusive = 12 remaining) */
316+
ql_.Erase(0, 5);
317+
ql_.Erase(-16, 16);
318+
if (fill == 32)
319+
ql_verify(ql_, 1, 12, 12, 12);
320+
321+
auto it = ql_.GetIterator(0);
322+
ASSERT_TRUE(it.Next());
323+
EXPECT_EQ(5, it.Get().ival());
324+
325+
it = ql_.GetIterator(-1);
326+
ASSERT_TRUE(it.Next());
327+
EXPECT_EQ(16, it.Get().ival());
328+
329+
ql_.Push("bobobob", QList::TAIL);
330+
it = ql_.GetIterator(-1);
331+
ASSERT_TRUE(it.Next());
332+
EXPECT_EQ("bobobob", it.Get().view());
333+
334+
for (int i = 0; i < 12; i++) {
335+
it = ql_.GetIterator(i);
336+
ASSERT_TRUE(it.Next());
337+
EXPECT_EQ(i + 5, it.Get().ival());
338+
}
339+
}
340+
341+
TEST_P(OptionsTest, DelRangeC) {
342+
auto [fill, compress] = GetParam();
343+
ql_ = QList(fill, compress);
344+
long long nums[5000];
345+
for (int i = 0; i < 33; i++) {
346+
nums[i] = -5157318210846258176 + i;
347+
ql_.Push(absl::StrCat(nums[i]), QList::TAIL);
348+
}
349+
if (fill == 32)
350+
ql_verify(ql_, 2, 33, 32, 1);
351+
352+
/* ltrim 3 3 (keep [3,3] inclusive = 1 remaining) */
353+
ql_.Erase(0, 3);
354+
ql_.Erase(-29, 4000); /* make sure not loop forever */
355+
if (fill == 32)
356+
ql_verify(ql_, 1, 1, 1, 1);
357+
auto it = ql_.GetIterator(0);
358+
ASSERT_TRUE(it.Next());
359+
ASSERT_EQ(-5157318210846258173, it.Get().ival());
360+
}
361+
362+
TEST_P(OptionsTest, DelRangeD) {
363+
auto [fill, compress] = GetParam();
364+
ql_ = QList(fill, compress);
365+
long long nums[5000];
366+
for (int i = 0; i < 33; i++) {
367+
nums[i] = -5157318210846258176 + i;
368+
ql_.Push(absl::StrCat(nums[i]), QList::TAIL);
369+
}
370+
if (fill == 32)
371+
ql_verify(ql_, 2, 33, 32, 1);
372+
ql_.Erase(-12, 3);
373+
374+
ASSERT_EQ(30, ql_.Size());
375+
}
376+
377+
} // namespace dfly

src/server/list_family.cc

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -755,8 +755,8 @@ OpStatus OpTrim(const OpArgs& op_args, string_view key, long start, long end) {
755755
return it_res.status();
756756

757757
auto it = it_res->it;
758-
quicklist* ql = GetQL(it->second);
759-
long llen = quicklistCount(ql);
758+
759+
long llen = it->second.Size();
760760

761761
/* convert negative indexes */
762762
if (start < 0)
@@ -781,12 +781,18 @@ OpStatus OpTrim(const OpArgs& op_args, string_view key, long start, long end) {
781781
rtrim = llen - end - 1;
782782
}
783783

784-
quicklistDelRange(ql, 0, ltrim);
785-
quicklistDelRange(ql, -rtrim, rtrim);
786-
784+
if (it->second.Encoding() == kEncodingQL2) {
785+
QList* ql = GetQLV2(it->second);
786+
ql->Erase(0, ltrim);
787+
ql->Erase(-rtrim, rtrim);
788+
} else {
789+
quicklist* ql = GetQL(it->second);
790+
quicklistDelRange(ql, 0, ltrim);
791+
quicklistDelRange(ql, -rtrim, rtrim);
792+
}
787793
it_res->post_updater.Run();
788794

789-
if (quicklistCount(ql) == 0) {
795+
if (it->second.Size() == 0) {
790796
CHECK(db_slice.Del(op_args.db_cntx, it));
791797
}
792798
return OpStatus::OK;

0 commit comments

Comments
 (0)