Skip to content

Commit cab08df

Browse files
committed
revert last_offset_from_record_batch
1 parent b75248e commit cab08df

File tree

2 files changed

+0
-16
lines changed

2 files changed

+0
-16
lines changed

kafka/consumer/fetcher.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -417,7 +417,6 @@ def _unpack_records(self, tp, records):
417417
try:
418418
batch_offset = batch.base_offset + batch.last_offset_delta
419419
leader_epoch = batch.leader_epoch
420-
self._subscriptions.assignment[tp].last_offset_from_record_batch = batch_offset
421420
# Control batches have a single record indicating whether a transaction
422421
# was aborted or committed.
423422
# When isolation_level is READ_COMMITTED (currently unsupported)
@@ -643,16 +642,6 @@ def _create_fetch_requests(self):
643642
for partition in self._fetchable_partitions():
644643
node_id = self._client.cluster.leader_for_partition(partition)
645644

646-
# advance position for any deleted compacted messages if required
647-
if self._subscriptions.assignment[partition].last_offset_from_record_batch:
648-
next_offset_from_batch_header = self._subscriptions.assignment[partition].last_offset_from_record_batch + 1
649-
if next_offset_from_batch_header > self._subscriptions.assignment[partition].position.offset:
650-
log.debug(
651-
"Advance position for partition %s from %s to %s (last record batch location plus one)"
652-
" to correct for deleted compacted messages and/or transactional control records",
653-
partition, self._subscriptions.assignment[partition].position.offset, next_offset_from_batch_header)
654-
self._subscriptions.assignment[partition].position = OffsetAndMetadata(next_offset_from_batch_header, '', -1)
655-
656645
position = self._subscriptions.assignment[partition].position
657646

658647
# fetch if there is a leader and no in-flight requests

kafka/consumer/subscription_state.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -382,9 +382,6 @@ def __init__(self):
382382
self._position = None # OffsetAndMetadata exposed to the user
383383
self.highwater = None
384384
self.drop_pending_record_batch = False
385-
# The last message offset hint available from a record batch with
386-
# magic=2 which includes deleted compacted messages
387-
self.last_offset_from_record_batch = None
388385

389386
def _set_position(self, offset):
390387
assert self.has_valid_position, 'Valid position required'
@@ -400,7 +397,6 @@ def await_reset(self, strategy):
400397
self.awaiting_reset = True
401398
self.reset_strategy = strategy
402399
self._position = None
403-
self.last_offset_from_record_batch = None
404400
self.has_valid_position = False
405401

406402
def seek(self, offset):
@@ -409,7 +405,6 @@ def seek(self, offset):
409405
self.reset_strategy = None
410406
self.has_valid_position = True
411407
self.drop_pending_record_batch = True
412-
self.last_offset_from_record_batch = None
413408

414409
def pause(self):
415410
self.paused = True

0 commit comments

Comments
 (0)