Skip to content

[TEMP] Test subscribe to all with truncated stream #419

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 137 additions & 0 deletions packages/test/src/streams/subscribeToAll.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ import {
jsonEvent,
ResolvedEvent,
END,
streamNameFilter,
START,
FORWARDS,
AllStreamResolvedEvent,
} from "@kurrent/kurrentdb-client";

const asyncPipeline = promisify(pipeline);
Expand Down Expand Up @@ -465,4 +469,137 @@ describe("subscribeToAll", () => {
expect(writeStream.ids).toHaveLength(9);
});
});

describe("should handle stream truncation properly", () => {
test.only("caught up received when no events pass the filter", async () => {
const STREAM_NAME = "json_stream_name";
const defer = new Defer();
let caughtUpReceived = false;
let eventCount = 0;

await client.appendToStream(STREAM_NAME, jsonTestEvents(10));

const subscribe = client.subscribeToAll({
fromPosition: START,
filter: streamNameFilter({ prefixes: ["passthrough-filter"] }),
});

subscribe
.on("data", (resolvedEvent) => {
eventCount++;
})
.on("caughtUp", () => {
caughtUpReceived = true;
subscribe.unsubscribe();
defer.resolve();
})
.on("error", (error) => {
defer.reject(error);
});

try {
await defer.promise;
expect(caughtUpReceived).toBe(true);
expect(eventCount).toBe(0);
} catch (error) {
throw error;
}
})

test.only("subscribeToAll with filter on truncated stream", async () => {
const TRUNCATED_STREAM = "truncated_stream_test";
const TRUNCATE_BEFORE = 20;
const CREATED_EVENTS = 25;
const deferAll = new Defer();
const deferStream = new Defer();
let caughtUpReceivedAll = false;
let caughtUpReceivedStream = false;

var collected_subscribe_to_all: AllStreamResolvedEvent[] = []
var collected_subscribe_to_stream: ResolvedEvent[] = []
var collected_read_all: AllStreamResolvedEvent[] = []
var collected_read_stream: ResolvedEvent[] = []

await client.appendToStream(TRUNCATED_STREAM, jsonTestEvents(CREATED_EVENTS));

await client.setStreamMetadata(TRUNCATED_STREAM, { truncateBefore: TRUNCATE_BEFORE });

const subscription_all = client.subscribeToAll({
fromPosition: START,
filter: streamNameFilter({ prefixes: [TRUNCATED_STREAM] }),
});

subscription_all
.on("data", (resolvedEvent) => {
collected_subscribe_to_all.push(resolvedEvent);
})
.on("caughtUp", () => {
caughtUpReceivedAll = true;
subscription_all.unsubscribe();
deferAll.resolve();
})
.on("error", (error) => {
deferAll.reject(error);
});

const subscription_stream = client.subscribeToStream(TRUNCATED_STREAM, {
fromRevision: START,
});

subscription_stream
.on("data", (resolvedEvent) => {
collected_subscribe_to_stream.push(resolvedEvent);
})
.on("caughtUp", () => {
caughtUpReceivedStream = true;
subscription_stream.unsubscribe();
deferStream.resolve();
})
.on("error", (error) => {
deferStream.reject(error);
});

const read_all_events = client.readAll({
direction: FORWARDS,
fromPosition: START,
filter: streamNameFilter({ prefixes: [TRUNCATED_STREAM] }),
});

for await (const resolvedEvent of read_all_events) {
collected_read_all.push(resolvedEvent);
}

const read_stream_events = client.readStream(TRUNCATED_STREAM, {
direction: FORWARDS,
fromRevision: START,
});

for await (const resolvedEvent of read_stream_events) {
collected_read_stream.push(resolvedEvent);
}

try {
await Promise.all([deferAll.promise, deferStream.promise]);

expect(caughtUpReceivedAll).toBe(true);
expect(caughtUpReceivedStream).toBe(true);

expect(collected_subscribe_to_all.length).toBe(25);
expect(collected_read_all.length).toBe(25);

// After truncation, the counts will be different - the truncation is stored in the stream metadata,
// and the metadata is not applied when reading/subscribing to $all
// so your $all reads and subscriptions will still include all the events until they are scavenged
expect(collected_subscribe_to_stream.length).toBe(CREATED_EVENTS - TRUNCATE_BEFORE);
expect(collected_read_stream.length).toBe(CREATED_EVENTS - TRUNCATE_BEFORE);

expect(collected_subscribe_to_all.at(-1)?.event?.revision).toBe(24n);
expect(collected_read_all.at(-1)?.event?.revision).toBe(24);
expect(collected_subscribe_to_stream.at(-1)?.event?.revision).toBe(24n);
expect(collected_read_stream.at(-1)?.event?.revision).toBe(24);
} catch (error) {
throw error;
}
});
});
});
Loading