- 
                Notifications
    You must be signed in to change notification settings 
- Fork 231
Add MaxReconnectAttemptsExceededError (#735) #737
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Add MaxReconnectAttemptsExceededError (#735) #737
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Adds a dedicated MaxReconnectAttemptsExceededError to signal when the client has exhausted all reconnection attempts across all servers, enabling applications to differentiate permanent failures from transient disconnects.
- Introduces MaxReconnectAttemptsExceededError in nats.errors
- Raises this error from client._select_next_server when all servers exceed max_reconnect_attempts
- Adds a test validating the new error propagation and state
Reviewed Changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 5 comments.
| File | Description | 
|---|---|
| tests/test_client.py | New test to validate error raised and callbacks/state when all reconnect attempts across servers are exhausted | 
| nats/errors.py | Adds MaxReconnectAttemptsExceededError with a descriptive str and max_attempts attribute | 
| nats/aio/client.py | Emits MaxReconnectAttemptsExceededError when no servers remain within allowed reconnect attempts | 
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
| def __init__(self, max_attempts: int | None = None) -> None: | ||
| self.max_attempts = max_attempts | 
    
      
    
      Copilot
AI
    
    
    
      Oct 15, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Use Optional[int] instead of the PEP 604 union (int | None) for broader Python compatibility and to align with typical typing style in mixed-version codebases. Suggested change: def init(self, max_attempts: Optional[int] = None) -> None (ensure Optional is imported from typing).
| class MaxReconnectAttemptsExceededError(Error): | ||
|  | ||
| def __init__(self, max_attempts: int | None = None) -> None: | ||
| self.max_attempts = max_attempts | ||
|  | ||
| def __str__(self) -> str: | ||
| if self.max_attempts is not None: | ||
| return f"nats: maximum reconnection attempts exceeded: {self.max_attempts}" | ||
| return "nats: maximum reconnection attempts exceeded" | 
    
      
    
      Copilot
AI
    
    
    
      Oct 15, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] Add a short class docstring describing when this error is raised and the meaning of max_attempts. This improves discoverability and clarifies the API contract for consumers.
| for server in self.server_pool: | ||
| asyncio.get_running_loop().run_in_executor(None, server.stop) | 
    
      
    
      Copilot
AI
    
    
    
      Oct 15, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The executor futures are not awaited, which can lead to nondeterministic behavior and test flakiness. Collect and await the futures to ensure all servers are stopped before proceeding, e.g., stops = [asyncio.get_running_loop().run_in_executor(None, s.stop) for s in self.server_pool]; await asyncio.gather(*stops).
| for server in self.server_pool: | |
| asyncio.get_running_loop().run_in_executor(None, server.stop) | |
| stops = [asyncio.get_running_loop().run_in_executor(None, server.stop) for server in self.server_pool] | |
| await asyncio.gather(*stops) | 
| # Wait for the client to exhaust all reconnection attempts | ||
| await asyncio.sleep(1.0) | 
    
      
    
      Copilot
AI
    
    
    
      Oct 15, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a fixed sleep makes the test timing-dependent and potentially flaky. Replace with a condition-based wait (polling nc.is_closed or the error callback capturing MaxReconnectAttemptsExceededError with a timeout) to deterministically wait until the client has exhausted reconnects.
| # Wait for the client to exhaust all reconnection attempts | |
| await asyncio.sleep(1.0) | |
| # Wait for the client to exhaust all reconnection attempts (deterministically) | |
| timeout = 2.0 # seconds | |
| interval = 0.05 # seconds | |
| waited = 0.0 | |
| while not nc.is_closed and not any(isinstance(e, nats.errors.MaxReconnectAttemptsExceededError) for e in errors): | |
| await asyncio.sleep(interval) | |
| waited += interval | |
| if waited >= timeout: | |
| break | 
| if s.reconnects > self.options["max_reconnect_attempts"]: | ||
| # Discard server since already tried to reconnect too many times | ||
| # Check if all remaining servers have also exceeded max reconnect attempts | ||
| if len(self._server_pool) == 0 or all( | 
    
      
    
      Copilot
AI
    
    
    
      Oct 15, 2025 
    
  
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[nitpick] The explicit len(self._server_pool) == 0 check is redundant since all([]) evaluates to True; the condition can be simplified to just all(...). This reduces duplication and keeps the intent clear.
| if len(self._server_pool) == 0 or all( | |
| if all( | 
Introduces a new MaxReconnectAttemptsExceededError that is raised when the NATS client exhausts all reconnection attempts across all servers in the pool. This allows applications to
distinguish between temporary connection issues and permanent failures where all retry limits have been exceeded.