|
2 | 2 |
|
3 | 3 | import collections |
4 | 4 | import copy |
| 5 | +import itertools |
5 | 6 | import logging |
6 | 7 | import random |
7 | 8 | import sys |
@@ -378,88 +379,35 @@ def _append(self, drained, part, max_records, update_offsets): |
378 | 379 | # note that the position should always be available |
379 | 380 | # as long as the partition is still assigned |
380 | 381 | position = self._subscriptions.assignment[tp].position |
381 | | - if part.fetch_offset == position.offset: |
| 382 | + if part.next_fetch_offset == position.offset: |
382 | 383 | part_records = part.take(max_records) |
383 | | - next_offset = part_records[-1].offset + 1 |
384 | | - leader_epoch = part_records[-1].leader_epoch |
385 | | - |
386 | 384 | log.debug("Returning fetched records at offset %d for assigned" |
387 | 385 | " partition %s", position.offset, tp) |
388 | 386 | drained[tp].extend(part_records) |
389 | | - if update_offsets: |
| 387 | + # We want to increment subscription position if (1) we're using consumer.poll(), |
| 388 | + # or (2) we didn't return any records (consumer iterator will update position |
| 389 | + # when each message is yielded). There may be edge cases where we re-fetch records |
| 390 | + # that we'll end up skipping, but for now we'll live with that. |
| 391 | + highwater = self._subscriptions.assignment[tp].highwater |
| 392 | + if highwater is not None: |
| 393 | + self._sensors.records_fetch_lag.record(highwater - part.next_fetch_offset) |
| 394 | + if update_offsets or not part_records: |
390 | 395 | # TODO: save leader_epoch |
391 | 396 | log.debug("Updating fetch position for assigned partition %s to %s (leader epoch %s)", |
392 | | - tp, next_offset, leader_epoch) |
393 | | - self._subscriptions.assignment[tp].position = OffsetAndMetadata(next_offset, '', -1) |
| 397 | + tp, part.next_fetch_offset, part.leader_epoch) |
| 398 | + self._subscriptions.assignment[tp].position = OffsetAndMetadata(part.next_fetch_offset, '', -1) |
394 | 399 | return len(part_records) |
395 | 400 |
|
396 | 401 | else: |
397 | 402 | # these records aren't next in line based on the last consumed |
398 | 403 | # position, ignore them they must be from an obsolete request |
399 | 404 | log.debug("Ignoring fetched records for %s at offset %s since" |
400 | | - " the current position is %d", tp, part.fetch_offset, |
| 405 | + " the current position is %d", tp, part.next_fetch_offset, |
401 | 406 | position.offset) |
402 | 407 |
|
403 | | - part.discard() |
| 408 | + part.drain() |
404 | 409 | return 0 |
405 | 410 |
|
406 | | - def _unpack_records(self, tp, records): |
407 | | - try: |
408 | | - batch = records.next_batch() |
409 | | - while batch is not None: |
410 | | - |
411 | | - # Try DefaultsRecordBatch / message log format v2 |
412 | | - # base_offset, last_offset_delta, and control batches |
413 | | - try: |
414 | | - batch_offset = batch.base_offset + batch.last_offset_delta |
415 | | - leader_epoch = batch.leader_epoch |
416 | | - # Control batches have a single record indicating whether a transaction |
417 | | - # was aborted or committed. |
418 | | - # When isolation_level is READ_COMMITTED (currently unsupported) |
419 | | - # we should also skip all messages from aborted transactions |
420 | | - # For now we only support READ_UNCOMMITTED and so we ignore the |
421 | | - # abort/commit signal. |
422 | | - if batch.is_control_batch: |
423 | | - batch = records.next_batch() |
424 | | - continue |
425 | | - except AttributeError: |
426 | | - leader_epoch = -1 |
427 | | - pass |
428 | | - |
429 | | - for record in batch: |
430 | | - key_size = len(record.key) if record.key is not None else -1 |
431 | | - value_size = len(record.value) if record.value is not None else -1 |
432 | | - key = self._deserialize( |
433 | | - self.config['key_deserializer'], |
434 | | - tp.topic, record.key) |
435 | | - value = self._deserialize( |
436 | | - self.config['value_deserializer'], |
437 | | - tp.topic, record.value) |
438 | | - headers = record.headers |
439 | | - header_size = sum( |
440 | | - len(h_key.encode("utf-8")) + (len(h_val) if h_val is not None else 0) for h_key, h_val in |
441 | | - headers) if headers else -1 |
442 | | - yield ConsumerRecord( |
443 | | - tp.topic, tp.partition, leader_epoch, record.offset, record.timestamp, |
444 | | - record.timestamp_type, key, value, headers, record.checksum, |
445 | | - key_size, value_size, header_size) |
446 | | - |
447 | | - batch = records.next_batch() |
448 | | - |
449 | | - # If unpacking raises StopIteration, it is erroneously |
450 | | - # caught by the generator. We want all exceptions to be raised |
451 | | - # back to the user. See Issue 545 |
452 | | - except StopIteration: |
453 | | - log.exception('StopIteration raised unpacking messageset') |
454 | | - raise RuntimeError('StopIteration raised unpacking messageset') |
455 | | - |
456 | | - def _deserialize(self, f, topic, bytes_): |
457 | | - if not f: |
458 | | - return bytes_ |
459 | | - if isinstance(f, Deserializer): |
460 | | - return f.deserialize(topic, bytes_) |
461 | | - return f(bytes_) |
462 | | - |
463 | 411 | def _send_list_offsets_requests(self, timestamps): |
464 | 412 | """Fetch offsets for each partition in timestamps dict. This may send |
465 | 413 | request to multiple nodes, based on who is Leader for partition. |
@@ -773,12 +721,9 @@ def _handle_fetch_error(self, node_id, exception): |
773 | 721 | def _parse_fetched_data(self, completed_fetch): |
774 | 722 | tp = completed_fetch.topic_partition |
775 | 723 | fetch_offset = completed_fetch.fetched_offset |
776 | | - num_bytes = 0 |
777 | | - records_count = 0 |
778 | | - parsed_records = None |
779 | | - |
780 | 724 | error_code, highwater = completed_fetch.partition_data[:2] |
781 | 725 | error_type = Errors.for_code(error_code) |
| 726 | + parsed_records = None |
782 | 727 |
|
783 | 728 | try: |
784 | 729 | if not self._subscriptions.is_fetchable(tp): |
@@ -807,13 +752,12 @@ def _parse_fetched_data(self, completed_fetch): |
807 | 752 | log.debug("Adding fetched record for partition %s with" |
808 | 753 | " offset %d to buffered record list", tp, |
809 | 754 | position.offset) |
810 | | - unpacked = list(self._unpack_records(tp, records)) |
811 | | - parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked) |
812 | | - if unpacked: |
813 | | - last_offset = unpacked[-1].offset |
814 | | - self._sensors.records_fetch_lag.record(highwater - last_offset) |
815 | | - num_bytes = records.valid_bytes() |
816 | | - records_count = len(unpacked) |
| 755 | + parsed_records = self.PartitionRecords(fetch_offset, tp, records, |
| 756 | + self.config['key_deserializer'], |
| 757 | + self.config['value_deserializer'], |
| 758 | + self.config['check_crcs'], |
| 759 | + completed_fetch.metric_aggregator) |
| 760 | + return parsed_records |
817 | 761 | elif records.size_in_bytes() > 0: |
818 | 762 | # we did not read a single message from a non-empty |
819 | 763 | # buffer because that message's size is larger than |
@@ -858,52 +802,116 @@ def _parse_fetched_data(self, completed_fetch): |
858 | 802 | raise error_type('Unexpected error while fetching data') |
859 | 803 |
|
860 | 804 | finally: |
861 | | - completed_fetch.metric_aggregator.record(tp, num_bytes, records_count) |
| 805 | + if parsed_records is None: |
| 806 | + completed_fetch.metric_aggregator.record(tp, 0, 0) |
862 | 807 |
|
863 | | - return parsed_records |
| 808 | + return None |
| 809 | + |
| 810 | + def close(self): |
| 811 | + if self._next_partition_records is not None: |
| 812 | + self._next_partition_records.drain() |
864 | 813 |
|
865 | 814 | class PartitionRecords(object): |
866 | | - def __init__(self, fetch_offset, tp, messages): |
| 815 | + def __init__(self, fetch_offset, tp, records, key_deserializer, value_deserializer, check_crcs, metric_aggregator): |
867 | 816 | self.fetch_offset = fetch_offset |
868 | 817 | self.topic_partition = tp |
869 | | - self.messages = messages |
| 818 | + self.leader_epoch = -1 |
| 819 | + self.next_fetch_offset = fetch_offset |
| 820 | + self.bytes_read = 0 |
| 821 | + self.records_read = 0 |
| 822 | + self.metric_aggregator = metric_aggregator |
| 823 | + self.check_crcs = check_crcs |
| 824 | + self.record_iterator = itertools.dropwhile( |
| 825 | + self._maybe_skip_record, |
| 826 | + self._unpack_records(tp, records, key_deserializer, value_deserializer)) |
| 827 | + |
| 828 | + def _maybe_skip_record(self, record): |
870 | 829 | # When fetching an offset that is in the middle of a |
871 | 830 | # compressed batch, we will get all messages in the batch. |
872 | 831 | # But we want to start 'take' at the fetch_offset |
873 | 832 | # (or the next highest offset in case the message was compacted) |
874 | | - for i, msg in enumerate(messages): |
875 | | - if msg.offset < fetch_offset: |
876 | | - log.debug("Skipping message offset: %s (expecting %s)", |
877 | | - msg.offset, fetch_offset) |
878 | | - else: |
879 | | - self.message_idx = i |
880 | | - break |
881 | | - |
| 833 | + if record.offset < self.fetch_offset: |
| 834 | + log.debug("Skipping message offset: %s (expecting %s)", |
| 835 | + record.offset, self.fetch_offset) |
| 836 | + return True |
882 | 837 | else: |
883 | | - self.message_idx = 0 |
884 | | - self.messages = None |
| 838 | + return False |
885 | 839 |
|
886 | | - # For truthiness evaluation we need to define __len__ or __nonzero__ |
887 | | - def __len__(self): |
888 | | - if self.messages is None or self.message_idx >= len(self.messages): |
889 | | - return 0 |
890 | | - return len(self.messages) - self.message_idx |
| 840 | + # For truthiness evaluation |
| 841 | + def __bool__(self): |
| 842 | + return self.record_iterator is not None |
891 | 843 |
|
892 | | - def discard(self): |
893 | | - self.messages = None |
| 844 | + def drain(self): |
| 845 | + if self.record_iterator is not None: |
| 846 | + self.record_iterator = None |
| 847 | + self.metric_aggregator.record(self.topic_partition, self.bytes_read, self.records_read) |
894 | 848 |
|
895 | 849 | def take(self, n=None): |
896 | | - if not len(self): |
897 | | - return [] |
898 | | - if n is None or n > len(self): |
899 | | - n = len(self) |
900 | | - next_idx = self.message_idx + n |
901 | | - res = self.messages[self.message_idx:next_idx] |
902 | | - self.message_idx = next_idx |
903 | | - # fetch_offset should be incremented by 1 to parallel the |
904 | | - # subscription position (also incremented by 1) |
905 | | - self.fetch_offset = max(self.fetch_offset, res[-1].offset + 1) |
906 | | - return res |
| 850 | + return list(itertools.islice(self.record_iterator, 0, n)) |
| 851 | + |
| 852 | + def _unpack_records(self, tp, records, key_deserializer, value_deserializer): |
| 853 | + try: |
| 854 | + batch = records.next_batch() |
| 855 | + last_batch = None |
| 856 | + while batch is not None: |
| 857 | + last_batch = batch |
| 858 | + |
| 859 | + # Try DefaultsRecordBatch / message log format v2 |
| 860 | + # base_offset, last_offset_delta, and control batches |
| 861 | + if batch.magic == 2: |
| 862 | + self.leader_epoch = batch.leader_epoch |
| 863 | + # Control batches have a single record indicating whether a transaction |
| 864 | + # was aborted or committed. |
| 865 | + # When isolation_level is READ_COMMITTED (currently unsupported) |
| 866 | + # we should also skip all messages from aborted transactions |
| 867 | + # For now we only support READ_UNCOMMITTED and so we ignore the |
| 868 | + # abort/commit signal. |
| 869 | + if batch.is_control_batch: |
| 870 | + self.next_fetch_offset = next(batch).offset + 1 |
| 871 | + batch = records.next_batch() |
| 872 | + continue |
| 873 | + |
| 874 | + for record in batch: |
| 875 | + key_size = len(record.key) if record.key is not None else -1 |
| 876 | + value_size = len(record.value) if record.value is not None else -1 |
| 877 | + key = self._deserialize(key_deserializer, tp.topic, record.key) |
| 878 | + value = self._deserialize(value_deserializer, tp.topic, record.value) |
| 879 | + headers = record.headers |
| 880 | + header_size = sum( |
| 881 | + len(h_key.encode("utf-8")) + (len(h_val) if h_val is not None else 0) for h_key, h_val in |
| 882 | + headers) if headers else -1 |
| 883 | + self.records_read += 1 |
| 884 | + self.bytes_read += record.size_in_bytes |
| 885 | + self.next_fetch_offset = record.offset + 1 |
| 886 | + yield ConsumerRecord( |
| 887 | + tp.topic, tp.partition, self.leader_epoch, record.offset, record.timestamp, |
| 888 | + record.timestamp_type, key, value, headers, record.checksum, |
| 889 | + key_size, value_size, header_size) |
| 890 | + |
| 891 | + batch = records.next_batch() |
| 892 | + else: |
| 893 | + # Message format v2 preserves the last offset in a batch even if the last record is removed |
| 894 | + # through compaction. By using the next offset computed from the last offset in the batch, |
| 895 | + # we ensure that the offset of the next fetch will point to the next batch, which avoids |
| 896 | + # unnecessary re-fetching of the same batch (in the worst case, the consumer could get stuck |
| 897 | + # fetching the same batch repeatedly). |
| 898 | + if last_batch and last_batch.magic == 2: |
| 899 | + self.next_fetch_offset = last_batch.base_offset + last_batch.last_offset_delta + 1 |
| 900 | + self.drain() |
| 901 | + |
| 902 | + # If unpacking raises StopIteration, it is erroneously |
| 903 | + # caught by the generator. We want all exceptions to be raised |
| 904 | + # back to the user. See Issue 545 |
| 905 | + except StopIteration: |
| 906 | + log.exception('StopIteration raised unpacking messageset') |
| 907 | + raise RuntimeError('StopIteration raised unpacking messageset') |
| 908 | + |
| 909 | + def _deserialize(self, f, topic, bytes_): |
| 910 | + if not f: |
| 911 | + return bytes_ |
| 912 | + if isinstance(f, Deserializer): |
| 913 | + return f.deserialize(topic, bytes_) |
| 914 | + return f(bytes_) |
907 | 915 |
|
908 | 916 |
|
909 | 917 | class FetchSessionHandler(object): |
|
0 commit comments