Skip to content

Commit 36f7a0c

Browse files
committed
Debug log fetch records return; separate offsets update log
1 parent cab08df commit 36f7a0c

File tree

1 file changed

+12
-16
lines changed

1 file changed

+12
-16
lines changed

kafka/consumer/fetcher.py

Lines changed: 12 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -363,37 +363,33 @@ def _append(self, drained, part, max_records, update_offsets):
363363
return 0
364364

365365
tp = part.topic_partition
366-
fetch_offset = part.fetch_offset
367366
if not self._subscriptions.is_assigned(tp):
368367
# this can happen when a rebalance happened before
369368
# fetched records are returned to the consumer's poll call
370369
log.debug("Not returning fetched records for partition %s"
371370
" since it is no longer assigned", tp)
371+
elif not self._subscriptions.is_fetchable(tp):
372+
# this can happen when a partition is paused before
373+
# fetched records are returned to the consumer's poll call
374+
log.debug("Not returning fetched records for assigned partition"
375+
" %s since it is no longer fetchable", tp)
376+
372377
else:
373378
# note that the position should always be available
374379
# as long as the partition is still assigned
375380
position = self._subscriptions.assignment[tp].position
376-
if not self._subscriptions.is_fetchable(tp):
377-
# this can happen when a partition is paused before
378-
# fetched records are returned to the consumer's poll call
379-
log.debug("Not returning fetched records for assigned partition"
380-
" %s since it is no longer fetchable", tp)
381-
382-
elif fetch_offset == position.offset:
383-
# we are ensured to have at least one record since we already checked for emptiness
381+
if part.fetch_offset == position.offset:
384382
part_records = part.take(max_records)
385383
next_offset = part_records[-1].offset + 1
386384
leader_epoch = part_records[-1].leader_epoch
387385

388-
log.log(0, "Returning fetched records at offset %d for assigned"
389-
" partition %s and update position to %s (leader epoch %s)", position.offset,
390-
tp, next_offset, leader_epoch)
391-
392-
for record in part_records:
393-
drained[tp].append(record)
394-
386+
log.debug("Returning fetched records at offset %d for assigned"
387+
" partition %s", position.offset, tp)
388+
drained[tp].extend(part_records)
395389
if update_offsets:
396390
# TODO: save leader_epoch
391+
log.debug("Updating fetch position for assigned partition %s to %s (leader epoch %s)",
392+
tp, next_offset, leader_epoch)
397393
self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, '', -1)
398394
return len(part_records)
399395

0 commit comments

Comments
 (0)