Skip to content

Conversation

@wallyqs
Copy link
Member

@wallyqs wallyqs commented Oct 14, 2025

Rework implementation of async for msg in sub.messages and sub.next_msg to get better perf.

Fixes #703

Copy link
Collaborator

@caspervonb caspervonb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better than earlier results discussed in #732, which peaked at 20k per second.

uv run ../nats-client/tools/bench.py --client=aio --msgs=1000000

Starting pub/sub benchmark with nats.aio [msgs=1,000,000, size=128 B]

Publisher results:
Test completed: 1,000,000 messages, 128,000,000 bytes, 3.84 seconds
  Throughput: 260,387 msgs/sec, 31.79 MB/sec
  Latency: (min/avg/max/std) = 0.00/0.00/52.37/0.36 ms

Subscriber results:
Test completed: 1,000,000 messages, 128,000,000 bytes, 3.88 seconds
  Throughput: 257,908 msgs/sec, 31.48 MB/sec
  Latency: (min/avg/max/std) = 99.45/2014.35/3976.80/1104.45 ms

Copy link
Collaborator

@caspervonb caspervonb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Tests were relaxed, so potential timing issues?
  • Technical debt for the whole comparability thing.
  • Why generator, instead of just calling queue.get() etc in anext?

@wallyqs
Copy link
Member Author

wallyqs commented Oct 14, 2025

Removed the compatibility wrapper with previous iterator since no longer used by the object store. Had to tweak the defaults from test_pull_subscribe_limits since it now processes more messages than before. About anext, I would have to take a closer look whether can use that instead.

@wallyqs wallyqs force-pushed the sub-next-msg-perf branch 2 times, most recently from cecb078 to ce7dd49 Compare October 17, 2025 09:45
Signed-off-by: Waldemar Quevedo <wally@nats.io>
Signed-off-by: Waldemar Quevedo <wally@nats.io>
Signed-off-by: Waldemar Quevedo <wally@nats.io>
Signed-off-by: Waldemar Quevedo <wally@nats.io>
Signed-off-by: Waldemar Quevedo <wally@nats.io>
Signed-off-by: Waldemar Quevedo <wally@nats.io>
Copy link
Collaborator

@caspervonb caspervonb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good if next and anext had the same call path, no?

# require tracking the next_msg calls inflight for cancelling.
if cb is None:
self._pending_next_msgs_calls = {}
self._pending_next_msgs_calls: Optional[Dict[str, asyncio.Task]] = {}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So ideally, both next_msg and anext are basically the same call, aside from options like timeout etc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

think it could do sub.next_msg(timeout=0) though it is only the peeking from the queue part that they have in common, it should be fine probably

Signed-off-by: Waldemar Quevedo <wally@nats.io>
Signed-off-by: Waldemar Quevedo <wally@nats.io>
Signed-off-by: Waldemar Quevedo <wally@nats.io>
Signed-off-by: Waldemar Quevedo <wally@nats.io>
@caspervonb caspervonb requested a review from Copilot October 30, 2025 21:13
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the NATS subscription message iteration mechanism to use a more efficient async generator pattern. The changes replace the custom _SubscriptionMessageIterator class with a simpler generator-based approach while maintaining compatibility across Python 3.8-3.13.

Key changes:

  • Replaces _SubscriptionMessageIterator with _message_generator() for cleaner async iteration
  • Adds Python 3.13+ QueueShutDown support with backwards compatibility for older versions
  • Implements sentinel-based queue termination for Python < 3.13
  • Adds comprehensive test coverage for concurrent consumption patterns and edge cases

Reviewed Changes

Copilot reviewed 10 out of 10 changed files in this pull request and generated 17 comments.

Show a summary per file
File Description
nats/src/nats/aio/subscription.py Refactored message iteration to use async generators with version-specific queue shutdown handling
nats/src/nats/aio/client.py Updated client cleanup to use new sentinel-based termination approach
nats/src/nats/js/client.py Updated JetStream subscription to reference new _active_consumers field
nats/src/nats/js/object_store.py Changed from _message_iterator to messages property
nats/tests/test_client.py Added extensive test coverage for async generators, concurrent consumption, and edge cases
nats/tests/test_js.py Adjusted assertions to handle timing variability in tests
nats/benchmark/*.py Added new benchmark scripts for performance testing
.github/workflows/test.yml Added additional NATS server versions to test matrix

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +162 to +163
if self._max_msgs > 0 and yielded_count >= self._max_msgs:
break
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is redundant as the same condition is checked again at lines 184-185 after yielding the message. Consider removing this pre-check since the subscription should still process any messages already in the queue before stopping, and the post-yield check is sufficient to enforce the limit.

Suggested change
if self._max_msgs > 0 and yielded_count >= self._max_msgs:
break

Copilot uses AI. Check for mistakes.
Comment on lines +757 to +758
print("WRN", e)
pass
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This debug print statement should be removed or replaced with proper logging. Using print() in tests can clutter test output and the exception is being silently swallowed after printing.

Suggested change
print("WRN", e)
pass
raise

Copilot uses AI. Check for mistakes.
Comment on lines +99 to +101
# For Python < 3.13, we need to track active consumers for sentinel-based termination
# For Python 3.13+, we use QueueShutDown which doesn't require tracking.
if not _HAS_QUEUE_SHUTDOWN and cb is None:
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The condition not _HAS_QUEUE_SHUTDOWN and cb is None means that for Python 3.13+ with callback subscriptions, _active_consumers is set to None. However, this is inconsistent with the check at line 886 in js/client.py which copies this field. Consider documenting why callback-based subscriptions don't need active consumer tracking, or simplify the logic to only check cb is None.

Suggested change
# For Python < 3.13, we need to track active consumers for sentinel-based termination
# For Python 3.13+, we use QueueShutDown which doesn't require tracking.
if not _HAS_QUEUE_SHUTDOWN and cb is None:
# Only synchronous (non-callback) subscriptions need active consumer tracking,
# regardless of Python version. Callback-based subscriptions do not require it.
if cb is None:

Copilot uses AI. Check for mistakes.

async def consumer_task(consumer_id: str, max_messages: int = None):
"""Consumer task that processes messages"""
import random
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The random module should be imported at the top of the file rather than inside a function. Move this import to the file-level imports section for better code organization and to avoid repeated imports in concurrent tasks.

Copilot uses AI. Check for mistakes.

async def consumer_task(consumer_id: str, max_messages: int = None):
"""Consumer task that processes messages"""
import random
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The random module should be imported at the top of the file rather than inside a function. Move this import to the file-level imports section for better code organization and to avoid repeated imports in concurrent tasks.

Copilot uses AI. Check for mistakes.
sentinels_to_send = max(1, self._active_consumers)
for _ in range(sentinels_to_send):
self._pending_queue.put_nowait(None)
except Exception:
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'except' clause does nothing but pass and there is no explanatory comment.

Copilot uses AI. Check for mistakes.

# Wait for iterator to complete after drain
await asyncio.wait_for(fut, 1)
await iterator_task # Ensure task cleanup
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement has no effect.

Copilot uses AI. Check for mistakes.
self.assertEqual(msg2.data, b"timeout_zero_msg")

# Clean up
await publish_task
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement has no effect.

Copilot uses AI. Check for mistakes.

# Clean up
await publish_task
await publish_task2
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This statement has no effect.

Copilot uses AI. Check for mistakes.
await asyncio.gather(*tasks)
except Exception as e:
print("WRN", e)
pass
Copy link

Copilot AI Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unnecessary 'pass' statement.

Suggested change
pass

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Get from Object Stores are extremely slow

3 participants