Skip to content

Commit 7882f83

Browse files
committed
can get abort marker for producer_id not in abort_transactions
1 parent b833764 commit 7882f83

File tree

1 file changed

+4
-1
lines changed

1 file changed

+4
-1
lines changed

kafka/consumer/fetcher.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -942,7 +942,10 @@ def _unpack_records(self, tp, records, key_deserializer, value_deserializer):
942942

943943
producer_id = batch.producer_id
944944
if self._contains_abort_marker(batch):
945-
self.aborted_producer_ids.remove(producer_id)
945+
try:
946+
self.aborted_producer_ids.remove(producer_id)
947+
except KeyError:
948+
pass
946949
elif self._is_batch_aborted(batch):
947950
log.debug("Skipping aborted record batch from partition %s with producer_id %s and"
948951
" offsets %s to %s",

0 commit comments

Comments
 (0)