Skip to content

Producer concurrency issues - TakeAsync method in AsyncCollection class #60

Closed
@marekstachura

Description

@marekstachura

We observed that TakeAsync method (AsyncCollection class) is not taking into the account the timeout parameter when it is constantly getting data from the TryTake method.
In our environment we could see that the producer was just building up the batch collection and not returning it, even when the timeout task completed. It returned it only when it reached the batch size (default 100 messages).
For some reason in our case it was taking quite a long time before it reached the batch size (over 60 seconds).

It looks like after introducing additional condition to take into the account the completeness of the timeoutTask - it would work as expected.

Please see line:
https://github.com/Jroland/kafka-net/blob/master/src/kafka-net/Common/AsyncCollection.cs#L72

The proposed change could be:

// Current version, line 72
//[...]
if (--count <= 0) return batch;
//[...]

// Proposed change
//[...]
if (--count <= 0 || timeoutTask.IsCompleted) return batch;
//[...]

Looking forward to your opinion about that.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions