@@ -136,7 +136,7 @@ def run_once(self):
136
136
expired_batches = self ._accumulator .abort_expired_batches (
137
137
self .config ['request_timeout_ms' ], self ._metadata )
138
138
139
- # Reset the PID if an expired batch has previously been sent to the broker.
139
+ # Reset the producer_id if an expired batch has previously been sent to the broker.
140
140
# See the documentation of `TransactionState.reset_producer_id` to understand why
141
141
# we need to reset the producer id here.
142
142
if self ._transaction_state and any ([batch .in_retry () for batch in expired_batches ]):
@@ -201,9 +201,7 @@ def add_topic(self, topic):
201
201
self .wakeup ()
202
202
203
203
def _maybe_wait_for_producer_id (self ):
204
- log .debug ("_maybe_wait_for_producer_id" )
205
204
if not self ._transaction_state :
206
- log .debug ("_maybe_wait_for_producer_id: no transaction_state..." )
207
205
return
208
206
209
207
while not self ._transaction_state .has_pid ():
@@ -236,7 +234,6 @@ def _maybe_wait_for_producer_id(self):
236
234
except Errors .RequestTimedOutError :
237
235
log .debug ("InitProducerId request to node %s timed out" , node_id )
238
236
time .sleep (self .config ['retry_backoff_ms' ] / 1000 )
239
- log .debug ("_maybe_wait_for_producer_id: ok: %s" , self ._transaction_state .producer_id_and_epoch )
240
237
241
238
def _failed_produce (self , batches , node_id , error ):
242
239
log .error ("Error sending produce request to node %d: %s" , node_id , error ) # trace
0 commit comments